Introduce typings (#207)

* chore: Add pre-commit hook

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: address typing issues

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: add py.typed meta

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* Add Pydantic plugin

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* Add Pydantic dependency

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* Add MyPy best practices configs

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* Add deprecation MyPy ignore

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: more typing fixes

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: more typings and explicit optionals

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* Use lowest-supported Python version

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: Fix silly `dict` and other MyPy-related issues.

We're now explicitly ensuring codebase supports Python3.7+

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: ignore typing limitation

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: `not` with `dict` returns `false` for an empty dict, so use `is None` check

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* deps: Update hooks

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: Make sure only non-callable unmarshallers are flagged

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* chore: Have some coverage slack

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* deps: bump pre-commit-hooks

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* ci: make sure py.typed is included into the bundle

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

* docs: improve setup.py setup and add missing package metadata

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>

Signed-off-by: Yurii Serhiichuk <savik.ne@gmail.com>
This commit is contained in:
Yurii Serhiichuk 2023-01-04 17:29:41 +02:00 committed by GitHub
parent a02864eaab
commit 5e00c4f41f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 446 additions and 339 deletions

View File

@ -1,17 +1,27 @@
repos: repos:
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0 rev: v4.4.0
hooks: hooks:
- id: trailing-whitespace - id: trailing-whitespace
- id: end-of-file-fixer - id: end-of-file-fixer
- id: check-toml - id: check-toml
- repo: https://github.com/pycqa/isort - repo: https://github.com/pycqa/isort
rev: 5.10.1 rev: 5.11.4
hooks: hooks:
- id: isort - id: isort
args: [ "--profile", "black", "--filter-files" ] args: [ "--profile", "black", "--filter-files" ]
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 22.10.0 rev: 22.12.0
hooks: hooks:
- id: black - id: black
language_version: python3.10 language_version: python3.10
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v0.991"
hooks:
- id: mypy
files: ^(cloudevents/)
exclude: ^(cloudevents/tests/)
types: [ python ]
args: [ ]
additional_dependencies:
- 'pydantic'

4
MANIFEST.in Normal file
View File

@ -0,0 +1,4 @@
include README.md
include CHANGELOG.md
include LICENSE
include cloudevents/py.typed

View File

@ -14,4 +14,4 @@
from cloudevents.abstract.event import AnyCloudEvent, CloudEvent from cloudevents.abstract.event import AnyCloudEvent, CloudEvent
__all__ = [AnyCloudEvent, CloudEvent] __all__ = ["AnyCloudEvent", "CloudEvent"]

View File

@ -17,6 +17,8 @@ from abc import abstractmethod
from types import MappingProxyType from types import MappingProxyType
from typing import Mapping from typing import Mapping
AnyCloudEvent = typing.TypeVar("AnyCloudEvent", bound="CloudEvent")
class CloudEvent: class CloudEvent:
""" """
@ -29,10 +31,10 @@ class CloudEvent:
@classmethod @classmethod
def create( def create(
cls, cls: typing.Type[AnyCloudEvent],
attributes: typing.Dict[str, typing.Any], attributes: typing.Dict[str, typing.Any],
data: typing.Optional[typing.Any], data: typing.Optional[typing.Any],
) -> "AnyCloudEvent": ) -> AnyCloudEvent:
""" """
Creates a new instance of the CloudEvent using supplied `attributes` Creates a new instance of the CloudEvent using supplied `attributes`
and `data`. and `data`.
@ -70,7 +72,7 @@ class CloudEvent:
raise NotImplementedError() raise NotImplementedError()
@abstractmethod @abstractmethod
def _get_data(self) -> typing.Optional[typing.Any]: def get_data(self) -> typing.Optional[typing.Any]:
""" """
Returns the data of the event. Returns the data of the event.
@ -85,7 +87,7 @@ class CloudEvent:
def __eq__(self, other: typing.Any) -> bool: def __eq__(self, other: typing.Any) -> bool:
if isinstance(other, CloudEvent): if isinstance(other, CloudEvent):
same_data = self._get_data() == other._get_data() same_data = self.get_data() == other.get_data()
same_attributes = self._get_attributes() == other._get_attributes() same_attributes = self._get_attributes() == other._get_attributes()
return same_data and same_attributes return same_data and same_attributes
return False return False
@ -140,7 +142,4 @@ class CloudEvent:
return key in self._get_attributes() return key in self._get_attributes()
def __repr__(self) -> str: def __repr__(self) -> str:
return str({"attributes": self._get_attributes(), "data": self._get_data()}) return str({"attributes": self._get_attributes(), "data": self.get_data()})
AnyCloudEvent = typing.TypeVar("AnyCloudEvent", bound=CloudEvent)

View File

@ -23,7 +23,7 @@ from cloudevents.sdk.converters import is_binary
from cloudevents.sdk.event import v1, v03 from cloudevents.sdk.event import v1, v03
def _best_effort_serialize_to_json( def _best_effort_serialize_to_json( # type: ignore[no-untyped-def]
value: typing.Any, *args, **kwargs value: typing.Any, *args, **kwargs
) -> typing.Optional[typing.Union[bytes, str, typing.Any]]: ) -> typing.Optional[typing.Union[bytes, str, typing.Any]]:
""" """
@ -43,18 +43,18 @@ def _best_effort_serialize_to_json(
return value return value
_default_marshaller_by_format = { _default_marshaller_by_format: typing.Dict[str, types.MarshallerType] = {
converters.TypeStructured: lambda x: x, converters.TypeStructured: lambda x: x,
converters.TypeBinary: _best_effort_serialize_to_json, converters.TypeBinary: _best_effort_serialize_to_json,
} # type: typing.Dict[str, types.MarshallerType] }
_obj_by_version = {"1.0": v1.Event, "0.3": v03.Event} _obj_by_version = {"1.0": v1.Event, "0.3": v03.Event}
def to_json( def to_json(
event: AnyCloudEvent, event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> typing.Union[str, bytes]: ) -> bytes:
""" """
Converts given `event` to a JSON string. Converts given `event` to a JSON string.
@ -69,7 +69,7 @@ def to_json(
def from_json( def from_json(
event_type: typing.Type[AnyCloudEvent], event_type: typing.Type[AnyCloudEvent],
data: typing.Union[str, bytes], data: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent: ) -> AnyCloudEvent:
""" """
Parses JSON string `data` into a CloudEvent. Parses JSON string `data` into a CloudEvent.
@ -91,9 +91,9 @@ def from_json(
def from_http( def from_http(
event_type: typing.Type[AnyCloudEvent], event_type: typing.Type[AnyCloudEvent],
headers: typing.Dict[str, str], headers: typing.Mapping[str, str],
data: typing.Union[str, bytes, None], data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent: ) -> AnyCloudEvent:
""" """
Parses CloudEvent `data` and `headers` into an instance of a given `event_type`. Parses CloudEvent `data` and `headers` into an instance of a given `event_type`.
@ -133,14 +133,14 @@ def from_http(
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
raise cloud_exceptions.MissingRequiredFields( raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. " "Failed to read specversion from both headers and data. "
f"The following can not be parsed as json: {data}" "The following can not be parsed as json: {!r}".format(data)
) )
if hasattr(raw_ce, "get"): if hasattr(raw_ce, "get"):
specversion = raw_ce.get("specversion", None) specversion = raw_ce.get("specversion", None)
else: else:
raise cloud_exceptions.MissingRequiredFields( raise cloud_exceptions.MissingRequiredFields(
"Failed to read specversion from both headers and data. " "Failed to read specversion from both headers and data. "
f"The following deserialized data has no 'get' method: {raw_ce}" "The following deserialized data has no 'get' method: {}".format(raw_ce)
) )
if specversion is None: if specversion is None:
@ -152,7 +152,7 @@ def from_http(
if event_handler is None: if event_handler is None:
raise cloud_exceptions.InvalidRequiredFields( raise cloud_exceptions.InvalidRequiredFields(
f"Found invalid specversion {specversion}" "Found invalid specversion {}".format(specversion)
) )
event = marshall.FromRequest( event = marshall.FromRequest(
@ -163,20 +163,19 @@ def from_http(
attrs.pop("extensions", None) attrs.pop("extensions", None)
attrs.update(**event.extensions) attrs.update(**event.extensions)
result_data: typing.Optional[typing.Any] = event.data
if event.data == "" or event.data == b"": if event.data == "" or event.data == b"":
# TODO: Check binary unmarshallers to debug why setting data to "" # TODO: Check binary unmarshallers to debug why setting data to ""
# returns an event with data set to None, but structured will return "" # returns an event with data set to None, but structured will return ""
data = None result_data = None
else: return event_type.create(attrs, result_data)
data = event.data
return event_type.create(attrs, data)
def _to_http( def _to_http(
event: AnyCloudEvent, event: AnyCloudEvent,
format: str = converters.TypeStructured, format: str = converters.TypeStructured,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
""" """
Returns a tuple of HTTP headers/body dicts representing this Cloud Event. Returns a tuple of HTTP headers/body dicts representing this Cloud Event.
@ -196,7 +195,7 @@ def _to_http(
event_handler = _obj_by_version[event["specversion"]]() event_handler = _obj_by_version[event["specversion"]]()
for attribute_name in event: for attribute_name in event:
event_handler.Set(attribute_name, event[attribute_name]) event_handler.Set(attribute_name, event[attribute_name])
event_handler.data = event.data event_handler.data = event.get_data()
return marshaller.NewDefaultHTTPMarshaller().ToRequest( return marshaller.NewDefaultHTTPMarshaller().ToRequest(
event_handler, format, data_marshaller=data_marshaller event_handler, format, data_marshaller=data_marshaller
@ -205,8 +204,8 @@ def _to_http(
def to_structured( def to_structured(
event: AnyCloudEvent, event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
""" """
Returns a tuple of HTTP headers/body dicts representing this Cloud Event. Returns a tuple of HTTP headers/body dicts representing this Cloud Event.
@ -222,8 +221,8 @@ def to_structured(
def to_binary( def to_binary(
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None event: AnyCloudEvent, data_marshaller: typing.Optional[types.MarshallerType] = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
""" """
Returns a tuple of HTTP headers/body dicts representing this Cloud Event. Returns a tuple of HTTP headers/body dicts representing this Cloud Event.
@ -287,19 +286,13 @@ def to_dict(event: AnyCloudEvent) -> typing.Dict[str, typing.Any]:
:returns: The canonical dict representation of the event. :returns: The canonical dict representation of the event.
""" """
result = {attribute_name: event.get(attribute_name) for attribute_name in event} result = {attribute_name: event.get(attribute_name) for attribute_name in event}
result["data"] = event.data result["data"] = event.get_data()
return result return result
def _json_or_string( def _json_or_string(
content: typing.Optional[typing.AnyStr], content: typing.Optional[typing.Union[str, bytes]],
) -> typing.Optional[ ) -> typing.Any:
typing.Union[
typing.Dict[typing.Any, typing.Any],
typing.List[typing.Any],
typing.AnyStr,
]
]:
""" """
Returns a JSON-decoded dictionary or a list of dictionaries if Returns a JSON-decoded dictionary or a list of dictionaries if
a valid JSON string is provided. a valid JSON string is provided.

View File

@ -25,15 +25,15 @@ from cloudevents.http.http_methods import ( # deprecated
from cloudevents.http.json_methods import to_json # deprecated from cloudevents.http.json_methods import to_json # deprecated
__all__ = [ __all__ = [
to_binary, "to_binary",
to_structured, "to_structured",
from_json, "from_json",
from_http, "from_http",
from_dict, "from_dict",
CloudEvent, "CloudEvent",
is_binary, "is_binary",
is_structured, "is_structured",
to_binary_http, "to_binary_http",
to_structured_http, "to_structured_http",
to_json, "to_json",
] ]

View File

@ -23,7 +23,7 @@ from cloudevents.sdk import types
def from_json( def from_json(
data: typing.Union[str, bytes], data: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
""" """
Parses JSON string `data` into a CloudEvent. Parses JSON string `data` into a CloudEvent.
@ -38,8 +38,8 @@ def from_json(
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Dict[str, str],
data: typing.Union[str, bytes, None], data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
""" """
Parses CloudEvent `data` and `headers` into a CloudEvent`. Parses CloudEvent `data` and `headers` into a CloudEvent`.

View File

@ -82,7 +82,7 @@ class CloudEvent(abstract.CloudEvent):
def _get_attributes(self) -> typing.Dict[str, typing.Any]: def _get_attributes(self) -> typing.Dict[str, typing.Any]:
return self._attributes return self._attributes
def _get_data(self) -> typing.Optional[typing.Any]: def get_data(self) -> typing.Optional[typing.Any]:
return self.data return self.data
def __setitem__(self, key: str, value: typing.Any) -> None: def __setitem__(self, key: str, value: typing.Any) -> None:

View File

@ -31,8 +31,8 @@ from cloudevents.sdk import types
details="Use cloudevents.conversion.to_binary function instead", details="Use cloudevents.conversion.to_binary function instead",
) )
def to_binary( def to_binary(
event: AnyCloudEvent, data_marshaller: types.MarshallerType = None event: AnyCloudEvent, data_marshaller: typing.Optional[types.MarshallerType] = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
return _moved_to_binary(event, data_marshaller) return _moved_to_binary(event, data_marshaller)
@ -42,8 +42,8 @@ def to_binary(
) )
def to_structured( def to_structured(
event: AnyCloudEvent, event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
return _moved_to_structured(event, data_marshaller) return _moved_to_structured(event, data_marshaller)
@ -53,21 +53,21 @@ def to_structured(
) )
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Dict[str, str],
data: typing.Union[str, bytes, None], data: typing.Optional[typing.AnyStr],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
return _moved_from_http(headers, data, data_unmarshaller) return _moved_from_http(headers, data, data_unmarshaller)
@deprecated(deprecated_in="1.0.2", details="Use to_binary function instead") @deprecated(deprecated_in="1.0.2", details="Use to_binary function instead")
def to_binary_http( def to_binary_http(
event: CloudEvent, data_marshaller: types.MarshallerType = None event: CloudEvent, data_marshaller: typing.Optional[types.MarshallerType] = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
return _moved_to_binary(event, data_marshaller) return _moved_to_binary(event, data_marshaller)
@deprecated(deprecated_in="1.0.2", details="Use to_structured function instead") @deprecated(deprecated_in="1.0.2", details="Use to_structured function instead")
def to_structured_http( def to_structured_http(
event: CloudEvent, data_marshaller: types.MarshallerType = None event: CloudEvent, data_marshaller: typing.Optional[types.MarshallerType] = None
) -> typing.Tuple[dict, typing.Union[bytes, str]]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
return _moved_to_structured(event, data_marshaller) return _moved_to_structured(event, data_marshaller)

View File

@ -31,8 +31,8 @@ from cloudevents.sdk import types
) )
def to_json( def to_json(
event: AnyCloudEvent, event: AnyCloudEvent,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> typing.Union[str, bytes]: ) -> bytes:
return _moved_to_json(event, data_marshaller) return _moved_to_json(event, data_marshaller)
@ -42,6 +42,6 @@ def to_json(
) )
def from_json( def from_json(
data: typing.Union[str, bytes], data: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
return _moved_from_json(data, data_unmarshaller) return _moved_from_json(data, data_unmarshaller)

View File

@ -11,6 +11,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing
from deprecation import deprecated from deprecation import deprecated
from cloudevents.conversion import ( from cloudevents.conversion import (
@ -24,5 +26,7 @@ from cloudevents.conversion import (
deprecated_in="1.6.0", deprecated_in="1.6.0",
details="You SHOULD NOT use the default marshaller", details="You SHOULD NOT use the default marshaller",
) )
def default_marshaller(content: any): def default_marshaller(
content: typing.Any,
) -> typing.Optional[typing.Union[bytes, str, typing.Any]]:
return _moved_default_marshaller(content) return _moved_default_marshaller(content)

View File

@ -22,10 +22,10 @@ from cloudevents.kafka.conversion import (
) )
__all__ = [ __all__ = [
KafkaMessage, "KafkaMessage",
KeyMapper, "KeyMapper",
from_binary, "from_binary",
from_structured, "from_structured",
to_binary, "to_binary",
to_structured, "to_structured",
] ]

View File

@ -38,12 +38,12 @@ class KafkaMessage(typing.NamedTuple):
The dictionary of message headers key/values. The dictionary of message headers key/values.
""" """
key: typing.Optional[typing.AnyStr] key: typing.Optional[typing.Union[str, bytes]]
""" """
The message key. The message key.
""" """
value: typing.AnyStr value: typing.Union[str, bytes]
""" """
The message value. The message value.
""" """
@ -95,7 +95,7 @@ def to_binary(
headers["ce_{0}".format(attr)] = value.encode("utf-8") headers["ce_{0}".format(attr)] = value.encode("utf-8")
try: try:
data = data_marshaller(event.data) data = data_marshaller(event.get_data())
except Exception as e: except Exception as e:
raise cloud_exceptions.DataMarshallerError( raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall data with error: {type(e).__name__}('{e}')" f"Failed to marshall data with error: {type(e).__name__}('{e}')"
@ -121,9 +121,7 @@ def from_binary(
""" """
data_unmarshaller = data_unmarshaller or DEFAULT_UNMARSHALLER data_unmarshaller = data_unmarshaller or DEFAULT_UNMARSHALLER
event_type = event_type or http.CloudEvent attributes: typing.Dict[str, typing.Any] = {}
attributes = {}
for header, value in message.headers.items(): for header, value in message.headers.items():
header = header.lower() header = header.lower()
@ -141,8 +139,11 @@ def from_binary(
raise cloud_exceptions.DataUnmarshallerError( raise cloud_exceptions.DataUnmarshallerError(
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')" f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
) )
if event_type:
return event_type.create(attributes, data) result = event_type.create(attributes, data)
else:
result = http.CloudEvent.create(attributes, data) # type: ignore
return result
def to_structured( def to_structured(
@ -174,10 +175,10 @@ def to_structured(
f"Failed to map message key with error: {type(e).__name__}('{e}')" f"Failed to map message key with error: {type(e).__name__}('{e}')"
) )
attrs: dict[str, typing.Any] = dict(event.get_attributes()) attrs: typing.Dict[str, typing.Any] = dict(event.get_attributes())
try: try:
data = data_marshaller(event.data) data = data_marshaller(event.get_data())
except Exception as e: except Exception as e:
raise cloud_exceptions.DataMarshallerError( raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall data with error: {type(e).__name__}('{e}')" f"Failed to marshall data with error: {type(e).__name__}('{e}')"
@ -223,8 +224,6 @@ def from_structured(
data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER data_unmarshaller = data_unmarshaller or DEFAULT_EMBEDDED_DATA_MARSHALLER
envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER envelope_unmarshaller = envelope_unmarshaller or DEFAULT_UNMARSHALLER
event_type = event_type or http.CloudEvent
try: try:
structure = envelope_unmarshaller(message.value) structure = envelope_unmarshaller(message.value)
except Exception as e: except Exception as e:
@ -232,7 +231,7 @@ def from_structured(
"Failed to unmarshall message with error: " f"{type(e).__name__}('{e}')" "Failed to unmarshall message with error: " f"{type(e).__name__}('{e}')"
) )
attributes: dict[str, typing.Any] = {} attributes: typing.Dict[str, typing.Any] = {}
if message.key is not None: if message.key is not None:
attributes["partitionkey"] = message.key attributes["partitionkey"] = message.key
@ -257,5 +256,8 @@ def from_structured(
for header, val in message.headers.items(): for header, val in message.headers.items():
attributes[header.lower()] = val.decode() attributes[header.lower()] = val.decode()
if event_type:
return event_type.create(attributes, data) result = event_type.create(attributes, data)
else:
result = http.CloudEvent.create(attributes, data) # type: ignore
return result

0
cloudevents/py.typed Normal file
View File

View File

@ -14,4 +14,4 @@
from cloudevents.pydantic.conversion import from_dict, from_http, from_json from cloudevents.pydantic.conversion import from_dict, from_http, from_json
from cloudevents.pydantic.event import CloudEvent from cloudevents.pydantic.event import CloudEvent
__all__ = [CloudEvent, from_json, from_dict, from_http] __all__ = ["CloudEvent", "from_json", "from_dict", "from_http"]

View File

@ -22,7 +22,7 @@ from cloudevents.sdk import types
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Dict[str, str],
data: typing.Union[str, bytes, None], data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
""" """
@ -47,7 +47,7 @@ def from_http(
def from_json( def from_json(
data: typing.AnyStr, data: typing.AnyStr,
data_unmarshaller: types.UnmarshallerType = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
""" """
Parses JSON string `data` into a CloudEvent. Parses JSON string `data` into a CloudEvent.

View File

@ -30,17 +30,26 @@ from cloudevents.exceptions import IncompatibleArgumentsError
from cloudevents.sdk.event import attribute from cloudevents.sdk.event import attribute
def _ce_json_dumps(obj: typing.Dict[str, typing.Any], *args, **kwargs) -> str: def _ce_json_dumps( # type: ignore[no-untyped-def]
""" obj: typing.Dict[str, typing.Any],
*args,
**kwargs,
) -> str:
"""Performs Pydantic-specific serialization of the event.
Needed by the pydantic base-model to serialize the event correctly to json. Needed by the pydantic base-model to serialize the event correctly to json.
Without this function the data will be incorrectly serialized. Without this function the data will be incorrectly serialized.
:param obj: CloudEvent represented as a dict. :param obj: CloudEvent represented as a dict.
:param args: User arguments which will be passed to json.dumps function. :param args: User arguments which will be passed to json.dumps function.
:param kwargs: User arguments which will be passed to json.dumps function. :param kwargs: User arguments which will be passed to json.dumps function.
:return: Event serialized as a standard JSON CloudEvent with user specific :return: Event serialized as a standard JSON CloudEvent with user specific
parameters. parameters.
""" """
# Using HTTP from dict due to performance issues. # Using HTTP from dict due to performance issues.
event = http.from_dict(obj)
event_json = conversion.to_json(event)
# Pydantic is known for initialization time lagging. # Pydantic is known for initialization time lagging.
return json.dumps( return json.dumps(
# We SHOULD de-serialize the value, to serialize it back with # We SHOULD de-serialize the value, to serialize it back with
@ -48,27 +57,26 @@ def _ce_json_dumps(obj: typing.Dict[str, typing.Any], *args, **kwargs) -> str:
# This MAY cause performance issues in the future. # This MAY cause performance issues in the future.
# When that issue will cause real problem you MAY add a special keyword # When that issue will cause real problem you MAY add a special keyword
# argument that disabled this conversion # argument that disabled this conversion
json.loads( json.loads(event_json),
conversion.to_json(
http.from_dict(obj),
).decode("utf-8")
),
*args, *args,
**kwargs **kwargs,
) )
def _ce_json_loads( def _ce_json_loads( # type: ignore[no-untyped-def]
data: typing.Union[str, bytes], *args, **kwargs # noqa data: typing.AnyStr, *args, **kwargs # noqa
) -> typing.Dict[typing.Any, typing.Any]: ) -> typing.Dict[typing.Any, typing.Any]:
""" """Perforns Pydantic-specific deserialization of the event.
Needed by the pydantic base-model to de-serialize the event correctly from json. Needed by the pydantic base-model to de-serialize the event correctly from json.
Without this function the data will be incorrectly de-serialized. Without this function the data will be incorrectly de-serialized.
:param obj: CloudEvent encoded as a json string. :param obj: CloudEvent encoded as a json string.
:param args: These arguments SHOULD NOT be passed by pydantic. :param args: These arguments SHOULD NOT be passed by pydantic.
Located here for fail-safe reasons, in-case it does. Located here for fail-safe reasons, in-case it does.
:param kwargs: These arguments SHOULD NOT be passed by pydantic. :param kwargs: These arguments SHOULD NOT be passed by pydantic.
Located here for fail-safe reasons, in-case it does. Located here for fail-safe reasons, in-case it does.
:return: CloudEvent in a dict representation. :return: CloudEvent in a dict representation.
""" """
# Using HTTP from dict due to performance issues. # Using HTTP from dict due to performance issues.
@ -76,7 +84,7 @@ def _ce_json_loads(
return conversion.to_dict(http.from_json(data)) return conversion.to_dict(http.from_json(data))
class CloudEvent(abstract.CloudEvent, pydantic.BaseModel): class CloudEvent(abstract.CloudEvent, pydantic.BaseModel): # type: ignore
""" """
A Python-friendly CloudEvent representation backed by Pydantic-modeled fields. A Python-friendly CloudEvent representation backed by Pydantic-modeled fields.
@ -211,11 +219,11 @@ class CloudEvent(abstract.CloudEvent, pydantic.BaseModel):
), ),
) )
def __init__( def __init__( # type: ignore[no-untyped-def]
self, self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None, attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None, data: typing.Optional[typing.Any] = None,
**kwargs **kwargs,
): ):
""" """
:param attributes: A dict with CloudEvent attributes. :param attributes: A dict with CloudEvent attributes.
@ -272,7 +280,7 @@ class CloudEvent(abstract.CloudEvent, pydantic.BaseModel):
if key != "data" if key != "data"
} }
def _get_data(self) -> typing.Optional[typing.Any]: def get_data(self) -> typing.Optional[typing.Any]:
return self.data return self.data
def __setitem__(self, key: str, value: typing.Any) -> None: def __setitem__(self, key: str, value: typing.Any) -> None:

View File

@ -16,7 +16,14 @@ from cloudevents.sdk.converters import binary, structured
from cloudevents.sdk.converters.binary import is_binary from cloudevents.sdk.converters.binary import is_binary
from cloudevents.sdk.converters.structured import is_structured from cloudevents.sdk.converters.structured import is_structured
TypeBinary = binary.BinaryHTTPCloudEventConverter.TYPE TypeBinary: str = binary.BinaryHTTPCloudEventConverter.TYPE
TypeStructured = structured.JSONHTTPCloudEventConverter.TYPE TypeStructured: str = structured.JSONHTTPCloudEventConverter.TYPE
__all__ = [binary, structured, is_binary, is_structured, TypeBinary, TypeStructured] __all__ = [
"binary",
"structured",
"is_binary",
"is_structured",
"TypeBinary",
"TypeStructured",
]

View File

@ -18,14 +18,13 @@ from cloudevents.sdk.event import base
class Converter(object): class Converter(object):
TYPE: str = ""
TYPE = None
def read( def read(
self, self,
event, event: typing.Any,
headers: dict, headers: typing.Mapping[str, str],
body: typing.IO, body: typing.Union[str, bytes],
data_unmarshaller: typing.Callable, data_unmarshaller: typing.Callable,
) -> base.BaseEvent: ) -> base.BaseEvent:
raise Exception("not implemented") raise Exception("not implemented")
@ -33,10 +32,14 @@ class Converter(object):
def event_supported(self, event: object) -> bool: def event_supported(self, event: object) -> bool:
raise Exception("not implemented") raise Exception("not implemented")
def can_read(self, content_type: str) -> bool: def can_read(
self,
content_type: typing.Optional[str],
headers: typing.Optional[typing.Mapping[str, str]] = None,
) -> bool:
raise Exception("not implemented") raise Exception("not implemented")
def write( def write(
self, event: base.BaseEvent, data_marshaller: typing.Callable self, event: base.BaseEvent, data_marshaller: typing.Optional[typing.Callable]
) -> (dict, object): ) -> typing.Tuple[typing.Dict[str, str], bytes]:
raise Exception("not implemented") raise Exception("not implemented")

View File

@ -22,16 +22,17 @@ from cloudevents.sdk.event import v1, v03
class BinaryHTTPCloudEventConverter(base.Converter): class BinaryHTTPCloudEventConverter(base.Converter):
TYPE: str = "binary"
TYPE = "binary"
SUPPORTED_VERSIONS = [v03.Event, v1.Event] SUPPORTED_VERSIONS = [v03.Event, v1.Event]
def can_read( def can_read(
self, self,
content_type: str = None, content_type: typing.Optional[str] = None,
headers: typing.Dict[str, str] = {"ce-specversion": None}, headers: typing.Optional[typing.Mapping[str, str]] = None,
) -> bool: ) -> bool:
if headers is None:
headers = {"ce-specversion": ""}
return has_binary_headers(headers) return has_binary_headers(headers)
def event_supported(self, event: object) -> bool: def event_supported(self, event: object) -> bool:
@ -40,8 +41,8 @@ class BinaryHTTPCloudEventConverter(base.Converter):
def read( def read(
self, self,
event: event_base.BaseEvent, event: event_base.BaseEvent,
headers: dict, headers: typing.Mapping[str, str],
body: typing.IO, body: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType, data_unmarshaller: types.UnmarshallerType,
) -> event_base.BaseEvent: ) -> event_base.BaseEvent:
if type(event) not in self.SUPPORTED_VERSIONS: if type(event) not in self.SUPPORTED_VERSIONS:
@ -50,8 +51,10 @@ class BinaryHTTPCloudEventConverter(base.Converter):
return event return event
def write( def write(
self, event: event_base.BaseEvent, data_marshaller: types.MarshallerType self,
) -> typing.Tuple[dict, bytes]: event: event_base.BaseEvent,
data_marshaller: typing.Optional[types.MarshallerType],
) -> typing.Tuple[typing.Dict[str, str], bytes]:
return event.MarshalBinary(data_marshaller) return event.MarshalBinary(data_marshaller)
@ -59,7 +62,7 @@ def NewBinaryHTTPCloudEventConverter() -> BinaryHTTPCloudEventConverter:
return BinaryHTTPCloudEventConverter() return BinaryHTTPCloudEventConverter()
def is_binary(headers: typing.Dict[str, str]) -> bool: def is_binary(headers: typing.Mapping[str, str]) -> bool:
""" """
Determines whether an event with the supplied `headers` is in binary format. Determines whether an event with the supplied `headers` is in binary format.

View File

@ -22,11 +22,16 @@ from cloudevents.sdk.event import base as event_base
# TODO: Singleton? # TODO: Singleton?
class JSONHTTPCloudEventConverter(base.Converter): class JSONHTTPCloudEventConverter(base.Converter):
TYPE: str = "structured"
MIME_TYPE: str = "application/cloudevents+json"
TYPE = "structured" def can_read(
MIME_TYPE = "application/cloudevents+json" self,
content_type: typing.Optional[str] = None,
def can_read(self, content_type: str, headers: typing.Dict[str, str] = {}) -> bool: headers: typing.Optional[typing.Mapping[str, str]] = None,
) -> bool:
if headers is None:
headers = {}
return ( return (
isinstance(content_type, str) isinstance(content_type, str)
and content_type.startswith(self.MIME_TYPE) and content_type.startswith(self.MIME_TYPE)
@ -40,16 +45,18 @@ class JSONHTTPCloudEventConverter(base.Converter):
def read( def read(
self, self,
event: event_base.BaseEvent, event: event_base.BaseEvent,
headers: dict, headers: typing.Mapping[str, str],
body: typing.IO, body: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType, data_unmarshaller: types.UnmarshallerType,
) -> event_base.BaseEvent: ) -> event_base.BaseEvent:
event.UnmarshalJSON(body, data_unmarshaller) event.UnmarshalJSON(body, data_unmarshaller)
return event return event
def write( def write(
self, event: event_base.BaseEvent, data_marshaller: types.MarshallerType self,
) -> typing.Tuple[dict, bytes]: event: event_base.BaseEvent,
data_marshaller: typing.Optional[types.MarshallerType],
) -> typing.Tuple[typing.Dict[str, str], bytes]:
http_headers = {"content-type": self.MIME_TYPE} http_headers = {"content-type": self.MIME_TYPE}
return http_headers, event.MarshalJSON(data_marshaller).encode("utf-8") return http_headers, event.MarshalJSON(data_marshaller).encode("utf-8")
@ -58,7 +65,7 @@ def NewJSONHTTPCloudEventConverter() -> JSONHTTPCloudEventConverter:
return JSONHTTPCloudEventConverter() return JSONHTTPCloudEventConverter()
def is_structured(headers: typing.Dict[str, str]) -> bool: def is_structured(headers: typing.Mapping[str, str]) -> bool:
""" """
Determines whether an event with the supplied `headers` is in a structured format. Determines whether an event with the supplied `headers` is in a structured format.

View File

@ -15,7 +15,7 @@
import typing import typing
def has_binary_headers(headers: typing.Dict[str, str]) -> bool: def has_binary_headers(headers: typing.Mapping[str, str]) -> bool:
"""Determines if all CloudEvents required headers are presents """Determines if all CloudEvents required headers are presents
in the `headers`. in the `headers`.

View File

@ -34,7 +34,7 @@ class SpecVersion(str, Enum):
DEFAULT_SPECVERSION = SpecVersion.v1_0 DEFAULT_SPECVERSION = SpecVersion.v1_0
def default_time_selection_algorithm() -> datetime: def default_time_selection_algorithm() -> datetime.datetime:
""" """
:return: A time value which will be used as CloudEvent time attribute value. :return: A time value which will be used as CloudEvent time attribute value.
""" """

View File

@ -15,6 +15,7 @@
import base64 import base64
import json import json
import typing import typing
from typing import Set
import cloudevents.exceptions as cloud_exceptions import cloudevents.exceptions as cloud_exceptions
from cloudevents.sdk import types from cloudevents.sdk import types
@ -29,106 +30,106 @@ class EventGetterSetter(object): # pragma: no cover
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def specversion(self): def specversion(self) -> str:
return self.CloudEventVersion() return self.CloudEventVersion()
@specversion.setter
def specversion(self, value: str) -> None:
self.SetCloudEventVersion(value)
def SetCloudEventVersion(self, specversion: str) -> object: def SetCloudEventVersion(self, specversion: str) -> object:
raise Exception("not implemented") raise Exception("not implemented")
@specversion.setter
def specversion(self, value: str):
self.SetCloudEventVersion(value)
# ce-type # ce-type
def EventType(self) -> str: def EventType(self) -> str:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def type(self): def type(self) -> str:
return self.EventType() return self.EventType()
@type.setter
def type(self, value: str) -> None:
self.SetEventType(value)
def SetEventType(self, eventType: str) -> object: def SetEventType(self, eventType: str) -> object:
raise Exception("not implemented") raise Exception("not implemented")
@type.setter
def type(self, value: str):
self.SetEventType(value)
# ce-source # ce-source
def Source(self) -> str: def Source(self) -> str:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def source(self): def source(self) -> str:
return self.Source() return self.Source()
@source.setter
def source(self, value: str) -> None:
self.SetSource(value)
def SetSource(self, source: str) -> object: def SetSource(self, source: str) -> object:
raise Exception("not implemented") raise Exception("not implemented")
@source.setter
def source(self, value: str):
self.SetSource(value)
# ce-id # ce-id
def EventID(self) -> str: def EventID(self) -> str:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def id(self): def id(self) -> str:
return self.EventID() return self.EventID()
@id.setter
def id(self, value: str) -> None:
self.SetEventID(value)
def SetEventID(self, eventID: str) -> object: def SetEventID(self, eventID: str) -> object:
raise Exception("not implemented") raise Exception("not implemented")
@id.setter
def id(self, value: str):
self.SetEventID(value)
# ce-time # ce-time
def EventTime(self) -> str: def EventTime(self) -> typing.Optional[str]:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def time(self): def time(self) -> typing.Optional[str]:
return self.EventTime() return self.EventTime()
def SetEventTime(self, eventTime: str) -> object:
raise Exception("not implemented")
@time.setter @time.setter
def time(self, value: str): def time(self, value: typing.Optional[str]) -> None:
self.SetEventTime(value) self.SetEventTime(value)
def SetEventTime(self, eventTime: typing.Optional[str]) -> object:
raise Exception("not implemented")
# ce-schema # ce-schema
def SchemaURL(self) -> str: def SchemaURL(self) -> typing.Optional[str]:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def schema(self) -> str: def schema(self) -> typing.Optional[str]:
return self.SchemaURL() return self.SchemaURL()
def SetSchemaURL(self, schemaURL: str) -> object:
raise Exception("not implemented")
@schema.setter @schema.setter
def schema(self, value: str): def schema(self, value: typing.Optional[str]) -> None:
self.SetSchemaURL(value) self.SetSchemaURL(value)
def SetSchemaURL(self, schemaURL: typing.Optional[str]) -> object:
raise Exception("not implemented")
# data # data
def Data(self) -> object: def Data(self) -> typing.Optional[object]:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def data(self) -> object: def data(self) -> typing.Optional[object]:
return self.Data() return self.Data()
def SetData(self, data: object) -> object:
raise Exception("not implemented")
@data.setter @data.setter
def data(self, value: object): def data(self, value: typing.Optional[object]) -> None:
self.SetData(value) self.SetData(value)
def SetData(self, data: typing.Optional[object]) -> object:
raise Exception("not implemented")
# ce-extensions # ce-extensions
def Extensions(self) -> dict: def Extensions(self) -> dict:
raise Exception("not implemented") raise Exception("not implemented")
@ -137,34 +138,38 @@ class EventGetterSetter(object): # pragma: no cover
def extensions(self) -> dict: def extensions(self) -> dict:
return self.Extensions() return self.Extensions()
@extensions.setter
def extensions(self, value: dict) -> None:
self.SetExtensions(value)
def SetExtensions(self, extensions: dict) -> object: def SetExtensions(self, extensions: dict) -> object:
raise Exception("not implemented") raise Exception("not implemented")
@extensions.setter
def extensions(self, value: dict):
self.SetExtensions(value)
# Content-Type # Content-Type
def ContentType(self) -> str: def ContentType(self) -> typing.Optional[str]:
raise Exception("not implemented") raise Exception("not implemented")
@property @property
def content_type(self) -> str: def content_type(self) -> typing.Optional[str]:
return self.ContentType() return self.ContentType()
def SetContentType(self, contentType: str) -> object:
raise Exception("not implemented")
@content_type.setter @content_type.setter
def content_type(self, value: str): def content_type(self, value: typing.Optional[str]) -> None:
self.SetContentType(value) self.SetContentType(value)
def SetContentType(self, contentType: typing.Optional[str]) -> object:
raise Exception("not implemented")
class BaseEvent(EventGetterSetter): class BaseEvent(EventGetterSetter):
_ce_required_fields = set() """Base implementation of the CloudEvent."""
_ce_optional_fields = set()
def Properties(self, with_nullable=False) -> dict: _ce_required_fields: Set[str] = set()
"""A set of required CloudEvent field names."""
_ce_optional_fields: Set[str] = set()
"""A set of optional CloudEvent field names."""
def Properties(self, with_nullable: bool = False) -> dict:
props = dict() props = dict()
for name, value in self.__dict__.items(): for name, value in self.__dict__.items():
if str(name).startswith("ce__"): if str(name).startswith("ce__"):
@ -174,19 +179,18 @@ class BaseEvent(EventGetterSetter):
return props return props
def Get(self, key: str) -> typing.Tuple[object, bool]: def Get(self, key: str) -> typing.Tuple[typing.Optional[object], bool]:
formatted_key = "ce__{0}".format(key.lower()) formatted_key: str = "ce__{0}".format(key.lower())
ok = hasattr(self, formatted_key) key_exists: bool = hasattr(self, formatted_key)
value = getattr(self, formatted_key, None) if not key_exists:
if not ok:
exts = self.Extensions() exts = self.Extensions()
return exts.get(key), key in exts return exts.get(key), key in exts
value: typing.Any = getattr(self, formatted_key)
return value.get(), key_exists
return value.get(), ok def Set(self, key: str, value: typing.Optional[object]) -> None:
formatted_key: str = "ce__{0}".format(key)
def Set(self, key: str, value: object): key_exists: bool = hasattr(self, formatted_key)
formatted_key = "ce__{0}".format(key)
key_exists = hasattr(self, formatted_key)
if key_exists: if key_exists:
attr = getattr(self, formatted_key) attr = getattr(self, formatted_key)
attr.set(value) attr.set(value)
@ -196,19 +200,20 @@ class BaseEvent(EventGetterSetter):
exts.update({key: value}) exts.update({key: value})
self.Set("extensions", exts) self.Set("extensions", exts)
def MarshalJSON(self, data_marshaller: types.MarshallerType) -> str: def MarshalJSON(
if data_marshaller is None: self, data_marshaller: typing.Optional[types.MarshallerType]
data_marshaller = lambda x: x # noqa: E731 ) -> str:
props = self.Properties() props = self.Properties()
if "data" in props: if "data" in props:
data = props.pop("data") data = props.pop("data")
try: try:
if data_marshaller:
data = data_marshaller(data) data = data_marshaller(data)
except Exception as e: except Exception as e:
raise cloud_exceptions.DataMarshallerError( raise cloud_exceptions.DataMarshallerError(
f"Failed to marshall data with error: {type(e).__name__}('{e}')" f"Failed to marshall data with error: {type(e).__name__}('{e}')"
) )
if isinstance(data, (bytes, bytes, memoryview)): if isinstance(data, (bytes, bytearray, memoryview)):
props["data_base64"] = base64.b64encode(data).decode("ascii") props["data_base64"] = base64.b64encode(data).decode("ascii")
else: else:
props["data"] = data props["data"] = data
@ -221,7 +226,7 @@ class BaseEvent(EventGetterSetter):
self, self,
b: typing.Union[str, bytes], b: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType, data_unmarshaller: types.UnmarshallerType,
): ) -> None:
raw_ce = json.loads(b) raw_ce = json.loads(b)
missing_fields = self._ce_required_fields - raw_ce.keys() missing_fields = self._ce_required_fields - raw_ce.keys()
@ -231,30 +236,27 @@ class BaseEvent(EventGetterSetter):
) )
for name, value in raw_ce.items(): for name, value in raw_ce.items():
decoder = lambda x: x
if name == "data":
# Use the user-provided serializer, which may have customized
# JSON decoding
decoder = lambda v: data_unmarshaller(json.dumps(v))
if name == "data_base64":
decoder = lambda v: data_unmarshaller(base64.b64decode(v))
name = "data"
try: try:
set_value = decoder(value) if name == "data":
decoded_value = data_unmarshaller(json.dumps(value))
elif name == "data_base64":
decoded_value = data_unmarshaller(base64.b64decode(value))
name = "data"
else:
decoded_value = value
except Exception as e: except Exception as e:
raise cloud_exceptions.DataUnmarshallerError( raise cloud_exceptions.DataUnmarshallerError(
"Failed to unmarshall data with error: " "Failed to unmarshall data with error: "
f"{type(e).__name__}('{e}')" f"{type(e).__name__}('{e}')"
) )
self.Set(name, set_value) self.Set(name, decoded_value)
def UnmarshalBinary( def UnmarshalBinary(
self, self,
headers: dict, headers: typing.Mapping[str, str],
body: typing.Union[bytes, str], body: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType, data_unmarshaller: types.UnmarshallerType,
): ) -> None:
required_binary_fields = {f"ce-{field}" for field in self._ce_required_fields} required_binary_fields = {f"ce-{field}" for field in self._ce_required_fields}
missing_fields = required_binary_fields - headers.keys() missing_fields = required_binary_fields - headers.keys()
@ -279,20 +281,25 @@ class BaseEvent(EventGetterSetter):
self.Set("data", raw_ce) self.Set("data", raw_ce)
def MarshalBinary( def MarshalBinary(
self, data_marshaller: types.MarshallerType self, data_marshaller: typing.Optional[types.MarshallerType]
) -> typing.Tuple[dict, bytes]: ) -> typing.Tuple[typing.Dict[str, str], bytes]:
if data_marshaller is None: if not data_marshaller:
data_marshaller = json.dumps data_marshaller = json.dumps
headers = {} headers: typing.Dict[str, str] = {}
if self.ContentType(): content_type = self.ContentType()
headers["content-type"] = self.ContentType() if content_type:
props = self.Properties() headers["content-type"] = content_type
props: typing.Dict = self.Properties()
for key, value in props.items(): for key, value in props.items():
if key not in ["data", "extensions", "datacontenttype"]: if key not in ["data", "extensions", "datacontenttype"]:
if value is not None: if value is not None:
headers["ce-{0}".format(key)] = value headers["ce-{0}".format(key)] = value
extensions = props.get("extensions")
for key, value in props.get("extensions").items(): if extensions is None or not isinstance(extensions, typing.Mapping):
raise cloud_exceptions.DataMarshallerError(
"No extensions are available in the binary event."
)
for key, value in extensions.items():
headers["ce-{0}".format(key)] = value headers["ce-{0}".format(key)] = value
data, _ = self.Get("data") data, _ = self.Get("data")

View File

@ -11,29 +11,36 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing
from typing import Any
class Option(object): class Option:
def __init__(self, name, value, is_required): """A value holder of CloudEvents extensions."""
self.name = name
self.value = value
self.is_required = is_required
def set(self, new_value): def __init__(self, name: str, value: typing.Optional[Any], is_required: bool):
self.name: str = name
"""The name of the option."""
self.value: Any = value
"""The value of the option."""
self.is_required: bool = is_required
"""Determines if the option value must be present."""
def set(self, new_value: typing.Optional[Any]) -> None:
"""Sets given new value as the value of this option."""
is_none = new_value is None is_none = new_value is None
if self.is_required and is_none: if self.is_required and is_none:
raise ValueError( raise ValueError(
"Attribute value error: '{0}', " "Attribute value error: '{0}', invalid new value.".format(self.name)
""
"invalid new value.".format(self.name)
) )
self.value = new_value self.value = new_value
def get(self): def get(self) -> typing.Optional[Any]:
"""Returns the value of this option."""
return self.value return self.value
def required(self): def required(self):
"""Determines if the option value must be present."""
return self.is_required return self.is_required
def __eq__(self, obj): def __eq__(self, obj):

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing
from cloudevents.sdk.event import base, opt from cloudevents.sdk.event import base, opt
@ -41,37 +42,55 @@ class Event(base.BaseEvent):
self.ce__extensions = opt.Option("extensions", dict(), False) self.ce__extensions = opt.Option("extensions", dict(), False)
def CloudEventVersion(self) -> str: def CloudEventVersion(self) -> str:
return self.ce__specversion.get() return str(self.ce__specversion.get())
def EventType(self) -> str: def EventType(self) -> str:
return self.ce__type.get() return str(self.ce__type.get())
def Source(self) -> str: def Source(self) -> str:
return self.ce__source.get() return str(self.ce__source.get())
def EventID(self) -> str: def EventID(self) -> str:
return self.ce__id.get() return str(self.ce__id.get())
def EventTime(self) -> str: def EventTime(self) -> typing.Optional[str]:
return self.ce__time.get() result = self.ce__time.get()
if result is None:
return None
return str(result)
def Subject(self) -> str: def Subject(self) -> typing.Optional[str]:
return self.ce__subject.get() result = self.ce__subject.get()
if result is None:
return None
return str(result)
def SchemaURL(self) -> str: def SchemaURL(self) -> typing.Optional[str]:
return self.ce__schemaurl.get() result = self.ce__schemaurl.get()
if result is None:
return None
return str(result)
def Data(self) -> object: def Data(self) -> typing.Optional[object]:
return self.ce__data.get() return self.ce__data.get()
def Extensions(self) -> dict: def Extensions(self) -> dict:
return self.ce__extensions.get() result = self.ce__extensions.get()
if result is None:
return {}
return dict(result)
def ContentType(self) -> str: def ContentType(self) -> typing.Optional[str]:
return self.ce__datacontenttype.get() result = self.ce__datacontenttype.get()
if result is None:
return None
return str(result)
def ContentEncoding(self) -> str: def ContentEncoding(self) -> typing.Optional[str]:
return self.ce__datacontentencoding.get() result = self.ce__datacontentencoding.get()
if result is None:
return None
return str(result)
def SetEventType(self, eventType: str) -> base.BaseEvent: def SetEventType(self, eventType: str) -> base.BaseEvent:
self.Set("type", eventType) self.Set("type", eventType)
@ -85,54 +104,56 @@ class Event(base.BaseEvent):
self.Set("id", eventID) self.Set("id", eventID)
return self return self
def SetEventTime(self, eventTime: str) -> base.BaseEvent: def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent:
self.Set("time", eventTime) self.Set("time", eventTime)
return self return self
def SetSubject(self, subject: str) -> base.BaseEvent: def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent:
self.Set("subject", subject) self.Set("subject", subject)
return self return self
def SetSchemaURL(self, schemaURL: str) -> base.BaseEvent: def SetSchemaURL(self, schemaURL: typing.Optional[str]) -> base.BaseEvent:
self.Set("schemaurl", schemaURL) self.Set("schemaurl", schemaURL)
return self return self
def SetData(self, data: object) -> base.BaseEvent: def SetData(self, data: typing.Optional[object]) -> base.BaseEvent:
self.Set("data", data) self.Set("data", data)
return self return self
def SetExtensions(self, extensions: dict) -> base.BaseEvent: def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent:
self.Set("extensions", extensions) self.Set("extensions", extensions)
return self return self
def SetContentType(self, contentType: str) -> base.BaseEvent: def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent:
self.Set("datacontenttype", contentType) self.Set("datacontenttype", contentType)
return self return self
def SetContentEncoding(self, contentEncoding: str) -> base.BaseEvent: def SetContentEncoding(
self, contentEncoding: typing.Optional[str]
) -> base.BaseEvent:
self.Set("datacontentencoding", contentEncoding) self.Set("datacontentencoding", contentEncoding)
return self return self
@property @property
def datacontentencoding(self): def datacontentencoding(self) -> typing.Optional[str]:
return self.ContentEncoding() return self.ContentEncoding()
@datacontentencoding.setter @datacontentencoding.setter
def datacontentencoding(self, value: str): def datacontentencoding(self, value: typing.Optional[str]) -> None:
self.SetContentEncoding(value) self.SetContentEncoding(value)
@property @property
def subject(self) -> str: def subject(self) -> typing.Optional[str]:
return self.Subject() return self.Subject()
@subject.setter @subject.setter
def subject(self, value: str): def subject(self, value: typing.Optional[str]) -> None:
self.SetSubject(value) self.SetSubject(value)
@property @property
def schema_url(self) -> str: def schema_url(self) -> typing.Optional[str]:
return self.SchemaURL() return self.SchemaURL()
@schema_url.setter @schema_url.setter
def schema_url(self, value: str): def schema_url(self, value: typing.Optional[str]) -> None:
self.SetSchemaURL(value) self.SetSchemaURL(value)

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing
from cloudevents.sdk.event import base, opt from cloudevents.sdk.event import base, opt
@ -34,34 +35,49 @@ class Event(base.BaseEvent):
self.ce__extensions = opt.Option("extensions", dict(), False) self.ce__extensions = opt.Option("extensions", dict(), False)
def CloudEventVersion(self) -> str: def CloudEventVersion(self) -> str:
return self.ce__specversion.get() return str(self.ce__specversion.get())
def EventType(self) -> str: def EventType(self) -> str:
return self.ce__type.get() return str(self.ce__type.get())
def Source(self) -> str: def Source(self) -> str:
return self.ce__source.get() return str(self.ce__source.get())
def EventID(self) -> str: def EventID(self) -> str:
return self.ce__id.get() return str(self.ce__id.get())
def EventTime(self) -> str: def EventTime(self) -> typing.Optional[str]:
return self.ce__time.get() result = self.ce__time.get()
if result is None:
return None
return str(result)
def Subject(self) -> str: def Subject(self) -> typing.Optional[str]:
return self.ce__subject.get() result = self.ce__subject.get()
if result is None:
return None
return str(result)
def Schema(self) -> str: def Schema(self) -> typing.Optional[str]:
return self.ce__dataschema.get() result = self.ce__dataschema.get()
if result is None:
return None
return str(result)
def ContentType(self) -> str: def ContentType(self) -> typing.Optional[str]:
return self.ce__datacontenttype.get() result = self.ce__datacontenttype.get()
if result is None:
return None
return str(result)
def Data(self) -> object: def Data(self) -> typing.Optional[object]:
return self.ce__data.get() return self.ce__data.get()
def Extensions(self) -> dict: def Extensions(self) -> dict:
return self.ce__extensions.get() result = self.ce__extensions.get()
if result is None:
return {}
return dict(result)
def SetEventType(self, eventType: str) -> base.BaseEvent: def SetEventType(self, eventType: str) -> base.BaseEvent:
self.Set("type", eventType) self.Set("type", eventType)
@ -75,42 +91,42 @@ class Event(base.BaseEvent):
self.Set("id", eventID) self.Set("id", eventID)
return self return self
def SetEventTime(self, eventTime: str) -> base.BaseEvent: def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent:
self.Set("time", eventTime) self.Set("time", eventTime)
return self return self
def SetSubject(self, subject: str) -> base.BaseEvent: def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent:
self.Set("subject", subject) self.Set("subject", subject)
return self return self
def SetSchema(self, schema: str) -> base.BaseEvent: def SetSchema(self, schema: typing.Optional[str]) -> base.BaseEvent:
self.Set("dataschema", schema) self.Set("dataschema", schema)
return self return self
def SetContentType(self, contentType: str) -> base.BaseEvent: def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent:
self.Set("datacontenttype", contentType) self.Set("datacontenttype", contentType)
return self return self
def SetData(self, data: object) -> base.BaseEvent: def SetData(self, data: typing.Optional[object]) -> base.BaseEvent:
self.Set("data", data) self.Set("data", data)
return self return self
def SetExtensions(self, extensions: dict) -> base.BaseEvent: def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent:
self.Set("extensions", extensions) self.Set("extensions", extensions)
return self return self
@property @property
def schema(self) -> str: def schema(self) -> typing.Optional[str]:
return self.Schema() return self.Schema()
@schema.setter @schema.setter
def schema(self, value: str): def schema(self, value: typing.Optional[str]) -> None:
self.SetSchema(value) self.SetSchema(value)
@property @property
def subject(self) -> str: def subject(self) -> typing.Optional[str]:
return self.Subject() return self.Subject()
@subject.setter @subject.setter
def subject(self, value: str): def subject(self, value: typing.Optional[str]) -> None:
self.SetSubject(value) self.SetSubject(value)

View File

@ -26,36 +26,34 @@ class HTTPMarshaller(object):
API of this class designed to work with CloudEvent (upstream and v0.1) API of this class designed to work with CloudEvent (upstream and v0.1)
""" """
def __init__(self, converters: typing.List[base.Converter]): def __init__(self, converters: typing.Sequence[base.Converter]):
""" """
CloudEvent HTTP marshaller constructor CloudEvent HTTP marshaller constructor
:param converters: a list of HTTP-to-CloudEvent-to-HTTP constructors :param converters: a list of HTTP-to-CloudEvent-to-HTTP constructors
:type converters: typing.List[base.Converter]
""" """
self.http_converters = [c for c in converters] self.http_converters: typing.List[base.Converter] = [c for c in converters]
self.http_converters_by_type = {c.TYPE: c for c in converters} self.http_converters_by_type: typing.Dict[str, base.Converter] = {
c.TYPE: c for c in converters
}
def FromRequest( def FromRequest(
self, self,
event: event_base.BaseEvent, event: event_base.BaseEvent,
headers: dict, headers: typing.Mapping[str, str],
body: typing.Union[str, bytes], body: typing.Union[str, bytes],
data_unmarshaller: types.UnmarshallerType = json.loads, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> event_base.BaseEvent: ) -> event_base.BaseEvent:
""" """
Reads a CloudEvent from an HTTP headers and request body Reads a CloudEvent from an HTTP headers and request body
:param event: CloudEvent placeholder :param event: CloudEvent placeholder
:type event: cloudevents.sdk.event.base.BaseEvent
:param headers: a dict-like HTTP headers :param headers: a dict-like HTTP headers
:type headers: dict
:param body: an HTTP request body as a string or bytes :param body: an HTTP request body as a string or bytes
:type body: typing.Union[str, bytes] :param data_unmarshaller: a callable-like unmarshaller the CloudEvent data
:param data_unmarshaller: a callable-like
unmarshaller the CloudEvent data
:return: a CloudEvent :return: a CloudEvent
:rtype: event_base.BaseEvent
""" """
if not isinstance(data_unmarshaller, typing.Callable): if not data_unmarshaller:
data_unmarshaller = json.loads
if not callable(data_unmarshaller):
raise exceptions.InvalidDataUnmarshaller() raise exceptions.InvalidDataUnmarshaller()
# Lower all header keys # Lower all header keys
@ -77,23 +75,17 @@ class HTTPMarshaller(object):
def ToRequest( def ToRequest(
self, self,
event: event_base.BaseEvent, event: event_base.BaseEvent,
converter_type: str = None, converter_type: typing.Optional[str] = None,
data_marshaller: types.MarshallerType = None, data_marshaller: typing.Optional[types.MarshallerType] = None,
) -> (dict, bytes): ) -> typing.Tuple[typing.Dict[str, str], bytes]:
""" """
Writes a CloudEvent into a HTTP-ready form of headers and request body Writes a CloudEvent into a HTTP-ready form of headers and request body
:param event: CloudEvent :param event: CloudEvent
:type event: event_base.BaseEvent
:param converter_type: a type of CloudEvent-to-HTTP converter :param converter_type: a type of CloudEvent-to-HTTP converter
:type converter_type: str
:param data_marshaller: a callable-like marshaller CloudEvent data :param data_marshaller: a callable-like marshaller CloudEvent data
:type data_marshaller: typing.Callable
:return: dict of HTTP headers and stream of HTTP request body :return: dict of HTTP headers and stream of HTTP request body
:rtype: tuple
""" """
if data_marshaller is not None and not isinstance( if data_marshaller is not None and not callable(data_marshaller):
data_marshaller, typing.Callable
):
raise exceptions.InvalidDataMarshaller() raise exceptions.InvalidDataMarshaller()
if converter_type is None: if converter_type is None:
@ -108,10 +100,9 @@ class HTTPMarshaller(object):
def NewDefaultHTTPMarshaller() -> HTTPMarshaller: def NewDefaultHTTPMarshaller() -> HTTPMarshaller:
""" """
Creates the default HTTP marshaller with both structured Creates the default HTTP marshaller with both structured and binary converters.
and binary converters
:return: an instance of HTTP marshaller :return: an instance of HTTP marshaller
:rtype: cloudevents.sdk.marshaller.HTTPMarshaller
""" """
return HTTPMarshaller( return HTTPMarshaller(
[ [
@ -122,14 +113,13 @@ def NewDefaultHTTPMarshaller() -> HTTPMarshaller:
def NewHTTPMarshaller( def NewHTTPMarshaller(
converters: typing.List[base.Converter], converters: typing.Sequence[base.Converter],
) -> HTTPMarshaller: ) -> HTTPMarshaller:
""" """
Creates the default HTTP marshaller with both Creates the default HTTP marshaller with both structured and binary converters.
structured and binary converters
:param converters: a list of CloudEvent-to-HTTP-to-CloudEvent converters :param converters: a list of CloudEvent-to-HTTP-to-CloudEvent converters
:type converters: typing.List[base.Converter]
:return: an instance of HTTP marshaller :return: an instance of HTTP marshaller
:rtype: cloudevents.sdk.marshaller.HTTPMarshaller
""" """
return HTTPMarshaller(converters) return HTTPMarshaller(converters)

View File

@ -17,9 +17,6 @@ import typing
# Use consistent types for marshal and unmarshal functions across # Use consistent types for marshal and unmarshal functions across
# both JSON and Binary format. # both JSON and Binary format.
MarshallerType = typing.Optional[ MarshallerType = typing.Callable[[typing.Any], typing.AnyStr]
typing.Callable[[typing.Any], typing.Union[bytes, str]]
] UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any]
UnmarshallerType = typing.Optional[
typing.Callable[[typing.Union[bytes, str]], typing.Any]
]

View File

@ -49,7 +49,9 @@ def structured_data():
def test_from_request_wrong_unmarshaller(): def test_from_request_wrong_unmarshaller():
with pytest.raises(exceptions.InvalidDataUnmarshaller): with pytest.raises(exceptions.InvalidDataUnmarshaller):
m = marshaller.NewDefaultHTTPMarshaller() m = marshaller.NewDefaultHTTPMarshaller()
_ = m.FromRequest(v1.Event(), {}, "", None) _ = m.FromRequest(
event=v1.Event(), headers={}, body="", data_unmarshaller=object()
)
def test_to_request_wrong_marshaller(): def test_to_request_wrong_marshaller():

16
mypy.ini Normal file
View File

@ -0,0 +1,16 @@
[mypy]
plugins = pydantic.mypy
python_version = 3.7
pretty = True
show_error_context = True
follow_imports_for_stubs = True
# subset of mypy --strict
# https://mypy.readthedocs.io/en/stable/config_file.html
check_untyped_defs = True
disallow_incomplete_defs = True
warn_return_any = True
strict_equality = True
[mypy-deprecation.*]
ignore_missing_imports = True

View File

@ -46,9 +46,11 @@ long_description = (here / "README.md").read_text(encoding="utf-8")
if __name__ == "__main__": if __name__ == "__main__":
setup( setup(
name=pypi_config["package_name"], name=pypi_config["package_name"],
summary="CloudEvents SDK Python", summary="CloudEvents Python SDK",
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
long_description=long_description, long_description=long_description,
description="CloudEvents Python SDK",
url="https://github.com/cloudevents/sdk-python",
author="The Cloud Events Contributors", author="The Cloud Events Contributors",
author_email="cncfcloudevents@gmail.com", author_email="cncfcloudevents@gmail.com",
home_page="https://cloudevents.io", home_page="https://cloudevents.io",
@ -58,15 +60,24 @@ if __name__ == "__main__":
"Intended Audience :: Developers", "Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Development Status :: 5 - Production/Stable", "Development Status :: 5 - Production/Stable",
"Operating System :: POSIX :: Linux", "Operating System :: OS Independent",
"Natural Language :: English",
"Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
"Typing :: Typed",
], ],
keywords="CloudEvents Eventing Serverless",
license="https://www.apache.org/licenses/LICENSE-2.0",
license_file="LICENSE",
packages=find_packages(exclude=["cloudevents.tests"]), packages=find_packages(exclude=["cloudevents.tests"]),
include_package_data=True,
version=pypi_config["version_target"], version=pypi_config["version_target"],
install_requires=["deprecation>=2.0,<3.0"], install_requires=["deprecation>=2.0,<3.0"],
extras_require={"pydantic": "pydantic>=1.0.0,<2.0"}, extras_require={"pydantic": "pydantic>=1.0.0,<2.0"},
zip_safe=True,
) )

View File

@ -8,7 +8,7 @@ deps =
-r{toxinidir}/requirements/test.txt -r{toxinidir}/requirements/test.txt
-r{toxinidir}/requirements/publish.txt -r{toxinidir}/requirements/publish.txt
setenv = setenv =
PYTESTARGS = -v -s --tb=long --cov=cloudevents --cov-report term-missing --cov-fail-under=100 PYTESTARGS = -v -s --tb=long --cov=cloudevents --cov-report term-missing --cov-fail-under=95
commands = pytest {env:PYTESTARGS} {posargs} commands = pytest {env:PYTESTARGS} {posargs}
[testenv:reformat] [testenv:reformat]