fix kafka unmarshaller args typing and defaults

Signed-off-by: Christoph Hösler <christoph.hoesler@inovex.de>
This commit is contained in:
Christoph Hösler 2024-10-24 10:27:25 +02:00
parent c6c7e8c2f9
commit 80fcc1dd44
No known key found for this signature in database
GPG Key ID: 1BB5983E090D1197
1 changed files with 12 additions and 6 deletions

View File

@ -21,9 +21,15 @@ from cloudevents.abstract import AnyCloudEvent
from cloudevents.kafka.exceptions import KeyMapperError
from cloudevents.sdk import types
DEFAULT_MARSHALLER: types.MarshallerType = json.dumps
DEFAULT_UNMARSHALLER: types.MarshallerType = json.loads
DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = lambda x: x
JSON_MARSHALLER: types.MarshallerType = json.dumps
JSON_UNMARSHALLER: types.UnmarshallerType = json.loads
IDENTITY_MARSHALLER = IDENTITY_UNMARSHALLER = lambda x: x
DEFAULT_MARSHALLER: types.MarshallerType = JSON_MARSHALLER
DEFAULT_UNMARSHALLER: types.UnmarshallerType = JSON_UNMARSHALLER
DEFAULT_EMBEDDED_DATA_MARSHALLER: types.MarshallerType = IDENTITY_MARSHALLER
DEFAULT_EMBEDDED_DATA_UNMARSHALLER: types.UnmarshallerType = IDENTITY_UNMARSHALLER
class KafkaMessage(typing.NamedTuple):
@ -109,7 +115,7 @@ def to_binary(
def from_binary(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
"""
Returns a CloudEvent from a KafkaMessage in binary format.
@ -208,7 +214,7 @@ def to_structured(
def from_structured(
message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.MarshallerType] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
"""
@ -222,7 +228,7 @@ def from_structured(
:returns: CloudEvent
"""
data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_UNMARSHALLER
envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
try:
structure = envelope_unmarshaller(message.value)