@@ -233,33 +233,130 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
233233 }
234234 }
235235
236- typealias PoolManager = ConnectionPoolManager <
236+ typealias ConnectionPoolManager = _ConnectionPoolModule . ConnectionPoolManager <
237237 PostgresConnection ,
238238 PostgresConnection . ID ,
239239 ConnectionIDGenerator ,
240240 Foo ,
241- ConnectionRequest < PostgresConnection > ,
242- ConnectionRequest . ID ,
241+ PostgresConnectionRequest ,
242+ PostgresConnectionRequest . ID ,
243243 PostgresKeepAliveBehavor ,
244244 NothingConnectionPoolExecutor ,
245245 PostgresClientMetrics ,
246246 ContinuousClock
247247 >
248248
249- typealias Pool = ConnectionPool <
249+ typealias ConnectionPool = _ConnectionPoolModule . ConnectionPool <
250250 PostgresConnection ,
251251 PostgresConnection . ID ,
252252 ConnectionIDGenerator ,
253253 Foo ,
254- ConnectionRequest < PostgresConnection > ,
255- ConnectionRequest . ID ,
254+ PostgresConnectionRequest ,
255+ PostgresConnectionRequest . ID ,
256256 PostgresKeepAliveBehavor ,
257257 NothingConnectionPoolExecutor ,
258258 PostgresClientMetrics ,
259259 ContinuousClock
260260 >
261261
262- let pool : PoolManager
262+ enum Pool {
263+ case manager( ConnectionPoolManager )
264+ case pool( ConnectionPool )
265+
266+ init (
267+ configuration: Configuration ,
268+ factory: ConnectionFactory ,
269+ eventLoopGroup: any EventLoopGroup ,
270+ backgroundLogger: Logger
271+ ) {
272+ let idGenerator = ConnectionIDGenerator . globalGenerator
273+
274+ if configuration. options. maximumConnections > 50 {
275+ // make as many executors as we have NIO els
276+ let executors = ( 0 ..< 10 ) . map { _ in NothingConnectionPoolExecutor ( ) }
277+ var poolManagerConfiguration = ConnectionPoolManagerConfiguration ( )
278+ poolManagerConfiguration. minimumConnectionPerExecutorCount = configuration. options. minimumConnections / executors. count
279+ poolManagerConfiguration. maximumConnectionPerExecutorSoftLimit = configuration. options. maximumConnections / executors. count
280+ poolManagerConfiguration. maximumConnectionPerExecutorHardLimit = configuration. options. maximumConnections / executors. count
281+
282+ self = . manager(
283+ ConnectionPoolManager (
284+ configuration: poolManagerConfiguration,
285+ connectionConfiguration: Foo ( ) ,
286+ idGenerator: idGenerator,
287+ requestType: PostgresConnectionRequest . self,
288+ keepAliveBehavior: . init( configuration. options. keepAliveBehavior, logger: backgroundLogger) ,
289+ executors: executors,
290+ observabilityDelegate: . init( logger: backgroundLogger) ,
291+ clock: ContinuousClock ( )
292+ ) { ( connectionID, connectionConfiguration, pool) in
293+ let connection = try await factory. makeConnection ( connectionID, pool: pool)
294+ return ConnectionAndMetadata ( connection: connection, maximalStreamsOnConnection: 1 )
295+ }
296+ )
297+ } else {
298+ self = . pool(
299+ ConnectionPool (
300+ configuration: . init( configuration) ,
301+ connectionConfiguration: Foo ( ) ,
302+ idGenerator: idGenerator,
303+ requestType: PostgresConnectionRequest . self,
304+ keepAliveBehavior: . init( configuration. options. keepAliveBehavior, logger: backgroundLogger) ,
305+ executor: NothingConnectionPoolExecutor ( ) ,
306+ observabilityDelegate: . init( logger: backgroundLogger) ,
307+ clock: ContinuousClock ( )
308+ ) { ( connectionID, connectionConfiguration, pool) in
309+ let connection = try await factory. makeConnection ( connectionID, pool: pool)
310+ return ConnectionAndMetadata ( connection: connection, maximalStreamsOnConnection: 1 )
311+ }
312+ )
313+ }
314+ }
315+
316+ func run( ) async {
317+ switch self {
318+ case . manager( let manager) :
319+ await manager. run ( )
320+ case . pool( let pool) :
321+ await pool. run ( )
322+ }
323+ }
324+
325+ func cancelRequest( id: PostgresConnectionRequest . ID ) {
326+ switch self {
327+ case . pool( let pool) :
328+ pool. cancelLeaseConnection ( id)
329+
330+ case . manager( let manager) :
331+ manager. cancelLeaseConnection ( id)
332+ }
333+ }
334+
335+ func leaseConnection( _ request: PostgresConnectionRequest ) {
336+ switch self {
337+ case . pool( let pool) :
338+ pool. leaseConnection ( request)
339+ case . manager( let manager) :
340+ manager. leaseConnection ( request)
341+ }
342+ }
343+
344+ func leaseConnection( ) async throws -> ConnectionLease < PostgresConnection > {
345+ let requestID = PostgresConnectionRequest . idGenerator. next ( )
346+
347+ return try await withTaskCancellationHandler {
348+ try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < ConnectionLease < PostgresConnection > , Error > ) in
349+ let request = PostgresConnectionRequest ( id: requestID, continuation: continuation)
350+
351+ self . leaseConnection ( request)
352+ }
353+ } onCancel: {
354+ self . cancelRequest ( id: requestID)
355+ }
356+ }
357+ }
358+
359+ let pool : Pool
263360 let factory : ConnectionFactory
264361 let runningAtomic = ManagedAtomic ( false )
265362 let backgroundLogger : Logger
@@ -295,26 +392,12 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
295392 self . factory = factory
296393 self . backgroundLogger = backgroundLogger
297394
298- let executors = ( 0 ..< 10 ) . map { _ in NothingConnectionPoolExecutor ( ) }
299- var poolManagerConfiguration = ConnectionPoolManagerConfiguration ( )
300- poolManagerConfiguration. minimumConnectionPerExecutorCount = configuration. options. minimumConnections / executors. count
301- poolManagerConfiguration. maximumConnectionPerExecutorSoftLimit = configuration. options. maximumConnections / executors. count
302- poolManagerConfiguration. maximumConnectionPerExecutorHardLimit = configuration. options. maximumConnections / executors. count
303-
304- self . pool = ConnectionPoolManager (
305- configuration: poolManagerConfiguration,
306- connectionConfiguration: Foo ( ) ,
307- idGenerator: ConnectionIDGenerator ( ) ,
308- requestType: ConnectionRequest< PostgresConnection> . self ,
309- keepAliveBehavior: . init( configuration. options. keepAliveBehavior, logger: backgroundLogger) ,
310- executors: executors,
311- observabilityDelegate: . init( logger: backgroundLogger) ,
312- clock: ContinuousClock ( )
313- ) { ( connectionID, connectionConfiguration, pool) in
314- let connection = try await factory. makeConnection ( connectionID, pool: pool)
315-
316- return ConnectionAndMetadata ( connection: connection, maximalStreamsOnConnection: 1 )
317- }
395+ self . pool = . init(
396+ configuration: configuration,
397+ factory: factory,
398+ eventLoopGroup: eventLoopGroup,
399+ backgroundLogger: backgroundLogger
400+ )
318401 }
319402
320403 /// Lease a connection for the provided `closure`'s lifetime.
@@ -427,30 +510,22 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
427510 throw PSQLError ( code: . tooManyParameters, query: query, file: file, line: line)
428511 }
429512
430- let lease = try await self . leaseConnection ( )
431- let connection = lease. connection
432-
433- var logger = logger
434- logger [ postgresMetadataKey: . connectionID] = " \( connection. id) "
435-
436- let promise = connection. channel. eventLoop. makePromise ( of: PSQLRowStream . self)
437- let context = ExtendedQueryContext (
438- query: query,
439- logger: logger,
440- promise: promise
441- )
513+ let requestID = PostgresConnectionRequest . idGenerator. next ( )
442514
443- connection. channel. write ( HandlerTask . extendedQuery ( context) , promise: nil )
515+ return try await withTaskCancellationHandler {
516+ try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < PostgresRowSequence , Error > ) in
517+ let request = PostgresConnectionRequest (
518+ id: requestID,
519+ query: query,
520+ continuation: continuation,
521+ logger: logger
522+ )
444523
445- promise. futureResult. whenFailure { _ in
446- lease. release ( )
524+ self . pool. leaseConnection ( request)
525+ }
526+ } onCancel: {
527+ self . pool. cancelRequest ( id: requestID)
447528 }
448-
449- return try await promise. futureResult. map {
450- $0. asyncSequence ( onFinish: {
451- lease. release ( )
452- } )
453- } . get ( )
454529 } catch var error as PSQLError {
455530 error. file = file
456531 error. line = line
@@ -543,7 +618,11 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
543618 PostgresConnection . defaultEventLoopGroup
544619 }
545620
546- static let loggingDisabled = Logger ( label: " Postgres-do-not-log " , factory: { _ in SwiftLogNoOpLogHandler ( ) } )
621+ static let loggingDisabled = {
622+ var logger = Logger ( label: " Postgres-do-not-log " , factory: { _ in SwiftLogNoOpLogHandler ( ) } )
623+ logger. logLevel = . critical
624+ return logger
625+ } ( )
547626}
548627
549628@available ( macOS 13 . 0 , iOS 16 . 0 , tvOS 16 . 0 , watchOS 9 . 0 , * )
0 commit comments