refactor: create abstract cloudevent package

Signed-off-by: Alexander Tkachev <sasha64sasha@gmail.com>
This commit is contained in:
Alexander Tkachev 2022-07-23 00:17:07 +03:00
parent c747f59a29
commit 6588577ffc
7 changed files with 246 additions and 160 deletions

View File

@ -11,3 +11,5 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.abstract.event import CloudEvent, AnyCloudEvent # noqa

View File

@ -3,13 +3,13 @@ import typing
from typing import TypeVar
class AbstractCloudEvent(abc.ABC):
class CloudEvent(abc.ABC):
@classmethod
def create(
cls,
attributes: typing.Dict[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "AbstractCloudEvent":
) -> "CloudEvent":
raise NotImplementedError()
@property
@ -31,7 +31,7 @@ class AbstractCloudEvent(abc.ABC):
raise NotImplementedError()
def __eq__(self, other: typing.Any) -> bool:
if isinstance(other, AbstractCloudEvent):
if isinstance(other, CloudEvent):
return (
self.data == other.data
and self._attributes_read_model == other._attributes_read_model
@ -72,4 +72,4 @@ class AbstractCloudEvent(abc.ABC):
return str({"attributes": self._attributes_read_model, "data": self.data})
AnyCloudEvent = TypeVar("AnyCloudEvent", bound=AbstractCloudEvent)
AnyCloudEvent = TypeVar("AnyCloudEvent", bound=CloudEvent)

View File

@ -0,0 +1,166 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import typing
import cloudevents.exceptions as cloud_exceptions
from cloudevents.abstract.event import AnyCloudEvent
from cloudevents.http.event_type import is_binary
from cloudevents.http.mappings import _marshaller_by_format, _obj_by_version
from cloudevents.http.util import _json_or_string
from cloudevents.sdk import converters, marshaller, types
def from_http(
event_type: typing.Type[AnyCloudEvent],
headers: typing.Dict[str, str],
data: typing.Union[str, bytes, None],
data_unmarshaller: types.UnmarshallerType = None,
) -> AnyCloudEvent:
if data is None or data == b"":
# Empty string will cause data to be marshalled into None
data = ""
if not isinstance(data, (str, bytes, bytearray)):
raise cloud_exceptions.InvalidStructuredJSON(
"Expected json of type (str, bytes, bytearray), "
f"but instead found type {type(data)}"
)
headers = {key.lower(): value for key, value in headers.items()}
if data_unmarshaller is None:
data_unmarshaller = _json_or_string
marshall = marshaller.NewDefaultHTTPMarshaller()
if is_binary(headers):
specversion = headers.get("ce-specversion", None)
else:
try:
raw_ce = json.loads(data)
except json.decoder.JSONDecodeError:
raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. "
f"The following can not be parsed as json: {data}"
)
if hasattr(raw_ce, "get"):
specversion = raw_ce.get("specversion", None)
else:
raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. "
f"The following deserialized data has no 'get' method: {raw_ce}"
)
if specversion is None:
raise cloud_exceptions.MissingRequiredFields(
"Failed to find specversion in HTTP request"
)
event_handler = _obj_by_version.get(specversion, None)
if event_handler is None:
raise cloud_exceptions.InvalidRequiredFields(
f"Found invalid specversion {specversion}"
)
event = marshall.FromRequest(
event_handler(), headers, data, data_unmarshaller=data_unmarshaller
)
attrs = event.Properties()
attrs.pop("data", None)
attrs.pop("extensions", None)
attrs.update(**event.extensions)
if event.data == "" or event.data == b"":
# TODO: Check binary unmarshallers to debug why setting data to ""
# returns an event with data set to None, but structured will return ""
data = None
else:
data = event.data
return event_type.create(attrs, data)
def _to_http(
event: AnyCloudEvent,
format: str = converters.TypeStructured,
data_marshaller: types.MarshallerType = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent
:param format: constant specifying an encoding format
:type format: str
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.MarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
if data_marshaller is None:
data_marshaller = _marshaller_by_format[format]
if event["specversion"] not in _obj_by_version:
raise cloud_exceptions.InvalidRequiredFields(
f"Unsupported specversion: {event['specversion']}"
)
event_handler = _obj_by_version[event["specversion"]]()
for attribute_name in event:
event_handler.Set(attribute_name, event[attribute_name])
event_handler.data = event.data
return marshaller.NewDefaultHTTPMarshaller().ToRequest(
event_handler, format, data_marshaller=data_marshaller
)
def to_structured(
event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent. If
event.data is a byte object, body will have a data_base64 field instead of
data.
:param event: CloudEvent to cast into http data
:type event: CloudEvent
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.MarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
return _to_http(event=event, data_marshaller=data_marshaller)
def to_binary(
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent
:param event: CloudEvent to cast into http data
:type event: CloudEvent
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.UnmarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
return _to_http(
event=event,
format=converters.TypeBinary,
data_marshaller=data_marshaller,
)

View File

@ -0,0 +1,54 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import typing
from cloudevents.abstract import AnyCloudEvent
from cloudevents.sdk import types
from cloudevents.abstract.http_methods import to_structured, from_http
def to_json(
event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None,
) -> typing.Union[str, bytes]:
"""
Cast an CloudEvent into a json object
:param event: CloudEvent which will be converted into a json object
:type event: CloudEvent
:param data_marshaller: Callable function which will cast event.data
into a json object
:type data_marshaller: typing.Callable
:returns: json object representing the given event
"""
return to_structured(event, data_marshaller=data_marshaller)[1]
def from_json(
event_type: typing.Type[AnyCloudEvent],
data: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType = None,
) -> AnyCloudEvent:
"""
Cast json encoded data into an CloudEvent
:param event_type: Concrete event type to which deserialize the json event
:param data: json encoded cloudevent data
:param data_unmarshaller: Callable function which will cast data to a
python object
:type data_unmarshaller: typing.Callable
:returns: CloudEvent representing given cloudevent json object
"""
return from_http(
event_type, headers={}, data=data, data_unmarshaller=data_unmarshaller
)

View File

@ -18,15 +18,21 @@ import uuid
import cloudevents.exceptions as cloud_exceptions
from cloudevents.http.mappings import _required_by_version
from cloudevents.sdk.event import abstract_event
from cloudevents import abstract
class CloudEvent(abstract_event.AbstractCloudEvent):
class CloudEvent(abstract.CloudEvent):
"""
Python-friendly cloudevent class supporting v1 events
Supports both binary and structured mode CloudEvents
"""
@classmethod
def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
) -> "CloudEvent":
return cls(attributes, data)
def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None):
"""
Event Constructor

View File

@ -17,12 +17,14 @@ import typing
from deprecation import deprecated
import cloudevents.exceptions as cloud_exceptions
from cloudevents.http.event import CloudEvent
from cloudevents.http.event_type import is_binary
from cloudevents.http.mappings import _marshaller_by_format, _obj_by_version
from cloudevents.http.util import _json_or_string
from cloudevents.sdk import converters, marshaller, types
from cloudevents.sdk import types
# backwards compatability imports
from cloudevents.abstract.http_methods import to_binary, to_structured # noqa
from cloudevents.abstract.http_methods import from_http as _abstract_from_http
def from_http(
@ -41,138 +43,7 @@ def from_http(
e.g. lambda x: x or lambda x: json.loads(x)
:type data_unmarshaller: types.UnmarshallerType
"""
if data is None or data == b"":
# Empty string will cause data to be marshalled into None
data = ""
if not isinstance(data, (str, bytes, bytearray)):
raise cloud_exceptions.InvalidStructuredJSON(
"Expected json of type (str, bytes, bytearray), "
f"but instead found type {type(data)}"
)
headers = {key.lower(): value for key, value in headers.items()}
if data_unmarshaller is None:
data_unmarshaller = _json_or_string
marshall = marshaller.NewDefaultHTTPMarshaller()
if is_binary(headers):
specversion = headers.get("ce-specversion", None)
else:
try:
raw_ce = json.loads(data)
except json.decoder.JSONDecodeError:
raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. "
f"The following can not be parsed as json: {data}"
)
if hasattr(raw_ce, "get"):
specversion = raw_ce.get("specversion", None)
else:
raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. "
f"The following deserialized data has no 'get' method: {raw_ce}"
)
if specversion is None:
raise cloud_exceptions.MissingRequiredFields(
"Failed to find specversion in HTTP request"
)
event_handler = _obj_by_version.get(specversion, None)
if event_handler is None:
raise cloud_exceptions.InvalidRequiredFields(
f"Found invalid specversion {specversion}"
)
event = marshall.FromRequest(
event_handler(), headers, data, data_unmarshaller=data_unmarshaller
)
attrs = event.Properties()
attrs.pop("data", None)
attrs.pop("extensions", None)
attrs.update(**event.extensions)
if event.data == "" or event.data == b"":
# TODO: Check binary unmarshallers to debug why setting data to ""
# returns an event with data set to None, but structured will return ""
data = None
else:
data = event.data
return CloudEvent(attrs, data)
def _to_http(
event: CloudEvent,
format: str = converters.TypeStructured,
data_marshaller: types.MarshallerType = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent
:param format: constant specifying an encoding format
:type format: str
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.MarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
if data_marshaller is None:
data_marshaller = _marshaller_by_format[format]
if event._attributes["specversion"] not in _obj_by_version:
raise cloud_exceptions.InvalidRequiredFields(
f"Unsupported specversion: {event._attributes['specversion']}"
)
event_handler = _obj_by_version[event._attributes["specversion"]]()
for k, v in event._attributes.items():
event_handler.Set(k, v)
event_handler.data = event.data
return marshaller.NewDefaultHTTPMarshaller().ToRequest(
event_handler, format, data_marshaller=data_marshaller
)
def to_structured(
event: CloudEvent, data_marshaller: types.MarshallerType = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent. If
event.data is a byte object, body will have a data_base64 field instead of
data.
:param event: CloudEvent to cast into http data
:type event: CloudEvent
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.MarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
return _to_http(event=event, data_marshaller=data_marshaller)
def to_binary(
event: CloudEvent, data_marshaller: types.MarshallerType = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]:
"""
Returns a tuple of HTTP headers/body dicts representing this cloudevent
:param event: CloudEvent to cast into http data
:type event: CloudEvent
:param data_marshaller: Callable function to cast event.data into
either a string or bytes
:type data_marshaller: types.UnmarshallerType
:returns: (http_headers: dict, http_body: bytes or str)
"""
return _to_http(
event=event,
format=converters.TypeBinary,
data_marshaller=data_marshaller,
)
return _abstract_from_http(CloudEvent, headers, data, data_unmarshaller)
@deprecated(deprecated_in="1.0.2", details="Use to_binary function instead")

View File

@ -15,23 +15,11 @@
import typing
from cloudevents.http.event import CloudEvent
from cloudevents.http.http_methods import from_http, to_structured
from cloudevents.sdk import types
def to_json(
event: CloudEvent, data_marshaller: types.MarshallerType = None
) -> typing.Union[str, bytes]:
"""
Cast an CloudEvent into a json object
:param event: CloudEvent which will be converted into a json object
:type event: CloudEvent
:param data_marshaller: Callable function which will cast event.data
into a json object
:type data_marshaller: typing.Callable
:returns: json object representing the given event
"""
return to_structured(event, data_marshaller=data_marshaller)[1]
# backwards compatibility
from cloudevents.abstract.json_methods import to_json # noqa
from cloudevents.abstract.json_methods import from_json as _abstract_from_json
def from_json(
@ -41,10 +29,9 @@ def from_json(
"""
Cast json encoded data into an CloudEvent
:param data: json encoded cloudevent data
:type event: typing.Union[str, bytes]
:param data_unmarshaller: Callable function which will cast data to a
python object
:type data_unmarshaller: typing.Callable
:returns: CloudEvent representing given cloudevent json object
"""
return from_http(headers={}, data=data, data_unmarshaller=data_unmarshaller)
return _abstract_from_json(CloudEvent, data, data_unmarshaller)