@@ -164,7 +164,7 @@ private void Init()
164164 if ( HeartbeatInterval != null )
165165 {
166166 heartbeatTimer = new Timer ( SendHeartbeat , null ,
167- TimeSpan . FromMilliseconds ( 0 ) , HeartbeatInterval . Value ) ;
167+ TimeSpan . FromMilliseconds ( 0 ) , HeartbeatInterval . GetValueOrDefault ( ) ) ;
168168 }
169169
170170 Interlocked . CompareExchange ( ref lastHeartbeatTicks , DateTime . UtcNow . Ticks , lastHeartbeatTicks ) ;
@@ -178,7 +178,7 @@ void SendHeartbeat(object state)
178178 if ( currentStatus != Status . Started )
179179 return ;
180180
181- if ( DateTime . UtcNow - new DateTime ( lastHeartbeatTicks ) < HeartbeatInterval . Value )
181+ if ( DateTime . UtcNow - new DateTime ( lastHeartbeatTicks ) < HeartbeatInterval . GetValueOrDefault ( ) )
182182 return ;
183183
184184 OnHeartbeatSent ? . Invoke ( ) ;
@@ -229,97 +229,95 @@ private void RunLoop()
229229 //RESET
230230 while ( Interlocked . CompareExchange ( ref status , 0 , 0 ) == Status . Started )
231231 {
232- using ( var redis = ClientsManager . GetReadOnlyClient ( ) )
233- {
234- masterClient = redis ;
232+ using var redis = ClientsManager . GetReadOnlyClient ( ) ;
233+ masterClient = redis ;
235234
236- //Record that we had a good run...
237- Interlocked . CompareExchange ( ref noOfContinuousErrors , 0 , noOfContinuousErrors ) ;
235+ //Record that we had a good run...
236+ Interlocked . CompareExchange ( ref noOfContinuousErrors , 0 , noOfContinuousErrors ) ;
238237
239- using ( var subscription = redis . CreateSubscription ( ) )
240- {
241- subscription . OnUnSubscribe = HandleUnSubscribe ;
238+ using var subscription = redis . CreateSubscription ( ) ;
239+ subscription . OnUnSubscribe = HandleUnSubscribe ;
242240
243- if ( OnMessageBytes != null )
244- {
245- bool IsCtrlMessage ( byte [ ] msg )
246- {
247- if ( msg . Length < 4 )
248- return false ;
249- return msg [ 0 ] == 'C' && msg [ 1 ] == 'T' && msg [ 0 ] == 'R' && msg [ 0 ] == 'L' ;
250- }
241+ if ( OnMessageBytes != null )
242+ {
243+ bool IsCtrlMessage ( byte [ ] msg )
244+ {
245+ if ( msg . Length < 4 )
246+ return false ;
247+ return msg [ 0 ] == 'C' && msg [ 1 ] == 'T' && msg [ 0 ] == 'R' && msg [ 0 ] == 'L' ;
248+ }
251249
252- ( ( RedisSubscription ) subscription ) . OnMessageBytes = ( channel , msg ) => {
253- if ( IsCtrlMessage ( msg ) )
254- return ;
250+ ( ( RedisSubscription ) subscription ) . OnMessageBytes = ( channel , msg ) => {
251+ if ( IsCtrlMessage ( msg ) )
252+ return ;
255253
256- OnMessageBytes ( channel , msg ) ;
257- } ;
258- }
254+ OnMessageBytes ( channel , msg ) ;
255+ } ;
256+ }
259257
260- subscription . OnMessage = ( channel , msg ) =>
261- {
262- if ( string . IsNullOrEmpty ( msg ) )
263- return ;
258+ subscription . OnMessage = ( channel , msg ) =>
259+ {
260+ if ( string . IsNullOrEmpty ( msg ) )
261+ return ;
264262
265- var ctrlMsg = msg . LeftPart ( ':' ) ;
266- if ( ctrlMsg == ControlCommand . Control )
267- {
268- var op = Interlocked . CompareExchange ( ref doOperation , Operation . NoOp , doOperation ) ;
263+ var ctrlMsg = msg . LeftPart ( ':' ) ;
264+ if ( ctrlMsg == ControlCommand . Control )
265+ {
266+ var op = Interlocked . CompareExchange ( ref doOperation , Operation . NoOp , doOperation ) ;
269267
270- var msgType = msg . IndexOf ( ':' ) >= 0
271- ? msg . RightPart ( ':' )
272- : null ;
268+ var msgType = msg . IndexOf ( ':' ) >= 0
269+ ? msg . RightPart ( ':' )
270+ : null ;
273271
274- OnControlCommand ? . Invoke ( msgType ?? Operation . GetName ( op ) ) ;
272+ OnControlCommand ? . Invoke ( msgType ?? Operation . GetName ( op ) ) ;
275273
276- switch ( op )
274+ switch ( op )
275+ {
276+ case Operation . Stop :
277+ if ( Log . IsDebugEnabled )
278+ Log . Debug ( "Stop Command Issued" ) ;
279+
280+ Interlocked . CompareExchange ( ref status , Status . Stopping , Status . Started ) ;
281+ try
277282 {
278- case Operation . Stop :
279- if ( Log . IsDebugEnabled )
280- Log . Debug ( "Stop Command Issued" ) ;
281-
282- Interlocked . CompareExchange ( ref status , Status . Stopping , Status . Started ) ;
283- try
284- {
285- if ( Log . IsDebugEnabled )
286- Log . Debug ( "UnSubscribe From All Channels..." ) ;
287-
288- subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
289- }
290- finally
291- {
292- Interlocked . CompareExchange ( ref status , Status . Stopped , Status . Stopping ) ;
293- }
294- return ;
295-
296- case Operation . Reset :
297- subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
298- return ;
299- }
283+ if ( Log . IsDebugEnabled )
284+ Log . Debug ( "UnSubscribe From All Channels..." ) ;
300285
301- switch ( msgType )
286+ // ReSharper disable once AccessToDisposedClosure
287+ subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
288+ }
289+ finally
302290 {
303- case ControlCommand . Pulse :
304- Pulse ( ) ;
305- break ;
291+ Interlocked . CompareExchange ( ref status , Status . Stopped , Status . Stopping ) ;
306292 }
307- }
308- else
309- {
310- OnMessage ( channel , msg ) ;
311- }
312- } ;
313-
314- //blocks thread
315- if ( ChannelsMatching != null && ChannelsMatching . Length > 0 )
316- subscription . SubscribeToChannelsMatching ( ChannelsMatching ) ;
317- else
318- subscription . SubscribeToChannels ( Channels ) ;
319-
320- masterClient = null ;
293+ return ;
294+
295+ case Operation . Reset :
296+ // ReSharper disable once AccessToDisposedClosure
297+ subscription . UnSubscribeFromAllChannels ( ) ; //Un block thread.
298+ return ;
299+ }
300+
301+ switch ( msgType )
302+ {
303+ case ControlCommand . Pulse :
304+ Pulse ( ) ;
305+ break ;
306+ }
321307 }
322- }
308+ else
309+ {
310+ OnMessage ( channel , msg ) ;
311+ }
312+ } ;
313+
314+ //blocks thread
315+ if ( ChannelsMatching != null && ChannelsMatching . Length > 0 )
316+ subscription . SubscribeToChannelsMatching ( ChannelsMatching ) ;
317+ else
318+ subscription . SubscribeToChannels ( Channels ) ;
319+
320+ masterClient = null ;
323321 }
324322
325323 OnStop ? . Invoke ( ) ;
@@ -363,7 +361,7 @@ private void Stop(bool shouldRestart)
363361 if ( Log . IsDebugEnabled )
364362 Log . Debug ( "Stopping RedisPubSubServer..." ) ;
365363
366- //Unblock current bgthread by issuing StopCommand
364+ //Unblock current bg thread by issuing StopCommand
367365 SendControlCommand ( Operation . Stop ) ;
368366 }
369367 }
@@ -382,12 +380,10 @@ private void NotifyAllSubscribers(string commandType=null)
382380
383381 try
384382 {
385- using ( var redis = ClientsManager . GetClient ( ) )
383+ using var redis = ClientsManager . GetClient ( ) ;
384+ foreach ( var channel in Channels )
386385 {
387- foreach ( var channel in Channels )
388- {
389- redis . PublishMessage ( channel , msg ) ;
390- }
386+ redis . PublishMessage ( channel , msg ) ;
391387 }
392388 }
393389 catch ( Exception ex )
@@ -406,13 +402,11 @@ private void HandleFailover(IRedisClientsManager clientsManager)
406402 if ( masterClient != null )
407403 {
408404 //New thread-safe client with same connection info as connected master
409- using ( var currentlySubscribedClient = ( ( RedisClient ) masterClient ) . CloneClient ( ) )
405+ using var currentlySubscribedClient = ( ( RedisClient ) masterClient ) . CloneClient ( ) ;
406+ Interlocked . CompareExchange ( ref doOperation , Operation . Reset , doOperation ) ;
407+ foreach ( var channel in Channels )
410408 {
411- Interlocked . CompareExchange ( ref doOperation , Operation . Reset , doOperation ) ;
412- foreach ( var channel in Channels )
413- {
414- currentlySubscribedClient . PublishMessage ( channel , ControlCommand . Control ) ;
415- }
409+ currentlySubscribedClient . PublishMessage ( channel , ControlCommand . Control ) ;
416410 }
417411 }
418412 else
@@ -466,12 +460,12 @@ private void KillBgThreadIfExists()
466460 private void SleepBackOffMultiplier ( int continuousErrorsCount )
467461 {
468462 if ( continuousErrorsCount == 0 ) return ;
469- const int MaxSleepMs = 60 * 1000 ;
463+ const int maxSleepMs = 60 * 1000 ;
470464
471465 //exponential/random retry back-off.
472466 var nextTry = Math . Min (
473467 rand . Next ( ( int ) Math . Pow ( continuousErrorsCount , 3 ) , ( int ) Math . Pow ( continuousErrorsCount + 1 , 3 ) + 1 ) ,
474- MaxSleepMs ) ;
468+ maxSleepMs ) ;
475469
476470 if ( Log . IsDebugEnabled )
477471 Log . Debug ( "Sleeping for {0}ms after {1} continuous errors" . Fmt ( nextTry , continuousErrorsCount ) ) ;
0 commit comments