Skip to content

Fix/protobuf schema registry imports#1129

Open
SBALAVIGNESH123 wants to merge 4 commits intotimeplus-io:developfrom
SBALAVIGNESH123:fix/protobuf-schema-registry-imports
Open

Fix/protobuf schema registry imports#1129
SBALAVIGNESH123 wants to merge 4 commits intotimeplus-io:developfrom
SBALAVIGNESH123:fix/protobuf-schema-registry-imports

Conversation

@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor

Fixes #1076

What's the problem?
When using kafka_schema_registry_url with ProtobufSingle, Proton fails to read messages whose schema uses import statements like import "google/protobuf/timestamp.proto". The query returns Code: 2504. DB::Exception: No message type in schema instead of the actual data.The root cause is that KafkaSchemaRegistry::fetchSchema() only reads the
schema field from the registry's JSON response and completely ignores the references array. Without the referenced schemas loaded into the DescriptorPool, BuildFile() cannot resolve the import and fails silently.

How does this fix work?
I added two new methods to KafkaSchemaRegistry — fetchSchemaWithReferences()which reads both the schema
text and the references[] array from the registry response, and fetchSchemaBySubjectVersion() which fetches a referenced schema at its pinned version.In ProtobufRowInputFormat.cpp, the fetchSchemaById() method now calls a new resolveReferences() function before BuildFile(). This function walks the references list and for each one checks Google's generated_pool() for well-known types like timestamp.proto first (avoids unnecessary network calls), then fetches user schemas from the registry by subject + version, recursively resolves nested references, and guards against circular references with a resolved_ids set.Once all dependencies are in the DescriptorPool, the main schema's BuildFile() succeeds and the import resolves correctly. All new code is thread-safe via the existing std::mutex and wrapped in // proton: starts / // proton: ends markers.

Files changed
src/Formats/KafkaSchemaRegistry.h — Added SchemaReference and SchemaWithReferences structs, declared two new fetch methods
src/Formats/KafkaSchemaRegistry.cpp — Implemented fetchSchemaWithReferences()
and fetchSchemaBySubjectVersion() following the same HTTP/JSON pattern as the existing fetchSchema()
src/Processors/Formats/Impl/ProtobufRowInputFormat.cpp — Added resolveReferences() and updated fetchSchemaById() to resolve imports before building

Reproduction on upstream Proton 3.0.17
I set up Redpanda + upstream Proton 3.0.17 in Docker and produced 3 protobuf messages with confluent_kafka.schema_registry.protobuf.ProtobufSerializer using a schema that imports google/protobuf/timestamp.proto.
Schema Registry shows the references are registered:

$ curl -s http://localhost:8081/subjects
["test-import-bug-value","events-value","google-protobuf-timestamp","google/protobuf/timestamp.proto"]
Producer confirmed messages at offset 0 with schema_id=4:

Produced id=100 created_at.seconds=1735100000
Produced id=101 created_at.seconds=1735100001
Produced id=102 created_at.seconds=1735100002
Schema ID = 4
References: [{'name': 'google/protobuf/timestamp.proto', 'subject': 'google/protobuf/timestamp.proto', 'version': 1}]
Querying the stream on upstream Proton reproduces the exact bug:

sql
CREATE EXTERNAL STREAM test_sr (
id int64, created_at tuple(seconds int64, nanos int32)
) SETTINGS type='kafka', brokers='redpanda:9092', topic='test-import-bug',
data_format='ProtobufSingle', kafka_schema_registry_url='http://redpanda:8081';
SELECT * FROM test_sr LIMIT 3 SETTINGS seek_to='earliest';
Code: 2504. DB::Exception: Received from localhost:8463. DB::Exception:
No message type in schema: Failed to parse message topic=test-import-bug
partition=0 offset=0: While executing KafkaSource. (INVALID_DATA)
The references array exists in the SR response but upstream Proton ignores it, so BuildFile() fails with "No message type in schema".

Tests
Added

tests/queries_ported/0_stateless/99177_protobuf_schema_registry_imports.sh
which creates a protobuf format schema with import "google/protobuf/timestamp.proto", inserts data, and verifies the output.

@SBALAVIGNESH123
Copy link
Copy Markdown
Contributor Author

Hi @yuzifeng1984 — I noticed this issue is assigned to you. I submitted this PR to help, but happy to defer if you're already working on a fix. If my approach is useful, I'm glad to adjust it based on your feedback. Let me know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

kafka_schema_registry_url cannot resolve protobuf import statements

1 participant