diff --git a/confluent/schemaregistry/serializers/MessageSerializer.py b/confluent/schemaregistry/serializers/MessageSerializer.py index 5aaec36..aee8c56 100644 --- a/confluent/schemaregistry/serializers/MessageSerializer.py +++ b/confluent/schemaregistry/serializers/MessageSerializer.py @@ -55,6 +55,8 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False): try: schema_id = self.registry_client.register(subject, schema) except: + message_none = "Exception, schema_id will be set to None!" + raise SerializerError(message_none) schema_id = None if not schema_id: @@ -166,7 +168,7 @@ def decoder(p): self.id_to_decoder_func[schema_id] = decoder return self.id_to_decoder_func[schema_id] - def decode_message(self, message): + def decode_message(self, message, with_schema_id=False): """ Decode a message from kafka that has been encoded for use with the schema registry. @@ -179,4 +181,8 @@ def decode_message(self, message): if magic != MAGIC_BYTE: raise SerializerError("message does not start with magic byte") decoder_func = self._get_decoder_func(schema_id, payload) - return decoder_func(payload) + decoded = decoder_func(payload) + if with_schema_id: + return (decoded, schema_id) + else: + return decoded