From bd13c2adc5f767c5e6971de73439784ffccb42d3 Mon Sep 17 00:00:00 2001 From: michaelr Date: Tue, 16 Feb 2016 15:52:22 +0200 Subject: [PATCH 1/3] optionaly return schema id together with decoded message --- confluent/schemaregistry/serializers/MessageSerializer.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/confluent/schemaregistry/serializers/MessageSerializer.py b/confluent/schemaregistry/serializers/MessageSerializer.py index 5aaec36..7b77af5 100644 --- a/confluent/schemaregistry/serializers/MessageSerializer.py +++ b/confluent/schemaregistry/serializers/MessageSerializer.py @@ -166,7 +166,7 @@ def decoder(p): self.id_to_decoder_func[schema_id] = decoder return self.id_to_decoder_func[schema_id] - def decode_message(self, message): + def decode_message(self, message, with_schema_id=False): """ Decode a message from kafka that has been encoded for use with the schema registry. @@ -179,4 +179,8 @@ def decode_message(self, message): if magic != MAGIC_BYTE: raise SerializerError("message does not start with magic byte") decoder_func = self._get_decoder_func(schema_id, payload) - return decoder_func(payload) + decoded = decoder_func(payload) + if with_schema_id: + return (decoded, schema_id) + else: + return decoded From ad46a279b64dda171accdbae8ddc6ab0390d5e74 Mon Sep 17 00:00:00 2001 From: afsheenb Date: Thu, 28 Apr 2016 10:01:59 -0400 Subject: [PATCH 2/3] adding lame printf debugging for schema_id error --- confluent/schemaregistry/serializers/MessageSerializer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/confluent/schemaregistry/serializers/MessageSerializer.py b/confluent/schemaregistry/serializers/MessageSerializer.py index 7b77af5..7e81e95 100644 --- a/confluent/schemaregistry/serializers/MessageSerializer.py +++ b/confluent/schemaregistry/serializers/MessageSerializer.py @@ -55,6 +55,8 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False): try: schema_id = self.registry_client.register(subject, schema) except: + message_none = "Exception, schema_id will be set to None! %s" % (schema_id) + raise SerializerError(message_none) schema_id = None if not schema_id: From d7a525cb272db2d39ee12b5dbc30c6dfc157c436 Mon Sep 17 00:00:00 2001 From: afsheenb Date: Thu, 28 Apr 2016 10:24:12 -0400 Subject: [PATCH 3/3] remove schema_id reference --- confluent/schemaregistry/serializers/MessageSerializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent/schemaregistry/serializers/MessageSerializer.py b/confluent/schemaregistry/serializers/MessageSerializer.py index 7e81e95..aee8c56 100644 --- a/confluent/schemaregistry/serializers/MessageSerializer.py +++ b/confluent/schemaregistry/serializers/MessageSerializer.py @@ -55,7 +55,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False): try: schema_id = self.registry_client.register(subject, schema) except: - message_none = "Exception, schema_id will be set to None! %s" % (schema_id) + message_none = "Exception, schema_id will be set to None!" raise SerializerError(message_none) schema_id = None