From 97e320b218c7e39c68b65372e2711e59a59098dd Mon Sep 17 00:00:00 2001 From: Diogo Autilio Date: Wed, 1 Oct 2025 08:10:27 -0300 Subject: [PATCH] =?UTF-8?q?feat:=20add=20resilient=20SSE=20session=20with?= =?UTF-8?q?=20reconnection/backoff=20=E2=80=A2=20Implement=20exponential?= =?UTF-8?q?=20backoff=20reconnection=20with=20configurable=20limits=20(max?= =?UTF-8?q?ReconnectAttempts,=20reconnectInitialDelay,=20reconnectBackoffF?= =?UTF-8?q?actor)=20=E2=80=A2=20Track=20reconnect=20attempts=20and=20ready?= =?UTF-8?q?State=20using=20thread-safe=20Mutex;=20reset=20attempts=20on=20?= =?UTF-8?q?successful=20.open?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Sources/EventSource/EventSource.swift | 149 +++++++++++++++++--------- 1 file changed, 100 insertions(+), 49 deletions(-) diff --git a/Sources/EventSource/EventSource.swift b/Sources/EventSource/EventSource.swift index b0419de..0f1c4a4 100644 --- a/Sources/EventSource/EventSource.swift +++ b/Sources/EventSource/EventSource.swift @@ -43,6 +43,9 @@ public struct EventSource: Sendable { private let eventParser: @Sendable () -> EventParser public var timeoutInterval: TimeInterval + public var maxReconnectAttempts: Int + public var reconnectInitialDelay: TimeInterval + public var reconnectBackoffFactor: Double public init(mode: Mode = .default, timeoutInterval: TimeInterval = 300) { self.init(mode: mode, eventParser: ServerEventParser(mode: mode), timeoutInterval: timeoutInterval) @@ -51,18 +54,27 @@ public struct EventSource: Sendable { public init( mode: Mode = .default, eventParser: @autoclosure @escaping @Sendable () -> EventParser, - timeoutInterval: TimeInterval = 300 + timeoutInterval: TimeInterval = 300, + maxReconnectAttempts: Int = 5, + reconnectInitialDelay: TimeInterval = 1.0, + reconnectBackoffFactor: Double = 2.0 ) { self.mode = mode self.eventParser = eventParser self.timeoutInterval = timeoutInterval + self.maxReconnectAttempts = maxReconnectAttempts + self.reconnectInitialDelay = reconnectInitialDelay + self.reconnectBackoffFactor = reconnectBackoffFactor } public func dataTask(for urlRequest: URLRequest) -> DataTask { DataTask( urlRequest: urlRequest, eventParser: eventParser(), - timeoutInterval: timeoutInterval + timeoutInterval: timeoutInterval, + maxReconnectAttempts: maxReconnectAttempts, + reconnectInitialDelay: reconnectInitialDelay, + reconnectBackoffFactor: reconnectBackoffFactor ) } } @@ -74,8 +86,76 @@ public extension EventSource { /// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task, /// it can be started by iterating event stream returned by ``DataTask/events()``. final class DataTask: Sendable { + /// Initializes or reinitializes the SSE session + private func startSession(stream continuation: AsyncStream.Continuation) { + let sessionDelegate = SessionDelegate() + let urlSession = URLSession( + configuration: urlSessionConfiguration, + delegate: sessionDelegate, + delegateQueue: nil + ) + let urlSessionDataTask = urlSession.dataTask(with: urlRequest) + let sessionDelegateTask = Task { [weak self] in + for await event in sessionDelegate.eventStream { + guard let self else { return } + switch event { + case let .didCompleteWithError(error): + self.handleSessionError(error, stream: continuation, urlSession: urlSession) + case let .didReceiveResponse(response, completionHandler): + self.handleSessionResponse( + response, + stream: continuation, + urlSession: urlSession, + completionHandler: completionHandler + ) + case let .didReceiveData(data): + self.parseMessages(from: data, stream: continuation, urlSession: urlSession) + } + } + } + #if compiler(>=6.0) + continuation.onTermination = { @Sendable [weak self] _ in + sessionDelegateTask.cancel() + Task { self?.close(stream: continuation, urlSession: urlSession) } + } + #else + continuation.onTermination = { @Sendable _ in + sessionDelegateTask.cancel() + Task { [weak self] in + await self?.close(stream: continuation, urlSession: urlSession) + } + } + #endif + + urlSessionDataTask.resume() + } + + /// Helper method for reconnection + private func attemptReconnect(stream continuation: AsyncStream.Continuation) { + let delay = reconnectInitialDelay * pow(reconnectBackoffFactor, Double(reconnectAttempts - 1)) + DispatchQueue.global().asyncAfter(deadline: .now() + delay) { [weak self] in + self?.startSession(stream: continuation) + self?.readyState = .connecting + self?.consumed = true + } + } private let _readyState: Mutex = Mutex(.none) + // Reconnection properties + private let maxReconnectAttempts: Int + private let reconnectInitialDelay: TimeInterval + private let reconnectBackoffFactor: Double + + private let _reconnectAttempts: Mutex = Mutex(0) + private var reconnectAttempts: Int { + get { + _reconnectAttempts.withLock { $0 } + } + set { + _reconnectAttempts.withLock { $0 = newValue } + } + } + /// A value representing the state of the connection. public var readyState: ReadyState { get { @@ -153,11 +233,17 @@ public extension EventSource { internal init( urlRequest: URLRequest, eventParser: EventParser, - timeoutInterval: TimeInterval + timeoutInterval: TimeInterval, + maxReconnectAttempts: Int, + reconnectInitialDelay: TimeInterval, + reconnectBackoffFactor: Double ) { self.urlRequest = urlRequest self._eventParser = Mutex(eventParser) self.timeoutInterval = timeoutInterval + self.maxReconnectAttempts = maxReconnectAttempts + self.reconnectInitialDelay = reconnectInitialDelay + self.reconnectBackoffFactor = reconnectBackoffFactor } /// Creates and returns event stream. @@ -170,49 +256,7 @@ public extension EventSource { } return AsyncStream { continuation in - let sessionDelegate = SessionDelegate() - let urlSession = URLSession( - configuration: urlSessionConfiguration, - delegate: sessionDelegate, - delegateQueue: nil - ) - let urlSessionDataTask = urlSession.dataTask(with: urlRequest) - - let sessionDelegateTask = Task { [weak self] in - for await event in sessionDelegate.eventStream { - guard let self else { return } - - switch event { - case let .didCompleteWithError(error): - handleSessionError(error, stream: continuation, urlSession: urlSession) - case let .didReceiveResponse(response, completionHandler): - handleSessionResponse( - response, - stream: continuation, - urlSession: urlSession, - completionHandler: completionHandler - ) - case let .didReceiveData(data): - parseMessages(from: data, stream: continuation, urlSession: urlSession) - } - } - } - - #if compiler(>=6.0) - continuation.onTermination = { @Sendable [weak self] _ in - sessionDelegateTask.cancel() - Task { self?.close(stream: continuation, urlSession: urlSession) } - } - #else - continuation.onTermination = { @Sendable _ in - sessionDelegateTask.cancel() - Task { [weak self] in - await self?.close(stream: continuation, urlSession: urlSession) - } - } - #endif - - urlSessionDataTask.resume() + startSession(stream: continuation) readyState = .connecting consumed = true } @@ -232,9 +276,15 @@ public extension EventSource { if let error { sendErrorEvent(with: error, stream: continuation) } - - // Close connection - close(stream: continuation, urlSession: urlSession) + + // Attempts to reconnect if the limit has not been exceeded + if reconnectAttempts < maxReconnectAttempts { + reconnectAttempts += 1 + attemptReconnect(stream: continuation) + } else { + // Close connection if attempts exceeded + close(stream: continuation, urlSession: urlSession) + } } private func handleSessionResponse( @@ -312,6 +362,7 @@ public extension EventSource { private func setOpen(stream continuation: AsyncStream.Continuation) { readyState = .open + reconnectAttempts = 0 // reset attempts when opening continuation.yield(.open) }