diff --git a/cloudevents/kafka/conversion.py b/cloudevents/kafka/conversion.py index 97c355f..7143362 100644 --- a/cloudevents/kafka/conversion.py +++ b/cloudevents/kafka/conversion.py @@ -21,9 +21,15 @@ from cloudevents.abstract import AnyCloudEvent from cloudevents.kafka.exceptions import KeyMapperError from cloudevents.sdk import types -DEFAULT_MARSHALLER: types.MarshallerType = json.dumps -DEFAULT_UNMARSHALLER: types.MarshallerType = json.loads -DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = lambda x: x + +JSON_MARSHALLER: types.MarshallerType = json.dumps +JSON_UNMARSHALLER: types.UnmarshallerType = json.loads +IDENTITY_MARSHALLER = IDENTITY_UNMARSHALLER = lambda x: x + +DEFAULT_MARSHALLER: types.MarshallerType = JSON_MARSHALLER +DEFAULT_UNMARSHALLER: types.UnmarshallerType = JSON_UNMARSHALLER +DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = IDENTITY_MARSHALLER +DEFAULT_EMBEDDED_DATA_UNMARSHALLER: types.UnmarshallerType = IDENTITY_UNMARSHALLER class KafkaMessage(typing.NamedTuple): @@ -109,7 +115,7 @@ def to_binary( def from_binary( message: KafkaMessage, event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, - data_unmarshaller: typing.Optional[types.MarshallerType] = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: """ Returns a CloudEvent from a KafkaMessage in binary format. @@ -208,7 +214,7 @@ def to_structured( def from_structured( message: KafkaMessage, event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, - data_unmarshaller: typing.Optional[types.MarshallerType] = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: """ @@ -222,7 +228,7 @@ def from_structured( :returns: CloudEvent """ - data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER + data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_UNMARSHALLER envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER try: structure = envelope_unmarshaller(message.value)