Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
34e0b89
implement native async client
joe-clickhouse Jan 13, 2026
3971254
add pylint exceptions
joe-clickhouse Jan 13, 2026
d1f262d
linting
joe-clickhouse Jan 13, 2026
83ad886
improve test config setup
joe-clickhouse Jan 13, 2026
4aff17d
fix some tests
joe-clickhouse Jan 13, 2026
7392e2b
enable log tables on startup
joe-clickhouse Jan 13, 2026
d96d1d5
improve error handling
joe-clickhouse Jan 13, 2026
9019bc2
linting
joe-clickhouse Jan 13, 2026
759c736
fix deprecation warning in aiohttp
joe-clickhouse Jan 13, 2026
4e050ab
error handling
joe-clickhouse Jan 13, 2026
841c5d8
separate sync/async stream failure tests
joe-clickhouse Jan 13, 2026
fb31ccb
more error handling issues
joe-clickhouse Jan 13, 2026
b61a1d2
linting
joe-clickhouse Jan 13, 2026
0f14167
test updates and improvements
joe-clickhouse Jan 14, 2026
74f5737
test adjustments
joe-clickhouse Jan 14, 2026
5781c52
increase async stream buffer size
joe-clickhouse Jan 15, 2026
6334be8
don't parallelize cloud tests
joe-clickhouse Jan 15, 2026
e060dfd
apply consistency settings to client factory fixture
joe-clickhouse Jan 15, 2026
590fd12
up cloud par to 4
joe-clickhouse Jan 15, 2026
1b2578e
enforce sequential consistency in helper
joe-clickhouse Jan 15, 2026
08abfee
update enable_cleanup_closed vers
joe-clickhouse Jan 15, 2026
85f0860
add conn abort to wrong port assertion
joe-clickhouse Jan 15, 2026
1f16500
small test refactor
joe-clickhouse Jan 15, 2026
e95f61b
more consistency
joe-clickhouse Jan 15, 2026
ceb672a
add deadlock check in async queue
joe-clickhouse Jan 16, 2026
038dfb5
enforce consistent use of async iter stream consumption
joe-clickhouse Jan 17, 2026
beaa28a
linting
joe-clickhouse Jan 17, 2026
45c96c1
DO materialize non-streaming queries
joe-clickhouse Jan 18, 2026
66438d8
accept either concurrent session error
joe-clickhouse Jan 19, 2026
2fdae8f
Merge branch 'main' into joe/141-a-database-client-should-be-based-on…
joe-clickhouse Jan 20, 2026
95d30d8
update async client to always reset context
joe-clickhouse Jan 20, 2026
7319d8f
Merge branch 'main' into joe/141-a-database-client-should-be-based-on…
joe-clickhouse Jan 21, 2026
f4e0010
add pipelining to arrow methods
joe-clickhouse Jan 30, 2026
e362dc2
Merge branch 'main' into joe/141-a-database-client-should-be-based-on…
joe-clickhouse Feb 3, 2026
aadd022
Merge branch 'main' into joe/141-a-database-client-should-be-based-on…
joe-clickhouse Feb 11, 2026
f65502a
feature parity with recent sync client changes
joe-clickhouse Feb 11, 2026
46bc597
fix jwt test after bad merge conflict resolution
joe-clickhouse Feb 11, 2026
6af89ee
0.12.0rc1 release prep
joe-clickhouse Feb 12, 2026
7a62f64
fix assertion in racy test
joe-clickhouse Feb 12, 2026
cb03466
clean up streaming source on exception
joe-clickhouse Feb 12, 2026
a5a949f
Merge branch 'main' into joe/141-a-database-client-should-be-based-on…
joe-clickhouse Mar 25, 2026
4a2e74d
linting
joe-clickhouse Mar 25, 2026
dd983aa
more linting
joe-clickhouse Mar 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .docker/clickhouse/single_node/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<console>1</console>
</logger>

<prepare_system_log_tables_on_startup>1</prepare_system_log_tables_on_startup>

<query_log>
<database>system</database>
<table>query_log</table>
Expand Down
2 changes: 2 additions & 0 deletions .docker/clickhouse/single_node_tls/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<console>1</console>
</logger>

<prepare_system_log_tables_on_startup>1</prepare_system_log_tables_on_startup>

<openSSL>
<server>
<certificateFile>/etc/clickhouse-server/certs/server.crt</certificateFile>
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/clickhouse_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ jobs:
CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT_PROD }}
CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_DESERT_VM_43 }}
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests/integration_tests
run: pytest tests/integration_tests -n 4

- name: Run ClickHouse Container (HEAD)
run: CLICKHOUSE_CONNECT_TEST_CH_VERSION=head docker compose up -d clickhouse
- name: Run HEAD tests
run: pytest tests/integration_tests
run: pytest tests/integration_tests -n 4
- name: remove HEAD container
run: docker compose down -v
8 changes: 4 additions & 4 deletions .github/workflows/on_push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
CLICKHOUSE_CONNECT_TEST_DOCKER: 'False'
CLICKHOUSE_CONNECT_TEST_FUZZ: 50
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests
run: pytest -n 4 tests

pandas-1x-compat-test:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -173,7 +173,7 @@ jobs:
CLICKHOUSE_CONNECT_TEST_TLS: 1
CLICKHOUSE_CONNECT_TEST_DOCKER: 'False'
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests/integration_tests/test_pandas_compat.py tests/integration_tests/test_pandas.py
run: pytest -n 4 tests/integration_tests/test_pandas_compat.py tests/integration_tests/test_pandas.py

sqlalchemy-1x-compat-test:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -211,7 +211,7 @@ jobs:
CLICKHOUSE_CONNECT_TEST_TLS: 1
CLICKHOUSE_CONNECT_TEST_DOCKER: 'False'
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests/integration_tests/test_sqlalchemy
run: pytest -n 4 tests/integration_tests/test_sqlalchemy

check-secret:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -264,4 +264,4 @@ jobs:
CLICKHOUSE_CONNECT_TEST_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT_PROD }}
CLICKHOUSE_CONNECT_TEST_JWT_SECRET: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_JWT_DESERT_VM_43 }}
SQLALCHEMY_SILENCE_UBER_WARNING: 1
run: pytest tests/integration_tests
run: pytest -n 4 tests/integration_tests
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ The supported method of passing ClickHouse server settings is to prefix such arg

### Deprecations
- Pandas 1.x support is now deprecated and will be removed in v1.0.0. A `DeprecationWarning` is emitted at import time for pandas 1.x users.
- The current `AsyncClient` is a thread-pool wrapper around the sync client and now emits a `FutureWarning` on creation, pointing users to the fully native async client available as a prerelease: `pip install 'clickhouse-connect[async]==0.12.0rc1'`. This prerelease branch is based on 0.11.0 and is gathering feedback ahead of 1.0.0, where it will become the default async implementation. It is a drop-in replacement with the same API surface.

### Improvements
- Added support for the `SAMPLE` clause in SQLAlchemy statements. Note: Due to a SQLAlchemy limitation, only one hint (SAMPLE or FINAL) can be applied per table; chaining both will silently ignore one. For now, this change enables use of sample(), but chaining with final() is not yet supported. Closes [#634](https://github.com/ClickHouse/clickhouse-connect/issues/634)
Expand All @@ -67,6 +66,9 @@ are now serialized using their native ClickHouse types client-side (e.g. inserti
- Recognize `UPDATE` as a command so lightweight updates work correctly via `client.query()` and SQLAlchemy.
- SQLAlchemy: `GROUP BY` now renders label aliases instead of full expressions which avoids circular reference errors when an alias shadows a source column name in ClickHouse.

## 0.12.0rc1, 2026-02-11
- Implement a native async client. Closes [#141](https://github.com/ClickHouse/clickhouse-connect/issues/141)

## 0.11.0, 2026-02-10

### Python 3.9 Deprecation
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ClickHouse Connect currently uses the ClickHouse HTTP interface for maximum comp
pip install clickhouse-connect
```

ClickHouse Connect requires Python 3.9 or higher. We officially test against Python 3.9 through 3.13.
ClickHouse Connect requires Python 3.9 or higher. We officially test against Python 3.9 through 3.14.

### Superset Connectivity

Expand Down Expand Up @@ -45,7 +45,13 @@ that rely on full ORM or advanced dialect functionality.

### Asyncio Support

ClickHouse Connect provides an async wrapper, so that it is possible to use the client in an `asyncio` environment.
ClickHouse Connect provides native async support using aiohttp. For the best performance with async applications,
install the optional async dependency:

```
pip install clickhouse-connect[async]
```

See the [run_async example](./examples/run_async.py) for more details.

### Complete Documentation
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CommonSetting:
_common_settings: Dict[str, CommonSetting] = {}


def build_client_name(client_name: str) -> str:
def build_client_name(client_name: Optional[str]) -> str:
product_name = get_setting('product_name')
product_name = product_name.strip() + ' ' if product_name else ''
client_name = client_name.strip() + ' ' if client_name else ''
Expand Down
160 changes: 98 additions & 62 deletions clickhouse_connect/driver/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,72 @@
import asyncio
import warnings
from concurrent.futures import ThreadPoolExecutor
from inspect import signature
from typing import Optional, Union, Dict, Any
from typing import Optional, Union, Dict, Any, Tuple
from urllib.parse import urlparse, parse_qs

import clickhouse_connect.driver.ctypes
import clickhouse_connect.driver.ctypes # noqa: F401 -- side-effect import
from clickhouse_connect.driver.client import Client
from clickhouse_connect.driver.common import dict_copy
from clickhouse_connect.driver.exceptions import ProgrammingError
from clickhouse_connect.driver.httpclient import HttpClient
from clickhouse_connect.driver.asyncclient import AsyncClient, DefaultThreadPoolExecutor, NEW_THREAD_POOL_EXECUTOR

__all__ = ['Client', 'AsyncClient', 'create_client', 'create_async_client']


def default_port(interface: str, secure: bool) -> int:
"""Get default port for the given interface."""
if interface.startswith("http"):
return 8443 if secure else 8123
raise ValueError("Unrecognized ClickHouse interface")


def _parse_connection_params(
host: Optional[str],
username: Optional[str],
password: str,
port: int,
database: str,
interface: Optional[str],
secure: Union[bool, str],
dsn: Optional[str],
kwargs: Dict[str, Any]
) -> Tuple[str, Optional[str], str, int, str, str]:
"""Parse and normalize connection parameters including DSN parsing."""
if dsn:
parsed = urlparse(dsn)
username = username or parsed.username
password = password or parsed.password
host = host or parsed.hostname
port = port or parsed.port
if parsed.path and (not database or database == "__default__"):
database = parsed.path[1:].split("/")[0]
database = database or parsed.path
for k, v in parse_qs(parsed.query).items():
kwargs[k] = v[0]
use_tls = str(secure).lower() == "true" or interface == "https" or (not interface and str(port) in ("443", "8443"))
if not host:
host = "localhost"
if not interface:
interface = "https" if use_tls else "http"
port = port or default_port(interface, use_tls)
if username is None and "user" in kwargs:
username = kwargs.pop("user")
if username is None and "user_name" in kwargs:
username = kwargs.pop("user_name")
if password and username is None:
username = "default"
if "compression" in kwargs and "compress" not in kwargs:
kwargs["compress"] = kwargs.pop("compression")

return host, username, password, port, database, interface


def _validate_access_token(access_token: Optional[str], username: Optional[str], password: str) -> None:
"""Validate that access_token and username/password are not both provided."""
if access_token and (username or password != ""):
raise ProgrammingError("Cannot use both access_token and username/password")


# pylint: disable=too-many-arguments,too-many-locals,too-many-branches
def create_client(*,
host: Optional[str] = None,
Expand Down Expand Up @@ -94,33 +146,11 @@ def create_client(*,
limits. Only available for query operations (not inserts). Default: False
:return: ClickHouse Connect Client instance
"""
if dsn:
parsed = urlparse(dsn)
username = username or parsed.username
password = password or parsed.password
host = host or parsed.hostname
port = port or parsed.port
if parsed.path and (not database or database == '__default__'):
database = parsed.path[1:].split('/')[0]
database = database or parsed.path
for k, v in parse_qs(parsed.query).items():
kwargs[k] = v[0]
use_tls = str(secure).lower() == 'true' or interface == 'https' or (not interface and str(port) in ('443', '8443'))
if not host:
host = 'localhost'
if not interface:
interface = 'https' if use_tls else 'http'
port = port or default_port(interface, use_tls)
if access_token and (username or password != ''):
raise ProgrammingError('Cannot use both access_token and username/password')
if username is None and 'user' in kwargs:
username = kwargs.pop('user')
if username is None and 'user_name' in kwargs:
username = kwargs.pop('user_name')
if password and username is None:
username = 'default'
if 'compression' in kwargs and 'compress' not in kwargs:
kwargs['compress'] = kwargs.pop('compression')
host, username, password, port, database, interface = _parse_connection_params(
host, username, password, port, database, interface, secure, dsn, kwargs
)
_validate_access_token(access_token, username, password)

settings = settings or {}
if interface.startswith('http'):
if generic_args:
Expand All @@ -140,16 +170,11 @@ def create_client(*,
raise ProgrammingError(f'Unrecognized client type {interface}')


def default_port(interface: str, secure: bool):
if interface.startswith('http'):
return 8443 if secure else 8123
raise ValueError('Unrecognized ClickHouse interface')


async def create_async_client(*,
host: Optional[str] = None,
username: Optional[str] = None,
password: str = '',
access_token: Optional[str] = None,
database: str = '__default__',
interface: Optional[str] = None,
port: int = 0,
Expand All @@ -159,6 +184,9 @@ async def create_async_client(*,
generic_args: Optional[Dict[str, Any]] = None,
executor_threads: int = 0,
executor: Union[ThreadPoolExecutor, None, DefaultThreadPoolExecutor] = NEW_THREAD_POOL_EXECUTOR,
connector_limit: int = 100,
connector_limit_per_host: int = 20,
keepalive_timeout: float = 30.0,
**kwargs) -> AsyncClient:
"""
The preferred method to get an async ClickHouse Connect Client instance.
Expand All @@ -169,6 +197,7 @@ async def create_async_client(*,
:param host: The hostname or IP address of the ClickHouse server. If not set, localhost will be used.
:param username: The ClickHouse username. If not set, the default ClickHouse user will be used.
:param password: The password for username.
:param access_token: JWT access token.
:param database: The default database for the connection. If not set, ClickHouse Connect will use the
default database for username.
:param interface: Must be http or https. Defaults to http, or to https if port is set to 8443 or 443
Expand All @@ -180,11 +209,11 @@ async def create_async_client(*,
:param settings: ClickHouse server settings to be used with the session/every request
:param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings.
It is not recommended to use this parameter externally
:param executor_threads: 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default
of 4 + detected CPU cores will be used
:param executor: Optional `ThreadPoolExecutor` to use for async operations. If not set, a new `ThreadPoolExecutor`
will be created with the number of threads specified by `executor_threads`. If set to `None` it will use the
default executor of the event loop.
:param executor_threads: (LEGACY) 'max_worker' threads used by the client ThreadPoolExecutor.
:param executor: (LEGACY) Optional `ThreadPoolExecutor` to use for async operations.
:param connector_limit: Maximum number of allowable connections to the server (native async)
:param connector_limit_per_host: Maximum number of connections per host (native async)
:param keepalive_timeout: Time limit on idle keepalive connections (native async)
:param kwargs -- Recognized keyword arguments (used by the HTTP client), see below

:param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred
Expand Down Expand Up @@ -226,27 +255,34 @@ async def create_async_client(*,
:param form_encode_query_params If True, query parameters will be sent as form-encoded data in the request body
instead of as URL parameters. This is useful for queries with large parameter sets that might exceed URL length
limits. Only available for query operations (not inserts). Default: False
:return: ClickHouse Connect Client instance
:return: ClickHouse Connect AsyncClient instance
"""

warnings.warn(
"The current async client is a thread-pool wrapper around the sync client. "
"A fully native async client is available for testing as a prerelease: "
"pip install 'clickhouse-connect[async]==0.12.0rc1'. "
"This prerelease branch is based on 0.11.0 and is gathering feedback ahead of 1.0.0, "
"where it will become the default async implementation. It is a drop-in replacement "
"with the same API surface. The main line includes additional updates that the native "
"client will receive when merged into 1.0.0.",
FutureWarning,
stacklevel=2,
host, username, password, port, database, interface = _parse_connection_params(
host, username, password, port, database, interface, secure, dsn, kwargs
)
_validate_access_token(access_token, username, password)

if executor_threads != 0 or executor is not NEW_THREAD_POOL_EXECUTOR:
# LEGACY PATH: User explicitly requested executor-based client
def _create_client():
if 'autogenerate_session_id' not in kwargs:
kwargs['autogenerate_session_id'] = False
return create_client(host=host, username=username, password=password, database=database, interface=interface,
port=port, secure=secure, dsn=None, settings=settings, generic_args=generic_args, **kwargs)

loop = asyncio.get_running_loop()
_client = await loop.run_in_executor(None, _create_client)
return AsyncClient(client=_client, executor_threads=executor_threads, executor=executor)

def _create_client():
if 'autogenerate_session_id' not in kwargs:
kwargs['autogenerate_session_id'] = False
return create_client(host=host, username=username, password=password, database=database, interface=interface,
port=port, secure=secure, dsn=dsn, settings=settings, generic_args=generic_args, **kwargs)
# NATIVE PATH: Default to true async client
# Set autogenerate_session_id to False by default
if "autogenerate_session_id" not in kwargs:
kwargs["autogenerate_session_id"] = False

loop = asyncio.get_running_loop()
_client = await loop.run_in_executor(None, _create_client)
return AsyncClient(client=_client, executor_threads=executor_threads, executor=executor)
client = AsyncClient(host=host, username=username, password=password, access_token=access_token,
database=database, interface=interface,
port=port, secure=secure, dsn=None, settings=settings, generic_args=generic_args,
connector_limit=connector_limit, connector_limit_per_host=connector_limit_per_host,
keepalive_timeout=keepalive_timeout, **kwargs)
await client._initialize() # pylint: disable=protected-access
return client
Loading
Loading