1- use crate :: application:: * ;
1+ use crate :: { application:: * , utils :: get_path_timestamp } ;
22pub use async_trait:: async_trait;
33use futures_lite:: stream:: StreamExt ;
44use lapin:: {
@@ -7,7 +7,11 @@ use lapin::{
77 Connection , ConnectionProperties ,
88} ;
99use prost:: Message ;
10- use std:: { collections:: HashMap , sync:: mpsc:: channel, thread} ;
10+ use std:: {
11+ collections:: { HashMap , HashSet } ,
12+ sync:: { mpsc:: channel, Arc , Mutex } ,
13+ thread,
14+ } ;
1115use structopt:: StructOpt ;
1216use tracing:: { debug, error} ;
1317
@@ -71,11 +75,11 @@ impl CoLinkProtocol {
7175 let list: CoLinkInternalTaskIdList =
7276 Message :: decode ( & * list_entry. payload ) . unwrap ( ) ;
7377 if list. task_ids_with_key_paths . is_empty ( ) {
74- get_timestamp ( & list_entry. key_path )
78+ get_path_timestamp ( & list_entry. key_path )
7579 } else {
7680 list. task_ids_with_key_paths
7781 . iter ( )
78- . map ( |x| get_timestamp ( & x. key_path ) )
82+ . map ( |x| get_path_timestamp ( & x. key_path ) )
7983 . min ( )
8084 . unwrap_or ( i64:: MAX )
8185 }
@@ -151,19 +155,16 @@ impl CoLinkProtocol {
151155 }
152156}
153157
154- fn get_timestamp ( key_path : & str ) -> i64 {
155- let pos = key_path. rfind ( '@' ) . unwrap ( ) ;
156- key_path[ pos + 1 ..] . parse ( ) . unwrap ( )
157- }
158-
159158pub fn _protocol_start (
160159 cl : CoLink ,
161160 user_funcs : HashMap < String , Box < dyn ProtocolEntry + Send + Sync > > ,
162161) -> Result < ( ) , Error > {
163162 let mut operator_funcs: HashMap < String , Box < dyn ProtocolEntry + Send + Sync > > = HashMap :: new ( ) ;
164- let mut protocols = vec ! [ ] ;
163+ let mut protocols = HashSet :: new ( ) ;
164+ let failed_protocols = Arc :: new ( Mutex :: new ( HashSet :: new ( ) ) ) ;
165165 for ( protocol_and_role, user_func) in user_funcs {
166166 let cl = cl. clone ( ) ;
167+ let failed_protocols = failed_protocols. clone ( ) ;
167168 if protocol_and_role. ends_with ( ":@init" ) {
168169 let protocol_name = protocol_and_role[ ..protocol_and_role. len ( ) - 6 ] . to_string ( ) ;
169170 tokio:: runtime:: Builder :: new_multi_thread ( )
@@ -181,19 +182,27 @@ pub fn _protocol_start(
181182 . start ( cl_clone, Default :: default ( ) , Default :: default ( ) )
182183 . await
183184 {
184- Ok ( _) => { }
185- Err ( e) => error ! ( "{}: {}." , protocol_and_role, e) ,
185+ Ok ( _) => {
186+ cl. update_entry ( & is_initialized_key, & [ 1 ] ) . await ?;
187+ }
188+ Err ( e) => {
189+ error ! ( "{}: {}." , protocol_and_role, e) ;
190+ failed_protocols. lock ( ) . unwrap ( ) . insert ( protocol_name) ;
191+ }
186192 }
187- cl. update_entry ( & is_initialized_key, & [ 1 ] ) . await ?;
188193 }
189194 cl. unlock ( lock) . await ?;
190195 Ok :: < ( ) , Box < dyn std:: error:: Error + Send + Sync + ' static > > ( ( ) )
191196 } ) ?;
192197 } else {
193- protocols. push ( protocol_and_role[ ..protocol_and_role. rfind ( ':' ) . unwrap ( ) ] . to_string ( ) ) ;
198+ protocols
199+ . insert ( protocol_and_role[ ..protocol_and_role. rfind ( ':' ) . unwrap ( ) ] . to_string ( ) ) ;
194200 operator_funcs. insert ( protocol_and_role, user_func) ;
195201 }
196202 }
203+ for failed_protocol in & * failed_protocols. lock ( ) . unwrap ( ) {
204+ protocols. remove ( failed_protocol) ;
205+ }
197206 let cl_clone = cl. clone ( ) ;
198207 tokio:: runtime:: Builder :: new_multi_thread ( )
199208 . enable_all ( )
@@ -259,6 +268,7 @@ pub struct CommandLineArgs {
259268}
260269
261270pub fn _colink_parse_args ( ) -> CoLink {
271+ tracing_subscriber:: fmt:: init ( ) ;
262272 let CommandLineArgs {
263273 addr,
264274 jwt,
0 commit comments