This commit is contained in:
Hal Blackburn 2025-05-21 14:47:34 +00:00 committed by GitHub
commit 22fcfaf366
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 267 additions and 88 deletions

View File

@ -17,12 +17,12 @@ jobs:
- name: Install dev dependencies - name: Install dev dependencies
run: python -m pip install -r requirements/dev.txt run: python -m pip install -r requirements/dev.txt
- name: Run linting - name: Run linting
run: python -m tox -e lint run: python -m tox -e lint,mypy,mypy-samples-image,mypy-samples-json
test: test:
strategy: strategy:
matrix: matrix:
python: ['3.8', '3.9', '3.10', '3.11'] python: ['3.9', '3.10', '3.11', '3.12', '3.13']
os: [ubuntu-latest, windows-latest, macos-latest] os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:

View File

@ -32,7 +32,7 @@ class CloudEvent:
@classmethod @classmethod
def create( def create(
cls: typing.Type[AnyCloudEvent], cls: typing.Type[AnyCloudEvent],
attributes: typing.Dict[str, typing.Any], attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any], data: typing.Optional[typing.Any],
) -> AnyCloudEvent: ) -> AnyCloudEvent:
""" """

View File

@ -91,7 +91,9 @@ def from_json(
def from_http( def from_http(
event_type: typing.Type[AnyCloudEvent], event_type: typing.Type[AnyCloudEvent],
headers: typing.Mapping[str, str], headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.Union[str, bytes]], data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent: ) -> AnyCloudEvent:
@ -260,7 +262,7 @@ def best_effort_encode_attribute_value(value: typing.Any) -> typing.Any:
def from_dict( def from_dict(
event_type: typing.Type[AnyCloudEvent], event_type: typing.Type[AnyCloudEvent],
event: typing.Dict[str, typing.Any], event: typing.Mapping[str, typing.Any],
) -> AnyCloudEvent: ) -> AnyCloudEvent:
""" """
Constructs an Event object of a given `event_type` from Constructs an Event object of a given `event_type` from

View File

@ -37,7 +37,9 @@ def from_json(
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.Union[str, bytes]], data: typing.Optional[typing.Union[str, bytes]],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
@ -58,7 +60,7 @@ def from_http(
def from_dict( def from_dict(
event: typing.Dict[str, typing.Any], event: typing.Mapping[str, typing.Any],
) -> CloudEvent: ) -> CloudEvent:
""" """
Constructs a CloudEvent from a dict `event` representation. Constructs a CloudEvent from a dict `event` representation.

View File

@ -34,11 +34,13 @@ class CloudEvent(abstract.CloudEvent):
@classmethod @classmethod
def create( def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent": ) -> "CloudEvent":
return cls(attributes, data) return cls(attributes, data)
def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None): def __init__(self, attributes: typing.Mapping[str, str], data: typing.Any = None):
""" """
Event Constructor Event Constructor
:param attributes: a dict with cloudevent attributes. Minimally :param attributes: a dict with cloudevent attributes. Minimally

View File

@ -111,11 +111,29 @@ def to_binary(
return KafkaMessage(headers, message_key, data) return KafkaMessage(headers, message_key, data)
@typing.overload
def from_binary(
message: KafkaMessage,
event_type: None = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> http.CloudEvent:
pass
@typing.overload
def from_binary(
message: KafkaMessage,
event_type: typing.Type[AnyCloudEvent],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
pass
def from_binary( def from_binary(
message: KafkaMessage, message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent: ) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
""" """
Returns a CloudEvent from a KafkaMessage in binary format. Returns a CloudEvent from a KafkaMessage in binary format.
@ -144,10 +162,11 @@ def from_binary(
raise cloud_exceptions.DataUnmarshallerError( raise cloud_exceptions.DataUnmarshallerError(
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')" f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
) )
result: typing.Union[http.CloudEvent, AnyCloudEvent]
if event_type: if event_type:
result = event_type.create(attributes, data) result = event_type.create(attributes, data)
else: else:
result = http.CloudEvent.create(attributes, data) # type: ignore result = http.CloudEvent.create(attributes, data)
return result return result
@ -210,12 +229,32 @@ def to_structured(
return KafkaMessage(headers, message_key, value) return KafkaMessage(headers, message_key, value)
@typing.overload
def from_structured(
message: KafkaMessage,
event_type: None = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> http.CloudEvent:
pass
@typing.overload
def from_structured(
message: KafkaMessage,
event_type: typing.Type[AnyCloudEvent],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent:
pass
def from_structured( def from_structured(
message: KafkaMessage, message: KafkaMessage,
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyCloudEvent: ) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
""" """
Returns a CloudEvent from a KafkaMessage in structured format. Returns a CloudEvent from a KafkaMessage in structured format.
@ -264,8 +303,9 @@ def from_structured(
attributes["datacontenttype"] = val.decode() attributes["datacontenttype"] = val.decode()
else: else:
attributes[header.lower()] = val.decode() attributes[header.lower()] = val.decode()
result: typing.Union[AnyCloudEvent, http.CloudEvent]
if event_type: if event_type:
result = event_type.create(attributes, data) result = event_type.create(attributes, data)
else: else:
result = http.CloudEvent.create(attributes, data) # type: ignore result = http.CloudEvent.create(attributes, data)
return result return result

View File

@ -21,7 +21,9 @@ from cloudevents.sdk import types
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.AnyStr], data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
@ -63,7 +65,7 @@ def from_json(
def from_dict( def from_dict(
event: typing.Dict[str, typing.Any], event: typing.Mapping[str, typing.Any],
) -> CloudEvent: ) -> CloudEvent:
""" """
Construct an CloudEvent from a dict `event` representation. Construct an CloudEvent from a dict `event` representation.

View File

@ -100,7 +100,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
@classmethod @classmethod
def create( def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent": ) -> "CloudEvent":
return cls(attributes, data) return cls(attributes, data)
@ -155,7 +157,7 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
def __init__( # type: ignore[no-untyped-def] def __init__( # type: ignore[no-untyped-def]
self, self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None, attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None, data: typing.Optional[typing.Any] = None,
**kwargs, **kwargs,
): ):

View File

@ -22,7 +22,9 @@ from cloudevents.sdk import types
def from_http( def from_http(
headers: typing.Dict[str, str], headers: typing.Union[
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
],
data: typing.Optional[typing.AnyStr], data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> CloudEvent: ) -> CloudEvent:
@ -64,7 +66,7 @@ def from_json(
def from_dict( def from_dict(
event: typing.Dict[str, typing.Any], event: typing.Mapping[str, typing.Any],
) -> CloudEvent: ) -> CloudEvent:
""" """
Construct an CloudEvent from a dict `event` representation. Construct an CloudEvent from a dict `event` representation.

View File

@ -44,7 +44,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
@classmethod @classmethod
def create( def create(
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] cls,
attributes: typing.Mapping[str, typing.Any],
data: typing.Optional[typing.Any],
) -> "CloudEvent": ) -> "CloudEvent":
return cls(attributes, data) return cls(attributes, data)
@ -103,7 +105,7 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
def __init__( # type: ignore[no-untyped-def] def __init__( # type: ignore[no-untyped-def]
self, self,
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None, attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
data: typing.Optional[typing.Any] = None, data: typing.Optional[typing.Any] = None,
**kwargs, **kwargs,
): ):
@ -173,6 +175,8 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
*, *,
strict: typing.Optional[bool] = None, strict: typing.Optional[bool] = None,
context: typing.Optional[typing.Dict[str, Any]] = None, context: typing.Optional[typing.Dict[str, Any]] = None,
by_alias: typing.Optional[bool] = None,
by_name: typing.Optional[bool] = None,
) -> "CloudEvent": ) -> "CloudEvent":
return conversion.from_json(cls, json_data) return conversion.from_json(cls, json_data)

View File

@ -11,10 +11,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import typing import typing
from cloudevents.sdk.event import base, opt from cloudevents.sdk.event import base, opt
if typing.TYPE_CHECKING:
from typing_extensions import Self
class Event(base.BaseEvent): class Event(base.BaseEvent):
_ce_required_fields = {"id", "source", "type", "specversion"} _ce_required_fields = {"id", "source", "type", "specversion"}
@ -79,39 +84,39 @@ class Event(base.BaseEvent):
return {} return {}
return dict(result) return dict(result)
def SetEventType(self, eventType: str) -> base.BaseEvent: def SetEventType(self, eventType: str) -> Self:
self.Set("type", eventType) self.Set("type", eventType)
return self return self
def SetSource(self, source: str) -> base.BaseEvent: def SetSource(self, source: str) -> Self:
self.Set("source", source) self.Set("source", source)
return self return self
def SetEventID(self, eventID: str) -> base.BaseEvent: def SetEventID(self, eventID: str) -> Self:
self.Set("id", eventID) self.Set("id", eventID)
return self return self
def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent: def SetEventTime(self, eventTime: typing.Optional[str]) -> Self:
self.Set("time", eventTime) self.Set("time", eventTime)
return self return self
def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent: def SetSubject(self, subject: typing.Optional[str]) -> Self:
self.Set("subject", subject) self.Set("subject", subject)
return self return self
def SetSchema(self, schema: typing.Optional[str]) -> base.BaseEvent: def SetSchema(self, schema: typing.Optional[str]) -> Self:
self.Set("dataschema", schema) self.Set("dataschema", schema)
return self return self
def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent: def SetContentType(self, contentType: typing.Optional[str]) -> Self:
self.Set("datacontenttype", contentType) self.Set("datacontenttype", contentType)
return self return self
def SetData(self, data: typing.Optional[object]) -> base.BaseEvent: def SetData(self, data: typing.Optional[object]) -> Self:
self.Set("data", data) self.Set("data", data)
return self return self
def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent: def SetExtensions(self, extensions: typing.Optional[dict]) -> Self:
self.Set("extensions", extensions) self.Set("extensions", extensions)
return self return self

View File

@ -14,9 +14,25 @@
import typing import typing
_K_co = typing.TypeVar("_K_co", covariant=True)
_V_co = typing.TypeVar("_V_co", covariant=True)
# Use consistent types for marshal and unmarshal functions across # Use consistent types for marshal and unmarshal functions across
# both JSON and Binary format. # both JSON and Binary format.
MarshallerType = typing.Callable[[typing.Any], typing.AnyStr] MarshallerType = typing.Callable[[typing.Any], typing.AnyStr]
UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any] UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any]
class SupportsDuplicateItems(typing.Protocol[_K_co, _V_co]):
"""
Dict-like objects with an items() method that may produce duplicate keys.
"""
# This is wider than _typeshed.SupportsItems, which expects items() to
# return type an AbstractSet. werkzeug's Headers class satisfies this type,
# but not _typeshed.SupportsItems.
def items(self) -> typing.Iterable[typing.Tuple[_K_co, _V_co]]:
pass

View File

@ -21,7 +21,7 @@ from cloudevents.sdk.converters import base, binary
def test_binary_converter_raise_unsupported(): def test_binary_converter_raise_unsupported():
with pytest.raises(exceptions.UnsupportedEvent): with pytest.raises(exceptions.UnsupportedEvent):
cnvtr = binary.BinaryHTTPCloudEventConverter() cnvtr = binary.BinaryHTTPCloudEventConverter()
cnvtr.read(None, {}, None, None) cnvtr.read(None, {}, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501
def test_base_converters_raise_exceptions(): def test_base_converters_raise_exceptions():
@ -35,8 +35,8 @@ def test_base_converters_raise_exceptions():
with pytest.raises(Exception): with pytest.raises(Exception):
cnvtr = base.Converter() cnvtr = base.Converter()
cnvtr.write(None, None) cnvtr.write(None, None) # type: ignore[arg-type] # intentionally wrong type
with pytest.raises(Exception): with pytest.raises(Exception):
cnvtr = base.Converter() cnvtr = base.Converter()
cnvtr.read(None, None, None, None) cnvtr.read(None, None, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501

View File

@ -25,7 +25,7 @@ from cloudevents.tests import data
@pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) @pytest.mark.parametrize("event_class", [v03.Event, v1.Event])
def test_binary_converter_upstream(event_class): def test_binary_converter_upstream(event_class):
m = marshaller.NewHTTPMarshaller([binary.NewBinaryHTTPCloudEventConverter()]) m = marshaller.NewHTTPMarshaller([binary.NewBinaryHTTPCloudEventConverter()])
event = m.FromRequest(event_class(), data.headers[event_class], None, lambda x: x) event = m.FromRequest(event_class(), data.headers[event_class], b"", lambda x: x)
assert event is not None assert event is not None
assert event.EventType() == data.ce_type assert event.EventType() == data.ce_type
assert event.EventID() == data.ce_id assert event.EventID() == data.ce_id

View File

@ -77,7 +77,7 @@ def test_object_event_v1():
_, structured_body = m.ToRequest(event) _, structured_body = m.ToRequest(event)
assert isinstance(structured_body, bytes) assert isinstance(structured_body, bytes)
structured_obj = json.loads(structured_body) structured_obj = json.loads(structured_body)
error_msg = f"Body was {structured_body}, obj is {structured_obj}" error_msg = f"Body was {structured_body!r}, obj is {structured_obj}"
assert isinstance(structured_obj, dict), error_msg assert isinstance(structured_obj, dict), error_msg
assert isinstance(structured_obj["data"], dict), error_msg assert isinstance(structured_obj["data"], dict), error_msg
assert len(structured_obj["data"]) == 1, error_msg assert len(structured_obj["data"]) == 1, error_msg

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import bz2 import bz2
import io import io
@ -241,11 +242,11 @@ def test_structured_to_request(specversion):
assert headers["content-type"] == "application/cloudevents+json" assert headers["content-type"] == "application/cloudevents+json"
for key in attributes: for key in attributes:
assert body[key] == attributes[key] assert body[key] == attributes[key]
assert body["data"] == data, f"|{body_bytes}|| {body}" assert body["data"] == data, f"|{body_bytes!r}|| {body}"
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_attributes_view_accessor(specversion: str): def test_attributes_view_accessor(specversion: str) -> None:
attributes: dict[str, typing.Any] = { attributes: dict[str, typing.Any] = {
"specversion": specversion, "specversion": specversion,
"type": "word.found.name", "type": "word.found.name",
@ -333,7 +334,7 @@ def test_valid_structured_events(specversion):
events_queue = [] events_queue = []
num_cloudevents = 30 num_cloudevents = 30
for i in range(num_cloudevents): for i in range(num_cloudevents):
event = { raw_event = {
"id": f"id{i}", "id": f"id{i}",
"source": f"source{i}.com.test", "source": f"source{i}.com.test",
"type": "cloudevent.test.type", "type": "cloudevent.test.type",
@ -343,7 +344,7 @@ def test_valid_structured_events(specversion):
events_queue.append( events_queue.append(
from_http( from_http(
{"content-type": "application/cloudevents+json"}, {"content-type": "application/cloudevents+json"},
json.dumps(event), json.dumps(raw_event),
) )
) )
@ -454,7 +455,7 @@ def test_invalid_data_format_structured_from_http():
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = 20 data = 20
with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e: with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e:
from_http(headers, data) from_http(headers, data) # type: ignore[arg-type] # intentionally wrong type
assert "Expected json of type (str, bytes, bytearray)" in str(e.value) assert "Expected json of type (str, bytes, bytearray)" in str(e.value)
@ -526,7 +527,7 @@ def test_generic_exception():
e.errisinstance(cloud_exceptions.MissingRequiredFields) e.errisinstance(cloud_exceptions.MissingRequiredFields)
with pytest.raises(cloud_exceptions.GenericException) as e: with pytest.raises(cloud_exceptions.GenericException) as e:
from_http({}, 123) from_http({}, 123) # type: ignore[arg-type] # intentionally wrong type
e.errisinstance(cloud_exceptions.InvalidStructuredJSON) e.errisinstance(cloud_exceptions.InvalidStructuredJSON)
with pytest.raises(cloud_exceptions.GenericException) as e: with pytest.raises(cloud_exceptions.GenericException) as e:

View File

@ -19,6 +19,7 @@ import json
import pytest import pytest
from cloudevents import exceptions as cloud_exceptions from cloudevents import exceptions as cloud_exceptions
from cloudevents.abstract.event import AnyCloudEvent
from cloudevents.http import CloudEvent from cloudevents.http import CloudEvent
from cloudevents.kafka.conversion import ( from cloudevents.kafka.conversion import (
KafkaMessage, KafkaMessage,
@ -36,7 +37,9 @@ def simple_serialize(data: dict) -> bytes:
def simple_deserialize(data: bytes) -> dict: def simple_deserialize(data: bytes) -> dict:
return json.loads(data.decode()) value = json.loads(data.decode())
assert isinstance(value, dict)
return value
def failing_func(*args): def failing_func(*args):
@ -47,7 +50,7 @@ class KafkaConversionTestBase:
expected_data = {"name": "test", "amount": 1} expected_data = {"name": "test", "amount": 1}
expected_custom_mapped_key = "custom-key" expected_custom_mapped_key = "custom-key"
def custom_key_mapper(self, _) -> str: def custom_key_mapper(self, _: AnyCloudEvent) -> str:
return self.expected_custom_mapped_key return self.expected_custom_mapped_key
@pytest.fixture @pytest.fixture

View File

@ -50,14 +50,14 @@ def test_from_request_wrong_unmarshaller():
with pytest.raises(exceptions.InvalidDataUnmarshaller): with pytest.raises(exceptions.InvalidDataUnmarshaller):
m = marshaller.NewDefaultHTTPMarshaller() m = marshaller.NewDefaultHTTPMarshaller()
_ = m.FromRequest( _ = m.FromRequest(
event=v1.Event(), headers={}, body="", data_unmarshaller=object() event=v1.Event(), headers={}, body="", data_unmarshaller=object() # type: ignore[arg-type] # intentionally wrong type # noqa: E501
) )
def test_to_request_wrong_marshaller(): def test_to_request_wrong_marshaller():
with pytest.raises(exceptions.InvalidDataMarshaller): with pytest.raises(exceptions.InvalidDataMarshaller):
m = marshaller.NewDefaultHTTPMarshaller() m = marshaller.NewDefaultHTTPMarshaller()
_ = m.ToRequest(v1.Event(), data_marshaller="") _ = m.ToRequest(v1.Event(), data_marshaller="") # type: ignore[arg-type] # intentionally wrong type # noqa: E501
def test_from_request_cannot_read(binary_headers): def test_from_request_cannot_read(binary_headers):

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import bz2 import bz2
import io import io
@ -28,10 +29,13 @@ from cloudevents.pydantic.v1.conversion import from_http as pydantic_v1_from_htt
from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent
from cloudevents.pydantic.v2.conversion import from_http as pydantic_v2_from_http from cloudevents.pydantic.v2.conversion import from_http as pydantic_v2_from_http
from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent
from cloudevents.sdk import converters from cloudevents.sdk import converters, types
from cloudevents.sdk.converters.binary import is_binary from cloudevents.sdk.converters.binary import is_binary
from cloudevents.sdk.converters.structured import is_structured from cloudevents.sdk.converters.structured import is_structured
if typing.TYPE_CHECKING:
from typing_extensions import TypeAlias
invalid_test_headers = [ invalid_test_headers = [
{ {
"ce-source": "<event-source>", "ce-source": "<event-source>",
@ -70,7 +74,30 @@ test_data = {"payload-content": "Hello World!"}
app = Sanic("test_pydantic_http_events") app = Sanic("test_pydantic_http_events")
_pydantic_implementation = {
AnyPydanticCloudEvent: TypeAlias = typing.Union[
PydanticV1CloudEvent, PydanticV2CloudEvent
]
class FromHttpFn(typing.Protocol):
def __call__(
self,
headers: typing.Dict[str, str],
data: typing.Optional[typing.AnyStr],
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
) -> AnyPydanticCloudEvent:
pass
class PydanticImplementation(typing.TypedDict):
event: typing.Type[AnyPydanticCloudEvent]
validation_error: typing.Type[Exception]
from_http: FromHttpFn
pydantic_version: typing.Literal["v1", "v2"]
_pydantic_implementation: typing.Mapping[str, PydanticImplementation] = {
"v1": { "v1": {
"event": PydanticV1CloudEvent, "event": PydanticV1CloudEvent,
"validation_error": PydanticV1ValidationError, "validation_error": PydanticV1ValidationError,
@ -87,7 +114,9 @@ _pydantic_implementation = {
@pytest.fixture(params=["v1", "v2"]) @pytest.fixture(params=["v1", "v2"])
def cloudevents_implementation(request): def cloudevents_implementation(
request: pytest.FixtureRequest,
) -> PydanticImplementation:
return _pydantic_implementation[request.param] return _pydantic_implementation[request.param]
@ -108,7 +137,9 @@ async def echo(request, pydantic_version):
@pytest.mark.parametrize("body", invalid_cloudevent_request_body) @pytest.mark.parametrize("body", invalid_cloudevent_request_body)
def test_missing_required_fields_structured(body, cloudevents_implementation): def test_missing_required_fields_structured(
body: dict, cloudevents_implementation: PydanticImplementation
) -> None:
with pytest.raises(cloud_exceptions.MissingRequiredFields): with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = cloudevents_implementation["from_http"]( _ = cloudevents_implementation["from_http"](
{"Content-Type": "application/cloudevents+json"}, json.dumps(body) {"Content-Type": "application/cloudevents+json"}, json.dumps(body)
@ -116,20 +147,26 @@ def test_missing_required_fields_structured(body, cloudevents_implementation):
@pytest.mark.parametrize("headers", invalid_test_headers) @pytest.mark.parametrize("headers", invalid_test_headers)
def test_missing_required_fields_binary(headers, cloudevents_implementation): def test_missing_required_fields_binary(
headers: dict, cloudevents_implementation: PydanticImplementation
) -> None:
with pytest.raises(cloud_exceptions.MissingRequiredFields): with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = cloudevents_implementation["from_http"](headers, json.dumps(test_data)) _ = cloudevents_implementation["from_http"](headers, json.dumps(test_data))
@pytest.mark.parametrize("headers", invalid_test_headers) @pytest.mark.parametrize("headers", invalid_test_headers)
def test_missing_required_fields_empty_data_binary(headers, cloudevents_implementation): def test_missing_required_fields_empty_data_binary(
headers: dict, cloudevents_implementation: PydanticImplementation
) -> None:
# Test for issue #115 # Test for issue #115
with pytest.raises(cloud_exceptions.MissingRequiredFields): with pytest.raises(cloud_exceptions.MissingRequiredFields):
_ = cloudevents_implementation["from_http"](headers, None) _ = cloudevents_implementation["from_http"](headers, None)
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_emit_binary_event(specversion, cloudevents_implementation): def test_emit_binary_event(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
headers = { headers = {
"ce-id": "my-id", "ce-id": "my-id",
"ce-source": "<event-source>", "ce-source": "<event-source>",
@ -159,7 +196,9 @@ def test_emit_binary_event(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_emit_structured_event(specversion, cloudevents_implementation): def test_emit_structured_event(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
body = { body = {
"id": "my-id", "id": "my-id",
@ -188,7 +227,11 @@ def test_emit_structured_event(specversion, cloudevents_implementation):
"converter", [converters.TypeBinary, converters.TypeStructured] "converter", [converters.TypeBinary, converters.TypeStructured]
) )
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementation): def test_roundtrip_non_json_event(
converter: str,
specversion: str,
cloudevents_implementation: PydanticImplementation,
) -> None:
input_data = io.BytesIO() input_data = io.BytesIO()
for _ in range(100): for _ in range(100):
for j in range(20): for j in range(20):
@ -217,7 +260,9 @@ def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementa
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation): def test_missing_ce_prefix_binary_event(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
prefixed_headers = {} prefixed_headers = {}
headers = { headers = {
"ce-id": "my-id", "ce-id": "my-id",
@ -240,9 +285,11 @@ def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation)
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_valid_binary_events(specversion, cloudevents_implementation): def test_valid_binary_events(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
# Test creating multiple cloud events # Test creating multiple cloud events
events_queue = [] events_queue: list[AnyPydanticCloudEvent] = []
headers = {} headers = {}
num_cloudevents = 30 num_cloudevents = 30
for i in range(num_cloudevents): for i in range(num_cloudevents):
@ -258,7 +305,7 @@ def test_valid_binary_events(specversion, cloudevents_implementation):
) )
for i, event in enumerate(events_queue): for i, event in enumerate(events_queue):
data = event.data assert isinstance(event.data, dict)
assert event["id"] == f"id{i}" assert event["id"] == f"id{i}"
assert event["source"] == f"source{i}.com.test" assert event["source"] == f"source{i}.com.test"
assert event["specversion"] == specversion assert event["specversion"] == specversion
@ -266,7 +313,9 @@ def test_valid_binary_events(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_structured_to_request(specversion, cloudevents_implementation): def test_structured_to_request(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
attributes = { attributes = {
"specversion": specversion, "specversion": specversion,
"type": "word.found.name", "type": "word.found.name",
@ -283,11 +332,13 @@ def test_structured_to_request(specversion, cloudevents_implementation):
assert headers["content-type"] == "application/cloudevents+json" assert headers["content-type"] == "application/cloudevents+json"
for key in attributes: for key in attributes:
assert body[key] == attributes[key] assert body[key] == attributes[key]
assert body["data"] == data, f"|{body_bytes}|| {body}" assert body["data"] == data, f"|{body_bytes!r}|| {body}"
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_attributes_view_accessor(specversion: str, cloudevents_implementation): def test_attributes_view_accessor(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
attributes: dict[str, typing.Any] = { attributes: dict[str, typing.Any] = {
"specversion": specversion, "specversion": specversion,
"type": "word.found.name", "type": "word.found.name",
@ -296,9 +347,7 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
} }
data = {"message": "Hello World!"} data = {"message": "Hello World!"}
event: cloudevents_implementation["event"] = cloudevents_implementation["event"]( event = cloudevents_implementation["event"](attributes, data)
attributes, data
)
event_attributes: typing.Mapping[str, typing.Any] = event.get_attributes() event_attributes: typing.Mapping[str, typing.Any] = event.get_attributes()
assert event_attributes["specversion"] == attributes["specversion"] assert event_attributes["specversion"] == attributes["specversion"]
assert event_attributes["type"] == attributes["type"] assert event_attributes["type"] == attributes["type"]
@ -308,7 +357,9 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_binary_to_request(specversion, cloudevents_implementation): def test_binary_to_request(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
attributes = { attributes = {
"specversion": specversion, "specversion": specversion,
"type": "word.found.name", "type": "word.found.name",
@ -327,7 +378,9 @@ def test_binary_to_request(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_empty_data_structured_event(specversion, cloudevents_implementation): def test_empty_data_structured_event(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
# Testing if cloudevent breaks when no structured data field present # Testing if cloudevent breaks when no structured data field present
attributes = { attributes = {
"specversion": specversion, "specversion": specversion,
@ -352,7 +405,9 @@ def test_empty_data_structured_event(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_empty_data_binary_event(specversion, cloudevents_implementation): def test_empty_data_binary_event(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
# Testing if cloudevent breaks when no structured data field present # Testing if cloudevent breaks when no structured data field present
headers = { headers = {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
@ -372,12 +427,14 @@ def test_empty_data_binary_event(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_valid_structured_events(specversion, cloudevents_implementation): def test_valid_structured_events(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
# Test creating multiple cloud events # Test creating multiple cloud events
events_queue = [] events_queue: list[AnyPydanticCloudEvent] = []
num_cloudevents = 30 num_cloudevents = 30
for i in range(num_cloudevents): for i in range(num_cloudevents):
event = { raw_event = {
"id": f"id{i}", "id": f"id{i}",
"source": f"source{i}.com.test", "source": f"source{i}.com.test",
"type": "cloudevent.test.type", "type": "cloudevent.test.type",
@ -387,11 +444,12 @@ def test_valid_structured_events(specversion, cloudevents_implementation):
events_queue.append( events_queue.append(
cloudevents_implementation["from_http"]( cloudevents_implementation["from_http"](
{"content-type": "application/cloudevents+json"}, {"content-type": "application/cloudevents+json"},
json.dumps(event), json.dumps(raw_event),
) )
) )
for i, event in enumerate(events_queue): for i, event in enumerate(events_queue):
assert isinstance(event.data, dict)
assert event["id"] == f"id{i}" assert event["id"] == f"id{i}"
assert event["source"] == f"source{i}.com.test" assert event["source"] == f"source{i}.com.test"
assert event["specversion"] == specversion assert event["specversion"] == specversion
@ -399,7 +457,9 @@ def test_valid_structured_events(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_structured_no_content_type(specversion, cloudevents_implementation): def test_structured_no_content_type(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
# Test creating multiple cloud events # Test creating multiple cloud events
data = { data = {
"id": "id", "id": "id",
@ -410,6 +470,7 @@ def test_structured_no_content_type(specversion, cloudevents_implementation):
} }
event = cloudevents_implementation["from_http"]({}, json.dumps(data)) event = cloudevents_implementation["from_http"]({}, json.dumps(data))
assert isinstance(event.data, dict)
assert event["id"] == "id" assert event["id"] == "id"
assert event["source"] == "source.com.test" assert event["source"] == "source.com.test"
assert event["specversion"] == specversion assert event["specversion"] == specversion
@ -437,7 +498,9 @@ def test_is_binary():
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_cloudevent_repr(specversion, cloudevents_implementation): def test_cloudevent_repr(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
headers = { headers = {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"ce-specversion": specversion, "ce-specversion": specversion,
@ -454,7 +517,9 @@ def test_cloudevent_repr(specversion, cloudevents_implementation):
@pytest.mark.parametrize("specversion", ["1.0", "0.3"]) @pytest.mark.parametrize("specversion", ["1.0", "0.3"])
def test_none_data_cloudevent(specversion, cloudevents_implementation): def test_none_data_cloudevent(
specversion: str, cloudevents_implementation: PydanticImplementation
) -> None:
event = cloudevents_implementation["event"]( event = cloudevents_implementation["event"](
{ {
"source": "<my-url>", "source": "<my-url>",
@ -466,7 +531,7 @@ def test_none_data_cloudevent(specversion, cloudevents_implementation):
to_structured(event) to_structured(event)
def test_wrong_specversion(cloudevents_implementation): def test_wrong_specversion(cloudevents_implementation: PydanticImplementation) -> None:
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = json.dumps( data = json.dumps(
{ {
@ -481,15 +546,19 @@ def test_wrong_specversion(cloudevents_implementation):
assert "Found invalid specversion 0.2" in str(e.value) assert "Found invalid specversion 0.2" in str(e.value)
def test_invalid_data_format_structured_from_http(cloudevents_implementation): def test_invalid_data_format_structured_from_http(
cloudevents_implementation: PydanticImplementation,
) -> None:
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = 20 data = 20
with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e: with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e:
cloudevents_implementation["from_http"](headers, data) cloudevents_implementation["from_http"](headers, data) # type: ignore[type-var] # intentionally wrong type # noqa: E501
assert "Expected json of type (str, bytes, bytearray)" in str(e.value) assert "Expected json of type (str, bytes, bytearray)" in str(e.value)
def test_wrong_specversion_to_request(cloudevents_implementation): def test_wrong_specversion_to_request(
cloudevents_implementation: PydanticImplementation,
) -> None:
event = cloudevents_implementation["event"]({"source": "s", "type": "t"}, None) event = cloudevents_implementation["event"]({"source": "s", "type": "t"}, None)
with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e: with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e:
event["specversion"] = "0.2" event["specversion"] = "0.2"
@ -513,7 +582,9 @@ def test_is_structured():
assert not is_structured(headers) assert not is_structured(headers)
def test_empty_json_structured(cloudevents_implementation): def test_empty_json_structured(
cloudevents_implementation: PydanticImplementation,
) -> None:
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = "" data = ""
with pytest.raises(cloud_exceptions.MissingRequiredFields) as e: with pytest.raises(cloud_exceptions.MissingRequiredFields) as e:
@ -521,7 +592,9 @@ def test_empty_json_structured(cloudevents_implementation):
assert "Failed to read specversion from both headers and data" in str(e.value) assert "Failed to read specversion from both headers and data" in str(e.value)
def test_uppercase_headers_with_none_data_binary(cloudevents_implementation): def test_uppercase_headers_with_none_data_binary(
cloudevents_implementation: PydanticImplementation,
) -> None:
headers = { headers = {
"Ce-Id": "my-id", "Ce-Id": "my-id",
"Ce-Source": "<event-source>", "Ce-Source": "<event-source>",
@ -538,7 +611,7 @@ def test_uppercase_headers_with_none_data_binary(cloudevents_implementation):
assert new_data is None assert new_data is None
def test_generic_exception(cloudevents_implementation): def test_generic_exception(cloudevents_implementation: PydanticImplementation) -> None:
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = json.dumps( data = json.dumps(
{ {
@ -554,7 +627,7 @@ def test_generic_exception(cloudevents_implementation):
e.errisinstance(cloud_exceptions.MissingRequiredFields) e.errisinstance(cloud_exceptions.MissingRequiredFields)
with pytest.raises(cloud_exceptions.GenericException) as e: with pytest.raises(cloud_exceptions.GenericException) as e:
cloudevents_implementation["from_http"]({}, 123) cloudevents_implementation["from_http"]({}, 123) # type: ignore[type-var] # intentionally wrong type # noqa: E501
e.errisinstance(cloud_exceptions.InvalidStructuredJSON) e.errisinstance(cloud_exceptions.InvalidStructuredJSON)
with pytest.raises(cloud_exceptions.GenericException) as e: with pytest.raises(cloud_exceptions.GenericException) as e:
@ -569,7 +642,9 @@ def test_generic_exception(cloudevents_implementation):
e.errisinstance(cloud_exceptions.DataMarshallerError) e.errisinstance(cloud_exceptions.DataMarshallerError)
def test_non_dict_data_no_headers_bug(cloudevents_implementation): def test_non_dict_data_no_headers_bug(
cloudevents_implementation: PydanticImplementation,
) -> None:
# Test for issue #116 # Test for issue #116
headers = {"Content-Type": "application/cloudevents+json"} headers = {"Content-Type": "application/cloudevents+json"}
data = "123" data = "123"

View File

@ -1,6 +1,6 @@
[mypy] [mypy]
plugins = pydantic.mypy plugins = pydantic.mypy
python_version = 3.8 python_version = 3.9
pretty = True pretty = True
show_error_context = True show_error_context = True

View File

@ -5,3 +5,4 @@ pep8-naming
flake8-print flake8-print
tox tox
pre-commit pre-commit
mypy

5
requirements/mypy.txt Normal file
View File

@ -0,0 +1,5 @@
mypy
# mypy has the pydantic plugin enabled
pydantic>=2.0.0,<3.0
types-requests
deprecation>=2.0,<3.0

View File

@ -25,7 +25,7 @@ resp = requests.get(
image_bytes = resp.content image_bytes = resp.content
def send_binary_cloud_event(url: str): def send_binary_cloud_event(url: str) -> None:
# Create cloudevent # Create cloudevent
attributes = { attributes = {
"type": "com.example.string", "type": "com.example.string",
@ -42,7 +42,7 @@ def send_binary_cloud_event(url: str):
print(f"Sent {event['id']} of type {event['type']}") print(f"Sent {event['id']} of type {event['type']}")
def send_structured_cloud_event(url: str): def send_structured_cloud_event(url: str) -> None:
# Create cloudevent # Create cloudevent
attributes = { attributes = {
"type": "com.example.base64", "type": "com.example.base64",

View File

@ -65,7 +65,6 @@ if __name__ == "__main__":
"Programming Language :: Python", "Programming Language :: Python",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",

20
tox.ini
View File

@ -1,5 +1,5 @@
[tox] [tox]
envlist = py{38,39,310,311,312},lint envlist = py{38,39,310,311,312},lint,mypy,mypy-samples-{image,json}
skipsdist = True skipsdist = True
[testenv] [testenv]
@ -30,3 +30,21 @@ commands =
black --check . black --check .
isort -c cloudevents samples isort -c cloudevents samples
flake8 cloudevents samples --ignore W503,E731 --extend-ignore E203 --max-line-length 88 flake8 cloudevents samples --ignore W503,E731 --extend-ignore E203 --max-line-length 88
[testenv:mypy]
basepython = python3.11
deps =
-r{toxinidir}/requirements/mypy.txt
# mypy needs test dependencies to check test modules
-r{toxinidir}/requirements/test.txt
commands = mypy cloudevents
[testenv:mypy-samples-{image,json}]
basepython = python3.11
setenv =
mypy-samples-image: SAMPLE_DIR={toxinidir}/samples/http-image-cloudevents
mypy-samples-json: SAMPLE_DIR={toxinidir}/samples/http-json-cloudevents
deps =
-r{toxinidir}/requirements/mypy.txt
-r{env:SAMPLE_DIR}/requirements.txt
commands = mypy {env:SAMPLE_DIR}