From 5fea1b1840692a4e2c24166fc89e7f6c0a916359 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 7 Sep 2023 18:24:00 +0100 Subject: [PATCH] Improved tree broadcast algorithm This revises the wakeup broadcast algorithm by including the source coordinates and strictly forwarding only towards peers that are further away from the origin of the broadcast than the current forwarding node is. Optionally, this can also only follow on-tree paths, which gets significantly closer to exactly-once delivery across receiving nodes. Signed-off-by: Neil Alexander --- cmd/pineconesim/ui/modules/graph.js | 2 +- router/consts.go | 3 +++ router/state_broadcast.go | 13 ++++------- router/state_forward.go | 34 ++++++++++++++++++++++------- router/version.go | 8 ++++++- types/coordinates.go | 7 ++++++ types/frame.go | 12 ++++++++++ 7 files changed, 60 insertions(+), 19 deletions(-) diff --git a/cmd/pineconesim/ui/modules/graph.js b/cmd/pineconesim/ui/modules/graph.js index 12699289..ebe49a32 100644 --- a/cmd/pineconesim/ui/modules/graph.js +++ b/cmd/pineconesim/ui/modules/graph.js @@ -946,7 +946,7 @@ function handleNodePanelUpdate() { "DestPeer" + snekTable + "" + - "

Broadcasts Received

" + + "

Broadcasts Received (" + broadcasts.size + ")

" + "" + "" + bcastTable + diff --git a/router/consts.go b/router/consts.go index 7fddcd53..5ab5bcb1 100644 --- a/router/consts.go +++ b/router/consts.go @@ -65,6 +65,9 @@ const coordsCacheMaintainInterval = time.Minute // to send broadcast messages into the network. const wakeupBroadcastInterval = time.Minute +// wakeupBroadcastType is the default flooding method. +const wakeupBroadcastType = FloodDefault + // broadcastExpiryPeriod is how long we'll wait to // expire a seen broadcast. const broadcastExpiryPeriod = wakeupBroadcastInterval * 3 diff --git a/router/state_broadcast.go b/router/state_broadcast.go index 3ad9f6de..12fe9ca6 100644 --- a/router/state_broadcast.go +++ b/router/state_broadcast.go @@ -84,6 +84,7 @@ func (s *state) _createBroadcastFrame() (*types.Frame, error) { // Construct the frame. send := getFrame() send.Type = types.TypeWakeupBroadcast + send.Source = s._coords() send.SourceKey = s.r.public send.HopLimit = types.NetworkHorizonDistance send.Payload = append(send.Payload[:0], b[:n]...) @@ -96,8 +97,7 @@ func (s *state) _sendWakeupBroadcasts() { if err != nil { s.r.log.Println("Failed creating broadcast frame:", err) } - - s._flood(s.r.local, broadcast, ClassicFlood) + s._flood(s.r.local, broadcast, wakeupBroadcastType) } func (s *state) _handleBroadcast(p *peer, f *types.Frame) error { @@ -157,13 +157,8 @@ func (s *state) _handleBroadcast(p *peer, f *types.Frame) error { return nil } - // Forward the broadcast to all our peers except for the peer we - // received it from. - if f.HopLimit >= types.NetworkHorizonDistance-1 { - s._flood(p, f, ClassicFlood) - } else { - s._flood(p, f, TreeFlood) - } + // Forward the broadcast onwards. + s._flood(p, f, wakeupBroadcastType) return nil } diff --git a/router/state_forward.go b/router/state_forward.go index 4dbc9c26..42ab7e42 100644 --- a/router/state_forward.go +++ b/router/state_forward.go @@ -25,8 +25,13 @@ import ( type FloodType int const ( - ClassicFlood FloodType = iota - TreeFlood + // Send to all neighbours with a further tree dist than the source. Potential + // for duplicate deliveries but much higher % chance of successful delivery. + FloodDefault FloodType = iota + + // Send only to on-tree neighbours with a further tree dist. Technically this + // eliminates duplicates but it has much weaker delivery guarantees. + FloodTreeOnly ) // _nextHopsFor returns the next-hop for the given frame. It will examine the packet @@ -166,6 +171,8 @@ func (s *state) _forward(p *peer, f *types.Frame) error { // Tree flooding works by only sending frames to peers on the same branch. func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) { floodCandidates := make(map[types.PublicKey]*peer) + ourCoords := s._coords() + ourDist := ourCoords.DistanceTo(f.Source) for _, newCandidate := range s._peers { if newCandidate == nil || newCandidate.proto == nil || !newCandidate.started.Load() { continue @@ -180,15 +187,26 @@ func (s *state) _flood(from *peer, f *types.Frame, floodType FloodType) { continue } - if floodType == TreeFlood { - if coords, err := newCandidate._coords(); err == nil { - if coords.DistanceTo(s._coords()) != 1 { - // This peer is not directly on the same branch. + if coords, err := newCandidate._coords(); err == nil { + if coords.DistanceTo(f.Source) <= ourDist { + // Don't forward the packet to any peers who aren't + // strictly further away from the source coordinates + // than we are. + continue + } + if floodType == FloodTreeOnly { + // Only flood via on-tree paths, i.e. direct parents or + // direct children. In this mode we will pretty much get + // at-most-once delivery, but we won't flood to any peer + // which is a shortcut to another branch. Without this, + // we'll send messages via shortcuts but that may result + // in duplicates. + if !ourCoords.IsChildOf(coords) && !coords.IsChildOf(ourCoords) { continue } - } else { - continue } + } else { + continue } if existingCandidate, ok := floodCandidates[newCandidate.public]; ok { diff --git a/router/version.go b/router/version.go index f93ae905..75739597 100644 --- a/router/version.go +++ b/router/version.go @@ -21,7 +21,13 @@ const ( capabilityDedupedCoordinateInfo capabilitySoftState capabilityHybridRouting + capabilityImprovedTreeBroadcast ) const ourVersion uint8 = 1 -const ourCapabilities uint32 = capabilityLengthenedRootInterval | capabilityCryptographicSetups | capabilityDedupedCoordinateInfo | capabilitySoftState | capabilityHybridRouting +const ourCapabilities uint32 = capabilityLengthenedRootInterval | + capabilityCryptographicSetups | + capabilityDedupedCoordinateInfo | + capabilitySoftState | + capabilityHybridRouting | + capabilityImprovedTreeBroadcast diff --git a/types/coordinates.go b/types/coordinates.go index 11eddb81..30fbbf07 100644 --- a/types/coordinates.go +++ b/types/coordinates.go @@ -117,6 +117,13 @@ func (a Coordinates) DistanceTo(b Coordinates) int { return len(a) + len(b) - 2*ancestor } +func (a Coordinates) IsChildOf(b Coordinates) bool { + if len(a) == 0 || len(a)-1 != len(b) { + return false + } + return a[:len(a)-1].EqualTo(b) +} + func getCommonPrefix(a, b Coordinates) int { c := 0 l := len(a) diff --git a/types/frame.go b/types/frame.go index 33f92a8a..689de3ee 100644 --- a/types/frame.go +++ b/types/frame.go @@ -89,6 +89,8 @@ func (f *Frame) CopyInto(t *Frame) { t.Type = f.Type t.Extra = f.Extra t.HopLimit = f.HopLimit + t.Source = append(t.Source[:0], f.Source...) + t.Destination = append(t.Source[:0], f.Destination...) t.DestinationKey = f.DestinationKey t.SourceKey = f.SourceKey t.Watermark = f.Watermark @@ -134,6 +136,11 @@ func (f *Frame) MarshalBinary(buffer []byte) (int, error) { payloadLen := len(f.Payload) binary.BigEndian.PutUint16(buffer[offset+0:offset+2], uint16(payloadLen)) offset += 2 + sn, err := f.Source.MarshalBinary(buffer[offset:]) + if err != nil { + return 0, fmt.Errorf("f.Source.MarshalBinary: %w", err) + } + offset += sn offset += copy(buffer[offset:], f.SourceKey[:ed25519.PublicKeySize]) if f.Payload != nil { f.Payload = f.Payload[:payloadLen] @@ -231,6 +238,11 @@ func (f *Frame) UnmarshalBinary(data []byte) (int, error) { return 0, fmt.Errorf("payload length exceeds frame capacity") } offset += 2 + srcLen, srcErr := f.Source.UnmarshalBinary(data[offset:]) + if srcErr != nil { + return 0, fmt.Errorf("f.Source.UnmarshalBinary: %w", srcErr) + } + offset += srcLen offset += copy(f.SourceKey[:], data[offset:]) f.Payload = f.Payload[:payloadLen] offset += copy(f.Payload[:payloadLen], data[offset:])
NameTime