-
Notifications
You must be signed in to change notification settings - Fork 26
feat: transition from SubstreamSet to FuturesStream #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
lexnv
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @iTranscend for contributing 🙏
This looks good to me, it is nice that we have tests for our FuturesStream implementation
dmitry-markin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests were testing the SubstreamSet, implemented in this file. As it was replaced with FuturesStream, the equivalent tests should go to utils/futures_stream.rs.
Also, the SubstreamSet should be completely removed.
|
regarding the complete removal of Removing it will render the In the implementation of Lines 152 to 178 in 3f13b10
This behavior can't be replicated with My intuition on this may be wrong and I may be looking at this from the wrong angle so please lmk what you think. |
|
@iTranscend Thanks for the explanation! @lexnv What was the motivation behind the removal of If it's used in the other parts of the codebase, we should either rewrite these parts without its use, or keep it with all the tests. May be finding a better place for it. |
The core logic of litep2p has moved away from Offhand, I believe we can adjust the logic to achieve a similar output: We'd need to keep polling the futuresStream as before: event = self.substreams.next()There needs to be a mapping between the peer ID and the equivalent tokio channel for forwarding commands: struct CustomProtocol {
mappings: HashMap<PeerId, Sender<Command>>,When we populate the FuturesStream, we'd need to create a new channel: let (tx, rx) = channel(1024);
self.mappings.insert(peer_id, tx);
let substream_work_futures = async move { ... };
self.substreams.push(async move {
loop {
tokio::select {
// Handle command
command = rx => ... { }
// Handle substream messages
message = substream.next() => {
return Ok((peer, message, rx))
}If this turns to be to complicated, we could also move |
in partial resolution of #342
This PR replaces the use of the SubstreamSet hashmap in substream's test with the FuturesStream type.