diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..ff6415d --- /dev/null +++ b/.coveragerc @@ -0,0 +1,7 @@ +[report] +exclude_lines = + # Have to re-enable the standard pragma + pragma: no cover + + # Don't complain if tests don't hit defensive assertion code: + raise NotImplementedError diff --git a/cloudevents/abstract/__init__.py b/cloudevents/abstract/__init__.py new file mode 100644 index 0000000..c4c7336 --- /dev/null +++ b/cloudevents/abstract/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from cloudevents.abstract.event import AnyCloudEvent, CloudEvent # noqa diff --git a/cloudevents/abstract/event.py b/cloudevents/abstract/event.py new file mode 100644 index 0000000..f6fe732 --- /dev/null +++ b/cloudevents/abstract/event.py @@ -0,0 +1,137 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import typing +from abc import abstractmethod +from typing import TypeVar + + +class CloudEvent: + """ + The CloudEvent Python wrapper contract exposing generically-available + properties and APIs. + + Implementations might handle fields and have other APIs exposed but are + obliged to follow this contract. + """ + + @classmethod + def create( + cls, + attributes: typing.Dict[str, typing.Any], + data: typing.Optional[typing.Any], + ) -> "AnyCloudEvent": + """ + Creates a new instance of the CloudEvent using supplied `attributes` + and `data`. + + This method should be preferably used over the constructor to create events + while custom framework-specific implementations may require or assume + different arguments. + + :param attributes: The attributes of the CloudEvent instance. + :param data: The payload of the CloudEvent instance. + :returns: A new instance of the CloudEvent created from the passed arguments. + """ + raise NotImplementedError() + + @abstractmethod + def _get_attributes(self) -> typing.Dict[str, typing.Any]: + """ + Returns the attributes of the event. + + The implementation MUST assume that the returned value MAY be mutated. + + Having a function over a property simplifies integration for custom + framework-specific implementations. + + :returns: Attributes of the event. + """ + raise NotImplementedError() + + @abstractmethod + def _get_data(self) -> typing.Optional[typing.Any]: + """ + Returns the data of the event. + + The implementation MUST assume that the returned value MAY be mutated. + + Having a function over a property simplifies integration for custom + framework-specific implementations. + + :returns: Data of the event. + """ + raise NotImplementedError() + + def __eq__(self, other: typing.Any) -> bool: + if isinstance(other, CloudEvent): + same_data = self._get_data() == other._get_data() + same_attributes = self._get_attributes() == other._get_attributes() + return same_data and same_attributes + return False + + def __getitem__(self, key: str) -> typing.Any: + """ + Returns a value of an attribute of the event denoted by the given `key`. + + The `data` of the event should be accessed by the `.data` accessor rather + than this mapping. + + :param key: The name of the event attribute to retrieve the value for. + :returns: The event attribute value. + """ + return self._get_attributes()[key] + + def get( + self, key: str, default: typing.Optional[typing.Any] = None + ) -> typing.Optional[typing.Any]: + """ + Retrieves an event attribute value for the given `key`. + + Returns the `default` value if the attribute for the given key does not exist. + + The implementation MUST NOT throw an error when the key does not exist, but + rather should return `None` or the configured `default`. + + :param key: The name of the event attribute to retrieve the value for. + :param default: The default value to be returned when + no attribute with the given key exists. + :returns: The event attribute value if exists, default value or None otherwise. + """ + return self._get_attributes().get(key, default) + + def __iter__(self) -> typing.Iterator[typing.Any]: + """ + Returns an iterator over the event attributes. + """ + return iter(self._get_attributes()) + + def __len__(self) -> int: + """ + Returns the number of the event attributes. + """ + return len(self._get_attributes()) + + def __contains__(self, key: str) -> bool: + """ + Determines if an attribute with a given `key` is present + in the event attributes. + """ + return key in self._get_attributes() + + def __repr__(self) -> str: + return str({"attributes": self._get_attributes(), "data": self._get_data()}) + + +AnyCloudEvent = TypeVar("AnyCloudEvent", bound=CloudEvent) diff --git a/cloudevents/conversion.py b/cloudevents/conversion.py new file mode 100644 index 0000000..b5f9eb9 --- /dev/null +++ b/cloudevents/conversion.py @@ -0,0 +1,224 @@ +# Copyright 2018-Present The CloudEvents Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json +import typing + +from cloudevents import exceptions as cloud_exceptions +from cloudevents.abstract import AnyCloudEvent +from cloudevents.http import is_binary +from cloudevents.http.mappings import _marshaller_by_format, _obj_by_version +from cloudevents.http.util import _json_or_string +from cloudevents.sdk import converters, marshaller, types + + +def to_json( + event: AnyCloudEvent, + data_marshaller: types.MarshallerType = None, +) -> typing.Union[str, bytes]: + """ + Converts given `event` to a JSON string. + + :param event: A CloudEvent to be converted into a JSON string. + :param data_marshaller: Callable function which will cast `event.data` + into a JSON string. + :returns: A JSON string representing the given event. + """ + return to_structured(event, data_marshaller=data_marshaller)[1] + + +def from_json( + event_type: typing.Type[AnyCloudEvent], + data: typing.Union[str, bytes], + data_unmarshaller: types.UnmarshallerType = None, +) -> AnyCloudEvent: + """ + Parses JSON string `data` into a CloudEvent. + + :param data: JSON string representation of a CloudEvent. + :param data_unmarshaller: Callable function that casts `data` to a + Python object. + :param event_type: A concrete type of the event into which the data is + deserialized. + :returns: A CloudEvent parsed from the given JSON representation. + """ + return from_http( + headers={}, + data=data, + data_unmarshaller=data_unmarshaller, + event_type=event_type, + ) + + +def from_http( + event_type: typing.Type[AnyCloudEvent], + headers: typing.Dict[str, str], + data: typing.Union[str, bytes, None], + data_unmarshaller: types.UnmarshallerType = None, +) -> AnyCloudEvent: + """ + Parses CloudEvent `data` and `headers` into an instance of a given `event_type`. + + The method supports both binary and structured representations. + + :param headers: The HTTP request headers. + :param data: The HTTP request body. If set to None, "" or b'', the returned + event's `data` field will be set to None. + :param data_unmarshaller: Callable function to map data to a python object + e.g. lambda x: x or lambda x: json.loads(x) + :param event_type: The actual type of CloudEvent to deserialize the event to. + :returns: A CloudEvent instance parsed from the passed HTTP parameters of + the specified type. + """ + if data is None or data == b"": + # Empty string will cause data to be marshalled into None + data = "" + + if not isinstance(data, (str, bytes, bytearray)): + raise cloud_exceptions.InvalidStructuredJSON( + "Expected json of type (str, bytes, bytearray), " + f"but instead found type {type(data)}" + ) + + headers = {key.lower(): value for key, value in headers.items()} + if data_unmarshaller is None: + data_unmarshaller = _json_or_string + + marshall = marshaller.NewDefaultHTTPMarshaller() + + if is_binary(headers): + specversion = headers.get("ce-specversion", None) + else: + try: + raw_ce = json.loads(data) + except json.decoder.JSONDecodeError: + raise cloud_exceptions.MissingRequiredFields( + "Failed to read specversion from both headers and data. " + f"The following can not be parsed as json: {data}" + ) + if hasattr(raw_ce, "get"): + specversion = raw_ce.get("specversion", None) + else: + raise cloud_exceptions.MissingRequiredFields( + "Failed to read specversion from both headers and data. " + f"The following deserialized data has no 'get' method: {raw_ce}" + ) + + if specversion is None: + raise cloud_exceptions.MissingRequiredFields( + "Failed to find specversion in HTTP request" + ) + + event_handler = _obj_by_version.get(specversion, None) + + if event_handler is None: + raise cloud_exceptions.InvalidRequiredFields( + f"Found invalid specversion {specversion}" + ) + + event = marshall.FromRequest( + event_handler(), headers, data, data_unmarshaller=data_unmarshaller + ) + attrs = event.Properties() + attrs.pop("data", None) + attrs.pop("extensions", None) + attrs.update(**event.extensions) + + if event.data == "" or event.data == b"": + # TODO: Check binary unmarshallers to debug why setting data to "" + # returns an event with data set to None, but structured will return "" + data = None + else: + data = event.data + return event_type.create(attrs, data) + + +def _to_http( + event: AnyCloudEvent, + format: str = converters.TypeStructured, + data_marshaller: types.MarshallerType = None, +) -> typing.Tuple[dict, typing.Union[bytes, str]]: + """ + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. + + :param format: The encoding format of the event. + :param data_marshaller: Callable function that casts event.data into + either a string or bytes. + :returns: (http_headers: dict, http_body: bytes or str) + """ + if data_marshaller is None: + data_marshaller = _marshaller_by_format[format] + + if event["specversion"] not in _obj_by_version: + raise cloud_exceptions.InvalidRequiredFields( + f"Unsupported specversion: {event['specversion']}" + ) + + event_handler = _obj_by_version[event["specversion"]]() + for attribute_name in event: + event_handler.Set(attribute_name, event[attribute_name]) + event_handler.data = event.data + + return marshaller.NewDefaultHTTPMarshaller().ToRequest( + event_handler, format, data_marshaller=data_marshaller + ) + + +def to_structured( + event: AnyCloudEvent, + data_marshaller: types.MarshallerType = None, +) -> typing.Tuple[dict, typing.Union[bytes, str]]: + """ + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. + + If event.data is a byte object, body will have a `data_base64` field instead of + `data`. + + :param event: The event to be converted. + :param data_marshaller: Callable function to cast event.data into + either a string or bytes + :returns: (http_headers: dict, http_body: bytes or str) + """ + return _to_http(event=event, data_marshaller=data_marshaller) + + +def to_binary( + event: AnyCloudEvent, data_marshaller: types.MarshallerType = None +) -> typing.Tuple[dict, typing.Union[bytes, str]]: + """ + Returns a tuple of HTTP headers/body dicts representing this Cloud Event. + + Uses Binary conversion format. + + :param event: The event to be converted. + :param data_marshaller: Callable function to cast event.data into + either a string or bytes. + :returns: (http_headers: dict, http_body: bytes or str) + """ + return _to_http( + event=event, + format=converters.TypeBinary, + data_marshaller=data_marshaller, + ) diff --git a/cloudevents/http/event.py b/cloudevents/http/event.py index ee78cff..d14f9fc 100644 --- a/cloudevents/http/event.py +++ b/cloudevents/http/event.py @@ -17,15 +17,22 @@ import typing import uuid import cloudevents.exceptions as cloud_exceptions +from cloudevents import abstract from cloudevents.http.mappings import _required_by_version -class CloudEvent: +class CloudEvent(abstract.CloudEvent): """ Python-friendly cloudevent class supporting v1 events Supports both binary and structured mode CloudEvents """ + @classmethod + def create( + cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] + ) -> "CloudEvent": + return cls(attributes, data) + def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None): """ Event Constructor @@ -67,46 +74,14 @@ class CloudEvent: f"Missing required keys: {required_set - self._attributes.keys()}" ) - def __eq__(self, other: typing.Any) -> bool: - if isinstance(other, CloudEvent): - return self.data == other.data and self._attributes == other._attributes - return False + def _get_attributes(self) -> typing.Dict[str, typing.Any]: + return self._attributes - # Data access is handled via `.data` member - # Attribute access is managed via Mapping type - def __getitem__(self, key: str) -> typing.Any: - return self._attributes[key] - - def get( - self, key: str, default: typing.Optional[typing.Any] = None - ) -> typing.Optional[typing.Any]: - """ - Retrieves an event attribute value for the given key. - Returns the default value if not attribute for the given key exists. - - MUST NOT throw an exception when the key does not exist. - - :param key: The event attribute name. - :param default: The default value to be returned when - no attribute with the given key exists. - :returns: The event attribute value if exists, default value otherwise. - """ - return self._attributes.get(key, default) + def _get_data(self) -> typing.Optional[typing.Any]: + return self.data def __setitem__(self, key: str, value: typing.Any) -> None: self._attributes[key] = value def __delitem__(self, key: str) -> None: del self._attributes[key] - - def __iter__(self) -> typing.Iterator[typing.Any]: - return iter(self._attributes) - - def __len__(self) -> int: - return len(self._attributes) - - def __contains__(self, key: str) -> bool: - return key in self._attributes - - def __repr__(self) -> str: - return str({"attributes": self._attributes, "data": self.data}) diff --git a/cloudevents/http/http_methods.py b/cloudevents/http/http_methods.py index e403611..61fc1ab 100644 --- a/cloudevents/http/http_methods.py +++ b/cloudevents/http/http_methods.py @@ -12,24 +12,21 @@ # License for the specific language governing permissions and limitations # under the License. -import json import typing from deprecation import deprecated -import cloudevents.exceptions as cloud_exceptions +from cloudevents.conversion import from_http as _abstract_from_http +from cloudevents.conversion import to_binary, to_structured from cloudevents.http.event import CloudEvent -from cloudevents.http.event_type import is_binary -from cloudevents.http.mappings import _marshaller_by_format, _obj_by_version -from cloudevents.http.util import _json_or_string -from cloudevents.sdk import converters, marshaller, types +from cloudevents.sdk import types def from_http( headers: typing.Dict[str, str], data: typing.Union[str, bytes, None], data_unmarshaller: types.UnmarshallerType = None, -): +) -> CloudEvent: """ Unwrap a CloudEvent (binary or structured) from an HTTP request. :param headers: the HTTP headers @@ -41,138 +38,13 @@ def from_http( e.g. lambda x: x or lambda x: json.loads(x) :type data_unmarshaller: types.UnmarshallerType """ - if data is None or data == b"": - # Empty string will cause data to be marshalled into None - data = "" - - if not isinstance(data, (str, bytes, bytearray)): - raise cloud_exceptions.InvalidStructuredJSON( - "Expected json of type (str, bytes, bytearray), " - f"but instead found type {type(data)}" - ) - - headers = {key.lower(): value for key, value in headers.items()} - if data_unmarshaller is None: - data_unmarshaller = _json_or_string - - marshall = marshaller.NewDefaultHTTPMarshaller() - - if is_binary(headers): - specversion = headers.get("ce-specversion", None) - else: - try: - raw_ce = json.loads(data) - except json.decoder.JSONDecodeError: - raise cloud_exceptions.MissingRequiredFields( - "Failed to read specversion from both headers and data. " - f"The following can not be parsed as json: {data}" - ) - if hasattr(raw_ce, "get"): - specversion = raw_ce.get("specversion", None) - else: - raise cloud_exceptions.MissingRequiredFields( - "Failed to read specversion from both headers and data. " - f"The following deserialized data has no 'get' method: {raw_ce}" - ) - - if specversion is None: - raise cloud_exceptions.MissingRequiredFields( - "Failed to find specversion in HTTP request" - ) - - event_handler = _obj_by_version.get(specversion, None) - - if event_handler is None: - raise cloud_exceptions.InvalidRequiredFields( - f"Found invalid specversion {specversion}" - ) - - event = marshall.FromRequest( - event_handler(), headers, data, data_unmarshaller=data_unmarshaller - ) - attrs = event.Properties() - attrs.pop("data", None) - attrs.pop("extensions", None) - attrs.update(**event.extensions) - - if event.data == "" or event.data == b"": - # TODO: Check binary unmarshallers to debug why setting data to "" - # returns an event with data set to None, but structured will return "" - data = None - else: - data = event.data - return CloudEvent(attrs, data) + return _abstract_from_http(CloudEvent, headers, data, data_unmarshaller) -def _to_http( - event: CloudEvent, - format: str = converters.TypeStructured, - data_marshaller: types.MarshallerType = None, -) -> typing.Tuple[dict, typing.Union[bytes, str]]: - """ - Returns a tuple of HTTP headers/body dicts representing this cloudevent - - :param format: constant specifying an encoding format - :type format: str - :param data_marshaller: Callable function to cast event.data into - either a string or bytes - :type data_marshaller: types.MarshallerType - :returns: (http_headers: dict, http_body: bytes or str) - """ - if data_marshaller is None: - data_marshaller = _marshaller_by_format[format] - - if event._attributes["specversion"] not in _obj_by_version: - raise cloud_exceptions.InvalidRequiredFields( - f"Unsupported specversion: {event._attributes['specversion']}" - ) - - event_handler = _obj_by_version[event._attributes["specversion"]]() - for k, v in event._attributes.items(): - event_handler.Set(k, v) - event_handler.data = event.data - - return marshaller.NewDefaultHTTPMarshaller().ToRequest( - event_handler, format, data_marshaller=data_marshaller - ) - - -def to_structured( - event: CloudEvent, data_marshaller: types.MarshallerType = None -) -> typing.Tuple[dict, typing.Union[bytes, str]]: - """ - Returns a tuple of HTTP headers/body dicts representing this cloudevent. If - event.data is a byte object, body will have a data_base64 field instead of - data. - - :param event: CloudEvent to cast into http data - :type event: CloudEvent - :param data_marshaller: Callable function to cast event.data into - either a string or bytes - :type data_marshaller: types.MarshallerType - :returns: (http_headers: dict, http_body: bytes or str) - """ - return _to_http(event=event, data_marshaller=data_marshaller) - - -def to_binary( - event: CloudEvent, data_marshaller: types.MarshallerType = None -) -> typing.Tuple[dict, typing.Union[bytes, str]]: - """ - Returns a tuple of HTTP headers/body dicts representing this cloudevent - - :param event: CloudEvent to cast into http data - :type event: CloudEvent - :param data_marshaller: Callable function to cast event.data into - either a string or bytes - :type data_marshaller: types.UnmarshallerType - :returns: (http_headers: dict, http_body: bytes or str) - """ - return _to_http( - event=event, - format=converters.TypeBinary, - data_marshaller=data_marshaller, - ) +# backwards compatibility +to_binary = to_binary +# backwards compatibility +to_structured = to_structured @deprecated(deprecated_in="1.0.2", details="Use to_binary function instead") diff --git a/cloudevents/http/json_methods.py b/cloudevents/http/json_methods.py index 728516e..1f04431 100644 --- a/cloudevents/http/json_methods.py +++ b/cloudevents/http/json_methods.py @@ -14,26 +14,12 @@ import typing +from cloudevents.conversion import from_json as _abstract_from_json +from cloudevents.conversion import to_json from cloudevents.http.event import CloudEvent -from cloudevents.http.http_methods import from_http, to_structured from cloudevents.sdk import types -def to_json( - event: CloudEvent, data_marshaller: types.MarshallerType = None -) -> typing.Union[str, bytes]: - """ - Cast an CloudEvent into a json object - :param event: CloudEvent which will be converted into a json object - :type event: CloudEvent - :param data_marshaller: Callable function which will cast event.data - into a json object - :type data_marshaller: typing.Callable - :returns: json object representing the given event - """ - return to_structured(event, data_marshaller=data_marshaller)[1] - - def from_json( data: typing.Union[str, bytes], data_unmarshaller: types.UnmarshallerType = None, @@ -41,10 +27,13 @@ def from_json( """ Cast json encoded data into an CloudEvent :param data: json encoded cloudevent data - :type event: typing.Union[str, bytes] :param data_unmarshaller: Callable function which will cast data to a python object :type data_unmarshaller: typing.Callable :returns: CloudEvent representing given cloudevent json object """ - return from_http(headers={}, data=data, data_unmarshaller=data_unmarshaller) + return _abstract_from_json(CloudEvent, data, data_unmarshaller) + + +# backwards compatibility +to_json = to_json