1+ use std:: convert:: From ;
2+ use std:: fmt;
3+ use std:: thread;
4+ use std:: process;
5+
6+
7+ use std:: fmt:: { Debug , Display } ;
8+
19use serde:: { Deserialize , Serialize } ;
210use std:: {
311 collections:: HashMap ,
12+ collections:: HashSet ,
413 error:: Error ,
514 marker:: Copy ,
615 time:: { Duration , SystemTime } ,
@@ -44,6 +53,9 @@ use std::iter::Iterator;
4453// }
4554
4655const NANOS_PER_MICRO : u128 = 1_000 ;
56+ const DEFAULT_MEDATADA_FETCH_TIMEOUT : u64 = 5 ;
57+ const DEFAULT_SHOW_PROGRESS_EVERY_SECS : u64 = 60 ;
58+ const DEFAULT_CONFIG_MESSAGE_TIMEOUT : & str = "5000" ;
4759
4860pub struct ConsumerTestContext {
4961 pub _n : i64 , // Add data for memory access validation
@@ -111,8 +123,10 @@ pub struct Config {
111123 pub clusters : Vec < Cluster > ,
112124 pub clients : Vec < Client > ,
113125 pub routes : Vec < Route > ,
126+ pub watchers : Vec < WatcherConfig > ,
114127}
115128
129+
116130#[ derive( Debug , Serialize , Deserialize , Clone ) ]
117131pub struct Client {
118132 #[ serde( rename( deserialize = "client" ) ) ]
@@ -140,6 +154,16 @@ enum RepartitioningStrategy {
140154 Random ,
141155}
142156
157+ impl fmt:: Display for RepartitioningStrategy {
158+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
159+ write ! ( f, "{:}" , if let RepartitioningStrategy :: StrictP2P = & self {
160+ "strict_p2p"
161+ } else {
162+ "random"
163+ } )
164+ }
165+ }
166+
143167#[ derive( Debug , Serialize , Deserialize , Copy , Clone ) ]
144168enum BeginOffset {
145169 #[ serde( rename( deserialize = "earliest" ) ) ]
@@ -172,23 +196,110 @@ pub struct ReplicationRule {
172196 downstream_client : FutureProducer < DefaultClientContext > ,
173197 upstream_client_name : String ,
174198 downstream_client_name : String ,
175- show_progress_every_secs : Option < u64 > , // repartitioning_strategy: RepartitioningStrategy
199+ show_progress_every_secs : Option < u64 > ,
200+ repartitioning_strategy : RepartitioningStrategy
201+ }
202+
203+ #[ derive( Debug , Serialize , Deserialize , Clone ) ]
204+ pub struct WatcherConfig {
205+ client : String ,
206+ topics : Vec < String > ,
207+ show_progress_every_secs : Option < u64 > ,
208+ fetch_timeout_secs : Option < u64 > ,
209+ name : Option < String >
210+ }
211+
212+
213+
214+ pub struct Watcher {
215+ client : StreamConsumer ,
216+ topics : Vec < String > ,
217+ topics_set : HashSet < String > ,
218+ show_progress_every_secs : Option < u64 > ,
219+ fetch_timeout_secs : Option < u64 > ,
220+ name : String
221+ }
222+
223+ impl fmt:: Debug for Watcher {
224+ fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
225+ write ! ( f, "Watcher {{ topics: {:} }}" , & self . topics. join( ", " ) )
226+ }
227+ }
228+
229+
230+ impl Watcher {
231+
232+ pub fn start ( & self ) {
233+ if self . topics . len ( ) > 0 {
234+ info ! ( "Starting {:} for topics \" {:}\" " , self . name, self . topics. join( ", " ) ) ;
235+ }
236+ else {
237+ info ! ( "Starting {:} all broker topics" , self . name) ;
238+ } ;
239+
240+ let timeout: Duration = Duration :: from_secs ( self . fetch_timeout_secs . unwrap_or ( DEFAULT_MEDATADA_FETCH_TIMEOUT ) ) ;
241+
242+ let topic: Option < & str > = if self . topics . len ( ) == 1 {
243+ Some ( & self . topics [ 0 ] )
244+ } else {
245+ None
246+ } ;
247+
248+ let progress_timeout = Duration :: from_secs ( self . show_progress_every_secs . unwrap_or ( DEFAULT_SHOW_PROGRESS_EVERY_SECS ) ) ;
249+
250+ loop {
251+ self . print_current_status ( topic. clone ( ) , timeout. clone ( ) ) ;
252+
253+ thread:: sleep ( progress_timeout) ;
254+ }
255+
256+ }
257+ pub fn print_current_status ( & self , topic : Option < & str > , timeout : Duration ) {
258+ let metadata = self . client
259+ . fetch_metadata ( topic, timeout)
260+ . expect ( "Failed to fetch metadata" ) ;
261+
262+ for topic in metadata. topics ( ) . iter ( ) {
263+ if self . topics_set . contains ( topic. name ( ) ) || self . topics_set . len ( ) == 0 {
264+
265+ let mut message_count = 0 ;
266+ for partition in topic. partitions ( ) {
267+
268+ let ( low, high) = self . client
269+ . fetch_watermarks ( topic. name ( ) , partition. id ( ) , Duration :: from_secs ( 1 ) )
270+ . unwrap_or ( ( -1 , -1 ) ) ;
271+
272+ message_count += high - low;
273+
274+ }
275+
276+ info ! ( "\" {:}\" messages status for topic \" {:}\" : {:}" , self . name, topic. name( ) , message_count) ;
277+ }
278+ } ;
279+ }
176280}
177281
178282impl ReplicationRule {
179283 pub async fn start ( & self ) {
284+
285+ let repartitioning_strategy = if let RepartitioningStrategy :: StrictP2P = & self . repartitioning_strategy {
286+ true
287+ } else {
288+ false
289+ } ;
290+
180291 info ! (
181- "Starting replication {:} [ {:} ] -> {:} [ {:} ]" ,
292+ "Starting replication {:} [ {:} ] -> {:} [ {:} ] strategy={:} " ,
182293 & self . upstream_client_name,
183294 & self . upstream_topics. join( "," ) ,
184295 & self . downstream_client_name,
185- & self . downstream_topic
296+ & self . downstream_topic,
297+ & self . repartitioning_strategy
186298 ) ;
187299
188300 let topics: & Vec < & str > = & self . upstream_topics . iter ( ) . map ( |x| & * * x) . collect ( ) ;
189301
190- & self
191- . upstream_client
302+ & self . upstream_client
192303 . subscribe ( topics)
193304 . expect ( "Can't subscribe to specified topics" ) ;
194305
@@ -262,9 +373,9 @@ impl ReplicationRule {
262373 record = record. headers ( new_headers) ;
263374 } ;
264375
265- // if &self. repartitioning_strategy == RepartitioningStrategy.StrictP2P {
266- // record = record.partition(& unwraped_message.partition())
267- // }
376+ if repartitioning_strategy == true {
377+ record = record. partition ( unwraped_message. partition ( ) ) ;
378+ }
268379
269380 & self . downstream_client . send ( record, 0 ) ;
270381
@@ -318,8 +429,6 @@ impl Config {
318429 }
319430
320431 pub fn create_client_config ( & self , name : & str , group_id : Option < & str > ) -> ClientConfig {
321- debug ! ( "Create common kafka client" ) ;
322-
323432 let client = self . get_client ( & name) . unwrap ( ) ;
324433 let cluster = self . get_cluster ( & client. cluster ) . unwrap ( ) ;
325434
@@ -332,7 +441,7 @@ impl Config {
332441 ) ;
333442 config
334443 . set ( "bootstrap.servers" , & cluster. hosts . join ( "," ) )
335- . set ( "message.timeout.ms" , "5000" ) ;
444+ . set ( "message.timeout.ms" , DEFAULT_CONFIG_MESSAGE_TIMEOUT ) ;
336445
337446 if let Some ( v) = group_id {
338447 info ! ( "Configure client \" {:}\" group: {:}" , & name, v) ;
@@ -341,7 +450,7 @@ impl Config {
341450
342451 for ( key, value) in & client. config {
343452 info ! (
344- "Configure client \" {:}\" option: {:}={:}" ,
453+ "Configure client \" {:}\" option: {:}={:}" ,
345454 & name, & key, & value
346455 ) ;
347456 config. set ( key, value) ;
@@ -350,6 +459,43 @@ impl Config {
350459 config
351460 }
352461
462+ pub fn check_partitions ( & self , upstream_config : & ClientConfig , downstream_config : & ClientConfig ,
463+ upstream_topics : HashSet < String > , downstream_topic : String ) -> Result < ( ) , String > {
464+
465+ let upstream_consumer: StreamConsumer = upstream_config. create ( ) . expect ( "Can't create consumer." ) ;
466+
467+ let downstream_consumer: StreamConsumer = downstream_config. create ( ) . expect ( "Can't create consumer." ) ;
468+
469+
470+ let downstream_metadata = downstream_consumer. fetch_metadata ( Some ( & downstream_topic) , Duration :: from_secs ( 10 ) ) . expect ( "Failed to fetch metadata" ) ;
471+ let upstream_metadata = upstream_consumer. fetch_metadata ( None , Duration :: from_secs ( 10 ) ) . expect ( "Failed to fetch metadata" ) ;
472+
473+ let md_downstream_topic_size = downstream_metadata. topics ( ) . iter ( )
474+ . filter ( |topic| { topic. name ( ) == downstream_topic} )
475+ . map ( |topic| {
476+ // (topic.name(), topic.partitions().len())
477+ topic. partitions ( ) . len ( )
478+ } ) . next ( ) . expect ( & format ! ( "Not found topic: {:}" , & downstream_topic) ) ;
479+
480+
481+ let md_upstream_topics = upstream_metadata. topics ( )
482+ . iter ( )
483+ . filter ( |topic| { upstream_topics. contains ( topic. name ( ) ) } )
484+ . map ( |topic| {
485+ ( topic. name ( ) , topic. partitions ( ) . len ( ) ) } ) ;
486+
487+ for ( name, size) in md_upstream_topics {
488+ if size != md_downstream_topic_size {
489+ return Result :: Err ( format ! ( "Upstream ({:}) and downstream ({:}) topics have different number of partitions." , name, downstream_topic) ) ;
490+ //error!("Upstream and downstream topics have different number of partitions.");
491+ //process::abort();
492+ }
493+ }
494+
495+
496+ Ok ( ( ) )
497+ }
498+
353499 pub fn get_route_clients ( & self , index : usize ) -> ReplicationRule {
354500 let route = & self . routes [ index] ;
355501
@@ -360,6 +506,15 @@ impl Config {
360506 let downstream_config: ClientConfig =
361507 self . create_client_config ( & route. downstream_client , Option :: None ) ;
362508
509+ match route. repartitioning_strategy {
510+ RepartitioningStrategy :: StrictP2P => {
511+ self . check_partitions ( & upstream_config, & downstream_config,
512+ route. upstream_topics . clone ( ) . iter ( ) . cloned ( ) . collect ( ) ,
513+ route. downstream_topic . clone ( ) ) . expect ( "Upstream and downstream topics have different number of partitions" ) ;
514+ } ,
515+ _=> { }
516+ } ;
517+
363518 let mut upstream_client = upstream_config;
364519 let downstream_client = downstream_config;
365520
@@ -374,6 +529,7 @@ impl Config {
374529 } ;
375530
376531 ReplicationRule {
532+ repartitioning_strategy : route. repartitioning_strategy ,
377533 show_progress_every_secs : route. progress_every_secs . clone ( ) ,
378534 upstream_client_name : route. upstream_client . clone ( ) ,
379535 downstream_client_name : route. downstream_client . clone ( ) ,
@@ -395,6 +551,29 @@ impl Config {
395551 . collect ( ) ;
396552 rules
397553 }
554+
555+ pub fn get_watchers ( & self ) -> Vec < Watcher > {
556+
557+ let watchers: Vec < _ > = self . watchers . iter ( ) . enumerate ( ) . map ( |( i, x) | {
558+ let client: ClientConfig = self . create_client_config ( & x. client , None ) ;
559+
560+ Watcher {
561+ client : client. create ( ) . expect ( "Can't create consumer" ) ,
562+ topics : x. topics . clone ( ) ,
563+ topics_set : x. topics . iter ( ) . cloned ( ) . collect ( ) ,
564+ show_progress_every_secs : x. show_progress_every_secs . clone ( ) ,
565+ fetch_timeout_secs : x. fetch_timeout_secs . clone ( ) ,
566+ name : if let Some ( v) = & x. name {
567+ v. clone ( )
568+ }
569+ else {
570+ format ! ( "Observer #{:}" , i)
571+ }
572+ }
573+ } ) . collect ( ) ;
574+
575+ watchers
576+ }
398577}
399578
400579// #[cfg(test)]
0 commit comments