11package com .avsystem .commons
22package redis .actor
33
4- import akka .actor .{Actor , ActorRef }
4+ import akka .actor .{Actor , ActorRef , Cancellable }
55import akka .stream .scaladsl ._
66import akka .stream .{CompletionStrategy , IgnoreComplete , Materializer , SystemMaterializer }
77import akka .util .ByteString
@@ -18,6 +18,7 @@ import java.nio.{Buffer, ByteBuffer}
1818import scala .annotation .tailrec
1919import scala .collection .mutable
2020import scala .concurrent .duration .Duration
21+ import scala .util .Random
2122
2223final class RedisConnectionActor (address : NodeAddress , config : ConnectionConfig )
2324 extends Actor with ActorLazyLogging { actor =>
@@ -35,13 +36,15 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
3536 }
3637 }
3738
39+ private val random = new Random
3840 private var mustInitiallyConnect : Boolean = _
3941 private var initPromise : Promise [Unit ] = _
4042
4143 // indicates how many times connection was restarted
4244 private var incarnation = 0
4345 private var reservedBy = Opt .empty[ActorRef ]
4446 private var reservationIncarnation = Opt .empty[Int ]
47+ private var currentConnectionId = Opt .empty[Int ]
4548
4649 private val queuedToReserve = new JArrayDeque [QueuedPacks ]
4750 private val queuedToWrite = new JArrayDeque [QueuedPacks ]
@@ -102,14 +105,24 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
102105 case Connect =>
103106 log.debug(s " Connecting to $address" )
104107 doConnect()
105- case Connected (connection, remoteAddress, localAddress) =>
106- log.debug(s " Connected to Redis at $address" )
107- // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
108- new ConnectedTo (connection, localAddress, remoteAddress).initialize(config.reconnectionStrategy)
109- readInitSender.foreach(_ ! ReadAck )
110- case _ : ConnectionFailed =>
111- log.error(s " Connection attempt to Redis at $address failed " )
112- tryReconnect(retryStrategy, new ConnectionFailedException (address))
108+ case Connected (connection, remoteAddress, localAddress, connectionId) =>
109+ if (currentConnectionId.forall(_ != connectionId)) {
110+ log.error(s " Received Connected for connection different than currently trying to establish " )
111+ connection ! CloseConnection (immediate = true )
112+ } else {
113+ log.debug(s " Connected to Redis at $address" )
114+ // TODO: use dedicated retry strategy for initialization instead of reconnection strategy
115+ new ConnectedTo (connection, localAddress, remoteAddress).initialize(retryStrategy)
116+ readInitSender.foreach(_ ! ReadAck )
117+ }
118+ case cf : ConnectionFailure =>
119+ if (currentConnectionId.forall(_ != cf.connectionId)) {
120+ log.error(s " Received ConnectionFailure for connection different than currently trying to establish " )
121+ } else {
122+ currentConnectionId = Opt .Empty
123+ log.error(s " Connection attempt to Redis at $address failed or was closed " )
124+ tryReconnect(retryStrategy, new ConnectionFailedException (address))
125+ }
113126 case Close (cause, stopSelf) =>
114127 close(cause, stopSelf, tcpConnecting = true )
115128 case ReadInit =>
@@ -153,20 +166,26 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
153166 )
154167 }
155168
169+ // create connection Id to match connection message with
170+ // corresponding connections. It prevents against handling old failures
171+ // in context of new connections.
172+ val connectionId = random.nextInt()
173+ currentConnectionId = connectionId.opt
174+
156175 val sink = Sink .actorRefWithBackpressure(
157176 ref = self,
158177 onInitMessage = ReadInit ,
159178 ackMessage = ReadAck ,
160- onCompleteMessage = ConnectionClosed (Opt .Empty ),
161- onFailureMessage = cause => ConnectionClosed (Opt (cause))
179+ onCompleteMessage = ConnectionClosed (Opt .Empty , connectionId ),
180+ onFailureMessage = cause => ConnectionClosed (Opt (cause), connectionId )
162181 )
163182
164183 val (actorRef, connFuture) = src.viaMat(conn)((_, _)).to(sink).run()
165184 connFuture.onCompleteNow {
166185 case Success (Tcp .OutgoingConnection (remoteAddress, localAddress)) =>
167- self ! Connected (actorRef, remoteAddress, localAddress)
186+ self ! Connected (actorRef, remoteAddress, localAddress, connectionId )
168187 case Failure (cause) =>
169- self ! ConnectionFailed (cause)
188+ self ! ConnectionFailed (cause, connectionId )
170189 }
171190 }
172191
@@ -206,37 +225,41 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
206225
207226 def become (receive : Receive ): Unit =
208227 context.become(receive unless {
209- case Connected (oldConnection, _, _) if oldConnection != connection =>
228+ case Connected (oldConnection, _, _, _ ) if oldConnection != connection =>
210229 oldConnection ! CloseConnection (immediate = true )
211230 })
212231
213232 def initialize (retryStrategy : RetryStrategy ): Unit = {
214233 // Make sure that at least PING is sent so that LOADING errors are detected
215234 val initBatch = config.initCommands *> RedisApi .Batches .StringTyped .ping
216235 val initBuffer = ByteBuffer .allocate(initBatch.rawCommandPacks.encodedSize)
236+ // schedule a Cancellable RetryInit in case we do not receive a response for our request
237+ val scheduledRetry = system.scheduler.scheduleOnce(config.initResponseTimeout, self, RetryInit (retryStrategy.next))
217238 new ReplyCollector (initBatch.rawCommandPacks, initBuffer, onInitResult(_, retryStrategy))
218239 .sendEmptyReplyOr { collector =>
219240 flip(initBuffer)
220241 val data = ByteString (initBuffer)
221242 logWrite(data)
222243 connection ! data
223- become(initializing(collector, retryStrategy))
244+ become(initializing(collector, retryStrategy, scheduledRetry ))
224245 }
225246 }
226247
227- def initializing (collector : ReplyCollector , retryStrategy : RetryStrategy ): Receive = {
248+ def initializing (collector : ReplyCollector , retryStrategy : RetryStrategy , scheduledRetry : Cancellable ): Receive = {
228249 case open : Open =>
229250 onOpen(open)
230251 case IncomingPacks (packs) =>
231252 handlePacks(packs)
232253 case Release if reservedBy.contains(sender()) =>
233254 handleRelease()
234255 case cc : ConnectionClosed =>
256+ scheduledRetry.cancel()
235257 onConnectionClosed(cc)
236- tryReconnect(config.reconnectionStrategy , new ConnectionClosedException (address, cc.error))
258+ tryReconnect(retryStrategy , new ConnectionClosedException (address, cc.error))
237259 case WriteAck =>
238260 case data : ByteString =>
239261 logReceived(data)
262+ scheduledRetry.cancel()
240263 try decoder.decodeMore(data)(collector.processMessage(_, this )) catch {
241264 case NonFatal (cause) =>
242265 // TODO: is there a possibility to NOT receive WriteAck up to this point? currently assuming that no
@@ -247,8 +270,10 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
247270 case ReadInit =>
248271 sender() ! ReadAck
249272 case RetryInit (newStrategy) =>
273+ scheduledRetry.cancel()
250274 initialize(newStrategy)
251275 case Close (cause, stop) =>
276+ scheduledRetry.cancel()
252277 close(cause, stop)
253278 }
254279
@@ -453,6 +478,9 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
453478 }
454479
455480 def onConnectionClosed (cc : ConnectionClosed ): Unit = {
481+ if (currentConnectionId.forall(_ != cc.connectionId)) {
482+ log.error(s " Received ConnectionClosed for connection different than currently handling " )
483+ }
456484 open = false
457485 incarnation += 1
458486 subscribed.foreach { receiver =>
@@ -505,7 +533,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig)
505533 case IncomingPacks (packs) =>
506534 packs.reply(PacksResult .Failure (cause))
507535 case Release => // ignore
508- case Connected (connection, _, _) if tcpConnecting =>
536+ case Connected (connection, _, _, _ ) if tcpConnecting =>
509537 // failure may have happened while connecting, simply close the connection
510538 connection ! CloseConnection (immediate = true )
511539 become(closed(cause, tcpConnecting = false ))
@@ -532,11 +560,13 @@ object RedisConnectionActor {
532560 private case class Connected (
533561 connection : ActorRef ,
534562 remoteAddress : InetSocketAddress ,
535- localAddress : InetSocketAddress
563+ localAddress : InetSocketAddress ,
564+ connectionId : Int ,
536565 ) extends TcpEvent
537566
538- private case class ConnectionFailed (cause : Throwable ) extends TcpEvent
539- private case class ConnectionClosed (error : Opt [Throwable ]) extends TcpEvent
567+ private sealed abstract class ConnectionFailure (val connectionId : Int ) extends TcpEvent
568+ private case class ConnectionFailed (cause : Throwable , id : Int ) extends ConnectionFailure (id)
569+ private case class ConnectionClosed (error : Opt [Throwable ], id : Int ) extends ConnectionFailure (id)
540570
541571 private case class CloseConnection (immediate : Boolean = false ) extends TcpEvent
542572
0 commit comments