Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ui/extension/dist
ui/extension/packages
ui/extension/.env
ui/wp-plugin.zip
ui/public/widget.wasm

# netstated globe web client and build
netstate/d/webclients/globe/node_modules
Expand Down
4 changes: 4 additions & 0 deletions clientcore/broflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt
egOpt = NewDefaultEgressOptions()
}

// obtain the donor's team member ID if they are a donor seeking "credit" for their facilitated connections
// TODO this is a mock/placeholder, replace with real value
egOpt.TeamMemberID = fmt.Sprintf("MOCK-TEAM-ID-%v", time.Now().UTC().Format("2006-01-02T15:04:05"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A contributing team member's ID could be obtained when the user logs in, and set somewhere that it can be read and added to egress websocket connections.

How should that ID be known here?


// The boot DAG:
// build cTable/pTable -> build the Broflake struct -> run ui.Init -> set up the bus and bind
// the upstream/downstream handlers -> build cRouter/pRouter -> start the bus, init the routers,
Expand Down
12 changes: 11 additions & 1 deletion clientcore/egress_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package clientcore

import (
"context"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -42,7 +43,16 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor

// TODO: WSS

c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, nil)
var opts websocket.DialOptions
if options.TeamMemberID != "" {
headers := http.Header{}
headers.Add("X-Unbounded", options.TeamMemberID)
opts = websocket.DialOptions{
HTTPHeader: headers,
}
}

c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, &opts)
if err != nil {
common.Debugf("Couldn't connect to egress server at %v: %v", options.Addr, err)
<-time.After(options.ErrorBackoff)
Expand Down
1 change: 1 addition & 0 deletions clientcore/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type EgressOptions struct {
Endpoint string
ConnectTimeout time.Duration
ErrorBackoff time.Duration
TeamMemberID string // an optional team member ID recording donated connectivity
}

func NewDefaultEgressOptions() *EgressOptions {
Expand Down
29 changes: 29 additions & 0 deletions diagrams.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diagrams

Temporary file for updated diagrams from the [README](./README.md) done natively in mermaid to allow version-controlled updates.

```mermaid
flowchart LR
subgraph blocked-peer
client
end
subgraph unbounded.lantern.io
widget
leaderboard
end
subgraph matchmaking
freddie <--> widget
end
subgraph lantern-cloud
subgraph http-proxy
widget <==> |WebSocket| egress
end
egress-->redis[(redis)]
redis-.->api
api<-->db[(database)]
end
client <==> |proxy| widget
client <--> freddie
api --> leaderboard
internet((open internet)) <==> egress
```
25 changes: 20 additions & 5 deletions egress/egresslib.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ func (q websocketPacketConn) LocalAddr() net.Addr {

type proxyListener struct {
net.Listener
connections chan net.Conn
tlsConfig *tls.Config
addr net.Addr
closeMetrics func(ctx context.Context) error
connections chan net.Conn
tlsConfig *tls.Config
addr net.Addr
closeMetrics func(ctx context.Context) error
ReportConnection func(ctx context.Context) error // TODO should we use a method here?
ReportBytes func(ctx context.Context) error // TODO same, and do we have a way to track bytes?
Comment on lines +112 to +113
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently unused, but kept in as a placeholder for a "insert reporting func" structure that isn't too specifically tied to lantern-cloud

}

func (l proxyListener) Accept() (net.Conn, error) {
Expand Down Expand Up @@ -191,6 +193,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
common.Debugf("Accepted a new WebSocket connection! (%v total)", atomic.AddUint64(&nClients, 1))
nClientsCounter.Add(context.Background(), 1)

// check for optional tracking identifier for donors who wish to be credited the facilitated connections
unboundedID := r.Header.Get("X-Unbounded")
common.Debugf("X-Unbounded: %v", unboundedID)

listener, err := quic.Listen(wspconn, l.tlsConfig, &common.QUICCfg)
if err != nil {
common.Debugf("Error creating QUIC listener: %v", err)
Expand All @@ -208,6 +214,11 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
nQUICConnectionsCounter.Add(context.Background(), 1)
common.Debugf("%v accepted a new QUIC connection!", wspconn.addr)

go func() {
// TODO record a connection
common.Debugf("[placeholder] POST new connection for ID: %v", unboundedID)
Comment on lines +218 to +219
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

placeholder: report to redis

}()

go func() {
for {
stream, err := conn.AcceptStream(context.Background())
Expand All @@ -223,6 +234,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
}

common.Debugf("Accepted a new QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, 1))

// TODO what is the relationship between a connection and a stream? 1:1?
// if not, should be be recording streams?

Comment on lines +238 to +240
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it matter to count streams?

nQUICStreamsCounter.Add(context.Background(), 1)

l.connections <- common.QUICStreamNetConn{
Expand All @@ -240,9 +255,9 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) {
}

func NewListener(ctx context.Context, ll net.Listener, certPEM, keyPEM string) (net.Listener, error) {
var err error
closeFuncMetric := telemetry.EnableOTELMetrics(ctx)
m := otel.Meter("github.com/getlantern/broflake/egress")
var err error
nClientsCounter, err = m.Int64UpDownCounter("concurrent-websockets")
if err != nil {
closeFuncMetric(ctx)
Expand Down
Binary file modified ui/public/widget.wasm
Binary file not shown.
1 change: 1 addition & 0 deletions ui/src/utils/wasmInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ export class WasmInterface {
[workerIdx]: connection
}
this.connections = this.idxMapToArr(this.connectionMap)

// emit state
connectionsEmitter.update(this.connections)
if (existingState === -1 && state === 1) {
Expand Down