Skip to content

Conversation

@qjpcpu
Copy link

@qjpcpu qjpcpu commented Jan 2, 2026

Context / Problem

  • Under “simultaneous connect” (both nodes dialing each other at the same time) and/or during pool re-dial, the acceptor side can receive a Join attempt for an already-existing logical connection but with a different ConnectionID .

  • Today this surfaces as noisy logs like unable to join ... connection id mismatch , and on the dialing side a failed Join can be misinterpreted as a successful recovery of a pool link, causing repeated retries and log storms.
    What This PR Changes

  • Deterministic connection collision handling (simultaneous connect)

    • Adds collision arbitration to converge on a single connection when both sides initiate connections concurrently.
    • When a collision is detected, the code deterministically prefers one side (based on node name ordering) and terminates/replaces the other to prevent “two competing connections” and follow-up Join mismatches.
    • Ensures serve() only unregisters a connection if it still owns the slot (prevents accidental unregister of a newer connection).
  • Explicit Join accept/reject signal for pool re-dial (protocol change within EHS Join flow)

    • Extends the Join flow to include a 1-byte status immediately after the Accept handshake message.
    • Join() now reads this status and returns a clearer error:
      • join rejected: connection id mismatch
      • join rejected: no existing connection
      • join rejected (generic)
    • The acceptor side writes back the status for Join handshakes (identified by PeerCreation == 0 ), so the dialing side can stop treating a rejected join as a success.
      Files Changed
  • node/network.go

    • Implements collision arbitration and safer unregister behavior.
    • Adds accept-side Join status responses (success / mismatch / no existing).
  • net/handshake/join.go

    • Reads the 1-byte status after Accept in the Join() path and converts it into explicit errors.
  • testing/tests/002_distributed/t008_simultaneous_connect_test.go (new)

    • Adds regression tests covering simultaneous connect, including a higher-parallelism case.
      Compatibility Notes (Important)
  • This PR changes the wire behavior of the Ergo Handshake Join path by requiring an additional 1-byte status after Accept .

  • That means it is not backward compatible with older Ergo nodes that don’t send/read this byte during Join() . Mixed-version clusters may fail to re-dial/join pool links until all nodes are upgraded.

@CLAassistant
Copy link

CLAassistant commented Jan 2, 2026

CLA assistant check
All committers have signed the CLA.

@qjpcpu
Copy link
Author

qjpcpu commented Jan 2, 2026

@halturin hi, If this PR change is not very elegant, I'd be very happy if you could submit a more elegant fix.

@halturin
Copy link
Collaborator

halturin commented Jan 3, 2026

Thanks for the report, it's a great catch. May you please check my fix for your use case? see #245

@qjpcpu
Copy link
Author

qjpcpu commented Jan 3, 2026

Thanks for the report, it's a great catch. May you please check my fix for your use case? see #245

@halturin great, it works in my case. I'll close my pr later

@qjpcpu
Copy link
Author

qjpcpu commented Jan 3, 2026

@halturin wait, still throw those errors and never stop

2026-01-03 15:28:21 [trace] 62264BA1: unable to join 192.168.192.3:51058 to the existing connection with 'node37@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 009882F8: unable to join 192.168.192.3:45946 to the existing connection with 'node63@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 13F1C4C8: unable to join 192.168.192.3:55194 to the existing connection with 'node40@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 13F1C4C8-8E882EA9: re-dialing 192.168.192.3:11195
2026-01-03 15:28:21 [trace] 13F1C4C8: unable to join 192.168.192.3:55198 to the existing connection with 'node40@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 62264BA1: unable to join 192.168.192.3:51068 to the existing connection with 'node37@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 13F1C4C8: unable to join 192.168.192.3:55202 to the existing connection with 'node40@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 8E882EA9-13F1C4C8: re-dialing 192.168.192.3:11198
2026-01-03 15:28:21 [trace] 009882F8-62264BA1: re-dialing 192.168.192.3:11200
2026-01-03 15:28:21 [trace] 8E882EA9: unable to join 192.168.192.3:38576 to the existing connection with 'node58@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 62264BA1-009882F8: re-dialing 192.168.192.3:11185
2026-01-03 15:28:21 [trace] 8E882EA9-13F1C4C8: re-dialing 192.168.192.3:11198
2026-01-03 15:28:21 [trace] 009882F8-62264BA1: re-dialing 192.168.192.3:11200
2026-01-03 15:28:21 [trace] 8E882EA9-13F1C4C8: re-dialing 192.168.192.3:11198
2026-01-03 15:28:21 [trace] 8E882EA9: unable to join 192.168.192.3:38578 to the existing connection with 'node58@actor-server1': connection id mismatch
2026-01-03 15:28:21 [trace] 009882F8: unable to join 192.168.192.3:45956 to the existing connection with 'node63@actor-server1': connection id mismatch

I push a simple demo https://github.com/qjpcpu/simultaneous-connect , you can reproduce this issue by it.

go build && ./simultaneous-connect

qjpcpu added a commit to qjpcpu/simultaneous-connect that referenced this pull request Jan 3, 2026
@halturin
Copy link
Collaborator

halturin commented Jan 3, 2026

Actually, the logic is correct. You can see that by adding this line

a.SendAfter(a.PID(), "send_msg", time.Second*3)
+ a.Log().Info("Total peers: %d", len(a.Node().Network().Nodes()))

and disable "trace" level. it takes some time since it is establishing ~15K tcp connection (3 per "node-to-node") but in the end you will see

1767467648466099000 [info] <D7D79671.0.1004>: Total peers: 99
1767467648466303000 [info] <630C9CC5.0.1004>: Total peers: 99
1767467648468082000 [info] <0DFE46A1.0.1004>: Total peers: 99
1767467648471253000 [info] <83D38CC2.0.1004>: Total peers: 99
1767467648472515000 [info] <D9113B2F.0.1004>: Total peers: 99
1767467648478249000 [info] <F320AC17.0.1004>: Total peers: 99
1767467648482102000 [info] <6252F9AB.0.1004>: Total peers: 99
1767467648482166000 [info] <50F09554.0.1004>: Total peers: 99
1767467648487690000 [info] <38904E46.0.1004>: Total peers: 99
1767467648489684000 [info] <C64EA4F0.0.1004>: Total peers: 99
1767467648503820000 [info] <1849BF8A.0.1004>: Total peers: 99
1767467648506806000 [info] <893BEA36.0.1004>: Total peers: 99
1767467648507729000 [info] <39CE2B28.0.1004>: Total peers: 99
1767467648508012000 [info] <659E9DB3.0.1004>: Total peers: 99
1767467648515249000 [info] <88658F58.0.1004>: Total peers: 99
1767467648527499000 [info] <428B0867.0.1004>: Total peers: 99
1767467648528195000 [info] <2BB5B61B.0.1004>: Total peers: 99
1767467648534599000 [info] <088EE865.0.1004>: Total peers: 99
1767467648539598000 [info] <E005544A.0.1004>: Total peers: 99
1767467648540250000 [info] <08151198.0.1004>: Total peers: 99
1767467648544622000 [info] <3A2C849A.0.1004>: Total peers: 99
1767467648554681000 [info] <43D56D09.0.1004>: Total peers: 99
1767467648557951000 [info] <7EEFC9C6.0.1004>: Total peers: 99
1767467648558452000 [info] <D3F95DDB.0.1004>: Total peers: 99
1767467648560491000 [info] <F529549C.0.1004>: Total peers: 99
1767467648566613000 [info] <C0DCA586.0.1004>: Total peers: 99
1767467648566678000 [info] <77E50080.0.1004>: Total peers: 99
1767467648570051000 [info] <3E024F30.0.1004>: Total peers: 99
1767467648570098000 [info] <EC7F33C8.0.1004>: Total peers: 99
1767467648573966000 [info] <7295AE44.0.1004>: Total peers: 99
1767467648574574000 [info] <68BA9F5F.0.1004>: Total peers: 99
1767467648574803000 [info] <ADCCD050.0.1004>: Total peers: 99

Those "mismatch" messages are part of the logic that handles collisions. Can be ignored.

PS: Not sure why a full-mesh topology is needed here - it’s generally not considered a good design choice.

@qjpcpu
Copy link
Author

qjpcpu commented Jan 4, 2026

@halturin I have a large cluster with over 2,000 nodes, and simultaneous interconnections between nodes are almost inevitable. However, the fix you proposed converges too slowly and doesn’t seem to address my issue. The occurrence of an "id mismatch" indicates that there is silent failure in Send operations, which the upper-layer application cannot detect (if everything falls back to SendImportant, performance becomes too poor). Additionally, during stress testing, about 6 out of 100 nodes consistently fail to achieve convergence, causing communication between these nodes to remain unrecoverable, unlike what you described—that convergence and self-healing would eventually occur. The fix I suggested earlier achieves convergence much more quickly. Could you please review whether there is a better implementation approach?

@halturin
Copy link
Collaborator

halturin commented Jan 4, 2026

The main problem is that we are closing tcp link (in both approaches), which means we might lose a message anyway.

I'll try to redesign the solution to merge tcp link into a single connection which basically the framework does, but there are two roles - dialer and acceptor. In the case of a collision, both nodes are dialers and acceptors simultaneously.

Here is the diagram of my current fix, which is more like a workaround.

sequenceDiagram
    box alice@localhost
        participant AliceMgr as Network Manager<br/>(n.connections)
        participant AliceOut as Outbound Dialer
        participant AliceAcc as Acceptor Loop
    end

    box bob@localhost
        participant BobAcc as Acceptor Loop
        participant BobOut as Outbound Dialer
        participant BobMgr as Network Manager<br/>(n.connections)
    end

    Note over AliceMgr,BobMgr: T0: Both nodes call GetNode() simultaneously

    par alice dials bob
        AliceMgr->>AliceOut: GetNode(bob) → connect()
        AliceOut->>BobAcc: TCP Connect
        BobAcc-->>AliceOut: TCP Accept
        AliceOut->>BobAcc: MessageHello (Start)<br/>{Creation: 123}
        BobAcc-->>AliceOut: Handshake response
        AliceOut->>AliceMgr: ✅ registerConnection(bob)<br/>LoadOrStore(bob) → SUCCESS
        Note over AliceMgr: bob registered in n.connections
    and bob dials alice
        BobMgr->>BobOut: GetNode(alice) → connect()
        BobOut->>AliceAcc: TCP Connect
        AliceAcc-->>BobOut: TCP Accept
        BobOut->>AliceAcc: MessageHello (Start)<br/>{Creation: 456}
        AliceAcc-->>BobOut: Handshake response
        BobOut->>BobMgr: ✅ registerConnection(alice)<br/>LoadOrStore(alice) → SUCCESS
        Note over BobMgr: alice registered in n.connections
    end

    Note over AliceMgr,BobMgr: T1: Both have registered each other from OUTBOUND connections

    par alice's acceptor receives inbound from bob
        Note over AliceAcc: Inbound TCP from bob
        BobOut->>AliceAcc: MessageHello (Start)<br/>{Creation: 456}
        rect rgb(240, 240, 255)
            Note over AliceAcc: Handshake complete<br/>result.Peer = bob<br/>result.PeerCreation = 456
            AliceAcc->>AliceMgr: Check: n.connections.Load(bob)
            AliceMgr-->>AliceAcc: ❌ bob ALREADY EXISTS
            Note over AliceAcc: PeerCreation = 456 ≠ 0<br/>→ Start handshake<br/>🔴 COLLISION DETECTED
            Note over AliceAcc: Tie-breaker:<br/>bob > alice<br/>→ alice wins
            AliceAcc->>AliceAcc: Close inbound TCP
            Note over AliceAcc: Keep existing outbound
        end
    and bob's acceptor receives inbound from alice
        Note over BobAcc: Inbound TCP from alice
        AliceOut->>BobAcc: MessageHello (Start)<br/>{Creation: 123}
        rect rgb(255, 240, 240)
            Note over BobAcc: Handshake complete<br/>result.Peer = alice<br/>result.PeerCreation = 123
            BobAcc->>BobMgr: Check: n.connections.Load(alice)
            BobMgr-->>BobAcc: ❌ alice ALREADY EXISTS
            Note over BobAcc: PeerCreation = 123 ≠ 0<br/>→ Start handshake<br/>🔴 COLLISION DETECTED
            Note over BobAcc: Tie-breaker:<br/>alice < bob<br/>→ alice wins
            BobMgr->>BobMgr: connections.Delete(alice)
            Note over BobAcc: Terminate outbound<br/>Accept inbound
            BobAcc->>BobMgr: registerConnection(alice)<br/>from inbound
        end
    end

    Note over AliceMgr,BobMgr: T2: Collision resolved<br/>alice: outbound connection to bob<br/>bob: inbound connection from alice

    AliceMgr->>AliceOut: Join pool members
    AliceOut->>BobAcc: MessageJoin(ConnectionID)<br/>{PeerCreation: 0}
    BobAcc->>BobMgr: Check: n.connections.Load(alice)
    BobMgr-->>BobAcc: ✅ alice exists
    Note over BobAcc: PeerCreation = 0<br/>→ Join (pool member)
    BobAcc->>BobMgr: conn.Join() - add pool member
    BobMgr-->>BobAcc: Success
    BobAcc-->>AliceOut: ACK
    AliceOut-->>AliceMgr: Pool ready

    rect rgb(240, 255, 240)
        Note over AliceMgr: RemoteNode{bob}<br/>PoolDSN: [127.0.0.1:11146]<br/>Role: DIALER
    end
    rect rgb(240, 255, 240)
        Note over BobMgr: RemoteNode{alice}<br/>PoolDSN: []<br/>Role: ACCEPTOR
    end

    Note over AliceMgr,BobMgr: ✅ Single logical connection with connection pool<br/>alice is dialer, bob is acceptor
Loading

@qjpcpu
Copy link
Author

qjpcpu commented Jan 4, 2026

Waiting for your new solution ^_^

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants