diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c7a85e..15eab2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.7.0] — 2022-11-17 +### Added +- Added [Kafka](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md) + support ([#197], thanks [David Martines](https://github.com/davidwmartines)) + ## [1.6.2] — 2022-10-18 ### Added - Added `get_attributes` API to the `CloudEvent` API. The method returns a read-only @@ -152,6 +157,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Initial release +[1.7.0]: https://github.com/cloudevents/sdk-python/compare/1.6.0...1.7.0 [1.6.2]: https://github.com/cloudevents/sdk-python/compare/1.6.1...1.6.2 [1.6.1]: https://github.com/cloudevents/sdk-python/compare/1.6.0...1.6.1 [1.6.0]: https://github.com/cloudevents/sdk-python/compare/1.5.0...1.6.0 @@ -218,3 +224,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#188]: https://github.com/cloudevents/sdk-python/pull/188 [#191]: https://github.com/cloudevents/sdk-python/pull/191 [#195]: https://github.com/cloudevents/sdk-python/pull/195 +[#197]: https://github.com/cloudevents/sdk-python/pull/197 diff --git a/cloudevents/__init__.py b/cloudevents/__init__.py index e74d8c0..95bd03d 100644 --- a/cloudevents/__init__.py +++ b/cloudevents/__init__.py @@ -12,4 +12,4 @@ # License for the specific language governing permissions and limitations # under the License. -__version__ = "1.6.2" +__version__ = "1.7.0" diff --git a/cloudevents/kafka/conversion.py b/cloudevents/kafka/conversion.py index 60a9f23..45e63a7 100644 --- a/cloudevents/kafka/conversion.py +++ b/cloudevents/kafka/conversion.py @@ -38,18 +38,18 @@ class KafkaMessage(typing.NamedTuple): The dictionary of message headers key/values. """ - key: typing.Optional[typing.Union[bytes, str]] + key: typing.Optional[typing.AnyStr] """ The message key. """ - value: typing.Union[bytes, str] + value: typing.AnyStr """ The message value. """ -KeyMapper = typing.Callable[[AnyCloudEvent], typing.Union[bytes, str]] +KeyMapper = typing.Callable[[AnyCloudEvent], typing.AnyStr] """ A callable function that creates a Kafka message key, given a CloudEvent instance. """ @@ -174,7 +174,7 @@ def to_structured( f"Failed to map message key with error: {type(e).__name__}('{e}')" ) - attrs = event.get_attributes().copy() + attrs: dict[str, typing.Any] = dict(event.get_attributes()) try: data = data_marshaller(event.data) @@ -208,7 +208,7 @@ def from_structured( message: KafkaMessage, event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, data_unmarshaller: typing.Optional[types.MarshallerType] = None, - envelope_unmarshaller: typing.Optional[types.MarshallerType] = None, + envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: """ Returns a CloudEvent from a KafkaMessage in structured format. @@ -232,20 +232,20 @@ def from_structured( "Failed to unmarshall message with error: " f"{type(e).__name__}('{e}')" ) - attributes = {} + attributes: dict[str, typing.Any] = {} if message.key is not None: attributes["partitionkey"] = message.key + data: typing.Optional[typing.Any] = None for name, value in structure.items(): - decoder = lambda x: x - if name == "data": - decoder = lambda v: data_unmarshaller(v) - if name == "data_base64": - decoder = lambda v: data_unmarshaller(base64.b64decode(v)) - name = "data" - try: - decoded_value = decoder(value) + if name == "data": + decoded_value = data_unmarshaller(value) + elif name == "data_base64": + decoded_value = data_unmarshaller(base64.b64decode(value)) + name = "data" + else: + decoded_value = value except Exception as e: raise cloud_exceptions.DataUnmarshallerError( "Failed to unmarshall data with error: " f"{type(e).__name__}('{e}')" diff --git a/cloudevents/tests/test_kafka_conversions.py b/cloudevents/tests/test_kafka_conversions.py index b631e55..97900ee 100644 --- a/cloudevents/tests/test_kafka_conversions.py +++ b/cloudevents/tests/test_kafka_conversions.py @@ -71,7 +71,7 @@ class KafkaConversionTestBase: return simple_serialize @pytest.fixture - def custom_unmarshaller(self) -> types.MarshallerType: + def custom_unmarshaller(self) -> types.UnmarshallerType: return simple_deserialize def test_custom_marshaller_can_talk_to_itself(