Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/storage/postgres_connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PostgresConnectionPool {
vector<PostgresConnection> connection_cache;

private:
PostgresPoolConnection GetConnectionInternal();
PostgresPoolConnection GetConnectionInternal(unique_lock<mutex> &lock);
};

} // namespace duckdb
44 changes: 28 additions & 16 deletions src/storage/postgres_connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,31 @@ PostgresConnectionPool::PostgresConnectionPool(PostgresCatalog &postgres_catalog
: postgres_catalog(postgres_catalog), active_connections(0), maximum_connections(maximum_connections_p) {
}

PostgresPoolConnection PostgresConnectionPool::GetConnectionInternal() {
PostgresPoolConnection PostgresConnectionPool::GetConnectionInternal(unique_lock<mutex> &lock) {
active_connections++;
// check if we have any cached connections left
if (!connection_cache.empty()) {
auto connection = PostgresPoolConnection(this, std::move(connection_cache.back()));
connection_cache.pop_back();
return connection;
}

// no cached connections left but there is space to open a new one - open it
// no cached connections left but there is space to open a new one - open it after releasing the cache lock
lock.unlock();
return PostgresPoolConnection(
this, PostgresConnection::Open(postgres_catalog.connection_string, postgres_catalog.attach_path));
}

PostgresPoolConnection PostgresConnectionPool::ForceGetConnection() {
lock_guard<mutex> l(connection_lock);
return GetConnectionInternal();
unique_lock<mutex> l(connection_lock);
return GetConnectionInternal(l);
}

bool PostgresConnectionPool::TryGetConnection(PostgresPoolConnection &connection) {
lock_guard<mutex> l(connection_lock);
unique_lock<mutex> l(connection_lock);
if (active_connections >= maximum_connections) {
return false;
}
connection = GetConnectionInternal();
connection = GetConnectionInternal(l);
return true;
}

Expand All @@ -90,30 +90,42 @@ PostgresPoolConnection PostgresConnectionPool::GetConnection() {
}

void PostgresConnectionPool::ReturnConnection(PostgresConnection connection) {
lock_guard<mutex> l(connection_lock);
unique_lock<mutex> l(connection_lock);
if (active_connections <= 0) {
throw InternalException("PostgresConnectionPool::ReturnConnection called but active_connections is 0");
}
active_connections--;
if (active_connections >= maximum_connections) {
// if the maximum number of connections has been decreased by the user we might need to reclaim the connection
// immediately
return;
}
if (!pg_use_connection_cache) {
// not caching - just return
active_connections--;
return;
}
// we want to cache the connection
// check if the underlying connection is still usable
// avoid holding the lock while doing this
l.unlock();
bool connection_is_bad = false;
auto pg_con = connection.GetConn();
if (PQstatus(connection.GetConn()) != CONNECTION_OK) {
// CONNECTION_BAD! try to reset it
PQreset(pg_con);
if (PQstatus(connection.GetConn()) != CONNECTION_OK) {
// still bad - just abandon this one
return;
connection_is_bad = true;
}
}
if (PQtransactionStatus(pg_con) != PQTRANS_IDLE) {
if (!connection_is_bad && PQtransactionStatus(pg_con) != PQTRANS_IDLE) {
connection_is_bad = true;
}
// lock and return the connection
l.lock();
active_connections--;
if (connection_is_bad) {
// if the connection is bad we cannot cache it
return;
}
if (active_connections >= maximum_connections) {
// if the maximum number of connections has been decreased by the user we might need to reclaim the connection
// immediately
return;
}
connection_cache.push_back(std::move(connection));
Expand Down
Loading