Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Sources/Valkey/Node/ValkeyNodeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,10 @@ extension ValkeyNodeClient {
isolation: isolated (any Actor)? = #isolation,
operation: (ValkeyConnection) async throws -> sending Value
) async throws -> Value {
let connection = try await self.leaseConnection()
let lease = try await self.connectionPool.leaseConnection()
defer { lease.release() }

defer { self.connectionPool.releaseConnection(connection) }

return try await operation(connection)
}

private func leaseConnection() async throws -> ValkeyConnection {
try await self.connectionPool.leaseConnection()
return try await operation(lease.connection)
}
}

Expand Down
17 changes: 17 additions & 0 deletions Sources/ValkeyConnectionPool/ConnectionLease.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
public struct ConnectionLease<Connection: PooledConnection>: Sendable {
public var connection: Connection

@usableFromInline
let _release: @Sendable (Connection) -> Void

@inlinable
public init(connection: Connection, release: @escaping @Sendable (Connection) -> Void) {
self.connection = connection
self._release = release
}

@inlinable
public func release() {
self._release(self.connection)
}
}
22 changes: 14 additions & 8 deletions Sources/ValkeyConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public protocol ConnectionRequestProtocol: Sendable {

/// A function that is called with a connection or a
/// `PoolError`.
func complete(with: Result<Connection, ConnectionPoolError>)
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
Expand Down Expand Up @@ -275,6 +275,7 @@ where
}
}

@inlinable
public func run() async {
await withTaskCancellationHandler {
if #available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) {
Expand Down Expand Up @@ -319,13 +320,15 @@ where
}

@available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *)
private func run(in taskGroup: inout DiscardingTaskGroup) async {
@inlinable
/* private */ func run(in taskGroup: inout DiscardingTaskGroup) async {
for await event in self.eventStream {
self.runEvent(event, in: &taskGroup)
}
}

private func run(in taskGroup: inout TaskGroup<Void>) async {
@inlinable
/* private */ func run(in taskGroup: inout TaskGroup<Void>) async {
var running = 0
for await event in self.eventStream {
running += 1
Expand All @@ -338,7 +341,8 @@ where
}
}

private func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) {
@inlinable
/* private */ func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) {
switch event {
case .makeConnection(let request):
self.makeConnection(for: request, in: &taskGroup)
Expand Down Expand Up @@ -405,8 +409,11 @@ where
/*private*/ func runRequestAction(_ action: StateMachine.RequestAction) {
switch action {
case .leaseConnection(let requests, let connection):
let lease = ConnectionLease(connection: connection) { connection in
self.releaseConnection(connection)
}
for request in requests {
request.complete(with: .success(connection))
request.complete(with: .success(lease))
}

case .failRequest(let request, let error):
Expand All @@ -430,7 +437,7 @@ where
self.connectionEstablished(bundle)

// after the connection has been established, we keep the task open. This ensures
// that the pools run method cannot be exited before all connections have been
// that the pools run method can not be exited before all connections have been
// closed.
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
bundle.connection.onClose {
Expand Down Expand Up @@ -458,7 +465,7 @@ where
}

@inlinable
/*private*/ func connectionEstablishFailed(_ error: any Error, for request: StateMachine.ConnectionRequest) {
/*private*/ func connectionEstablishFailed(_ error: Error, for request: StateMachine.ConnectionRequest) {
self.observabilityDelegate.connectFailed(id: request.connectionID, error: error)

self.modifyStateAndRunActions { state in
Expand Down Expand Up @@ -586,7 +593,6 @@ extension DiscardingTaskGroup: TaskGroupProtocol {
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension TaskGroup<Void>: TaskGroupProtocol {
@inlinable
mutating func addTask_(operation: @escaping @Sendable () async -> Void) {
Expand Down
17 changes: 13 additions & 4 deletions Sources/ValkeyConnectionPool/ConnectionPoolError.swift
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
public struct ConnectionPoolError: Error, Hashable {
enum Base: Error, Hashable {
@usableFromInline
enum Base: Error, Hashable, Sendable {
case requestCancelled
case poolShutdown
}

private let base: Base
@usableFromInline
let base: Base

@inlinable
init(_ base: Base) { self.base = base }

/// The connection requests got cancelled
public static let requestCancelled = ConnectionPoolError(.requestCancelled)
@inlinable
public static var requestCancelled: Self {
ConnectionPoolError(.requestCancelled)
}
/// The connection requests can't be fulfilled as the pool has already been shutdown
public static let poolShutdown = ConnectionPoolError(.poolShutdown)
@inlinable
public static var poolShutdown: Self {
ConnectionPoolError(.poolShutdown)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public protocol ConnectionPoolObservabilityDelegate: Sendable {
/// time and is reported via ````. The
func connectSucceeded(id: ConnectionID, streamCapacity: UInt16)

/// The utilization of the connection changed; a stream may have been used, returned or the
/// The utlization of the connection changed; a stream may have been used, returned or the
/// maximum number of concurrent streams available on the connection changed.
func connectionUtilizationChanged(id: ConnectionID, streamsUsed: UInt16, streamCapacity: UInt16)

Expand Down
17 changes: 8 additions & 9 deletions Sources/ValkeyConnectionPool/ConnectionRequest.swift
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequestProtocol {
public typealias ID = Int

public var id: ID

@usableFromInline
private(set) var continuation: CheckedContinuation<Connection, any Error>
private(set) var continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>

@inlinable
init(
id: Int,
continuation: CheckedContinuation<Connection, any Error>
continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
) {
self.id = id
self.continuation = continuation
}

public func complete(with result: Result<Connection, ConnectionPoolError>) {
public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
self.continuation.resume(with: result)
}
}
Expand Down Expand Up @@ -46,15 +45,15 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
}

@inlinable
public func leaseConnection() async throws -> Connection {
public func leaseConnection() async throws -> ConnectionLease<Connection> {
let requestID = requestIDGenerator.next()

let connection = try await withTaskCancellationHandler {
if Task.isCancelled {
throw CancellationError()
}

return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<Connection>, Error>) in
let request = Request(
id: requestID,
continuation: continuation
Expand All @@ -71,8 +70,8 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {

@inlinable
public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
let connection = try await self.leaseConnection()
defer { self.releaseConnection(connection) }
return try await closure(connection)
let lease = try await self.leaseConnection()
defer { lease.release() }
return try await closure(lease.connection)
}
}
4 changes: 2 additions & 2 deletions Sources/ValkeyConnectionPool/NIOLock.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2022 the SwiftNIO project authors
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -145,7 +145,7 @@ final class LockStorage<Value>: ManagedBuffer<Value, LockPrimitive> {
let buffer = Self.create(minimumCapacity: 1) { _ in
value
}
// Intentionally using a force cast here to avoid a miss compilation in 5.10.
// Intentionally using a force cast here to avoid a miss compiliation in 5.10.
// This is as fast as an unsafeDownCast since ManagedBuffer is inlined and the optimizer
// can eliminate the upcast/downcast pair
let storage = buffer as! Self
Expand Down
4 changes: 2 additions & 2 deletions Sources/ValkeyConnectionPool/NIOLockedValueBox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022 the SwiftNIO project authors
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
Expand Down Expand Up @@ -51,7 +51,7 @@ struct NIOLockedValueBox<Value> {

/// Provides an unsafe view over the lock and its value.
///
/// This can be beneficial when you require fine-grained control over the lock in some
/// This can be beneficial when you require fine grained control over the lock in some
/// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by
/// switching to ``NIOLock``.
var unsafe: Unsafe {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,13 @@ extension PoolStateMachine {
@inlinable
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
// because of a race, this connection (connection close runs against trigger of timeout)
// because of a race this connection (connection close runs against trigger of timeout)
// was already removed from the state machine.
return nil
}

if index < self.minimumConcurrentConnections {
// because of a race, a connection might receive an idle timeout after it was moved into
// because of a race a connection might receive a idle timeout after it was moved into
// the persisted connections. If a connection is now persisted, we now need to ignore
// the trigger
return nil
Expand Down
3 changes: 2 additions & 1 deletion Sources/ValkeyConnectionPool/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct PoolConfiguration: Sendable {
@usableFromInline
var minimumConnectionCount: Int = 0

/// The maximum number of connections for this pool, to be preserved.
/// The maximum number of connections to for this pool, to be preserved.
@usableFromInline
var maximumConnectionSoftLimit: Int = 10

Expand Down Expand Up @@ -434,6 +434,7 @@ struct PoolStateMachine<
fatalError("Unimplemented")
}

@usableFromInline
mutating func triggerForceShutdown() -> Action {
switch self.poolState {
case .running:
Expand Down
8 changes: 7 additions & 1 deletion Sources/ValkeyConnectionPool/TinyFastSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ struct TinyFastSequence<Element>: Sequence {
self.base = .none(reserveCapacity: 0)
case 1:
self.base = .one(collection.first!, reserveCapacity: 0)
case 2:
self.base = .two(
collection.first!,
collection[collection.index(after: collection.startIndex)],
reserveCapacity: 0
)
default:
if let collection = collection as? [Element] {
self.base = .n(collection)
Expand All @@ -46,7 +52,7 @@ struct TinyFastSequence<Element>: Sequence {
case 1:
self.base = .one(max2Sequence.first!, reserveCapacity: 0)
case 2:
self.base = .n(Array(max2Sequence))
self.base = .two(max2Sequence.first!, max2Sequence.second!, reserveCapacity: 0)
default:
fatalError()
}
Expand Down
Loading