A high-performance, feature-rich OCaml driver for Timeplus Proton - the streaming database built on ClickHouse.
- Streaming Queries - Process large datasets with constant memory usage
- Async Inserts - High-throughput data ingestion with automatic batching
- Compression - LZ4 and ZSTD support for reduced network overhead
- TLS Security - Secure connections with certificate validation
- Connection Pooling - Efficient resource management for high-concurrency applications
- Rich Data Types - Full support for ClickHouse types including Arrays, Maps, Enums, DateTime64
- Idiomatic OCaml - Functional API leveraging OCaml's strengths
- Quick Start
- Parameterized Queries
- Streaming Queries
- Async Inserts
- Data Types
- Configuration Options
- Real-World Examples
- Development Setup
- Testing
- Docker Environment
- TLS Configuration
- Contributing
- License
- Support
opam install protonopen Proton
(* Create a client *)
let client = Client.create ~host:"localhost" ~port:8463 ~database:"default" () in
(* Execute a simple query *)
let%lwt result = Client.execute client "SELECT name, age FROM users LIMIT 10" in
match result with
| Client.NoRows -> Lwt_io.println "No results found"
| Client.Rows (rows, columns) ->
List.iter (fun row ->
let values = List.map Column.value_to_string row in
Printf.printf "%s\n" (String.concat ", " values)
) rows;
Lwt.return_unitAvoid string interpolation by binding values through named placeholders. Placeholders use
the syntax {{name}} and are supplied as (string * Proton.Column.value) pairs.
open Proton
let client = Client.create () in
let%lwt result =
Client.execute_with_params client
"SELECT * FROM events WHERE tenant = {{tenant}} AND ts >= {{start}}"
~params:[
("tenant", Column.String "acme");
("start", Column.DateTime64 (1_700_000_000_000L, 3, Some "UTC"));
]
in
match result with
| Client.NoRows -> print_endline "no matches"
| Client.Rows (rows, _) -> Printf.printf "fetched %d rows\n" (List.length rows)open Proton
let client = Client.create () in
let stmt = Client.prepare client
"SELECT count(*) FROM metrics WHERE name = {{metric}} AND ts >= {{window_start}}"
in
let run metric window_start =
Client.execute_prepared client stmt
~params:[
("metric", Column.String metric);
("window_start", Column.DateTime64 (window_start, 3, Some "UTC"));
]
in
let%lwt yesterday = run "cpu.usage" 1_699_913_600_000L in
let%lwt today = run "cpu.usage" 1_699_999_200_000L in
(* ... *)All streaming helpers have _with_params and _prepared variants so you can bind values while
folding or iterating over large result sets.
Process arbitrarily large result sets with constant memory usage using OCaml's functional patterns:
(* Calculate total sales amount *)
let%lwt total_sales = Client.query_fold client
"SELECT amount FROM sales WHERE date >= '2024-01-01'"
~init:0.0
~f:(fun acc row -> match row with
| [Column.Float64 amount] -> Lwt.return (acc +. amount)
| _ -> Lwt.return acc)
in
Printf.printf "Total sales: $%.2f\n" total_sales(* Send personalized emails *)
let%lwt () = Client.query_iter client
"SELECT name, email FROM users WHERE active = 1"
~f:(fun row -> match row with
| [Column.String name; Column.String email] ->
send_email ~to:email ~subject:("Hi " ^ name) ~body:"..."
| _ -> Lwt.return_unit)(* Process data in pipeline with lazy evaluation *)
let%lwt user_seq = Client.query_to_seq client "SELECT * FROM users" in
let processed = user_seq
|> Seq.filter is_premium_user
|> Seq.map extract_preferences
|> Seq.take 1000
|> Seq.fold_left update_recommendations init_state(* Get both data and column information *)
let%lwt result = Client.query_fold_with_columns client
"SELECT id, name, created_at FROM products"
~init:[]
~f:(fun acc row columns ->
let product = parse_product_row row columns in
Lwt.return (product :: acc))
in
Printf.printf "Found %d products with columns: %s\n"
(List.length result.rows)
(String.concat "," (List.map fst result.columns))The Proton OCaml driver provides powerful async insert functionality for high-throughput data ingestion scenarios where you need to insert many rows efficiently without blocking your application.
- Batching: Automatically batches multiple rows before sending to reduce network overhead
- Background processing: Non-blocking inserts that don't block your main application logic
- Configurable triggers: Flush based on row count, byte size, or time intervals
- Error handling: Automatic retry logic with exponential backoff
- Buffer management: Automatic memory management with configurable limits
- Binary protocol: Uses ClickHouse native binary Data packet protocol for optimal performance
open Proton
let client = Client.create ~host:"localhost" ~database:"mydb" () in
(* Insert a single row *)
let row = [
Column.String "user123";
Column.Int32 42l;
Column.DateTime (Int64.of_float (Unix.gettimeofday ()), None)
] in
Client.insert_row client "events"
~columns:[("user_id", "String"); ("value", "Int32"); ("timestamp", "DateTime")]
rowlet rows = [
[Column.String "user1"; Column.Int32 10l];
[Column.String "user2"; Column.Int32 20l];
[Column.String "user3"; Column.Int32 30l];
] in
Client.insert_rows client "users"
~columns:[("name", "String"); ("score", "Int32")]
rows(* Create custom configuration *)
let config = {
(Async_insert.default_config "events") with
max_batch_size = 10000; (* Batch up to 10k rows *)
max_batch_bytes = 5_000_000; (* Or 5MB of data *)
flush_interval = 10.0; (* Flush every 10 seconds *)
max_retries = 5; (* Retry failed batches 5 times *)
retry_delay = 2.0; (* Start with 2 second retry delay *)
} in
(* Create and start async inserter *)
let inserter = Async_insert.create config client.conn in
Async_insert.start inserter;
(* Add rows continuously *)
let rec add_events () =
let event = generate_event_data () in
let%lwt () = Async_insert.add_row inserter event in
(* Your application logic continues immediately *)
let%lwt () = Lwt_unix.sleep 0.1 in
add_events ()
in
Lwt.async add_events;
(* Later, when shutting down *)
let%lwt () = Async_insert.stop inserter in
Client.disconnect clienttable_name: Target table namemax_batch_size: Maximum rows per batch (default: 1000)max_batch_bytes: Maximum bytes per batch (default: 1MB)flush_interval: Seconds between automatic flushes (default: 5.0)max_retries: Maximum retry attempts on failure (default: 3)retry_delay: Initial retry delay in seconds (default: 1.0)
Client.insert_row: Insert a single row (convenience method)Client.insert_rows: Insert multiple rows (convenience method)Client.create_async_inserter: Create a reusable async inserter
Async_insert.create: Create an async inserterAsync_insert.start: Start background processingAsync_insert.stop: Stop and flush remaining dataAsync_insert.add_row: Add a single row to bufferAsync_insert.add_rows: Add multiple rows to bufferAsync_insert.flush: Force immediate flushAsync_insert.get_stats: Get current buffer statistics
- Batch size: Larger batches reduce network overhead but use more memory
- Flush interval: Longer intervals improve throughput but increase latency
- Retry strategy: More retries improve reliability but can delay error detection
- Binary protocol: Uses native ClickHouse binary format for maximum efficiency
The async inserter handles errors gracefully:
- Network failures trigger automatic retries with exponential backoff
- After max retries, errors are logged but don't crash your application
- Use
Async_insert.stopto ensure all data is flushed before shutdown
The async inserter is thread-safe and uses Lwt mutexes internally. Multiple threads can safely call add_row concurrently.
(* High-throughput event ingestion system *)
open Lwt.Syntax
let setup_event_pipeline () =
let client = Client.create ~host:"proton-cluster" ~database:"analytics" () in
let config = {
(Async_insert.default_config "events") with
max_batch_size = 5000;
max_batch_bytes = 2_000_000;
flush_interval = 5.0;
max_retries = 3;
} in
let inserter = Async_insert.create config client.conn in
Async_insert.start inserter;
(* Process incoming events *)
let process_event event_json =
let event_row = [
Column.String event_json.user_id;
Column.String event_json.event_type;
Column.DateTime (Int64.of_float event_json.timestamp, Some "UTC");
Column.String (Yojson.to_string event_json.properties);
] in
Async_insert.add_row inserter event_row
in
(* Setup graceful shutdown *)
Lwt_unix.on_signal Sys.sigterm (fun _ ->
Printf.printf "Shutting down event pipeline...\n";
Lwt.async (fun () ->
let* () = Async_insert.stop inserter in
let* () = Client.disconnect client in
Lwt_io.println "Pipeline shutdown complete"
)
);
(inserter, process_event)Full support for ClickHouse data types with OCaml-native representations:
open Column
let sample_row = [
String "Hello World";
Int32 42l;
Int64 1234567890L;
Float64 3.14159;
DateTime (Int64.of_float (Unix.gettimeofday ()), Some "UTC");
DateTime64 (Int64.of_float (Unix.gettimeofday () *. 1000.), 3, Some "UTC");
Enum8 ("status", 1);
Array [| Int32 1l; Int32 2l; Int32 3l |];
Map [(String "key1", String "value1"); (String "key2", String "value2")];
]let client = Client.create
~host:"proton.example.com"
~port:8463
~database:"analytics"
~user:"readonly"
~password:"secret123"
~compression:Protocol.LZ4 (* or ZSTD, or None *)
~tls_config:{
ca_file = Some "/path/to/ca.pem";
cert_file = Some "/path/to/client.pem";
key_file = Some "/path/to/client.key";
verify_hostname = true;
}
~settings:[
("max_block_size", "65536");
("connect_timeout", "10");
]
()(* For high-concurrency applications *)
let pool = Pool_lwt.create
~max_connections:20
~create:(fun () ->
Client.create ~host:"localhost" ~database:"default" ())
~validate:(fun client ->
Lwt.catch
(fun () -> let%lwt _ = Client.execute client "SELECT 1" in Lwt.return true)
(fun _ -> Lwt.return false))
()
in
Pool_lwt.use pool (fun client ->
Client.query_fold client "SELECT * FROM large_table" ~init:0 ~f:(...)
)(* Process clickstream data *)
let process_clickstream () =
let%lwt () = Client.query_iter client
"SELECT user_id, page, timestamp FROM clicks WHERE date = today()"
~f:(fun row -> match row with
| [String user_id; String page; DateTime (ts, _)] ->
update_user_session user_id page ts
| _ -> Lwt.return_unit)
in
Lwt_io.println "Clickstream processing complete"
(* Real-time aggregation *)
let calculate_metrics () =
let%lwt metrics = Client.query_fold_with_columns client
"SELECT country, COUNT(*) as visits, AVG(duration) as avg_duration
FROM sessions
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY country"
~init:[]
~f:(fun acc row _columns -> match row with
| [String country; Int64 visits; Float64 duration] ->
let metric = { country; visits = Int64.to_int visits; avg_duration = duration } in
Lwt.return (metric :: acc)
| _ -> Lwt.return acc)
in
publish_metrics_to_dashboard metrics.rows(* Extract, Transform, Load pipeline *)
let etl_pipeline source_query transform_fn target_table =
let inserter = Client.create_async_inserter client target_table in
Async_insert.start inserter;
let%lwt () = Client.query_iter client source_query
~f:(fun row ->
let transformed = transform_fn row in
Async_insert.add_row inserter transformed)
in
Async_insert.stop inserter# macOS
brew install opam pkg-config openssl@3 lz4
# Ubuntu/Debian
sudo apt install opam pkg-config libssl-dev liblz4-dev build-essential
# Setup OCaml
opam init --disable-sandboxing
opam switch create proton 4.14.0
opam install dune lwt tls-lwt lz4 zstd alcotestgit clone <repository-url>
cd proton-ocaml-driver
dune build
dune runtestRun all tests:
make test
# or
dune test# Run all tests
dune test
# Run tests with verbose output
dune test --verbose
# Clean and rebuild before testing
dune clean && dune test# Run tests (always shows output)
make test
# Run tests silently (only shows failures)
make test-silent
# Verbose tests with build info
make test-verbose
# Watch mode (re-runs on file changes)
make test-watch
# Clean build artifacts
make cleanAll tests use the Alcotest framework, which provides colored output:
- β
[OK]- Test passed - β
[FAIL]- Test failed with details
test/
βββ test_suite.ml # Main test file with all test cases
- CityHash - Tests the CityHash128 implementation
- Compression - LZ4 compression/decompression tests
- Binary - Binary encoding/decoding tests
- Connection - Connection and client creation tests
- Add your test function to
test/test_suite.ml:
let test_my_feature () =
Alcotest.(check string) "description" "expected" "actual"- Add it to a test suite:
let my_tests = [
Alcotest.test_case "My test" `Quick test_my_feature;
]- Register the suite in the main runner:
let () =
Alcotest.run "Proton OCaml Driver" [
"My Tests", my_tests;
(* ... other suites ... *)
]To run a specific test suite, you can filter by name:
dune exec test/test_suite.exe -- test "Compression"The test suite is designed to be CI-friendly:
- Exit code 0 on success, non-zero on failure
- Machine-readable output available
- No external dependencies beyond OCaml libraries
This directory contains a complete Docker-based test environment for the Proton OCaml driver, based on the ServiceRadar project's proven Proton setup.
-
Start the environment:
make -f Makefile.docker up
-
Run all tests:
make -f Makefile.docker test -
Get a development shell:
make -f Makefile.docker shell
The test environment consists of two main services:
- proton: Timeplus Proton database configured for testing
- ocaml-env: OCaml development environment with all dependencies
- Image:
ghcr.io/timeplus-io/proton:latest - Ports:
8123: HTTP interface8463: Native TCP protocol (primary)8443: HTTPS (TLS)9440: Native TCP with TLS
- Configuration: Custom config optimized for testing
- Data: Persisted in Docker volume
proton-data
- Base:
ocaml/opam:ubuntu-24.04-ocaml-5.1 - Features:
- OCaml 5.1.1
- All project dependencies pre-installed
- Development tools (utop, merlin, ocaml-lsp-server)
- Source code mounted for live development
- Cache: OPAM and Dune caches persisted in volumes
Use make -f Makefile.docker <command>:
| Command | Description |
|---|---|
help |
Show all available commands |
up |
Start the test environment |
down |
Stop the environment |
test |
Run all tests |
test-simple |
Run simple tests only |
test-lwt |
Run Lwt tests only |
build-project |
Build the OCaml project |
shell |
Get shell in OCaml container |
proton-shell |
Connect to Proton CLI |
logs |
Show all service logs |
clean |
Clean up everything |
examples |
Run example programs |
-
Start development environment:
make -f Makefile.docker dev-setup
-
Make changes to your OCaml code (files are mounted, so changes are immediate)
-
Test your changes:
make -f Makefile.docker build-project make -f Makefile.docker test -
Debug interactively:
make -f Makefile.docker shell # Inside container: dune utop src/
The OCaml container has these environment variables set:
PROTON_HOST=protonPROTON_PORT=8463PROTON_HTTP_PORT=8123PROTON_DATABASE=defaultPROTON_USER=defaultPROTON_PASSWORD=(empty for testing)
The Proton database is configured with:
- Memory: Limited to 2GB for development
- Logging: Error level only
- Authentication: Default user with no password
- Compression: LZ4 enabled for testing compression features
- Streaming: Optimized for streaming query testing
The environment supports testing all driver features:
- β Basic Connectivity: TCP and HTTP protocols
- β Data Types: All ClickHouse/Proton types
- β Compression: LZ4 and ZSTD
- β Streaming Queries: Real-time data streaming
- β Async Inserts: Batch and streaming inserts
- β TLS/SSL: Secure connections (ports 8443, 9440)
- β Connection Pooling: Multi-connection scenarios
- β Error Handling: Server exception testing
make -f Makefile.docker logs-protonmake -f Makefile.docker shell
dune clean
dune build --verbose# Run tests individually
make -f Makefile.docker shell
dune exec test_simple/simple_test.exe
dune test --verbose# Test from OCaml container
make -f Makefile.docker shell
curl http://proton:8123/?query=SELECT%201make -f Makefile.docker clean
make -f Makefile.docker updocker/
βββ Dockerfile.ocaml # OCaml development environment
βββ proton-config.xml # Proton server configuration
βββ users.xml # Proton user configuration
βββ test-runner.sh # Comprehensive test script
This Docker setup can be easily integrated into CI/CD pipelines:
# GitHub Actions example
- name: Start test environment
run: make -f Makefile.docker up
- name: Run tests
run: make -f Makefile.docker test
- name: Cleanup
run: make -f Makefile.docker down- First startup: ~30-60 seconds (downloading images, building OCaml)
- Subsequent startups: ~10-15 seconds
- Test execution: ~30-60 seconds for full suite
- Memory usage: ~2-3GB total (Proton: 2GB, OCaml: 1GB)
This setup is optimized for testing and development. For production:
- Enable authentication and TLS
- Increase memory limits
- Configure persistent storage
- Set up monitoring and logging
- Use production-grade container orchestration
This document describes how to set up and use Timeplus Proton with TLS/mTLS support for the OCaml driver.
./docker/generate-certs.shThis creates the following certificates in docker/certs/:
ca.pem/ca-key.pem- Certificate Authorityserver.pem/server-key.pem- Server certificate for Protonclient.pem/client-key.pem- Client certificate for OCaml driverdhparam.pem- DH parameters for TLS
# Start both TLS and non-TLS Proton instances
docker-compose -f docker-compose-tls.yml up -dThis starts:
- proton - Standard Proton without TLS (ports 8123, 8463)
- proton-tls - Proton with mTLS enabled (ports 8443, 9440)
- ocaml-env - OCaml development environment
./docker/test-tls.sh- HTTP Port: 8123
- Native TCP Port: 8463
- User: default (no password)
- Config:
docker/proton-config.xml,docker/users.xml
- HTTPS Port: 8443
- Native TCP Secure Port: 9440
- HTTP Port (non-TLS): 8124
- Native TCP Port (non-TLS): 8464
- Config:
docker/proton-config-tls.xml,docker/users-tls.xml
Two users are configured for TLS mode:
-
proton_user (full access)
- Password:
proton_ocaml_test
- Password:
-
readonly_user (read-only)
- Password:
readonly_test
- Password:
The docker-compose file sets up these environment variables:
# Non-TLS configuration
PROTON_HOST=proton
PROTON_PORT=8463
PROTON_USER=default
PROTON_PASSWORD=
# TLS configuration
PROTON_TLS_HOST=proton-tls
PROTON_TLS_PORT=9440
PROTON_TLS_USER=proton_user
PROTON_TLS_PASSWORD=proton_ocaml_test
PROTON_TLS_CA_CERT=/app/certs/ca.pem
PROTON_TLS_CLIENT_CERT=/app/certs/client.pem
PROTON_TLS_CLIENT_KEY=/app/certs/client-key.pem(* Non-TLS connection *)
let client = Proton.Client.create
~host:"proton"
~port:8463
()
(* TLS connection with mTLS *)
let tls_config = Proton.Tls.{
ca_cert = "/app/certs/ca.pem";
client_cert = Some "/app/certs/client.pem";
client_key = Some "/app/certs/client-key.pem";
verify_mode = Strict;
}
let tls_client = Proton.Client.create
~host:"proton-tls"
~port:9440
~user:"proton_user"
~password:"proton_ocaml_test"
~tls:tls_config
()# Test non-TLS
curl "http://localhost:8123/?query=SELECT%201"
# Test TLS with authentication
curl -k "https://localhost:8443/?query=SELECT%201" \
-H "X-ClickHouse-User: proton_user" \
-H "X-ClickHouse-Key: proton_ocaml_test" \
--cert docker/certs/client.pem \
--key docker/certs/client-key.pem \
--cacert docker/certs/ca.pem# Inside the OCaml container
docker-compose -f docker-compose-tls.yml exec ocaml-env bash
# Run tests
dune test
# Run specific TLS tests (when implemented)
dune test test_tls-
Certificates: The generated certificates are for testing only. In production:
- Use certificates from a trusted CA
- Store private keys securely
- Rotate certificates regularly
-
Passwords: The example passwords are hardcoded for testing. In production:
- Use strong, randomly generated passwords
- Store passwords in secure vaults
- Use environment variables or secrets management
-
Network: The test setup allows connections from anywhere (
::/0). In production:- Restrict network access to specific IPs/ranges
- Use firewalls and network policies
- Enable audit logging
# Verify certificate
openssl x509 -in docker/certs/server.pem -text -noout
# Test TLS connection
openssl s_client -connect localhost:9440 \
-cert docker/certs/client.pem \
-key docker/certs/client-key.pem \
-CAfile docker/certs/ca.pem- Check if services are running:
docker-compose -f docker-compose-tls.yml ps - Check logs:
docker-compose -f docker-compose-tls.yml logs proton-tls - Verify ports are exposed:
docker port proton-ocaml-test-tls
- Verify user exists: Connect with default user first
- Check password hash:
echo -n "password" | sha256sum - Review Proton logs for auth errors
docker/generate-certs.sh- Certificate generation scriptdocker/proton-config-tls.xml- Proton TLS configurationdocker/users-tls.xml- User configuration with passwordsdocker/Dockerfile.proton-tls- Docker image for TLS Protondocker-compose-tls.yml- Docker Compose with TLS servicesdocker/test-tls.sh- TLS connection test scriptdocker/certs/- Generated certificates directory
Contributions are welcome! Please see our contribution guidelines.
This project is licensed under the MIT License - see the LICENSE file for details.
- Issues: GitHub Issues
- Timeplus Community: Timeplus Slack