diff --git a/.gitignore b/.gitignore index 23e2222f..95eeb9c3 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ ui/extension/dist ui/extension/packages ui/extension/.env ui/wp-plugin.zip +ui/public/widget.wasm # netstated globe web client and build netstate/d/webclients/globe/node_modules diff --git a/clientcore/broflake.go b/clientcore/broflake.go index deb97187..54299436 100644 --- a/clientcore/broflake.go +++ b/clientcore/broflake.go @@ -117,6 +117,10 @@ func NewBroflake(bfOpt *BroflakeOptions, rtcOpt *WebRTCOptions, egOpt *EgressOpt egOpt = NewDefaultEgressOptions() } + // obtain the donor's team member ID if they are a donor seeking "credit" for their facilitated connections + // TODO this is a mock/placeholder, replace with real value + egOpt.TeamMemberID = fmt.Sprintf("MOCK-TEAM-ID-%v", time.Now().UTC().Format("2006-01-02T15:04:05")) + // The boot DAG: // build cTable/pTable -> build the Broflake struct -> run ui.Init -> set up the bus and bind // the upstream/downstream handlers -> build cRouter/pRouter -> start the bus, init the routers, diff --git a/clientcore/egress_consumer.go b/clientcore/egress_consumer.go index 1aa5be22..e4053ba3 100644 --- a/clientcore/egress_consumer.go +++ b/clientcore/egress_consumer.go @@ -6,6 +6,7 @@ package clientcore import ( "context" + "net/http" "sync" "time" @@ -42,7 +43,16 @@ func NewEgressConsumerWebSocket(options *EgressOptions, wg *sync.WaitGroup) *Wor // TODO: WSS - c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, nil) + var opts websocket.DialOptions + if options.TeamMemberID != "" { + headers := http.Header{} + headers.Add("X-Unbounded", options.TeamMemberID) + opts = websocket.DialOptions{ + HTTPHeader: headers, + } + } + + c, _, err := websocket.Dial(ctx, options.Addr+options.Endpoint, &opts) if err != nil { common.Debugf("Couldn't connect to egress server at %v: %v", options.Addr, err) <-time.After(options.ErrorBackoff) diff --git a/clientcore/settings.go b/clientcore/settings.go index 7163939b..d4e71bb6 100644 --- a/clientcore/settings.go +++ b/clientcore/settings.go @@ -43,6 +43,7 @@ type EgressOptions struct { Endpoint string ConnectTimeout time.Duration ErrorBackoff time.Duration + TeamMemberID string // an optional team member ID recording donated connectivity } func NewDefaultEgressOptions() *EgressOptions { diff --git a/diagrams.md b/diagrams.md new file mode 100644 index 00000000..ce1c36b5 --- /dev/null +++ b/diagrams.md @@ -0,0 +1,29 @@ +# diagrams + +Temporary file for updated diagrams from the [README](./README.md) done natively in mermaid to allow version-controlled updates. + +```mermaid +flowchart LR +subgraph blocked-peer + client +end +subgraph unbounded.lantern.io + widget + leaderboard +end +subgraph matchmaking + freddie <--> widget +end +subgraph lantern-cloud + subgraph http-proxy + widget <==> |WebSocket| egress + end + egress-->redis[(redis)] + redis-.->api + api<-->db[(database)] +end +client <==> |proxy| widget +client <--> freddie +api --> leaderboard +internet((open internet)) <==> egress +``` diff --git a/egress/egresslib.go b/egress/egresslib.go index dc017eac..a55b59ca 100644 --- a/egress/egresslib.go +++ b/egress/egresslib.go @@ -105,10 +105,12 @@ func (q websocketPacketConn) LocalAddr() net.Addr { type proxyListener struct { net.Listener - connections chan net.Conn - tlsConfig *tls.Config - addr net.Addr - closeMetrics func(ctx context.Context) error + connections chan net.Conn + tlsConfig *tls.Config + addr net.Addr + closeMetrics func(ctx context.Context) error + ReportConnection func(ctx context.Context) error // TODO should we use a method here? + ReportBytes func(ctx context.Context) error // TODO same, and do we have a way to track bytes? } func (l proxyListener) Accept() (net.Conn, error) { @@ -191,6 +193,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { common.Debugf("Accepted a new WebSocket connection! (%v total)", atomic.AddUint64(&nClients, 1)) nClientsCounter.Add(context.Background(), 1) + // check for optional tracking identifier for donors who wish to be credited the facilitated connections + unboundedID := r.Header.Get("X-Unbounded") + common.Debugf("X-Unbounded: %v", unboundedID) + listener, err := quic.Listen(wspconn, l.tlsConfig, &common.QUICCfg) if err != nil { common.Debugf("Error creating QUIC listener: %v", err) @@ -208,6 +214,11 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { nQUICConnectionsCounter.Add(context.Background(), 1) common.Debugf("%v accepted a new QUIC connection!", wspconn.addr) + go func() { + // TODO record a connection + common.Debugf("[placeholder] POST new connection for ID: %v", unboundedID) + }() + go func() { for { stream, err := conn.AcceptStream(context.Background()) @@ -223,6 +234,10 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } common.Debugf("Accepted a new QUIC stream! (%v total)", atomic.AddUint64(&nQUICStreams, 1)) + + // TODO what is the relationship between a connection and a stream? 1:1? + // if not, should be be recording streams? + nQUICStreamsCounter.Add(context.Background(), 1) l.connections <- common.QUICStreamNetConn{ @@ -240,9 +255,9 @@ func (l proxyListener) handleWebsocket(w http.ResponseWriter, r *http.Request) { } func NewListener(ctx context.Context, ll net.Listener, certPEM, keyPEM string) (net.Listener, error) { + var err error closeFuncMetric := telemetry.EnableOTELMetrics(ctx) m := otel.Meter("github.com/getlantern/broflake/egress") - var err error nClientsCounter, err = m.Int64UpDownCounter("concurrent-websockets") if err != nil { closeFuncMetric(ctx) diff --git a/ui/public/widget.wasm b/ui/public/widget.wasm index 94f4c870..a70cff56 100755 Binary files a/ui/public/widget.wasm and b/ui/public/widget.wasm differ diff --git a/ui/src/utils/wasmInterface.ts b/ui/src/utils/wasmInterface.ts index 49c0794a..d3001de5 100644 --- a/ui/src/utils/wasmInterface.ts +++ b/ui/src/utils/wasmInterface.ts @@ -232,6 +232,7 @@ export class WasmInterface { [workerIdx]: connection } this.connections = this.idxMapToArr(this.connectionMap) + // emit state connectionsEmitter.update(this.connections) if (existingState === -1 && state === 1) {