Skip to content

Commit f9e1a15

Browse files
committed
Enhance TcpGetNode with default configuration values and improve message handling
1 parent 5ab3a47 commit f9e1a15

File tree

1 file changed

+19
-5
lines changed
  • crates/core/src/runtime/nodes/network_nodes

1 file changed

+19
-5
lines changed

crates/core/src/runtime/nodes/network_nodes/tcp_get.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ struct TcpGetNode {
4343
base: BaseFlowNodeState,
4444
config: TcpGetNodeConfig,
4545
connections: Arc<DashMap<String, Arc<Mutex<TcpStream>>>>,
46+
reconnect_time: u64,
47+
socket_timeout: Option<u64>,
48+
msg_queue_size: usize,
4649
}
4750

4851
impl TcpGetNode {
@@ -74,7 +77,18 @@ impl TcpGetNode {
7477
_options: Option<&config::Config>,
7578
) -> crate::Result<Box<dyn FlowNodeBehavior>> {
7679
let tcp_config = TcpGetNodeConfig::deserialize(&config.rest)?;
77-
let node = TcpGetNode { base: state, config: tcp_config, connections: Arc::new(DashMap::new()) };
80+
// Set defaults as in Node-RED: reconnect_time=10000, socket_timeout=None, msg_queue_size=1000
81+
let reconnect_time = 10000;
82+
let socket_timeout = None;
83+
let msg_queue_size = 1000;
84+
let node = TcpGetNode {
85+
base: state,
86+
config: tcp_config,
87+
connections: Arc::new(DashMap::new()),
88+
reconnect_time,
89+
socket_timeout,
90+
msg_queue_size,
91+
};
7892
Ok(Box::new(node))
7993
}
8094
}
@@ -336,7 +350,7 @@ impl TcpGetNode {
336350
Ok(buffer)
337351
}
338352

339-
async fn handle_message(&self, msg: MsgHandle, stop_token: CancellationToken) -> crate::Result<()> {
353+
async fn handle_message(self: Arc<Self>, msg: MsgHandle, stop_token: CancellationToken) -> crate::Result<()> {
340354
let msg_guard = msg.read().await;
341355

342356
// Check for reset command
@@ -572,10 +586,10 @@ impl FlowNodeBehavior for TcpGetNode {
572586

573587
async fn run(self: Arc<Self>, stop_token: CancellationToken) {
574588
while !stop_token.is_cancelled() {
575-
let self_clone = self.clone();
589+
let arc_self = Arc::clone(&self);
576590
let stop_token_clone = stop_token.clone();
577-
with_uow(self_clone.as_ref(), stop_token_clone.clone(), |node, msg| async move {
578-
node.handle_message(msg, stop_token_clone).await
591+
with_uow(self.as_ref(), stop_token_clone.clone(), |node, msg| async move {
592+
arc_self.handle_message(msg, stop_token_clone).await
579593
})
580594
.await;
581595
}

0 commit comments

Comments
 (0)