Pydantic v2 native implementation (#219)

* Create stub pydantic v2 implementation and parametrize tests for both implementations

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Add default values to optional fields

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Adapt pydantic v1 serializer/deserializer logic

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Extract CloudEvent fields non functional data in separate module

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Fix lint

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Add missing Copyright

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Add missing docstring

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Remove test leftover

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Remove dependency on HTTP CloudEvent implementation

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Remove failing test for unsupported scenario

Fix typo

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Use SDK json serialization logic

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* No need to filter base64_data

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Use SDK json deserialization logic

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Fix imports

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Move docs after field declarations

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Add test for model_validate_json method

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Use fully qualified imports

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

* Ignore typing error

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>

---------

Signed-off-by: Federico Busetti <729029+febus982@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Federico Busetti 2023-09-20 20:59:13 +01:00 committed by GitHub
parent e5f76ed14c
commit 5a1063e50d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 790 additions and 240 deletions

View File

@ -11,7 +11,28 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.pydantic.conversion import from_dict, from_http, from_json
from cloudevents.pydantic.event import CloudEvent
from cloudevents.exceptions import PydanticFeatureNotInstalled
try:
from pydantic import VERSION as PYDANTIC_VERSION
pydantic_major_version = PYDANTIC_VERSION.split(".")[0]
if pydantic_major_version == "1":
from cloudevents.pydantic.v1 import CloudEvent, from_dict, from_http, from_json
else:
from cloudevents.pydantic.v2 import ( # type: ignore
CloudEvent,
from_dict,
from_http,
from_json,
)
except ImportError: # pragma: no cover # hard to test
raise PydanticFeatureNotInstalled(
"CloudEvents pydantic feature is not installed. "
"Install it using pip install cloudevents[pydantic]"
)
__all__ = ["CloudEvent", "from_json", "from_dict", "from_http"]

View File

@ -0,0 +1,142 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.sdk.event import attribute
FIELD_DESCRIPTIONS = {
"data": {
"title": "Event Data",
"description": (
"CloudEvents MAY include domain-specific information about the occurrence."
" When present, this information will be encapsulated within data.It is"
" encoded into a media format which is specified by the datacontenttype"
" attribute (e.g. application/json), and adheres to the dataschema format"
" when those respective attributes are present."
),
},
"source": {
"title": "Event Source",
"description": (
"Identifies the context in which an event happened. Often this will include"
" information such as the type of the event source, the organization"
" publishing the event or the process that produced the event. The exact"
" syntax and semantics behind the data encoded in the URI is defined by the"
" event producer.\n"
"\n"
"Producers MUST ensure that source + id is unique for"
" each distinct event.\n"
"\n"
"An application MAY assign a unique source to each"
" distinct producer, which makes it easy to produce unique IDs since no"
" other producer will have the same source. The application MAY use UUIDs,"
" URNs, DNS authorities or an application-specific scheme to create unique"
" source identifiers.\n"
"\n"
"A source MAY include more than one producer. In"
" that case the producers MUST collaborate to ensure that source + id is"
" unique for each distinct event."
),
"example": "https://github.com/cloudevents",
},
"id": {
"title": "Event ID",
"description": (
"Identifies the event. Producers MUST ensure that source + id is unique for"
" each distinct event. If a duplicate event is re-sent (e.g. due to a"
" network error) it MAY have the same id. Consumers MAY assume that Events"
" with identical source and id are duplicates. MUST be unique within the"
" scope of the producer"
),
"example": "A234-1234-1234",
},
"type": {
"title": "Event Type",
"description": (
"This attribute contains a value describing the type of event related to"
" the originating occurrence. Often this attribute is used for routing,"
" observability, policy enforcement, etc. The format of this is producer"
" defined and might include information such as the version of the type"
),
"example": "com.github.pull_request.opened",
},
"specversion": {
"title": "Specification Version",
"description": (
"The version of the CloudEvents specification which the event uses. This"
" enables the interpretation of the context.\n"
"\n"
"Currently, this attribute will only have the 'major'"
" and 'minor' version numbers included in it. This allows for 'patch'"
" changes to the specification to be made without changing this property's"
" value in the serialization."
),
"example": attribute.DEFAULT_SPECVERSION,
},
"time": {
"title": "Occurrence Time",
"description": (
" Timestamp of when the occurrence happened. If the time of the occurrence"
" cannot be determined then this attribute MAY be set to some other time"
" (such as the current time) by the CloudEvents producer, however all"
" producers for the same source MUST be consistent in this respect. In"
" other words, either they all use the actual time of the occurrence or"
" they all use the same algorithm to determine the value used."
),
"example": "2018-04-05T17:31:00Z",
},
"subject": {
"title": "Event Subject",
"description": (
"This describes the subject of the event in the context of the event"
" producer (identified by source). In publish-subscribe scenarios, a"
" subscriber will typically subscribe to events emitted by a source, but"
" the source identifier alone might not be sufficient as a qualifier for"
" any specific event if the source context has internal"
" sub-structure.\n"
"\n"
"Identifying the subject of the event in context"
" metadata (opposed to only in the data payload) is particularly helpful in"
" generic subscription filtering scenarios where middleware is unable to"
" interpret the data content. In the above example, the subscriber might"
" only be interested in blobs with names ending with '.jpg' or '.jpeg' and"
" the subject attribute allows for constructing a simple and efficient"
" string-suffix filter for that subset of events."
),
"example": "123",
},
"datacontenttype": {
"title": "Event Data Content Type",
"description": (
"Content type of data value. This attribute enables data to carry any type"
" of content, whereby format and encoding might differ from that of the"
" chosen event format."
),
"example": "text/xml",
},
"dataschema": {
"title": "Event Data Schema",
"description": (
"Identifies the schema that data adheres to. "
"Incompatible changes to the schema SHOULD be reflected by a different URI"
),
},
}
"""
The dictionary above contains title, description, example and other
NON-FUNCTIONAL data for pydantic fields. It could be potentially.
used across all the SDK.
Functional field configurations (e.g. defaults) are still defined
in the pydantic model classes.
"""

View File

@ -0,0 +1,18 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.pydantic.v1.conversion import from_dict, from_http, from_json
from cloudevents.pydantic.v1.event import CloudEvent
__all__ = ["CloudEvent", "from_json", "from_dict", "from_http"]

View File

@ -16,7 +16,7 @@ import typing
from cloudevents.conversion import from_dict as _abstract_from_dict
from cloudevents.conversion import from_http as _abstract_from_http
from cloudevents.conversion import from_json as _abstract_from_json
from cloudevents.pydantic.event import CloudEvent
from cloudevents.pydantic.v1.event import CloudEvent
from cloudevents.sdk import types

View File

@ -16,6 +16,7 @@ import json
import typing
from cloudevents.exceptions import PydanticFeatureNotInstalled
from cloudevents.pydantic.fields_docs import FIELD_DESCRIPTIONS
try:
from pydantic import VERSION as PYDANTIC_VERSION
@ -72,7 +73,7 @@ def _ce_json_dumps( # type: ignore[no-untyped-def]
def _ce_json_loads( # type: ignore[no-untyped-def]
data: typing.AnyStr, *args, **kwargs # noqa
) -> typing.Dict[typing.Any, typing.Any]:
"""Perforns Pydantic-specific deserialization of the event.
"""Performs Pydantic-specific deserialization of the event.
Needed by the pydantic base-model to de-serialize the event correctly from json.
Without this function the data will be incorrectly de-serialized.
@ -104,125 +105,52 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
return cls(attributes, data)
data: typing.Optional[typing.Any] = Field(
title="Event Data",
description=(
"CloudEvents MAY include domain-specific information about the occurrence."
" When present, this information will be encapsulated within data.It is"
" encoded into a media format which is specified by the datacontenttype"
" attribute (e.g. application/json), and adheres to the dataschema format"
" when those respective attributes are present."
),
title=FIELD_DESCRIPTIONS["data"].get("title"),
description=FIELD_DESCRIPTIONS["data"].get("description"),
example=FIELD_DESCRIPTIONS["data"].get("example"),
)
source: str = Field(
title="Event Source",
description=(
"Identifies the context in which an event happened. Often this will include"
" information such as the type of the event source, the organization"
" publishing the event or the process that produced the event. The exact"
" syntax and semantics behind the data encoded in the URI is defined by the"
" event producer.\n"
"\n"
"Producers MUST ensure that source + id is unique for"
" each distinct event.\n"
"\n"
"An application MAY assign a unique source to each"
" distinct producer, which makes it easy to produce unique IDs since no"
" other producer will have the same source. The application MAY use UUIDs,"
" URNs, DNS authorities or an application-specific scheme to create unique"
" source identifiers.\n"
"\n"
"A source MAY include more than one producer. In"
" that case the producers MUST collaborate to ensure that source + id is"
" unique for each distinct event."
),
example="https://github.com/cloudevents",
title=FIELD_DESCRIPTIONS["source"].get("title"),
description=FIELD_DESCRIPTIONS["source"].get("description"),
example=FIELD_DESCRIPTIONS["source"].get("example"),
)
id: str = Field(
title=FIELD_DESCRIPTIONS["id"].get("title"),
description=FIELD_DESCRIPTIONS["id"].get("description"),
example=FIELD_DESCRIPTIONS["id"].get("example"),
default_factory=attribute.default_id_selection_algorithm,
title="Event ID",
description=(
"Identifies the event. Producers MUST ensure that source + id is unique for"
" each distinct event. If a duplicate event is re-sent (e.g. due to a"
" network error) it MAY have the same id. Consumers MAY assume that Events"
" with identical source and id are duplicates. MUST be unique within the"
" scope of the producer"
),
example="A234-1234-1234",
)
type: str = Field(
title="Event Type",
description=(
"This attribute contains a value describing the type of event related to"
" the originating occurrence. Often this attribute is used for routing,"
" observability, policy enforcement, etc. The format of this is producer"
" defined and might include information such as the version of the type"
),
example="com.github.pull_request.opened",
title=FIELD_DESCRIPTIONS["type"].get("title"),
description=FIELD_DESCRIPTIONS["type"].get("description"),
example=FIELD_DESCRIPTIONS["type"].get("example"),
)
specversion: attribute.SpecVersion = Field(
title=FIELD_DESCRIPTIONS["specversion"].get("title"),
description=FIELD_DESCRIPTIONS["specversion"].get("description"),
example=FIELD_DESCRIPTIONS["specversion"].get("example"),
default=attribute.DEFAULT_SPECVERSION,
title="Specification Version",
description=(
"The version of the CloudEvents specification which the event uses. This"
" enables the interpretation of the context.\n"
"\n"
"Currently, this attribute will only have the 'major'"
" and 'minor' version numbers included in it. This allows for 'patch'"
" changes to the specification to be made without changing this property's"
" value in the serialization."
),
example=attribute.DEFAULT_SPECVERSION,
)
time: typing.Optional[datetime.datetime] = Field(
title=FIELD_DESCRIPTIONS["time"].get("title"),
description=FIELD_DESCRIPTIONS["time"].get("description"),
example=FIELD_DESCRIPTIONS["time"].get("example"),
default_factory=attribute.default_time_selection_algorithm,
title="Occurrence Time",
description=(
" Timestamp of when the occurrence happened. If the time of the occurrence"
" cannot be determined then this attribute MAY be set to some other time"
" (such as the current time) by the CloudEvents producer, however all"
" producers for the same source MUST be consistent in this respect. In"
" other words, either they all use the actual time of the occurrence or"
" they all use the same algorithm to determine the value used."
),
example="2018-04-05T17:31:00Z",
)
subject: typing.Optional[str] = Field(
title="Event Subject",
description=(
"This describes the subject of the event in the context of the event"
" producer (identified by source). In publish-subscribe scenarios, a"
" subscriber will typically subscribe to events emitted by a source, but"
" the source identifier alone might not be sufficient as a qualifier for"
" any specific event if the source context has internal"
" sub-structure.\n"
"\n"
"Identifying the subject of the event in context"
" metadata (opposed to only in the data payload) is particularly helpful in"
" generic subscription filtering scenarios where middleware is unable to"
" interpret the data content. In the above example, the subscriber might"
" only be interested in blobs with names ending with '.jpg' or '.jpeg' and"
" the subject attribute allows for constructing a simple and efficient"
" string-suffix filter for that subset of events."
),
example="123",
title=FIELD_DESCRIPTIONS["subject"].get("title"),
description=FIELD_DESCRIPTIONS["subject"].get("description"),
example=FIELD_DESCRIPTIONS["subject"].get("example"),
)
datacontenttype: typing.Optional[str] = Field(
title="Event Data Content Type",
description=(
"Content type of data value. This attribute enables data to carry any type"
" of content, whereby format and encoding might differ from that of the"
" chosen event format."
),
example="text/xml",
title=FIELD_DESCRIPTIONS["datacontenttype"].get("title"),
description=FIELD_DESCRIPTIONS["datacontenttype"].get("description"),
example=FIELD_DESCRIPTIONS["datacontenttype"].get("example"),
)
dataschema: typing.Optional[str] = Field(
title="Event Data Schema",
description=(
"Identifies the schema that data adheres to. "
"Incompatible changes to the schema SHOULD be reflected by a different URI"
),
title=FIELD_DESCRIPTIONS["dataschema"].get("title"),
description=FIELD_DESCRIPTIONS["dataschema"].get("description"),
example=FIELD_DESCRIPTIONS["dataschema"].get("example"),
)
def __init__( # type: ignore[no-untyped-def]

View File

@ -0,0 +1,18 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.pydantic.v2.conversion import from_dict, from_http, from_json
from cloudevents.pydantic.v2.event import CloudEvent
__all__ = ["CloudEvent", "from_json", "from_dict", "from_http"]

View File

@ -0,0 +1,75 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import typing
from cloudevents.conversion import from_dict as _abstract_from_dict
from cloudevents.conversion import from_http as _abstract_from_http
from cloudevents.conversion import from_json as _abstract_from_json
from cloudevents.pydantic.v2.event import CloudEvent
from cloudevents.sdk import types
def from_http(
headers: typing.Dict[str, str],
data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent:
"""
Parses CloudEvent `data` and `headers` into a CloudEvent.
The method supports both binary and structured representations.
:param headers: The HTTP request headers.
:param data: The HTTP request body. If set to None, "" or b'', the returned
event's `data` field will be set to None.
:param data_unmarshaller: Callable function to map data to a python object
e.g. lambda x: x or lambda x: json.loads(x)
:returns: A CloudEvent parsed from the passed HTTP parameters
"""
return _abstract_from_http(
headers=headers,
data=data,
data_unmarshaller=data_unmarshaller,
event_type=CloudEvent,
)
def from_json(
data: typing.AnyStr,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent:
"""
Parses JSON string `data` into a CloudEvent.
:param data: JSON string representation of a CloudEvent.
:param data_unmarshaller: Callable function that casts `data` to a
Python object.
:returns: A CloudEvent parsed from the given JSON representation.
"""
return _abstract_from_json(
data=data, data_unmarshaller=data_unmarshaller, event_type=CloudEvent
)
def from_dict(
event: typing.Dict[str, typing.Any],
) -> CloudEvent:
"""
Construct an CloudEvent from a dict `event` representation.
:param event: The event represented as a dict.
:returns: A CloudEvent parsed from the given dict representation.
"""
return _abstract_from_dict(CloudEvent, event)

View File

@ -0,0 +1,244 @@
# Copyright 2018-Present The CloudEvents Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import json
import typing
from typing import Any
from pydantic.deprecated import parse as _deprecated_parse
from cloudevents.exceptions import PydanticFeatureNotInstalled
from cloudevents.pydantic.fields_docs import FIELD_DESCRIPTIONS
try:
from pydantic import BaseModel, ConfigDict, Field, model_serializer
except ImportError: # pragma: no cover # hard to test
raise PydanticFeatureNotInstalled(
"CloudEvents pydantic feature is not installed. "
"Install it using pip install cloudevents[pydantic]"
)
from cloudevents import abstract, conversion
from cloudevents.exceptions import IncompatibleArgumentsError
from cloudevents.sdk.event import attribute
class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
"""
A Python-friendly CloudEvent representation backed by Pydantic-modeled fields.
Supports both binary and structured modes of the CloudEvents v1 specification.
"""
@classmethod
def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
) -> "CloudEvent":
return cls(attributes, data)
data: typing.Optional[typing.Any] = Field(
title=FIELD_DESCRIPTIONS["data"].get("title"),
description=FIELD_DESCRIPTIONS["data"].get("description"),
example=FIELD_DESCRIPTIONS["data"].get("example"),
default=None,
)
source: str = Field(
title=FIELD_DESCRIPTIONS["source"].get("title"),
description=FIELD_DESCRIPTIONS["source"].get("description"),
example=FIELD_DESCRIPTIONS["source"].get("example"),
)
id: str = Field(
title=FIELD_DESCRIPTIONS["id"].get("title"),
description=FIELD_DESCRIPTIONS["id"].get("description"),
example=FIELD_DESCRIPTIONS["id"].get("example"),
default_factory=attribute.default_id_selection_algorithm,
)
type: str = Field(
title=FIELD_DESCRIPTIONS["type"].get("title"),
description=FIELD_DESCRIPTIONS["type"].get("description"),
example=FIELD_DESCRIPTIONS["type"].get("example"),
)
specversion: attribute.SpecVersion = Field(
title=FIELD_DESCRIPTIONS["specversion"].get("title"),
description=FIELD_DESCRIPTIONS["specversion"].get("description"),
example=FIELD_DESCRIPTIONS["specversion"].get("example"),
default=attribute.DEFAULT_SPECVERSION,
)
time: typing.Optional[datetime.datetime] = Field(
title=FIELD_DESCRIPTIONS["time"].get("title"),
description=FIELD_DESCRIPTIONS["time"].get("description"),
example=FIELD_DESCRIPTIONS["time"].get("example"),
default_factory=attribute.default_time_selection_algorithm,
)
subject: typing.Optional[str] = Field(
title=FIELD_DESCRIPTIONS["subject"].get("title"),
description=FIELD_DESCRIPTIONS["subject"].get("description"),
example=FIELD_DESCRIPTIONS["subject"].get("example"),
default=None,
)
datacontenttype: typing.Optional[str] = Field(
title=FIELD_DESCRIPTIONS["datacontenttype"].get("title"),
description=FIELD_DESCRIPTIONS["datacontenttype"].get("description"),
example=FIELD_DESCRIPTIONS["datacontenttype"].get("example"),
default=None,
)
dataschema: typing.Optional[str] = Field(
title=FIELD_DESCRIPTIONS["dataschema"].get("title"),
description=FIELD_DESCRIPTIONS["dataschema"].get("description"),
example=FIELD_DESCRIPTIONS["dataschema"].get("example"),
default=None,
)
def __init__( # type: ignore[no-untyped-def]
self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None,
**kwargs,
):
"""
:param attributes: A dict with CloudEvent attributes.
Minimally expects the attributes 'type' and 'source'. If not given the
attributes 'specversion', 'id' or 'time', this will create
those attributes with default values.
If no attribute is given the class MUST use the kwargs as the attributes.
Example Attributes:
{
"specversion": "1.0",
"type": "com.github.pull_request.opened",
"source": "https://github.com/cloudevents/spec/pull",
"id": "A234-1234-1234",
"time": "2018-04-05T17:31:00Z",
}
:param data: Domain-specific information about the occurrence.
"""
if attributes:
if len(kwargs) != 0:
# To prevent API complexity and confusion.
raise IncompatibleArgumentsError(
"Attributes dict and kwargs are incompatible."
)
attributes = {k.lower(): v for k, v in attributes.items()}
kwargs.update(attributes)
super(CloudEvent, self).__init__(data=data, **kwargs)
model_config = ConfigDict(
extra="allow", # this is the way we implement extensions
json_schema_extra={
"example": {
"specversion": "1.0",
"type": "com.github.pull_request.opened",
"source": "https://github.com/cloudevents/spec/pull",
"subject": "123",
"id": "A234-1234-1234",
"time": "2018-04-05T17:31:00Z",
"comexampleextension1": "value",
"comexampleothervalue": 5,
"datacontenttype": "text/xml",
"data": '<much wow="xml"/>',
}
},
)
"""
We should use a @model_validator decorator to handle JSON deserialisation,
however it's not possible to completely bypass the internal pydantic logic
and still use the CloudEvents shared conversion logic.
Same issue applies to the multiple from/to JSON conversion logic in the
@model_serializer implemented after
To remove the need for the multiple from/to JSON transformation we need
major refactor in the SDK conversion logic.
"""
@classmethod
def model_validate_json(
cls,
json_data: typing.Union[str, bytes, bytearray],
*,
strict: typing.Optional[bool] = None,
context: typing.Optional[typing.Dict[str, Any]] = None,
) -> "CloudEvent":
return conversion.from_json(cls, json_data)
@classmethod
def parse_raw(
cls,
b: typing.Union[str, bytes],
*,
content_type: typing.Optional[str] = None,
encoding: str = "utf8",
proto: typing.Optional[_deprecated_parse.Protocol] = None,
allow_pickle: bool = False,
) -> "CloudEvent":
return conversion.from_json(cls, b)
@model_serializer(when_used="json")
def _ce_json_dumps(self) -> typing.Dict[str, typing.Any]:
"""Performs Pydantic-specific serialization of the event when
serializing the model using `.model_dump_json()` method.
Needed by the pydantic base-model to serialize the event correctly to json.
Without this function the data will be incorrectly serialized.
:param self: CloudEvent.
:return: Event serialized as a standard CloudEvent dict with user specific
parameters.
"""
# Here mypy complains about json.loads returning Any
# which is incompatible with this method return type
# but we know it's always a dictionary in this case
return json.loads(conversion.to_json(self)) # type: ignore
def _get_attributes(self) -> typing.Dict[str, typing.Any]:
return {
key: conversion.best_effort_encode_attribute_value(value)
for key, value in self.__dict__.items()
if key not in ["data"]
}
def get_data(self) -> typing.Optional[typing.Any]:
return self.data
def __setitem__(self, key: str, value: typing.Any) -> None:
"""
Set event attribute value
MUST NOT set event data with this method, use `.data` member instead
Method SHOULD mimic `cloudevents.http.event.CloudEvent` interface
:param key: Event attribute name
:param value: New event attribute value
"""
if key != "data": # to mirror the behaviour of the http event
setattr(self, key, value)
else:
pass # It is de-facto ignored by the http event
def __delitem__(self, key: str) -> None:
"""
SHOULD raise `KeyError` if no event attribute for the given key exists.
Method SHOULD mimic `cloudevents.http.event.CloudEvent` interface
:param key: The event attribute name.
"""
if key == "data":
raise KeyError(key) # to mirror the behaviour of the http event
delattr(self, key)

View File

@ -15,19 +15,15 @@ import datetime
from json import loads
import pytest
from pydantic import VERSION as PYDANTIC_VERSION
from pydantic import ValidationError as PydanticV2ValidationError
from pydantic.v1 import ValidationError as PydanticV1ValidationError
from cloudevents.conversion import _json_or_string
from cloudevents.exceptions import IncompatibleArgumentsError
from cloudevents.pydantic import CloudEvent
from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent
from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent
from cloudevents.sdk.event.attribute import SpecVersion
pydantic_major_version = PYDANTIC_VERSION.split(".")[0]
if pydantic_major_version == "2":
from pydantic.v1 import ValidationError
else:
from pydantic import ValidationError
_DUMMY_SOURCE = "dummy:source"
_DUMMY_TYPE = "tests.cloudevents.override"
_DUMMY_TIME = "2022-07-16T11:20:34.284130+00:00"
@ -39,6 +35,25 @@ def specversion(request):
return request.param
_pydantic_implementation = {
"v1": {
"event": PydanticV1CloudEvent,
"validation_error": PydanticV1ValidationError,
"pydantic_version": "v1",
},
"v2": {
"event": PydanticV2CloudEvent,
"validation_error": PydanticV2ValidationError,
"pydantic_version": "v2",
},
}
@pytest.fixture(params=["v1", "v2"])
def cloudevents_implementation(request):
return _pydantic_implementation[request.param]
@pytest.fixture()
def dummy_attributes(specversion):
return {
@ -64,8 +79,10 @@ def your_dummy_data():
@pytest.fixture()
def dummy_event(dummy_attributes, my_dummy_data):
return CloudEvent(attributes=dummy_attributes, data=my_dummy_data)
def dummy_event(dummy_attributes, my_dummy_data, cloudevents_implementation):
return cloudevents_implementation["event"](
attributes=dummy_attributes, data=my_dummy_data
)
@pytest.fixture()
@ -75,10 +92,12 @@ def non_exiting_attribute_name(dummy_event):
return result
def test_pydantic_cloudevent_equality(dummy_attributes, my_dummy_data, your_dummy_data):
def test_pydantic_cloudevent_equality(
dummy_attributes, my_dummy_data, your_dummy_data, cloudevents_implementation
):
data = my_dummy_data
event1 = CloudEvent(dummy_attributes, data)
event2 = CloudEvent(dummy_attributes, data)
event1 = cloudevents_implementation["event"](dummy_attributes, data)
event2 = cloudevents_implementation["event"](dummy_attributes, data)
assert event1 == event2
# Test different attributes
for key in dummy_attributes:
@ -86,15 +105,15 @@ def test_pydantic_cloudevent_equality(dummy_attributes, my_dummy_data, your_dumm
continue
else:
dummy_attributes[key] = f"noise-{key}"
event3 = CloudEvent(dummy_attributes, data)
event2 = CloudEvent(dummy_attributes, data)
event3 = cloudevents_implementation["event"](dummy_attributes, data)
event2 = cloudevents_implementation["event"](dummy_attributes, data)
assert event2 == event3
assert event1 != event2 and event3 != event1
# Test different data
data = your_dummy_data
event3 = CloudEvent(dummy_attributes, data)
event2 = CloudEvent(dummy_attributes, data)
event3 = cloudevents_implementation["event"](dummy_attributes, data)
event2 = cloudevents_implementation["event"](dummy_attributes, data)
assert event2 == event3
assert event1 != event2 and event3 != event1
@ -115,12 +134,12 @@ def test_http_cloudevent_must_not_equal_to_non_cloudevent_value(
def test_http_cloudevent_mutates_equality(
dummy_attributes, my_dummy_data, your_dummy_data
dummy_attributes, my_dummy_data, your_dummy_data, cloudevents_implementation
):
data = my_dummy_data
event1 = CloudEvent(dummy_attributes, data)
event2 = CloudEvent(dummy_attributes, data)
event3 = CloudEvent(dummy_attributes, data)
event1 = cloudevents_implementation["event"](dummy_attributes, data)
event2 = cloudevents_implementation["event"](dummy_attributes, data)
event3 = cloudevents_implementation["event"](dummy_attributes, data)
assert event1 == event2
# Test different attributes
@ -140,29 +159,40 @@ def test_http_cloudevent_mutates_equality(
assert event1 != event2 and event3 != event1
def test_cloudevent_missing_specversion():
def test_cloudevent_missing_specversion(cloudevents_implementation):
errors = {
"v1": "value is not a valid enumeration member; permitted: '0.3', '1.0'",
"v2": "Input should be '0.3' or '1.0'",
}
attributes = {"specversion": "0.2", "source": "s", "type": "t"}
with pytest.raises(ValidationError) as e:
_ = CloudEvent(attributes, None)
assert "value is not a valid enumeration member; permitted: '0.3', '1.0'" in str(
e.value
)
with pytest.raises(cloudevents_implementation["validation_error"]) as e:
_ = cloudevents_implementation["event"](attributes, None)
assert errors[cloudevents_implementation["pydantic_version"]] in str(e.value)
def test_cloudevent_missing_minimal_required_fields():
def test_cloudevent_missing_minimal_required_fields(cloudevents_implementation):
attributes = {"type": "t"}
with pytest.raises(ValidationError) as e:
_ = CloudEvent(attributes, None)
assert "\nsource\n field required " in str(e.value)
errors = {
"v1": "\nsource\n field required ",
"v2": "\nsource\n Field required ",
}
with pytest.raises(cloudevents_implementation["validation_error"]) as e:
_ = cloudevents_implementation["event"](attributes, None)
assert errors[cloudevents_implementation["pydantic_version"]] in str(e.value)
attributes = {"source": "s"}
with pytest.raises(ValidationError) as e:
_ = CloudEvent(attributes, None)
assert "\ntype\n field required " in str(e.value)
errors = {
"v1": "\ntype\n field required ",
"v2": "\ntype\n Field required ",
}
with pytest.raises(cloudevents_implementation["validation_error"]) as e:
_ = cloudevents_implementation["event"](attributes, None)
assert errors[cloudevents_implementation["pydantic_version"]] in str(e.value)
def test_cloudevent_general_overrides():
event = CloudEvent(
def test_cloudevent_general_overrides(cloudevents_implementation):
event = cloudevents_implementation["event"](
{
"source": "my-source",
"type": "com.test.overrides",
@ -223,9 +253,9 @@ def test_get_operation_on_non_existing_attribute_should_not_copy_default_value(
@pytest.mark.xfail() # https://github.com/cloudevents/sdk-python/issues/185
def test_json_data_serialization_without_explicit_type():
def test_json_data_serialization_without_explicit_type(cloudevents_implementation):
assert loads(
CloudEvent(
cloudevents_implementation["event"](
source=_DUMMY_SOURCE, type=_DUMMY_TYPE, data='{"hello": "world"}'
).json()
)["data"] == {"hello": "world"}
@ -242,17 +272,15 @@ def test_json_data_serialization_without_explicit_type():
],
)
def test_json_data_serialization_with_explicit_json_content_type(
dummy_attributes, json_content_type
dummy_attributes, json_content_type, cloudevents_implementation
):
dummy_attributes["datacontenttype"] = json_content_type
assert loads(
CloudEvent(
cloudevents_implementation["event"](
dummy_attributes,
data='{"hello": "world"}',
).json()
)[
"data"
] == {"hello": "world"}
)["data"] == {"hello": "world"}
_NON_JSON_CONTENT_TYPES = [
@ -275,10 +303,10 @@ _NON_JSON_CONTENT_TYPES = [
@pytest.mark.parametrize("datacontenttype", _NON_JSON_CONTENT_TYPES)
def test_json_data_serialization_with_explicit_non_json_content_type(
dummy_attributes, datacontenttype
dummy_attributes, datacontenttype, cloudevents_implementation
):
dummy_attributes["datacontenttype"] = datacontenttype
event = CloudEvent(
event = cloudevents_implementation["event"](
dummy_attributes,
data='{"hello": "world"}',
).json()
@ -286,18 +314,20 @@ def test_json_data_serialization_with_explicit_non_json_content_type(
@pytest.mark.parametrize("datacontenttype", _NON_JSON_CONTENT_TYPES)
def test_binary_data_serialization(dummy_attributes, datacontenttype):
def test_binary_data_serialization(
dummy_attributes, datacontenttype, cloudevents_implementation
):
dummy_attributes["datacontenttype"] = datacontenttype
event = CloudEvent(
event = cloudevents_implementation["event"](
dummy_attributes,
data=b"\x00\x00\x11Hello World",
).json()
result_json = loads(event)
assert result_json["data_base64"] == "AAARSGVsbG8gV29ybGQ="
assert "daata" not in result_json
assert "data" not in result_json
def test_binary_data_deserialization():
def test_binary_data_deserialization(cloudevents_implementation):
given = (
b'{"source": "dummy:source", "id": "11775cb2-fd00-4487-a18b-30c3600eaa5f",'
b' "type": "dummy.type", "specversion": "1.0", "time":'
@ -318,7 +348,12 @@ def test_binary_data_deserialization():
),
"type": "dummy.type",
}
assert CloudEvent.parse_raw(given).dict() == expected
assert cloudevents_implementation["event"].parse_raw(given).dict() == expected
if cloudevents_implementation["pydantic_version"] == "v2":
assert (
cloudevents_implementation["event"].model_validate_json(given).dict()
== expected
)
def test_access_data_event_attribute_should_raise_key_error(dummy_event):
@ -355,6 +390,6 @@ def test_data_must_never_exist_as_an_attribute_name(dummy_event):
assert "data" not in dummy_event
def test_attributes_and_kwards_are_incompatible():
def test_attributes_and_kwards_are_incompatible(cloudevents_implementation):
with pytest.raises(IncompatibleArgumentsError):
CloudEvent({"a": "b"}, other="hello world")
cloudevents_implementation["event"]({"a": "b"}, other="hello world")

View File

@ -17,9 +17,16 @@ import datetime
import json
import pytest
from pydantic import ValidationError as PydanticV2ValidationError
from pydantic.v1 import ValidationError as PydanticV1ValidationError
from cloudevents.conversion import to_json
from cloudevents.pydantic import CloudEvent, from_dict, from_json
from cloudevents.pydantic.v1.conversion import from_dict as pydantic_v1_from_dict
from cloudevents.pydantic.v1.conversion import from_json as pydantic_v1_from_json
from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent
from cloudevents.pydantic.v2.conversion import from_dict as pydantic_v2_from_dict
from cloudevents.pydantic.v2.conversion import from_json as pydantic_v2_from_json
from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent
from cloudevents.sdk.event.attribute import SpecVersion
test_data = json.dumps({"data-key": "val"})
@ -29,9 +36,32 @@ test_attributes = {
}
_pydantic_implementation = {
"v1": {
"event": PydanticV1CloudEvent,
"validation_error": PydanticV1ValidationError,
"from_dict": pydantic_v1_from_dict,
"from_json": pydantic_v1_from_json,
"pydantic_version": "v1",
},
"v2": {
"event": PydanticV2CloudEvent,
"validation_error": PydanticV2ValidationError,
"from_dict": pydantic_v2_from_dict,
"from_json": pydantic_v2_from_json,
"pydantic_version": "v2",
},
}
@pytest.fixture(params=["v1", "v2"])
def cloudevents_implementation(request):
return _pydantic_implementation[request.param]
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_to_json(specversion):
event = CloudEvent(test_attributes, test_data)
def test_to_json(specversion, cloudevents_implementation):
event = cloudevents_implementation["event"](test_attributes, test_data)
event_json = to_json(event)
event_dict = json.loads(event_json)
@ -42,10 +72,10 @@ def test_to_json(specversion):
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_to_json_base64(specversion):
def test_to_json_base64(specversion, cloudevents_implementation):
data = b"test123"
event = CloudEvent(test_attributes, data)
event = cloudevents_implementation["event"](test_attributes, data)
event_json = to_json(event)
event_dict = json.loads(event_json)
@ -60,7 +90,7 @@ def test_to_json_base64(specversion):
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_from_json(specversion):
def test_from_json(specversion, cloudevents_implementation):
payload = {
"type": "com.example.string",
"source": "https://example.com/event-producer",
@ -68,7 +98,7 @@ def test_from_json(specversion):
"specversion": specversion,
"data": {"data-key": "val"},
}
event = from_json(json.dumps(payload))
event = cloudevents_implementation["from_json"](json.dumps(payload))
for key, val in payload.items():
if key == "data":
@ -78,7 +108,7 @@ def test_from_json(specversion):
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_from_json_base64(specversion):
def test_from_json_base64(specversion, cloudevents_implementation):
# Create base64 encoded data
raw_data = {"data-key": "val"}
data = json.dumps(raw_data).encode()
@ -95,7 +125,7 @@ def test_from_json_base64(specversion):
payload_json = json.dumps(payload)
# Create event
event = from_json(payload_json)
event = cloudevents_implementation["from_json"](payload_json)
# Test fields were marshalled properly
for key, val in payload.items():
@ -107,11 +137,11 @@ def test_from_json_base64(specversion):
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_json_can_talk_to_itself(specversion):
event = CloudEvent(test_attributes, test_data)
def test_json_can_talk_to_itself(specversion, cloudevents_implementation):
event = cloudevents_implementation["event"](test_attributes, test_data)
event_json = to_json(event)
event = from_json(event_json)
event = cloudevents_implementation["from_json"](event_json)
for key, val in test_attributes.items():
assert event[key] == val
@ -119,20 +149,20 @@ def test_json_can_talk_to_itself(specversion):
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_json_can_talk_to_itself_base64(specversion):
def test_json_can_talk_to_itself_base64(specversion, cloudevents_implementation):
data = b"test123"
event = CloudEvent(test_attributes, data)
event = cloudevents_implementation["event"](test_attributes, data)
event_json = to_json(event)
event = from_json(event_json)
event = cloudevents_implementation["from_json"](event_json)
for key, val in test_attributes.items():
assert event[key] == val
assert event.data == data
def test_from_dict():
def test_from_dict(cloudevents_implementation):
given = {
"data": b"\x00\x00\x11Hello World",
"datacontenttype": "application/octet-stream",
@ -146,12 +176,4 @@ def test_from_dict():
),
"type": "dummy.type",
}
assert from_dict(given).dict() == given
@pytest.mark.parametrize("specversion", ["0.3", "1.0"])
def test_pydantic_json_function_parameters_must_affect_output(specversion):
event = CloudEvent(test_attributes, test_data)
v1 = event.json(indent=2, sort_keys=True)
v2 = event.json(indent=4, sort_keys=True)
assert v1 != v2
assert cloudevents_implementation["from_dict"](given).dict() == given

View File

@ -18,11 +18,16 @@ import json
import typing
import pytest
from pydantic import ValidationError as PydanticV2ValidationError
from pydantic.v1 import ValidationError as PydanticV1ValidationError
from sanic import Sanic, response
import cloudevents.exceptions as cloud_exceptions
from cloudevents.conversion import to_binary, to_structured
from cloudevents.pydantic import CloudEvent, from_http
from cloudevents.pydantic.v1.conversion import from_http as pydantic_v1_from_http
from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent
from cloudevents.pydantic.v2.conversion import from_http as pydantic_v2_from_http
from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent
from cloudevents.sdk import converters
from cloudevents.sdk.converters.binary import is_binary
from cloudevents.sdk.converters.structured import is_structured
@ -65,13 +70,35 @@ test_data = {"payload-content": "Hello World!"}
app = Sanic("test_pydantic_http_events")
_pydantic_implementation = {
"v1": {
"event": PydanticV1CloudEvent,
"validation_error": PydanticV1ValidationError,
"from_http": pydantic_v1_from_http,
"pydantic_version": "v1",
},
"v2": {
"event": PydanticV2CloudEvent,
"validation_error": PydanticV2ValidationError,
"from_http": pydantic_v2_from_http,
"pydantic_version": "v2",
},
}
@app.route("/event", ["POST"])
async def echo(request):
@pytest.fixture(params=["v1", "v2"])
def cloudevents_implementation(request):
return _pydantic_implementation[request.param]
@app.route("/event/<pydantic_version>", ["POST"])
async def echo(request, pydantic_version):
decoder = None
if "binary-payload" in request.headers:
decoder = lambda x: x
event = from_http(dict(request.headers), request.body, data_unmarshaller=decoder)
event = _pydantic_implementation[pydantic_version]["from_http"](
dict(request.headers), request.body, data_unmarshaller=decoder
)
data = (
event.data
if isinstance(event.data, (bytes, bytearray, memoryview))
@ -81,28 +108,28 @@ async def echo(request):
@pytest.mark.parametrize("body", invalid_cloudevent_request_body)
def test_missing_required_fields_structured(body):
def test_missing_required_fields_structured(body, cloudevents_implementation):
with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = from_http(
_ = cloudevents_implementation["from_http"](
{"Content-Type": "application/cloudevents+json"}, json.dumps(body)
)
@pytest.mark.parametrize("headers", invalid_test_headers)
def test_missing_required_fields_binary(headers):
def test_missing_required_fields_binary(headers, cloudevents_implementation):
with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = from_http(headers, json.dumps(test_data))
_ = cloudevents_implementation["from_http"](headers, json.dumps(test_data))
@pytest.mark.parametrize("headers", invalid_test_headers)
def test_missing_required_fields_empty_data_binary(headers):
def test_missing_required_fields_empty_data_binary(headers, cloudevents_implementation):
# Test for issue #115
with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = from_http(headers, None)
_ = cloudevents_implementation["from_http"](headers, None)
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_emit_binary_event(specversion):
def test_emit_binary_event(specversion, cloudevents_implementation):
headers = {
"ce-id": "my-id",
"ce-source": "<event-source>",
@ -111,7 +138,11 @@ def test_emit_binary_event(specversion):
"Content-Type": "text/plain",
}
data = json.dumps(test_data)
_, r = app.test_client.post("/event", headers=headers, data=data)
_, r = app.test_client.post(
f"/event/{cloudevents_implementation['pydantic_version']}",
headers=headers,
data=data,
)
# Convert byte array to dict
# e.g. r.body = b'{"payload-content": "Hello World!"}'
@ -128,7 +159,7 @@ def test_emit_binary_event(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_emit_structured_event(specversion):
def test_emit_structured_event(specversion, cloudevents_implementation):
headers = {"Content-Type": "application/cloudevents+json"}
body = {
"id": "my-id",
@ -137,7 +168,11 @@ def test_emit_structured_event(specversion):
"specversion": specversion,
"data": test_data,
}
_, r = app.test_client.post("/event", headers=headers, data=json.dumps(body))
_, r = app.test_client.post(
f"/event/{cloudevents_implementation['pydantic_version']}",
headers=headers,
data=json.dumps(body),
)
# Convert byte array to dict
# e.g. r.body = b'{"payload-content": "Hello World!"}'
@ -153,7 +188,7 @@ def test_emit_structured_event(specversion):
"converter", [converters.TypeBinary, converters.TypeStructured]
)
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_roundtrip_non_json_event(converter, specversion):
def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementation):
input_data = io.BytesIO()
for _ in range(100):
for j in range(20):
@ -161,7 +196,7 @@ def test_roundtrip_non_json_event(converter, specversion):
compressed_data = bz2.compress(input_data.getvalue())
attrs = {"source": "test", "type": "t"}
event = CloudEvent(attrs, compressed_data)
event = cloudevents_implementation["event"](attrs, compressed_data)
if converter == converters.TypeStructured:
headers, data = to_structured(event, data_marshaller=lambda x: x)
@ -169,7 +204,11 @@ def test_roundtrip_non_json_event(converter, specversion):
headers, data = to_binary(event, data_marshaller=lambda x: x)
headers["binary-payload"] = "true" # Decoding hint for server
_, r = app.test_client.post("/event", headers=headers, data=data)
_, r = app.test_client.post(
f"/event/{cloudevents_implementation['pydantic_version']}",
headers=headers,
data=data,
)
assert r.status_code == 200
for key in attrs:
@ -178,7 +217,7 @@ def test_roundtrip_non_json_event(converter, specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_missing_ce_prefix_binary_event(specversion):
def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation):
prefixed_headers = {}
headers = {
"ce-id": "my-id",
@ -195,11 +234,13 @@ def test_missing_ce_prefix_binary_event(specversion):
# and NotImplementedError because structured calls aren't
# implemented. In this instance one of the required keys should have
# prefix e-id instead of ce-id therefore it should throw
_ = from_http(prefixed_headers, json.dumps(test_data))
_ = cloudevents_implementation["from_http"](
prefixed_headers, json.dumps(test_data)
)
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_valid_binary_events(specversion):
def test_valid_binary_events(specversion, cloudevents_implementation):
# Test creating multiple cloud events
events_queue = []
headers = {}
@ -212,7 +253,9 @@ def test_valid_binary_events(specversion):
"ce-specversion": specversion,
}
data = {"payload": f"payload-{i}"}
events_queue.append(from_http(headers, json.dumps(data)))
events_queue.append(
cloudevents_implementation["from_http"](headers, json.dumps(data))
)
for i, event in enumerate(events_queue):
data = event.data
@ -223,7 +266,7 @@ def test_valid_binary_events(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_structured_to_request(specversion):
def test_structured_to_request(specversion, cloudevents_implementation):
attributes = {
"specversion": specversion,
"type": "word.found.name",
@ -232,7 +275,7 @@ def test_structured_to_request(specversion):
}
data = {"message": "Hello World!"}
event = CloudEvent(attributes, data)
event = cloudevents_implementation["event"](attributes, data)
headers, body_bytes = to_structured(event)
assert isinstance(body_bytes, bytes)
body = json.loads(body_bytes)
@ -244,7 +287,7 @@ def test_structured_to_request(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_attributes_view_accessor(specversion: str):
def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
attributes: dict[str, typing.Any] = {
"specversion": specversion,
"type": "word.found.name",
@ -253,7 +296,9 @@ def test_attributes_view_accessor(specversion: str):
}
data = {"message": "Hello World!"}
event: CloudEvent = CloudEvent(attributes, data)
event: cloudevents_implementation["event"] = cloudevents_implementation["event"](
attributes, data
)
event_attributes: typing.Mapping[str, typing.Any] = event.get_attributes()
assert event_attributes["specversion"] == attributes["specversion"]
assert event_attributes["type"] == attributes["type"]
@ -263,7 +308,7 @@ def test_attributes_view_accessor(specversion: str):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_binary_to_request(specversion):
def test_binary_to_request(specversion, cloudevents_implementation):
attributes = {
"specversion": specversion,
"type": "word.found.name",
@ -271,7 +316,7 @@ def test_binary_to_request(specversion):
"source": "pytest",
}
data = {"message": "Hello World!"}
event = CloudEvent(attributes, data)
event = cloudevents_implementation["event"](attributes, data)
headers, body_bytes = to_binary(event)
body = json.loads(body_bytes)
@ -282,7 +327,7 @@ def test_binary_to_request(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_empty_data_structured_event(specversion):
def test_empty_data_structured_event(specversion, cloudevents_implementation):
# Testing if cloudevent breaks when no structured data field present
attributes = {
"specversion": specversion,
@ -293,21 +338,21 @@ def test_empty_data_structured_event(specversion):
"source": "<source-url>",
}
event = from_http(
event = cloudevents_implementation["from_http"](
{"content-type": "application/cloudevents+json"}, json.dumps(attributes)
)
assert event.data is None
attributes["data"] = ""
# Data of empty string will be marshalled into None
event = from_http(
event = cloudevents_implementation["from_http"](
{"content-type": "application/cloudevents+json"}, json.dumps(attributes)
)
assert event.data is None
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_empty_data_binary_event(specversion):
def test_empty_data_binary_event(specversion, cloudevents_implementation):
# Testing if cloudevent breaks when no structured data field present
headers = {
"Content-Type": "application/octet-stream",
@ -317,17 +362,17 @@ def test_empty_data_binary_event(specversion):
"ce-time": "2018-10-23T12:28:22.4579346Z",
"ce-source": "<source-url>",
}
event = from_http(headers, None)
event = cloudevents_implementation["from_http"](headers, None)
assert event.data is None
data = ""
# Data of empty string will be marshalled into None
event = from_http(headers, data)
event = cloudevents_implementation["from_http"](headers, data)
assert event.data is None
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_valid_structured_events(specversion):
def test_valid_structured_events(specversion, cloudevents_implementation):
# Test creating multiple cloud events
events_queue = []
num_cloudevents = 30
@ -340,7 +385,7 @@ def test_valid_structured_events(specversion):
"data": {"payload": f"payload-{i}"},
}
events_queue.append(
from_http(
cloudevents_implementation["from_http"](
{"content-type": "application/cloudevents+json"},
json.dumps(event),
)
@ -354,7 +399,7 @@ def test_valid_structured_events(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_structured_no_content_type(specversion):
def test_structured_no_content_type(specversion, cloudevents_implementation):
# Test creating multiple cloud events
data = {
"id": "id",
@ -363,7 +408,7 @@ def test_structured_no_content_type(specversion):
"specversion": specversion,
"data": test_data,
}
event = from_http({}, json.dumps(data))
event = cloudevents_implementation["from_http"]({}, json.dumps(data))
assert event["id"] == "id"
assert event["source"] == "source.com.test"
@ -392,7 +437,7 @@ def test_is_binary():
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_cloudevent_repr(specversion):
def test_cloudevent_repr(specversion, cloudevents_implementation):
headers = {
"Content-Type": "application/octet-stream",
"ce-specversion": specversion,
@ -401,7 +446,7 @@ def test_cloudevent_repr(specversion):
"ce-time": "2018-10-23T12:28:22.4579346Z",
"ce-source": "<source-url>",
}
event = from_http(headers, "")
event = cloudevents_implementation["from_http"](headers, "")
# Testing to make sure event is printable. I could run event. __repr__() but
# we had issues in the past where event.__repr__() could run but
# print(event) would fail.
@ -409,8 +454,8 @@ def test_cloudevent_repr(specversion):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_none_data_cloudevent(specversion):
event = CloudEvent(
def test_none_data_cloudevent(specversion, cloudevents_implementation):
event = cloudevents_implementation["event"](
{
"source": "<my-url>",
"type": "issue.example",
@ -421,7 +466,7 @@ def test_none_data_cloudevent(specversion):
to_structured(event)
def test_wrong_specversion():
def test_wrong_specversion(cloudevents_implementation):
headers = {"Content-Type": "application/cloudevents+json"}
data = json.dumps(
{
@ -432,20 +477,20 @@ def test_wrong_specversion():
}
)
with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e:
from_http(headers, data)
cloudevents_implementation["from_http"](headers, data)
assert "Found invalid specversion 0.2" in str(e.value)
def test_invalid_data_format_structured_from_http():
def test_invalid_data_format_structured_from_http(cloudevents_implementation):
headers = {"Content-Type": "application/cloudevents+json"}
data = 20
with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e:
from_http(headers, data)
cloudevents_implementation["from_http"](headers, data)
assert "Expected json of type (str, bytes, bytearray)" in str(e.value)
def test_wrong_specversion_to_request():
event = CloudEvent({"source": "s", "type": "t"}, None)
def test_wrong_specversion_to_request(cloudevents_implementation):
event = cloudevents_implementation["event"]({"source": "s", "type": "t"}, None)
with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e:
event["specversion"] = "0.2"
to_binary(event)
@ -468,22 +513,22 @@ def test_is_structured():
assert not is_structured(headers)
def test_empty_json_structured():
def test_empty_json_structured(cloudevents_implementation):
headers = {"Content-Type": "application/cloudevents+json"}
data = ""
with pytest.raises(cloud_exceptions.MissingRequiredFields) as e:
from_http(headers, data)
cloudevents_implementation["from_http"](headers, data)
assert "Failed to read specversion from both headers and data" in str(e.value)
def test_uppercase_headers_with_none_data_binary():
def test_uppercase_headers_with_none_data_binary(cloudevents_implementation):
headers = {
"Ce-Id": "my-id",
"Ce-Source": "<event-source>",
"Ce-Type": "cloudevent.event.type",
"Ce-Specversion": "1.0",
}
event = from_http(headers, None)
event = cloudevents_implementation["from_http"](headers, None)
for key in headers:
assert event[key.lower()[3:]] == headers[key]
@ -493,7 +538,7 @@ def test_uppercase_headers_with_none_data_binary():
assert new_data is None
def test_generic_exception():
def test_generic_exception(cloudevents_implementation):
headers = {"Content-Type": "application/cloudevents+json"}
data = json.dumps(
{
@ -505,28 +550,30 @@ def test_generic_exception():
}
)
with pytest.raises(cloud_exceptions.GenericException) as e:
from_http({}, None)
cloudevents_implementation["from_http"]({}, None)
e.errisinstance(cloud_exceptions.MissingRequiredFields)
with pytest.raises(cloud_exceptions.GenericException) as e:
from_http({}, 123)
cloudevents_implementation["from_http"]({}, 123)
e.errisinstance(cloud_exceptions.InvalidStructuredJSON)
with pytest.raises(cloud_exceptions.GenericException) as e:
from_http(headers, data, data_unmarshaller=lambda x: 1 / 0)
cloudevents_implementation["from_http"](
headers, data, data_unmarshaller=lambda x: 1 / 0
)
e.errisinstance(cloud_exceptions.DataUnmarshallerError)
with pytest.raises(cloud_exceptions.GenericException) as e:
event = from_http(headers, data)
event = cloudevents_implementation["from_http"](headers, data)
to_binary(event, data_marshaller=lambda x: 1 / 0)
e.errisinstance(cloud_exceptions.DataMarshallerError)
def test_non_dict_data_no_headers_bug():
def test_non_dict_data_no_headers_bug(cloudevents_implementation):
# Test for issue #116
headers = {"Content-Type": "application/cloudevents+json"}
data = "123"
with pytest.raises(cloud_exceptions.MissingRequiredFields) as e:
from_http(headers, data)
cloudevents_implementation["from_http"](headers, data)
assert "Failed to read specversion from both headers and data" in str(e.value)
assert "The following deserialized data has no 'get' method" in str(e.value)

View File

@ -10,4 +10,4 @@ aiohttp
Pillow
requests
flask
pydantic>=1.0.0,<3.0
pydantic>=2.0.0,<3.0