Skip to content

Commit 32bfb39

Browse files
committed
Add composite type support
User-defined composite types (CREATE TYPE ... AS) can now be decoded as PgComposite in query results and sent as query parameters. Register composite types with CodecRegistry.with_composite_type() providing the OID and field descriptors (name/OID pairs). Both PreparedQuery (binary format) and SimpleQuery (text format) decode registered composites. Nested composites and composite arrays are supported. Depth-guarded recursion prevents stack overflow from pathological schemas. PgComposite.from_fields provides a safe construction API for query parameters — each field's name, OID, and value stay together, eliminating the misalignment risk of parallel arrays. Design: #188
1 parent 393682f commit 32bfb39

17 files changed

Lines changed: 2917 additions & 48 deletions
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
## Add composite type support
2+
3+
The driver now supports PostgreSQL composite types (user-defined structured types created with `CREATE TYPE ... AS (...)`). Composite values are decoded as `PgComposite` in query results and can be sent as query parameters.
4+
5+
Register composite types with `CodecRegistry.with_composite_type()`:
6+
7+
```pony
8+
// CREATE TYPE address AS (street text, city text, zip_code int4)
9+
// OID discovered via: SELECT oid FROM pg_type WHERE typname = 'address'
10+
11+
let registry = CodecRegistry
12+
.with_composite_type(16400,
13+
recover val
14+
[as (String, U32): ("street", 25); ("city", 25); ("zip_code", 23)]
15+
end)?
16+
.with_array_type(16401, 16400)? // address[]
17+
```
18+
19+
Access fields by position or name:
20+
21+
```pony
22+
match field.value
23+
| let addr: PgComposite =>
24+
match try addr(0)? end // positional
25+
| let street: String => // ...
26+
end
27+
match try addr.field("city")? end // named
28+
| let city: String => // ...
29+
end
30+
end
31+
```
32+
33+
Send composites as query parameters using `from_fields` for safe construction:
34+
35+
```pony
36+
let addr = PgComposite.from_fields(16400,
37+
recover val
38+
[as (String, U32, (FieldData | None)):
39+
("street", 25, "123 Main St")
40+
("city", 25, "Springfield")
41+
("zip_code", 23, I32(62704))]
42+
end)
43+
session.execute(PreparedQuery("INSERT INTO users (home) VALUES ($1)",
44+
recover val [as FieldDataTypes: addr] end), receiver)
45+
```
46+
47+
Nested composites and composite arrays are supported. Both `PreparedQuery` (binary format) and `SimpleQuery` (text format) decode composites.

CLAUDE.md

Lines changed: 13 additions & 11 deletions
Large diffs are not rendered by default.

examples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ Query cancellation using `Session.cancel()`. Executes a long-running query (`SEL
4646

4747
Connection timeout using the `connection_timeout` parameter on `ServerConnectInfo`. Connects to a configurable host and port with a 3-second timeout via `lori.MakeConnectionTimeout(3000)`, and handles `ConnectionFailedTimeout` in `pg_session_connection_failed`. Shows how the driver reports unreachable servers without hanging indefinitely.
4848

49+
## composite-type
50+
51+
Composite type support via `CodecRegistry.with_composite_type()`. Creates a PostgreSQL composite type, discovers its OID from `pg_type`, registers it with field descriptors, and queries it with `PreparedQuery` to get `PgComposite` results. Uses two sessions to demonstrate the typical two-phase pattern for dynamic OIDs: the first session discovers the OID, the second uses a `CodecRegistry` built with that OID. Shows positional access via `apply()`, named access via `field()`, and sending a `PgComposite` as a query parameter.
52+
4953
## crud
5054

5155
Multi-query workflow mixing `SimpleQuery` and `PreparedQuery`. Creates a table, inserts rows with parameterized INSERTs, selects them back, deletes, and drops the table. Demonstrates all three `Result` types (`ResultSet`, `RowModifying`, `SimpleResult`) and `ErrorResponseMessage` error handling.
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
"""
2+
Composite type support via `CodecRegistry.with_composite_type()`. Creates a
3+
PostgreSQL composite type, discovers its OID from `pg_type`, registers it
4+
with field descriptors, and queries it with `PreparedQuery` to get
5+
`PgComposite` results. Uses two sessions to demonstrate the typical
6+
two-phase pattern for dynamic OIDs: the first session discovers the OID,
7+
the second uses a `CodecRegistry` built with that OID.
8+
9+
Shows positional access via `apply()`, named access via `field()`, and
10+
sending a `PgComposite` as a query parameter.
11+
"""
12+
use "cli"
13+
use "collections"
14+
use lori = "lori"
15+
// in your code this `use` statement would be:
16+
// use "postgres"
17+
use "../../postgres"
18+
19+
actor Main
20+
new create(env: Env) =>
21+
let server_info = ServerInfo(env.vars)
22+
let auth = lori.TCPConnectAuth(env.root)
23+
Client(auth, server_info, env.out)
24+
25+
actor Client is (SessionStatusNotify & ResultReceiver)
26+
let _auth: lori.TCPConnectAuth
27+
let _info: ServerInfo
28+
let _out: OutStream
29+
var _session: Session
30+
var _phase: USize = 0
31+
var _composite_oid: U32 = 0
32+
33+
new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) =>
34+
_auth = auth
35+
_info = info
36+
_out = out
37+
_session = Session(
38+
ServerConnectInfo(_auth, _info.host, _info.port),
39+
DatabaseConnectInfo(_info.username, _info.password, _info.database),
40+
this)
41+
42+
be close() =>
43+
_session.close()
44+
45+
be pg_session_authenticated(session: Session) =>
46+
match _phase
47+
| 0 =>
48+
// Phase 1: set up the composite type and discover its OID.
49+
_out.print("Authenticated (phase 1: discover composite OID).")
50+
session.execute(
51+
SimpleQuery("DROP TYPE IF EXISTS address"), this)
52+
| 4 =>
53+
// Phase 2: query with composite-aware registry.
54+
_out.print("Authenticated (phase 2: query with registered composite).")
55+
_phase = 5
56+
57+
// SELECT a composite literal
58+
session.execute(PreparedQuery(
59+
"SELECT ROW('123 Main St','Springfield',62704)::address AS addr",
60+
recover val Array[FieldDataTypes] end), this)
61+
end
62+
63+
be pg_session_authentication_failed(
64+
s: Session,
65+
reason: AuthenticationFailureReason)
66+
=>
67+
_out.print("Failed to authenticate.")
68+
69+
be pg_query_result(session: Session, result: Result) =>
70+
_phase = _phase + 1
71+
72+
match _phase
73+
| 1 =>
74+
// Old type dropped. Create the composite type.
75+
_out.print("Creating composite type 'address'...")
76+
session.execute(SimpleQuery(
77+
"CREATE TYPE address AS (street text, city text, zip_code int4)"),
78+
this)
79+
| 2 =>
80+
// Type created. Query its OID from pg_type.
81+
_out.print("Querying composite OID from pg_type...")
82+
session.execute(
83+
SimpleQuery("SELECT oid FROM pg_type WHERE typname = 'address'"),
84+
this)
85+
| 3 =>
86+
// Got the OID. Parse it.
87+
var oid: U32 = 0
88+
match result
89+
| let rs: ResultSet =>
90+
try
91+
match rs.rows()(0)?.fields(0)?.value
92+
| let s: String => oid = s.u32()?
93+
end
94+
end
95+
end
96+
if oid == 0 then
97+
_out.print("Failed to discover composite OID.")
98+
close()
99+
return
100+
end
101+
_composite_oid = oid
102+
_out.print("Discovered composite OID: " + oid.string())
103+
104+
// Register and query the array OID too if needed
105+
_out.print("Querying array OID from pg_type...")
106+
session.execute(
107+
SimpleQuery("SELECT typarray FROM pg_type WHERE oid = "
108+
+ oid.string()),
109+
this)
110+
| 4 =>
111+
// Got the array OID. Build registry, close, reconnect.
112+
var array_oid: U32 = 0
113+
match result
114+
| let rs: ResultSet =>
115+
try
116+
match rs.rows()(0)?.fields(0)?.value
117+
| let s: String => array_oid = s.u32()?
118+
end
119+
end
120+
end
121+
122+
let descriptors: Array[(String, U32)] val = recover val
123+
[as (String, U32): ("street", 25); ("city", 25); ("zip_code", 23)]
124+
end
125+
let registry = try
126+
let r = CodecRegistry
127+
.with_composite_type(_composite_oid, descriptors)?
128+
if array_oid > 0 then
129+
r.with_array_type(array_oid, _composite_oid)?
130+
else
131+
r
132+
end
133+
else
134+
_out.print("Failed to register composite type.")
135+
close()
136+
return
137+
end
138+
session.close()
139+
_session = Session(
140+
ServerConnectInfo(_auth, _info.host, _info.port),
141+
DatabaseConnectInfo(_info.username, _info.password, _info.database),
142+
this where registry = registry)
143+
| 6 =>
144+
// PreparedQuery result. The composite value arrives as PgComposite.
145+
match result
146+
| let rs: ResultSet =>
147+
_out.print("ResultSet (" + rs.rows().size().string() + " rows):")
148+
for row in rs.rows().values() do
149+
for field in row.fields.values() do
150+
match field.value
151+
| let c: PgComposite =>
152+
_out.print(" " + field.name + " (composite):")
153+
// Positional access
154+
try
155+
match c(0)?
156+
| let s: String => _out.print(" [0] street = " + s)
157+
| None => _out.print(" [0] street = NULL")
158+
end
159+
end
160+
// Named access
161+
try
162+
match c.field("city")?
163+
| let s: String => _out.print(" city = " + s)
164+
| None => _out.print(" city = NULL")
165+
end
166+
end
167+
try
168+
match c.field("zip_code")?
169+
| let v: I32 =>
170+
_out.print(" zip_code = " + v.string())
171+
| None => _out.print(" zip_code = NULL")
172+
end
173+
end
174+
// String representation
175+
_out.print(" string() = " + c.string())
176+
| None => _out.print(" " + field.name + " = NULL")
177+
end
178+
end
179+
end
180+
end
181+
182+
// Now send a PgComposite as a parameter
183+
_out.print("Sending PgComposite as query parameter...")
184+
let addr = PgComposite.from_fields(_composite_oid,
185+
recover val
186+
[as (String, U32, (FieldData | None)):
187+
("street", 25, "42 Elm St")
188+
("city", 25, "Portland")
189+
("zip_code", 23, I32(97201))]
190+
end)
191+
session.execute(PreparedQuery(
192+
"SELECT $1::address AS roundtrip",
193+
recover val [as FieldDataTypes: addr] end), this)
194+
| 7 =>
195+
// Roundtrip result
196+
match result
197+
| let rs: ResultSet =>
198+
_out.print("Roundtrip result:")
199+
try
200+
match rs.rows()(0)?.fields(0)?.value
201+
| let c: PgComposite =>
202+
_out.print(" " + c.string())
203+
end
204+
end
205+
end
206+
// Clean up
207+
_out.print("Dropping composite type...")
208+
session.execute(SimpleQuery("DROP TYPE address"), this)
209+
| 8 =>
210+
_out.print("Done.")
211+
close()
212+
end
213+
214+
be pg_query_failed(session: Session, query: Query,
215+
failure: (ErrorResponseMessage | ClientQueryError))
216+
=>
217+
match failure
218+
| let e: ErrorResponseMessage =>
219+
_out.print("Query failed: [" + e.severity + "] " + e.code + ": "
220+
+ e.message)
221+
| let _: SessionClosed =>
222+
return
223+
| let e: ClientQueryError =>
224+
_out.print("Query failed: client error")
225+
end
226+
close()
227+
228+
class val ServerInfo
229+
let host: String
230+
let port: String
231+
let username: String
232+
let password: String
233+
let database: String
234+
235+
new val create(vars: (Array[String] val | None)) =>
236+
let e = EnvVars(vars)
237+
host = try e("POSTGRES_HOST")? else "127.0.0.1" end
238+
port = try e("POSTGRES_PORT")? else "5432" end
239+
username = try e("POSTGRES_USERNAME")? else "postgres" end
240+
password = try e("POSTGRES_PASSWORD")? else "postgres" end
241+
database = try e("POSTGRES_DATABASE")? else "postgres" end

postgres/_array_encoder.pony

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ primitive _ArrayEncoder
1414
Byte[length] data
1515
```
1616
17-
Coupling: element encoding must stay in sync with
18-
`_FrontendMessage.bind()` and `_binary_codecs.pony`. Changes to scalar
17+
Coupling: element encoding must stay in sync with `_CompositeEncoder`,
18+
`_FrontendMessage.bind()`, and `_binary_codecs.pony`. Changes to scalar
1919
binary encoding in those files must be mirrored here.
2020
"""
21-
fun apply(a: PgArray): Array[U8] val ? =>
21+
fun apply(a: PgArray, registry: CodecRegistry = CodecRegistry)
22+
: Array[U8] val ?
23+
=>
2224
if a.elements.size() == 0 then
2325
return recover val
2426
let msg = Array[U8].init(0, 12)
@@ -38,7 +40,8 @@ primitive _ArrayEncoder
3840
for elem in a.elements.values() do
3941
match elem
4042
| None => enc.push(None)
41-
| let fd: FieldData => enc.push(_encode_element(fd, a.element_oid)?)
43+
| let fd: FieldData =>
44+
enc.push(_encode_element(fd, a.element_oid, registry)?)
4245
end
4346
end
4447
enc
@@ -97,7 +100,9 @@ primitive _ArrayEncoder
97100
msg
98101
end
99102

100-
fun _encode_element(fd: FieldData, element_oid: U32): Array[U8] val ? =>
103+
fun _encode_element(fd: FieldData, element_oid: U32,
104+
registry: CodecRegistry): Array[U8] val ?
105+
=>
101106
match fd
102107
| let v: I16 => _Int2BinaryCodec.encode(v)?
103108
| let v: I32 => _Int4BinaryCodec.encode(v)?
@@ -110,6 +115,7 @@ primitive _ArrayEncoder
110115
| let v: PgTime => _TimeBinaryCodec.encode(v)?
111116
| let v: PgTimestamp => _TimestampBinaryCodec.encode(v)?
112117
| let v: PgInterval => _IntervalBinaryCodec.encode(v)?
118+
| let c: PgComposite => _CompositeEncoder(c, registry)?
113119
| let v: String =>
114120
// Route by element_oid for string-producing types
115121
match element_oid

0 commit comments

Comments
 (0)