-
Notifications
You must be signed in to change notification settings - Fork 5
Implement nchain/consumer #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev2
Are you sure you want to change the base?
Conversation
kthomas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense with regard to cardinality 👍🏻
IIRC it was always implemented in the way you describe.
consumer/consumer.go
Outdated
|
|
||
| payload, _ := json.Marshal(fragment.Reassembly) | ||
| err = natsutil.NatsPublish(natsPacketReassembleSubject, payload) | ||
| remaining := fragment.Cardinality - *i |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this breaks parallelization pretty badly...
We must use an atomic counter here. You can receive all 1000 chunks of a 1000-chunk packet at the same time. Then you are racing for the last one to finish, and whatever cardinality i was is now responsible for setting progress....
This was meant to suggest using an atomic decrement equivalent to remaining--.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I'm following - i here is the redis counter that's atomically incremented when we ingest a fragment.
I should probably rename it, but all this is there for is making sure we send out only one prvd.packet.reassemble.finalize message and only once we've recieved all fragments.
Only concurrency issue here that I can see is that there is a possibility we recieve the last fragment at the same time as the reassembly header in which case we'd run this twice - I am fixing that now (just need to increment on the header and check for (Cardinality + 1) - counter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm
Make use of goroutines in BroadcastFragments
|
@blam23 is this still considered WIP? |
|
No this code is in a good state and can be reviewed/merged - although I'm not sure if it's being merged here? |
No description provided.