Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env_test.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
POSTGRES_HOST=localhost
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_PORT=5432
POSTGRES_DBNAME=psqlpy_test
POSTGRES_CERT_FILE=/home/runner/work/_temp/pgdata/server.crt
12 changes: 12 additions & 0 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import random
from typing import AsyncGenerator
from urllib import parse

import pytest
from psqlpy import ConnectionPool, Cursor
Expand Down Expand Up @@ -78,6 +79,17 @@ def map_parameters_table_name() -> str:
return random_string()


@pytest.fixture
def dsn(
postgres_host: str,
postgres_user: str,
postgres_password: str,
postgres_port: int,
postgres_dbname: str,
) -> str:
return f"postgres://{postgres_user}:{parse.quote(postgres_password)}@{postgres_host}:{postgres_port}/{postgres_dbname}"


@pytest.fixture
def number_database_records() -> int:
return random.randint(10, 35)
Expand Down
76 changes: 31 additions & 45 deletions python/tests/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,23 @@
pytestmark = pytest.mark.anyio


async def test_connect_func() -> None:
async def test_connect_func(dsn: str) -> None:
"""Test that connect function makes new connection pool."""
pg_pool = connect_pool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
pg_pool = connect_pool(dsn=dsn)

conn = await pg_pool.connection()
await conn.execute("SELECT 1")


async def test_pool_dsn_startup() -> None:
async def test_pool_dsn_startup(dsn: str) -> None:
"""Test that connection pool can startup with dsn."""
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
pg_pool = ConnectionPool(dsn=dsn)

conn = await pg_pool.connection()
await conn.execute("SELECT 1")


async def test_pool_connection(
psql_pool: ConnectionPool,
) -> None:
async def test_pool_connection(psql_pool: ConnectionPool) -> None:
"""Test that ConnectionPool can return single connection from the pool."""
connection = await psql_pool.connection()
assert isinstance(connection, Connection)
Expand All @@ -53,37 +47,29 @@ async def test_pool_connection(
)
async def test_pool_conn_recycling_method(
conn_recycling_method: ConnRecyclingMethod,
dsn: str,
) -> None:
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
dsn=dsn,
conn_recycling_method=conn_recycling_method,
)

conn = await pg_pool.connection()
await conn.execute("SELECT 1")


async def test_build_pool_failure() -> None:
async def test_build_pool_failure(dsn: str) -> None:
with pytest.raises(expected_exception=ConnectionPoolConfigurationError):
ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
connect_timeout_nanosec=12,
)
ConnectionPool(dsn=dsn, connect_timeout_nanosec=12)

with pytest.raises(expected_exception=ConnectionPoolConfigurationError):
ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
connect_timeout_nanosec=12,
)
ConnectionPool(dsn=dsn, connect_timeout_nanosec=12)

with pytest.raises(expected_exception=ConnectionPoolConfigurationError):
ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
keepalives_idle_nanosec=12,
)
ConnectionPool(dsn=dsn, keepalives_idle_nanosec=12)

with pytest.raises(expected_exception=ConnectionPoolConfigurationError):
ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
keepalives_interval_nanosec=12,
)
ConnectionPool(dsn=dsn, keepalives_interval_nanosec=12)


@pytest.mark.parametrize(
Expand All @@ -96,12 +82,18 @@ async def test_build_pool_failure() -> None:
)
async def test_pool_target_session_attrs(
target_session_attrs: TargetSessionAttrs,
postgres_host: str,
postgres_user: str,
postgres_password: str,
postgres_port: int,
postgres_dbname: str,
) -> None:
pg_pool = ConnectionPool(
db_name="psqlpy_test",
host="localhost",
username="postgres",
password="postgres", # noqa: S106
db_name=postgres_dbname,
host=postgres_host,
port=postgres_port,
username=postgres_user,
password=postgres_password,
target_session_attrs=target_session_attrs,
)

Expand All @@ -122,21 +114,17 @@ async def test_pool_target_session_attrs(
)
async def test_pool_load_balance_hosts(
load_balance_hosts: LoadBalanceHosts,
dsn: str,
) -> None:
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
load_balance_hosts=load_balance_hosts,
)
pg_pool = ConnectionPool(dsn=dsn, load_balance_hosts=load_balance_hosts)

conn = await pg_pool.connection()
await conn.execute("SELECT 1")


async def test_close_connection_pool() -> None:
async def test_close_connection_pool(dsn: str) -> None:
"""Test that `close` method closes connection pool."""
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
pg_pool = ConnectionPool(dsn=dsn)

conn = await pg_pool.connection()
await conn.execute("SELECT 1")
Expand All @@ -147,11 +135,9 @@ async def test_close_connection_pool() -> None:
await pg_pool.connection()


async def test_connection_pool_as_context_manager() -> None:
async def test_connection_pool_as_context_manager(dsn: str) -> None:
"""Test connection pool as context manager."""
with ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
) as pg_pool:
with ConnectionPool(dsn=dsn) as pg_pool:
conn = await pg_pool.connection()
res = await conn.execute("SELECT 1")
assert res.result()
Expand Down
Loading