Add Airbyte protocol v2 compliance (stream status, per-stream state)#149
Add Airbyte protocol v2 compliance (stream status, per-stream state)#149Smidge wants to merge 3 commits intoplanetscale:mainfrom
Conversation
Airbyte 2.x requires sources to emit STREAM_STATUS trace messages (STARTED, COMPLETE, INCOMPLETE) for each stream. Without these, every sync fails with: "streams did not receive a terminal stream status message" Changes: - Add TRACE message type and stream status constants to types.go - Add StreamDescriptor, AirbyteStreamStatus, AirbyteTraceMessage types - Replace legacy global State() with per-stream StreamState() that emits state.type=STREAM (required by Airbyte 2.x, which rejects the LEGACY format with IllegalArgumentException) - Add StreamStatus() method to emit STARTED/COMPLETE/INCOMPLETE traces - Update AirbyteLogger interface and test mock accordingly
Update the read command to be fully compatible with Airbyte 2.x: Read loop changes: - Emit STARTED before reading each stream - Emit COMPLETE after successful read, INCOMPLETE on error - Replace os.Exit(1) with break on per-stream errors so remaining streams still get status messages - Emit per-stream STATE (type=STREAM) after each stream completes instead of one global state blob at the end State parsing changes: - Handle Airbyte v2 per-stream state format on incremental syncs. Airbyte 2.x passes state back as a JSON array of per-stream state objects, not the legacy global SyncState blob. Without this, the second sync always fails because json.Unmarshal fails on the array format, causing os.Exit(1) before any streams are processed. - Fall back to legacy format for backwards compatibility - Default empty namespace to source database name to prevent state key mismatches
Logger tests: - StreamState emits correct per-stream format with type=STREAM - Multiple shards included in state output - No legacy "data" field present (would cause LEGACY rejection) - StreamStatus emits TRACE messages with correct status values - JSON round-trip matches exact Airbyte protocol v2 structure Read protocol tests: - Read emits per-stream STATE, not legacy global state - STARTED and COMPLETE emitted for each configured stream - Correct message ordering: STARTED -> STATE -> COMPLETE - Multi-shard state contains all shard cursors - Read errors emit INCOMPLETE and skip state emission
maxenglander
left a comment
There was a problem hiding this comment.
thanks @Smidge! couple potential issues:
• 1. cmd/airbyte-source/read.go:L118-146 loses checkpointed progress for multi-shard streams. The PR updates syncState.Streams[streamStateKey].Shards[shardName] inside the shard loop, but only emits StreamState(...) after the whole loop succeeds. If shard A advances and shard B then fails, the stream is marked INCOMPLETE and no state message is emitted for shard A’s new cursor, so the next retry re-reads already-synced data.
2. cmd/airbyte-source/read.go:131-147 no longer fails the command when a stream read errors. On ch.Database.Read(...) error, the code emits INCOMPLETE, breaks out of the shard loop, and then continues without returning an error, so read exits successfully overall.
Problem
The connector does not work with Airbyte 2.x. Every sync fails with one of two errors:
Missing stream status messages:
LEGACY state rejection (on incremental syncs):
These were optional in Airbyte 1.x but are now enforced in 2.x (protocol v0.2.0+).
Root Cause
Three issues in the connector:
No stream status trace messages. Airbyte 2.x requires
STARTEDandCOMPLETE/INCOMPLETEstatus messages for every stream. The connector emits none.Legacy global state format. The connector emits a single global state blob (
{"data": {"streams": {...}}}). Airbyte 2.x requires per-stream state messages with{"type": "STREAM", "stream": {"stream_descriptor": {...}, "stream_state": {...}}}.Cannot parse v2 state input. On incremental syncs, Airbyte 2.x passes state back as a JSON array of per-stream state objects. The connector only understands the legacy global format, so
json.Unmarshalfails and the process exits before reading any data.Changes
Commit 1: Add stream status trace messages
TRACEmessage type andSTREAM_STATUSconstants (STARTED,COMPLETE,INCOMPLETE)StreamDescriptor,AirbyteStreamStatus,AirbyteTraceMessagetypesState(SyncState)withStreamState(namespace, streamName, ShardStates)that emits per-stream state withtype=STREAMStreamStatus()method to emit stream lifecycle trace messagesAirbyteLoggerinterface and test mockCommit 2: Update read loop and state parsing
STARTEDbefore reading each stream,COMPLETEon success,INCOMPLETEon erroros.Exit(1)withbreakon per-stream errors so remaining streams still get status messagesSTATEafter each stream completes (not one global blob at the end)Commit 3: Tests
Testing
docker run ... read | grep STATEshows"type":"STREAM"per-stream stateSTARTEDandCOMPLETEtrace messages appear for each streamCompatibility