Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions rosrust/src/api/raii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::rosxmlrpc::Response;
use crate::tcpros::{Message, PublisherStream, ServicePair, ServiceResult};
use crate::{RawMessageDescription, SubscriptionHandler};
use log::error;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;

#[derive(Clone)]
Expand Down Expand Up @@ -144,13 +144,15 @@ impl Subscriber {
T: Message,
H: SubscriptionHandler<T>,
{
let id = slave.add_subscription::<T, H>(name, queue_size, handler)?;
let unsub_signal = Arc::new(AtomicBool::new(false));
let id = slave.add_subscription::<T, H>(name, queue_size, handler, unsub_signal.clone())?;

let info = Arc::new(InteractorRaii::new(SubscriberInfo {
master,
slave,
name: name.into(),
id,
unsub_signal,
}));

let publishers = info
Expand Down Expand Up @@ -194,10 +196,13 @@ struct SubscriberInfo {
slave: Arc<Slave>,
name: String,
id: usize,
unsub_signal: Arc<AtomicBool>,
}

impl Interactor for SubscriberInfo {
fn unregister(&mut self) -> Response<()> {
self.unsub_signal.store(true, std::sync::atomic::Ordering::Relaxed);

self.slave.remove_subscription(&self.name, self.id);
self.master.unregister_subscriber(&self.name).map(|_| ())
}
Expand Down
4 changes: 3 additions & 1 deletion rosrust/src/api/slave/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use error_chain::bail;
use log::error;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

pub struct Slave {
Expand Down Expand Up @@ -156,13 +157,14 @@ impl Slave {
topic: &str,
queue_size: usize,
handler: H,
unsub_signal: Arc<AtomicBool>,
) -> Result<usize>
where
T: Message,
H: SubscriptionHandler<T>,
{
self.subscriptions
.add(&self.name, topic, queue_size, handler)
.add(&self.name, topic, queue_size, handler, unsub_signal)
}

#[inline]
Expand Down
4 changes: 3 additions & 1 deletion rosrust/src/api/slave/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use log::error;
use std::collections::{BTreeSet, HashMap};
use std::iter::FromIterator;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};

#[derive(Clone, Default)]
pub struct SubscriptionsTracker {
Expand Down Expand Up @@ -51,7 +52,7 @@ impl SubscriptionsTracker {
.collect()
}

pub fn add<T, H>(&self, name: &str, topic: &str, queue_size: usize, handler: H) -> Result<usize>
pub fn add<T, H>(&self, name: &str, topic: &str, queue_size: usize, handler: H, unsub_signal: Arc<AtomicBool>) -> Result<usize>
where
T: Message,
H: SubscriptionHandler<T>,
Expand All @@ -67,6 +68,7 @@ impl SubscriptionsTracker {
msg_definition,
msg_type.clone(),
md5sum.clone(),
unsub_signal,
)
});
let connection_topic = connection.get_topic();
Expand Down
36 changes: 29 additions & 7 deletions rosrust/src/tcpros/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::sync::Arc;
use std::thread;
use std::sync::atomic::{AtomicBool, Ordering};

enum DataStreamConnectionChange {
Connect(
Expand All @@ -37,6 +38,7 @@ impl SubscriberRosConnection {
msg_definition: String,
msg_type: String,
md5sum: String,
unsub_signal: Arc<AtomicBool>,
) -> SubscriberRosConnection {
let subscriber_connection_queue_size = 8;
let (data_stream_tx, data_stream_rx) = bounded(subscriber_connection_queue_size);
Expand All @@ -56,6 +58,7 @@ impl SubscriberRosConnection {
&msg_definition,
&md5sum,
&msg_type,
unsub_signal,
)
}
});
Expand Down Expand Up @@ -203,6 +206,7 @@ fn join_connections(
msg_definition: &str,
md5sum: &str,
msg_type: &str,
unsub_signal: Arc<AtomicBool>,
) {
type Sub = (LossySender<MessageInfo>, Sender<HashMap<String, String>>);
let mut subs: BTreeMap<usize, Sub> = BTreeMap::new();
Expand Down Expand Up @@ -255,6 +259,7 @@ fn join_connections(
msg_definition,
md5sum,
msg_type,
unsub_signal.clone(),
)
.chain_err(|| ErrorKind::TopicConnectionFail(topic.into()));
match result {
Expand Down Expand Up @@ -290,8 +295,11 @@ fn join_connection(
msg_definition: &str,
md5sum: &str,
msg_type: &str,
unsub_signal: Arc<AtomicBool>,
) -> Result<HashMap<String, String>> {
let mut stream = TcpStream::connect(publisher)?;
stream.set_read_timeout(Some(std::time::Duration::from_secs(10)))?;

let headers = exchange_headers::<_>(
&mut stream,
caller_id,
Expand All @@ -302,15 +310,29 @@ fn join_connection(
)?;
let pub_caller_id = headers.get("callerid").cloned();
let target = data_stream.clone();

thread::spawn(move || {
let pub_caller_id = Arc::new(pub_caller_id.unwrap_or_default());
while let Ok(buffer) = package_to_vector(&mut stream) {
if let Err(TrySendError::Disconnected(_)) =
target.try_send(MessageInfo::new(Arc::clone(&pub_caller_id), buffer))
{
// Data receiver has been destroyed after
// Subscriber destructor's kill signal
break;
loop {
match package_to_vector(&mut stream) {
Ok(buffer) => {
if let Err(TrySendError::Disconnected(_)) = target.try_send(MessageInfo::new(Arc::clone(&pub_caller_id), buffer))
{
// Data receiver has been destroyed after
// Subscriber destructor's kill signal
break;
}
}

Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
if unsub_signal.load(Ordering::Relaxed) {
// SubscriberInfo has been dropped, so break out of here to close the
// socket and exit the thread
break;
}
}

Err(_) => break,
}
}
});
Expand Down