Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 81 additions & 154 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -1,214 +1,141 @@
package consumer

import (
"encoding/json"
"encoding/hex"
"sync"
"time"

"github.com/kthomas/go-natsutil"
"github.com/nats-io/stan.go"
"github.com/provideplatform/nchain/common"
"github.com/vmihailenco/msgpack/v5"
)

// TODO: audit arbitrary max in flight & timeouts
const natsPacketFragmentIngestSubject = "prvd.packet.fragment.ingest"
const natsPacketFragmentIngestMaxInFlight = 1024
const natsPacketFragmentIngestInvocationTimeout = time.Second * 5 // FIXME!!! (see above)
const natsPacketFragmentIngestTimeout = int64(time.Second * 8) // FIXME!!! (see above)
const packetFragmentIngestSubject = "prvd.packet.fragment.ingest"
const packetFragmentIngestMaxInFlight = 1024
const packetFragmentIngestInvocationTimeout = time.Second * 5 // FIXME!!! (see above)
const packetFragmentIngestTimeout = int64(time.Second * 8) // FIXME!!! (see above)

// TODO: audit arbitrary max in flight & timeouts; investigate dynamic timeout based on reassembled packet size
const natsPacketReassembleSubject = "prvd.packet.reassemble"
const natsPacketReassembleMaxInFlight = 256
const natsPacketReassembleInvocationTimeout = time.Second * 30 // FIXME!!! (see above)
const natsPacketReassembleTimeout = int64(time.Second * 52) // FIXME!!! (see above)

var (
waitGroup sync.WaitGroup
currencyPairs = []string{}
)
const packetReassembleSubject = "prvd.packet.reassemble.ingest"
const packetReassembleMaxInFlight = 256
const packetReassembleInvocationTimeout = time.Second * 30 // FIXME!!! (see above)
const packetReassembleTimeout = int64(time.Second * 52) // FIXME!!! (see above)

const packetCompleteSubject = "prvd.packet.reassemble.finalize"

// PacketConsumer ingests packets that have been broadcasted out, and sends completion messages when a whole message has been ingested
type PacketConsumer struct {
network INetwork
db IDatabase
waitGroup sync.WaitGroup
}

func init() {
// Setup initialises a PacketConsumer and sets up the network as well as subscribing to the fragment and reassembly messages
func (consumer *PacketConsumer) Setup() {
if !common.ConsumeNATSStreamingSubscriptions {
common.Log.Debug("Consumer package configured to skip NATS streaming subscription setup")
return
}

natsutil.EstablishSharedNatsStreamingConnection(nil)

createNatsPacketReassemblySubscriptions(&waitGroup)
createNatsPacketFragmentIngestSubscriptions(&waitGroup)
consumer.network.Setup()
consumer.setupFragmentIngestSubscription()
consumer.setupReassemblyIngestSubscription()
}

func createNatsPacketFragmentIngestSubscriptions(wg *sync.WaitGroup) {
for i := uint64(0); i < natsutil.GetNatsConsumerConcurrency(); i++ {
natsutil.RequireNatsStreamingSubscription(wg,
natsPacketFragmentIngestInvocationTimeout,
natsPacketFragmentIngestSubject,
natsPacketFragmentIngestSubject,
consumePacketFragmentIngestMsg,
natsPacketFragmentIngestInvocationTimeout,
natsPacketFragmentIngestMaxInFlight,
func (consumer *PacketConsumer) setupFragmentIngestSubscription() {
for i := uint64(0); i < consumer.network.GetConsumerConcurrency(); i++ {
consumer.network.Subscribe(&consumer.waitGroup,
packetFragmentIngestInvocationTimeout,
packetFragmentIngestSubject,
packetFragmentIngestSubject,
consumer.consumePacketFragmentIngestMsg,
packetFragmentIngestInvocationTimeout,
packetFragmentIngestMaxInFlight,
nil,
)
}
}

func createNatsPacketReassemblySubscriptions(wg *sync.WaitGroup) {
for i := uint64(0); i < natsutil.GetNatsConsumerConcurrency(); i++ {
natsutil.RequireNatsStreamingSubscription(wg,
natsPacketReassembleInvocationTimeout,
natsPacketReassembleSubject,
natsPacketReassembleSubject,
consumePacketReassembleMsg,
natsPacketReassembleInvocationTimeout,
natsPacketReassembleMaxInFlight,
func (consumer *PacketConsumer) setupReassemblyIngestSubscription() {
for i := uint64(0); i < consumer.network.GetConsumerConcurrency(); i++ {
consumer.network.Subscribe(&consumer.waitGroup,
packetReassembleInvocationTimeout,
packetReassembleSubject,
packetReassembleSubject,
consumer.consumePacketReassembleMsg,
packetReassembleInvocationTimeout,
packetReassembleMaxInFlight,
nil,
)
}
}

func consumePacketFragmentIngestMsg(msg *stan.Msg) {
common.Log.Debugf("Consuming NATS packet fragment ingest message: %s", msg)
// Common reassembly code - this is called once all fragments & the header have been consumed
func (consumer *PacketConsumer) handleReassembly(msg *stan.Msg, reassembly *packetReassembly) {
common.Log.Debugf("All fragments ingested for packet with checksum %s", hex.EncodeToString(*reassembly.Checksum))

fragment := &packetFragment{}
err := json.Unmarshal(msg.Data, &fragment)
payload, _ := msgpack.Marshal(reassembly)
_, err := consumer.network.Send(packetCompleteSubject, payload)
if err != nil {
common.Log.Warningf("Failed to umarshal packet fragment ingest message; %s", err.Error())
natsutil.Nack(msg)
return
}

if fragment.Checksum == nil {
common.Log.Warning("Failed to ingest packet fragment; nil checksum")
natsutil.Nack(msg)
return
}

if fragment.Payload == nil {
common.Log.Warning("Failed to ingest packet fragment; nil payload")
natsutil.Nack(msg)
return
}

if fragment.Cardinality == 0 {
common.Log.Warning("Failed to ingest packet fragment; cardinality must be greater than zero")
natsutil.Nack(msg)
return
common.Log.Warningf("Failed to publish %d-byte packet reassembly message on subject: %s; %s", len(payload), packetReassembleSubject, err.Error())
natsutil.AttemptNack(msg, packetFragmentIngestTimeout)
}
}

if fragment.Index >= fragment.Cardinality {
common.Log.Warning("Failed to ingest packet fragment; fragment index must be less than the packet cardinality")
natsutil.Nack(msg)
func (consumer *PacketConsumer) consumePacketFragmentIngestMsg(msg *stan.Msg) {
fragment := &packetFragment{}
err := msgpack.Unmarshal(msg.Data, &fragment)
if err != nil {
common.Log.Warningf("Failed to umarshal packet fragment ingest message; %s", err.Error())
consumer.network.Acknowledge(msg) // Acknowledge the bad packet so it's not resent
return
}

if fragment.Index == 0 {
if fragment.Reassembly == nil {
common.Log.Warning("Failed to ingest packet fragment; reassembly 'header' required within 'fragment 0' encapsulation")
natsutil.Nack(msg)
return
}
}

common.Log.Debugf("Attempting to ingest %d-byte packet fragment containing %d-byte fragment payload (%d of %d)", len(msg.Data), len(*fragment.Payload), fragment.Index+1, fragment.Cardinality)
ingestVerified, i, ingestErr := fragment.Ingest()
if ingestErr != nil || !ingestVerified {
common.Log.Warningf("Failed to ingest %d-byte packet fragment containing %d-byte fragment payload (%d of %d); %s", len(msg.Data), len(*fragment.Payload), fragment.Index+1, fragment.Cardinality, ingestErr.Error())
natsutil.AttemptNack(msg, natsPacketFragmentIngestTimeout)
ingestVerified, ingestCounter, err := fragment.Ingest(consumer.db)
if err != nil || !ingestVerified {
common.Log.Warningf("Failed to ingest %d-byte packet fragment containing %d-byte fragment payload (%d of %d); %s", len(msg.Data), len(*fragment.Payload), fragment.Index+1, fragment.Cardinality, err.Error())
// No acknowledgement here - so we can try again
return
}

progress := float64(*i) / float64(fragment.Cardinality)
if progress == 1 {
common.Log.Debugf("All fragments ingested for packet with checksum %s; dispatching reassembly message on subject: %s", *fragment.Checksum, *fragment.Reassembly.Next)

if fragment.Reassembly == nil {
_, err := fragment.FetchReassemblyHeader()
if err != nil {
common.Log.Warningf("Unable to publish packet reassembly message on subject: %s; %s", natsPacketReassembleSubject, err.Error())
natsutil.AttemptNack(msg, natsPacketFragmentIngestTimeout)
return
}
}

payload, _ := json.Marshal(fragment.Reassembly)
err = natsutil.NatsPublish(natsPacketReassembleSubject, payload)
// Add one to cardinality as we need to account for the reassembly message too
common.Log.Tracef("Successfully ingested fragment #%d with checksum %s; %d of %d total fragments needed for reassembly have been ingested", fragment.Index, hex.EncodeToString(*fragment.Checksum), *ingestCounter, fragment.Cardinality+1)
remaining := (fragment.Cardinality + 1) - *ingestCounter
if remaining == 0 {
reassembly, err := fragment.FetchReassemblyHeader(consumer.db)
if err != nil {
common.Log.Warningf("Failed to publish %d-byte packet reassembly message on subject: %s; %s", len(payload), natsPacketReassembleSubject, err.Error())
natsutil.AttemptNack(msg, natsPacketFragmentIngestTimeout)
return
common.Log.Warningf("Unable to publish packet reassembly message on subject: %s; %s", packetReassembleSubject, err.Error())
} else {
consumer.handleReassembly(msg, reassembly)
}
}

common.Log.Debugf("Successfully ingested fragment #%d with checksum %s; %d of %d total fragments needed for reassembly have been ingested (%f%%)", fragment.Index+1, *fragment.Checksum, i, fragment.Cardinality, progress)
msg.Ack()
consumer.network.Acknowledge(msg)
}

func consumePacketReassembleMsg(msg *stan.Msg) {
common.Log.Debugf("Consuming NATS packet reassembly message: %s", msg)

func (consumer *PacketConsumer) consumePacketReassembleMsg(msg *stan.Msg) {
reassembly := &packetReassembly{}
err := json.Unmarshal(msg.Data, &reassembly)
err := msgpack.Unmarshal(msg.Data, &reassembly)
if err != nil {
common.Log.Warningf("Failed to umarshal packet reassembly message; %s", err.Error())
natsutil.Nack(msg)
consumer.network.Acknowledge(msg) // Acknowledge the bad packet so it's not resent
return
}

if reassembly.Checksum == nil {
common.Log.Warning("Failed to reassemble packet; nil checksum")
natsutil.Nack(msg)
ingestVerified, ingestCounter, err := reassembly.Ingest(consumer.db)
if err != nil || !ingestVerified {
common.Log.Warningf("Failed to ingest %d-byte packet header; %s", len(msg.Data), err.Error())
// No acknowledgement here - so we can try again
return
}

if reassembly.Next == nil {
common.Log.Warning("Failed to reassemble packet; next hop not specified") // TODO-- relax this to support pure p2p file transfer
natsutil.Nack(msg)
return
common.Log.Tracef("Successfully ingested reassembly header. %d of %d total fragments needed for reassembly have been ingested", *ingestCounter, reassembly.Cardinality+1)
remaining := (reassembly.Cardinality + 1) - *ingestCounter
if remaining == 0 {
consumer.handleReassembly(msg, reassembly)
}

if reassembly.Cardinality == 0 {
common.Log.Warning("Failed to reassemble packet; cardinality must be greater than zero")
natsutil.Nack(msg)
return
}

fragSize := reassembly.Size / reassembly.Cardinality
progress, i, err := reassembly.fragmentIngestProgress()
if err != nil {
common.Log.Warningf("Failed to reassemble %d-byte packet consisting of %d %d-byte fragment(s); failed atomically reading or parsing fragment ingest progress; %s", reassembly.Size, reassembly.Cardinality, fragSize, err.Error())
natsutil.AttemptNack(msg, natsPacketReassembleTimeout)
return
}

if *progress == 1 {
common.Log.Debugf("All fragments ingested for packet with checksum %s; dispatching next hop with pointer to reconstituted packet as message on subject: %s", *reassembly.Checksum, *reassembly.Next)

reassemblyVerified, reassemblyErr := reassembly.Reassemble()
if reassemblyErr != nil || !reassemblyVerified {
if reassemblyErr != nil {
common.Log.Warningf(reassemblyErr.Error())
} else {
common.Log.Warningf("Failed to reassemble packet with checksum %s; verification failed", *reassembly.Checksum)
}

natsutil.AttemptNack(msg, natsPacketReassembleTimeout)
return
}

payload, _ := json.Marshal(reassembly)
err = natsutil.NatsPublish(*reassembly.Next, payload)
if err != nil {
common.Log.Warningf("Failed to publish %d-byte next hop message on subject: %s; %s", len(payload), *reassembly.Next, err.Error())
natsutil.AttemptNack(msg, natsPacketReassembleTimeout)
return
}

common.Log.Debugf("Published %d-byte next hop message after successful reassembly of packet with checksum %s on subject: %s", len(payload), *reassembly.Checksum, *reassembly.Next)
msg.Ack()
} else {
common.Log.Warningf("Failed to reassemble %d-byte packet consisting of %d %d-byte fragment(s); only %d fragments ingested (%f%%)", reassembly.Size, reassembly.Cardinality, fragSize, *i, *progress)
natsutil.AttemptNack(msg, natsPacketReassembleTimeout)
return
}
consumer.network.Acknowledge(msg)
}
Loading