Improve public API type annotations & fix unit test type errors (#248)
* chore: improve typing of functions returning AnyCloudEvent
kafka.conversion.from_binary() and from_structured() return
AnyCloudEvent type var according to their event_type argument, but when
event_type is None, type checkers cannot infer the return type. We now
use an overload to declare that the return type is http.CloudEvent when
event_type is None.
Previously users had to explicitly annotate this type when calling
without event_type. This happens quite a lot in this repo's
test_kafka_conversions.py — this fixes quite a few type errors like:
> error: Need type annotation for "result" [var-annotated]
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: type v1.Event chainable Set*() methods
The v1.Event self-returning Set*() methods like SetData() were returning
BaseEvent, which doesn't declare the same Set* methods. As a result,
chaining more than one Set* method would make the return type unknown.
This was causing type errors in test_event_pipeline.py.
The Set*() methods now return the Self type.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: fix type errors in tests
mypy was failing with lots of type errors in test modules. I've not
annotated all fixtures, mostly fixed existing type errors.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: allow non-dict headers types in from_http()
from_http() conversion function was requiring its headers argument to
be a typing.Dict, which makes it incompatible with headers types of http
libraries, which support features like multiple values per key.
typing.Mapping and even _typeshed.SupportsItems do not cover these
types. For example,
samples/http-image-cloudevents/image_sample_server.py was failing to
type check where it calls `from_http(request.headers, ...)`.
To support these kind of headers types in from_http(), we now define our
own SupportsDuplicateItems protocol, which is broader than
_typeshed.SupportsItems.
I've only applied this to from_http(), as typing.Mapping is OK for most
other methods that accept dict-like objects, and using this more lenient
interface everywhere would impose restrictions on our implementation,
even though it might be more flexible for users.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* build: run mypy via tox
Tox now runs mypy on cloudevents itself, and the samples.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* build(ci): run mypy in CI alongside linting
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: fix minor mypy type complaint in samples
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* feat: use Mapping, not Dict for input arguments
Mapping imposes less restrictions on callers, because it's read-only and
allows non-dict types to be passed without copying them as dict(), or
passing dict-like values and ignoring the resulting type error.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: fix tests on py3.8
Tests were failing because the sanic dependency dropped support for
py3.8 in its current release. sanic is now pinned to the last compatible
version for py3.8 only.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* feat: support new model_validate_json() kwargs
Pydantic added by_alias and by_name keyword arguments to
BaseModel.model_validate_json in 2.11.1:
acb0f10fda
This caused mypy to report that that the Pydantic v2 CloudEvent did not
override model_validate_json() correctly. Our override now accepts these
newly-added arguments. They have no effect, as the implementation does
not use Pydantic to validate the JSON, but we also don't use field
aliases, so the only effect they could have in the superclass would be
to raise an error if they're both False.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: accept Mapping as well as SupportsDuplicateItems
Although our types.SupportsDuplicateItems type is wider than Dict and
Mapping, it's not a familar type to users, so explicitly accepting
Mapping in the from_http() functions should make it more clear to users
that a dict-like object is required for the headers argument.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
* chore: constrain deps to maintain py 3.8 support
Python 3.8 is unsupported and dependencies (such as pydantic) are now
shipping releases that fail to type check with mypy running in 3.8
compatibility mode. We run mypy in py 3.8 compatibility mode, so the
mypy tox environments must only use deps that support 3.8. And unit
tests run by py 3.8 must only use deps that support 3.8.
To constrain the deps for 3.8 support, we use two constraint files, one
for general environments that only constrains the dependencies that
python 3.8 interpreters use, and another for mypy that constraints the
dependencies that all interpreters use.
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
---------
Signed-off-by: Hal Blackburn <hwtb2@cam.ac.uk>
This commit is contained in:
parent
c5645d8fcf
commit
37ae369ced
|
@ -17,7 +17,7 @@ jobs:
|
|||
- name: Install dev dependencies
|
||||
run: python -m pip install -r requirements/dev.txt
|
||||
- name: Run linting
|
||||
run: python -m tox -e lint
|
||||
run: python -m tox -e lint,mypy,mypy-samples-image,mypy-samples-json
|
||||
|
||||
test:
|
||||
strategy:
|
||||
|
|
|
@ -32,7 +32,7 @@ class CloudEvent:
|
|||
@classmethod
|
||||
def create(
|
||||
cls: typing.Type[AnyCloudEvent],
|
||||
attributes: typing.Dict[str, typing.Any],
|
||||
attributes: typing.Mapping[str, typing.Any],
|
||||
data: typing.Optional[typing.Any],
|
||||
) -> AnyCloudEvent:
|
||||
"""
|
||||
|
|
|
@ -91,7 +91,9 @@ def from_json(
|
|||
|
||||
def from_http(
|
||||
event_type: typing.Type[AnyCloudEvent],
|
||||
headers: typing.Mapping[str, str],
|
||||
headers: typing.Union[
|
||||
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
|
||||
],
|
||||
data: typing.Optional[typing.Union[str, bytes]],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyCloudEvent:
|
||||
|
@ -260,7 +262,7 @@ def best_effort_encode_attribute_value(value: typing.Any) -> typing.Any:
|
|||
|
||||
def from_dict(
|
||||
event_type: typing.Type[AnyCloudEvent],
|
||||
event: typing.Dict[str, typing.Any],
|
||||
event: typing.Mapping[str, typing.Any],
|
||||
) -> AnyCloudEvent:
|
||||
"""
|
||||
Constructs an Event object of a given `event_type` from
|
||||
|
|
|
@ -37,7 +37,9 @@ def from_json(
|
|||
|
||||
|
||||
def from_http(
|
||||
headers: typing.Dict[str, str],
|
||||
headers: typing.Union[
|
||||
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
|
||||
],
|
||||
data: typing.Optional[typing.Union[str, bytes]],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> CloudEvent:
|
||||
|
@ -58,7 +60,7 @@ def from_http(
|
|||
|
||||
|
||||
def from_dict(
|
||||
event: typing.Dict[str, typing.Any],
|
||||
event: typing.Mapping[str, typing.Any],
|
||||
) -> CloudEvent:
|
||||
"""
|
||||
Constructs a CloudEvent from a dict `event` representation.
|
||||
|
|
|
@ -34,11 +34,13 @@ class CloudEvent(abstract.CloudEvent):
|
|||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
|
||||
cls,
|
||||
attributes: typing.Mapping[str, typing.Any],
|
||||
data: typing.Optional[typing.Any],
|
||||
) -> "CloudEvent":
|
||||
return cls(attributes, data)
|
||||
|
||||
def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None):
|
||||
def __init__(self, attributes: typing.Mapping[str, str], data: typing.Any = None):
|
||||
"""
|
||||
Event Constructor
|
||||
:param attributes: a dict with cloudevent attributes. Minimally
|
||||
|
|
|
@ -111,11 +111,29 @@ def to_binary(
|
|||
return KafkaMessage(headers, message_key, data)
|
||||
|
||||
|
||||
@typing.overload
|
||||
def from_binary(
|
||||
message: KafkaMessage,
|
||||
event_type: None = None,
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> http.CloudEvent:
|
||||
pass
|
||||
|
||||
|
||||
@typing.overload
|
||||
def from_binary(
|
||||
message: KafkaMessage,
|
||||
event_type: typing.Type[AnyCloudEvent],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyCloudEvent:
|
||||
pass
|
||||
|
||||
|
||||
def from_binary(
|
||||
message: KafkaMessage,
|
||||
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyCloudEvent:
|
||||
) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
|
||||
"""
|
||||
Returns a CloudEvent from a KafkaMessage in binary format.
|
||||
|
||||
|
@ -144,10 +162,11 @@ def from_binary(
|
|||
raise cloud_exceptions.DataUnmarshallerError(
|
||||
f"Failed to unmarshall data with error: {type(e).__name__}('{e}')"
|
||||
)
|
||||
result: typing.Union[http.CloudEvent, AnyCloudEvent]
|
||||
if event_type:
|
||||
result = event_type.create(attributes, data)
|
||||
else:
|
||||
result = http.CloudEvent.create(attributes, data) # type: ignore
|
||||
result = http.CloudEvent.create(attributes, data)
|
||||
return result
|
||||
|
||||
|
||||
|
@ -210,12 +229,32 @@ def to_structured(
|
|||
return KafkaMessage(headers, message_key, value)
|
||||
|
||||
|
||||
@typing.overload
|
||||
def from_structured(
|
||||
message: KafkaMessage,
|
||||
event_type: None = None,
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> http.CloudEvent:
|
||||
pass
|
||||
|
||||
|
||||
@typing.overload
|
||||
def from_structured(
|
||||
message: KafkaMessage,
|
||||
event_type: typing.Type[AnyCloudEvent],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyCloudEvent:
|
||||
pass
|
||||
|
||||
|
||||
def from_structured(
|
||||
message: KafkaMessage,
|
||||
event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None,
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyCloudEvent:
|
||||
) -> typing.Union[http.CloudEvent, AnyCloudEvent]:
|
||||
"""
|
||||
Returns a CloudEvent from a KafkaMessage in structured format.
|
||||
|
||||
|
@ -264,8 +303,9 @@ def from_structured(
|
|||
attributes["datacontenttype"] = val.decode()
|
||||
else:
|
||||
attributes[header.lower()] = val.decode()
|
||||
result: typing.Union[AnyCloudEvent, http.CloudEvent]
|
||||
if event_type:
|
||||
result = event_type.create(attributes, data)
|
||||
else:
|
||||
result = http.CloudEvent.create(attributes, data) # type: ignore
|
||||
result = http.CloudEvent.create(attributes, data)
|
||||
return result
|
||||
|
|
|
@ -21,7 +21,9 @@ from cloudevents.sdk import types
|
|||
|
||||
|
||||
def from_http(
|
||||
headers: typing.Dict[str, str],
|
||||
headers: typing.Union[
|
||||
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
|
||||
],
|
||||
data: typing.Optional[typing.AnyStr],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> CloudEvent:
|
||||
|
@ -63,7 +65,7 @@ def from_json(
|
|||
|
||||
|
||||
def from_dict(
|
||||
event: typing.Dict[str, typing.Any],
|
||||
event: typing.Mapping[str, typing.Any],
|
||||
) -> CloudEvent:
|
||||
"""
|
||||
Construct an CloudEvent from a dict `event` representation.
|
||||
|
|
|
@ -100,7 +100,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
|
|||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
|
||||
cls,
|
||||
attributes: typing.Mapping[str, typing.Any],
|
||||
data: typing.Optional[typing.Any],
|
||||
) -> "CloudEvent":
|
||||
return cls(attributes, data)
|
||||
|
||||
|
@ -155,7 +157,7 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
|
|||
|
||||
def __init__( # type: ignore[no-untyped-def]
|
||||
self,
|
||||
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
||||
attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
|
||||
data: typing.Optional[typing.Any] = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
|
|
@ -22,7 +22,9 @@ from cloudevents.sdk import types
|
|||
|
||||
|
||||
def from_http(
|
||||
headers: typing.Dict[str, str],
|
||||
headers: typing.Union[
|
||||
typing.Mapping[str, str], types.SupportsDuplicateItems[str, str]
|
||||
],
|
||||
data: typing.Optional[typing.AnyStr],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> CloudEvent:
|
||||
|
@ -64,7 +66,7 @@ def from_json(
|
|||
|
||||
|
||||
def from_dict(
|
||||
event: typing.Dict[str, typing.Any],
|
||||
event: typing.Mapping[str, typing.Any],
|
||||
) -> CloudEvent:
|
||||
"""
|
||||
Construct an CloudEvent from a dict `event` representation.
|
||||
|
|
|
@ -44,7 +44,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
|
|||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any]
|
||||
cls,
|
||||
attributes: typing.Mapping[str, typing.Any],
|
||||
data: typing.Optional[typing.Any],
|
||||
) -> "CloudEvent":
|
||||
return cls(attributes, data)
|
||||
|
||||
|
@ -103,7 +105,7 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
|
|||
|
||||
def __init__( # type: ignore[no-untyped-def]
|
||||
self,
|
||||
attributes: typing.Optional[typing.Dict[str, typing.Any]] = None,
|
||||
attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None,
|
||||
data: typing.Optional[typing.Any] = None,
|
||||
**kwargs,
|
||||
):
|
||||
|
@ -173,6 +175,8 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore
|
|||
*,
|
||||
strict: typing.Optional[bool] = None,
|
||||
context: typing.Optional[typing.Dict[str, Any]] = None,
|
||||
by_alias: typing.Optional[bool] = None,
|
||||
by_name: typing.Optional[bool] = None,
|
||||
) -> "CloudEvent":
|
||||
return conversion.from_json(cls, json_data)
|
||||
|
||||
|
|
|
@ -11,10 +11,15 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import typing
|
||||
|
||||
from cloudevents.sdk.event import base, opt
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from typing_extensions import Self
|
||||
|
||||
|
||||
class Event(base.BaseEvent):
|
||||
_ce_required_fields = {"id", "source", "type", "specversion"}
|
||||
|
@ -79,39 +84,39 @@ class Event(base.BaseEvent):
|
|||
return {}
|
||||
return dict(result)
|
||||
|
||||
def SetEventType(self, eventType: str) -> base.BaseEvent:
|
||||
def SetEventType(self, eventType: str) -> Self:
|
||||
self.Set("type", eventType)
|
||||
return self
|
||||
|
||||
def SetSource(self, source: str) -> base.BaseEvent:
|
||||
def SetSource(self, source: str) -> Self:
|
||||
self.Set("source", source)
|
||||
return self
|
||||
|
||||
def SetEventID(self, eventID: str) -> base.BaseEvent:
|
||||
def SetEventID(self, eventID: str) -> Self:
|
||||
self.Set("id", eventID)
|
||||
return self
|
||||
|
||||
def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent:
|
||||
def SetEventTime(self, eventTime: typing.Optional[str]) -> Self:
|
||||
self.Set("time", eventTime)
|
||||
return self
|
||||
|
||||
def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent:
|
||||
def SetSubject(self, subject: typing.Optional[str]) -> Self:
|
||||
self.Set("subject", subject)
|
||||
return self
|
||||
|
||||
def SetSchema(self, schema: typing.Optional[str]) -> base.BaseEvent:
|
||||
def SetSchema(self, schema: typing.Optional[str]) -> Self:
|
||||
self.Set("dataschema", schema)
|
||||
return self
|
||||
|
||||
def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent:
|
||||
def SetContentType(self, contentType: typing.Optional[str]) -> Self:
|
||||
self.Set("datacontenttype", contentType)
|
||||
return self
|
||||
|
||||
def SetData(self, data: typing.Optional[object]) -> base.BaseEvent:
|
||||
def SetData(self, data: typing.Optional[object]) -> Self:
|
||||
self.Set("data", data)
|
||||
return self
|
||||
|
||||
def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent:
|
||||
def SetExtensions(self, extensions: typing.Optional[dict]) -> Self:
|
||||
self.Set("extensions", extensions)
|
||||
return self
|
||||
|
||||
|
|
|
@ -14,9 +14,25 @@
|
|||
|
||||
import typing
|
||||
|
||||
_K_co = typing.TypeVar("_K_co", covariant=True)
|
||||
_V_co = typing.TypeVar("_V_co", covariant=True)
|
||||
|
||||
# Use consistent types for marshal and unmarshal functions across
|
||||
# both JSON and Binary format.
|
||||
|
||||
MarshallerType = typing.Callable[[typing.Any], typing.AnyStr]
|
||||
|
||||
UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any]
|
||||
|
||||
|
||||
class SupportsDuplicateItems(typing.Protocol[_K_co, _V_co]):
|
||||
"""
|
||||
Dict-like objects with an items() method that may produce duplicate keys.
|
||||
"""
|
||||
|
||||
# This is wider than _typeshed.SupportsItems, which expects items() to
|
||||
# return type an AbstractSet. werkzeug's Headers class satisfies this type,
|
||||
# but not _typeshed.SupportsItems.
|
||||
|
||||
def items(self) -> typing.Iterable[typing.Tuple[_K_co, _V_co]]:
|
||||
pass
|
||||
|
|
|
@ -21,7 +21,7 @@ from cloudevents.sdk.converters import base, binary
|
|||
def test_binary_converter_raise_unsupported():
|
||||
with pytest.raises(exceptions.UnsupportedEvent):
|
||||
cnvtr = binary.BinaryHTTPCloudEventConverter()
|
||||
cnvtr.read(None, {}, None, None)
|
||||
cnvtr.read(None, {}, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501
|
||||
|
||||
|
||||
def test_base_converters_raise_exceptions():
|
||||
|
@ -35,8 +35,8 @@ def test_base_converters_raise_exceptions():
|
|||
|
||||
with pytest.raises(Exception):
|
||||
cnvtr = base.Converter()
|
||||
cnvtr.write(None, None)
|
||||
cnvtr.write(None, None) # type: ignore[arg-type] # intentionally wrong type
|
||||
|
||||
with pytest.raises(Exception):
|
||||
cnvtr = base.Converter()
|
||||
cnvtr.read(None, None, None, None)
|
||||
cnvtr.read(None, None, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501
|
||||
|
|
|
@ -25,7 +25,7 @@ from cloudevents.tests import data
|
|||
@pytest.mark.parametrize("event_class", [v03.Event, v1.Event])
|
||||
def test_binary_converter_upstream(event_class):
|
||||
m = marshaller.NewHTTPMarshaller([binary.NewBinaryHTTPCloudEventConverter()])
|
||||
event = m.FromRequest(event_class(), data.headers[event_class], None, lambda x: x)
|
||||
event = m.FromRequest(event_class(), data.headers[event_class], b"", lambda x: x)
|
||||
assert event is not None
|
||||
assert event.EventType() == data.ce_type
|
||||
assert event.EventID() == data.ce_id
|
||||
|
|
|
@ -77,7 +77,7 @@ def test_object_event_v1():
|
|||
_, structured_body = m.ToRequest(event)
|
||||
assert isinstance(structured_body, bytes)
|
||||
structured_obj = json.loads(structured_body)
|
||||
error_msg = f"Body was {structured_body}, obj is {structured_obj}"
|
||||
error_msg = f"Body was {structured_body!r}, obj is {structured_obj}"
|
||||
assert isinstance(structured_obj, dict), error_msg
|
||||
assert isinstance(structured_obj["data"], dict), error_msg
|
||||
assert len(structured_obj["data"]) == 1, error_msg
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import bz2
|
||||
import io
|
||||
|
@ -241,11 +242,11 @@ def test_structured_to_request(specversion):
|
|||
assert headers["content-type"] == "application/cloudevents+json"
|
||||
for key in attributes:
|
||||
assert body[key] == attributes[key]
|
||||
assert body["data"] == data, f"|{body_bytes}|| {body}"
|
||||
assert body["data"] == data, f"|{body_bytes!r}|| {body}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_attributes_view_accessor(specversion: str):
|
||||
def test_attributes_view_accessor(specversion: str) -> None:
|
||||
attributes: dict[str, typing.Any] = {
|
||||
"specversion": specversion,
|
||||
"type": "word.found.name",
|
||||
|
@ -333,7 +334,7 @@ def test_valid_structured_events(specversion):
|
|||
events_queue = []
|
||||
num_cloudevents = 30
|
||||
for i in range(num_cloudevents):
|
||||
event = {
|
||||
raw_event = {
|
||||
"id": f"id{i}",
|
||||
"source": f"source{i}.com.test",
|
||||
"type": "cloudevent.test.type",
|
||||
|
@ -343,7 +344,7 @@ def test_valid_structured_events(specversion):
|
|||
events_queue.append(
|
||||
from_http(
|
||||
{"content-type": "application/cloudevents+json"},
|
||||
json.dumps(event),
|
||||
json.dumps(raw_event),
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -454,7 +455,7 @@ def test_invalid_data_format_structured_from_http():
|
|||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = 20
|
||||
with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e:
|
||||
from_http(headers, data)
|
||||
from_http(headers, data) # type: ignore[arg-type] # intentionally wrong type
|
||||
assert "Expected json of type (str, bytes, bytearray)" in str(e.value)
|
||||
|
||||
|
||||
|
@ -526,7 +527,7 @@ def test_generic_exception():
|
|||
e.errisinstance(cloud_exceptions.MissingRequiredFields)
|
||||
|
||||
with pytest.raises(cloud_exceptions.GenericException) as e:
|
||||
from_http({}, 123)
|
||||
from_http({}, 123) # type: ignore[arg-type] # intentionally wrong type
|
||||
e.errisinstance(cloud_exceptions.InvalidStructuredJSON)
|
||||
|
||||
with pytest.raises(cloud_exceptions.GenericException) as e:
|
||||
|
|
|
@ -19,6 +19,7 @@ import json
|
|||
import pytest
|
||||
|
||||
from cloudevents import exceptions as cloud_exceptions
|
||||
from cloudevents.abstract.event import AnyCloudEvent
|
||||
from cloudevents.http import CloudEvent
|
||||
from cloudevents.kafka.conversion import (
|
||||
KafkaMessage,
|
||||
|
@ -36,7 +37,9 @@ def simple_serialize(data: dict) -> bytes:
|
|||
|
||||
|
||||
def simple_deserialize(data: bytes) -> dict:
|
||||
return json.loads(data.decode())
|
||||
value = json.loads(data.decode())
|
||||
assert isinstance(value, dict)
|
||||
return value
|
||||
|
||||
|
||||
def failing_func(*args):
|
||||
|
@ -47,7 +50,7 @@ class KafkaConversionTestBase:
|
|||
expected_data = {"name": "test", "amount": 1}
|
||||
expected_custom_mapped_key = "custom-key"
|
||||
|
||||
def custom_key_mapper(self, _) -> str:
|
||||
def custom_key_mapper(self, _: AnyCloudEvent) -> str:
|
||||
return self.expected_custom_mapped_key
|
||||
|
||||
@pytest.fixture
|
||||
|
|
|
@ -50,14 +50,14 @@ def test_from_request_wrong_unmarshaller():
|
|||
with pytest.raises(exceptions.InvalidDataUnmarshaller):
|
||||
m = marshaller.NewDefaultHTTPMarshaller()
|
||||
_ = m.FromRequest(
|
||||
event=v1.Event(), headers={}, body="", data_unmarshaller=object()
|
||||
event=v1.Event(), headers={}, body="", data_unmarshaller=object() # type: ignore[arg-type] # intentionally wrong type # noqa: E501
|
||||
)
|
||||
|
||||
|
||||
def test_to_request_wrong_marshaller():
|
||||
with pytest.raises(exceptions.InvalidDataMarshaller):
|
||||
m = marshaller.NewDefaultHTTPMarshaller()
|
||||
_ = m.ToRequest(v1.Event(), data_marshaller="")
|
||||
_ = m.ToRequest(v1.Event(), data_marshaller="") # type: ignore[arg-type] # intentionally wrong type # noqa: E501
|
||||
|
||||
|
||||
def test_from_request_cannot_read(binary_headers):
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
import bz2
|
||||
import io
|
||||
|
@ -28,10 +29,13 @@ from cloudevents.pydantic.v1.conversion import from_http as pydantic_v1_from_htt
|
|||
from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent
|
||||
from cloudevents.pydantic.v2.conversion import from_http as pydantic_v2_from_http
|
||||
from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent
|
||||
from cloudevents.sdk import converters
|
||||
from cloudevents.sdk import converters, types
|
||||
from cloudevents.sdk.converters.binary import is_binary
|
||||
from cloudevents.sdk.converters.structured import is_structured
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from typing_extensions import TypeAlias
|
||||
|
||||
invalid_test_headers = [
|
||||
{
|
||||
"ce-source": "<event-source>",
|
||||
|
@ -70,7 +74,30 @@ test_data = {"payload-content": "Hello World!"}
|
|||
|
||||
app = Sanic("test_pydantic_http_events")
|
||||
|
||||
_pydantic_implementation = {
|
||||
|
||||
AnyPydanticCloudEvent: TypeAlias = typing.Union[
|
||||
PydanticV1CloudEvent, PydanticV2CloudEvent
|
||||
]
|
||||
|
||||
|
||||
class FromHttpFn(typing.Protocol):
|
||||
def __call__(
|
||||
self,
|
||||
headers: typing.Dict[str, str],
|
||||
data: typing.Optional[typing.AnyStr],
|
||||
data_unmarshaller: typing.Optional[types.UnmarshallerType] = None,
|
||||
) -> AnyPydanticCloudEvent:
|
||||
pass
|
||||
|
||||
|
||||
class PydanticImplementation(typing.TypedDict):
|
||||
event: typing.Type[AnyPydanticCloudEvent]
|
||||
validation_error: typing.Type[Exception]
|
||||
from_http: FromHttpFn
|
||||
pydantic_version: typing.Literal["v1", "v2"]
|
||||
|
||||
|
||||
_pydantic_implementation: typing.Mapping[str, PydanticImplementation] = {
|
||||
"v1": {
|
||||
"event": PydanticV1CloudEvent,
|
||||
"validation_error": PydanticV1ValidationError,
|
||||
|
@ -87,7 +114,9 @@ _pydantic_implementation = {
|
|||
|
||||
|
||||
@pytest.fixture(params=["v1", "v2"])
|
||||
def cloudevents_implementation(request):
|
||||
def cloudevents_implementation(
|
||||
request: pytest.FixtureRequest,
|
||||
) -> PydanticImplementation:
|
||||
return _pydantic_implementation[request.param]
|
||||
|
||||
|
||||
|
@ -108,7 +137,9 @@ async def echo(request, pydantic_version):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("body", invalid_cloudevent_request_body)
|
||||
def test_missing_required_fields_structured(body, cloudevents_implementation):
|
||||
def test_missing_required_fields_structured(
|
||||
body: dict, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
with pytest.raises(cloud_exceptions.MissingRequiredFields):
|
||||
_ = cloudevents_implementation["from_http"](
|
||||
{"Content-Type": "application/cloudevents+json"}, json.dumps(body)
|
||||
|
@ -116,20 +147,26 @@ def test_missing_required_fields_structured(body, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("headers", invalid_test_headers)
|
||||
def test_missing_required_fields_binary(headers, cloudevents_implementation):
|
||||
def test_missing_required_fields_binary(
|
||||
headers: dict, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
with pytest.raises(cloud_exceptions.MissingRequiredFields):
|
||||
_ = cloudevents_implementation["from_http"](headers, json.dumps(test_data))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("headers", invalid_test_headers)
|
||||
def test_missing_required_fields_empty_data_binary(headers, cloudevents_implementation):
|
||||
def test_missing_required_fields_empty_data_binary(
|
||||
headers: dict, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Test for issue #115
|
||||
with pytest.raises(cloud_exceptions.MissingRequiredFields):
|
||||
_ = cloudevents_implementation["from_http"](headers, None)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_emit_binary_event(specversion, cloudevents_implementation):
|
||||
def test_emit_binary_event(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
headers = {
|
||||
"ce-id": "my-id",
|
||||
"ce-source": "<event-source>",
|
||||
|
@ -159,7 +196,9 @@ def test_emit_binary_event(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_emit_structured_event(specversion, cloudevents_implementation):
|
||||
def test_emit_structured_event(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
body = {
|
||||
"id": "my-id",
|
||||
|
@ -188,7 +227,11 @@ def test_emit_structured_event(specversion, cloudevents_implementation):
|
|||
"converter", [converters.TypeBinary, converters.TypeStructured]
|
||||
)
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementation):
|
||||
def test_roundtrip_non_json_event(
|
||||
converter: str,
|
||||
specversion: str,
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
input_data = io.BytesIO()
|
||||
for _ in range(100):
|
||||
for j in range(20):
|
||||
|
@ -217,7 +260,9 @@ def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementa
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation):
|
||||
def test_missing_ce_prefix_binary_event(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
prefixed_headers = {}
|
||||
headers = {
|
||||
"ce-id": "my-id",
|
||||
|
@ -240,9 +285,11 @@ def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation)
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_valid_binary_events(specversion, cloudevents_implementation):
|
||||
def test_valid_binary_events(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Test creating multiple cloud events
|
||||
events_queue = []
|
||||
events_queue: list[AnyPydanticCloudEvent] = []
|
||||
headers = {}
|
||||
num_cloudevents = 30
|
||||
for i in range(num_cloudevents):
|
||||
|
@ -258,7 +305,7 @@ def test_valid_binary_events(specversion, cloudevents_implementation):
|
|||
)
|
||||
|
||||
for i, event in enumerate(events_queue):
|
||||
data = event.data
|
||||
assert isinstance(event.data, dict)
|
||||
assert event["id"] == f"id{i}"
|
||||
assert event["source"] == f"source{i}.com.test"
|
||||
assert event["specversion"] == specversion
|
||||
|
@ -266,7 +313,9 @@ def test_valid_binary_events(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_structured_to_request(specversion, cloudevents_implementation):
|
||||
def test_structured_to_request(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
attributes = {
|
||||
"specversion": specversion,
|
||||
"type": "word.found.name",
|
||||
|
@ -283,11 +332,13 @@ def test_structured_to_request(specversion, cloudevents_implementation):
|
|||
assert headers["content-type"] == "application/cloudevents+json"
|
||||
for key in attributes:
|
||||
assert body[key] == attributes[key]
|
||||
assert body["data"] == data, f"|{body_bytes}|| {body}"
|
||||
assert body["data"] == data, f"|{body_bytes!r}|| {body}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
|
||||
def test_attributes_view_accessor(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
attributes: dict[str, typing.Any] = {
|
||||
"specversion": specversion,
|
||||
"type": "word.found.name",
|
||||
|
@ -296,9 +347,7 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
|
|||
}
|
||||
data = {"message": "Hello World!"}
|
||||
|
||||
event: cloudevents_implementation["event"] = cloudevents_implementation["event"](
|
||||
attributes, data
|
||||
)
|
||||
event = cloudevents_implementation["event"](attributes, data)
|
||||
event_attributes: typing.Mapping[str, typing.Any] = event.get_attributes()
|
||||
assert event_attributes["specversion"] == attributes["specversion"]
|
||||
assert event_attributes["type"] == attributes["type"]
|
||||
|
@ -308,7 +357,9 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_binary_to_request(specversion, cloudevents_implementation):
|
||||
def test_binary_to_request(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
attributes = {
|
||||
"specversion": specversion,
|
||||
"type": "word.found.name",
|
||||
|
@ -327,7 +378,9 @@ def test_binary_to_request(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_empty_data_structured_event(specversion, cloudevents_implementation):
|
||||
def test_empty_data_structured_event(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Testing if cloudevent breaks when no structured data field present
|
||||
attributes = {
|
||||
"specversion": specversion,
|
||||
|
@ -352,7 +405,9 @@ def test_empty_data_structured_event(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_empty_data_binary_event(specversion, cloudevents_implementation):
|
||||
def test_empty_data_binary_event(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Testing if cloudevent breaks when no structured data field present
|
||||
headers = {
|
||||
"Content-Type": "application/octet-stream",
|
||||
|
@ -372,12 +427,14 @@ def test_empty_data_binary_event(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_valid_structured_events(specversion, cloudevents_implementation):
|
||||
def test_valid_structured_events(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Test creating multiple cloud events
|
||||
events_queue = []
|
||||
events_queue: list[AnyPydanticCloudEvent] = []
|
||||
num_cloudevents = 30
|
||||
for i in range(num_cloudevents):
|
||||
event = {
|
||||
raw_event = {
|
||||
"id": f"id{i}",
|
||||
"source": f"source{i}.com.test",
|
||||
"type": "cloudevent.test.type",
|
||||
|
@ -387,11 +444,12 @@ def test_valid_structured_events(specversion, cloudevents_implementation):
|
|||
events_queue.append(
|
||||
cloudevents_implementation["from_http"](
|
||||
{"content-type": "application/cloudevents+json"},
|
||||
json.dumps(event),
|
||||
json.dumps(raw_event),
|
||||
)
|
||||
)
|
||||
|
||||
for i, event in enumerate(events_queue):
|
||||
assert isinstance(event.data, dict)
|
||||
assert event["id"] == f"id{i}"
|
||||
assert event["source"] == f"source{i}.com.test"
|
||||
assert event["specversion"] == specversion
|
||||
|
@ -399,7 +457,9 @@ def test_valid_structured_events(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_structured_no_content_type(specversion, cloudevents_implementation):
|
||||
def test_structured_no_content_type(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
# Test creating multiple cloud events
|
||||
data = {
|
||||
"id": "id",
|
||||
|
@ -410,6 +470,7 @@ def test_structured_no_content_type(specversion, cloudevents_implementation):
|
|||
}
|
||||
event = cloudevents_implementation["from_http"]({}, json.dumps(data))
|
||||
|
||||
assert isinstance(event.data, dict)
|
||||
assert event["id"] == "id"
|
||||
assert event["source"] == "source.com.test"
|
||||
assert event["specversion"] == specversion
|
||||
|
@ -437,7 +498,9 @@ def test_is_binary():
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_cloudevent_repr(specversion, cloudevents_implementation):
|
||||
def test_cloudevent_repr(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
headers = {
|
||||
"Content-Type": "application/octet-stream",
|
||||
"ce-specversion": specversion,
|
||||
|
@ -454,7 +517,9 @@ def test_cloudevent_repr(specversion, cloudevents_implementation):
|
|||
|
||||
|
||||
@pytest.mark.parametrize("specversion", ["1.0", "0.3"])
|
||||
def test_none_data_cloudevent(specversion, cloudevents_implementation):
|
||||
def test_none_data_cloudevent(
|
||||
specversion: str, cloudevents_implementation: PydanticImplementation
|
||||
) -> None:
|
||||
event = cloudevents_implementation["event"](
|
||||
{
|
||||
"source": "<my-url>",
|
||||
|
@ -466,7 +531,7 @@ def test_none_data_cloudevent(specversion, cloudevents_implementation):
|
|||
to_structured(event)
|
||||
|
||||
|
||||
def test_wrong_specversion(cloudevents_implementation):
|
||||
def test_wrong_specversion(cloudevents_implementation: PydanticImplementation) -> None:
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = json.dumps(
|
||||
{
|
||||
|
@ -481,15 +546,19 @@ def test_wrong_specversion(cloudevents_implementation):
|
|||
assert "Found invalid specversion 0.2" in str(e.value)
|
||||
|
||||
|
||||
def test_invalid_data_format_structured_from_http(cloudevents_implementation):
|
||||
def test_invalid_data_format_structured_from_http(
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = 20
|
||||
with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e:
|
||||
cloudevents_implementation["from_http"](headers, data)
|
||||
cloudevents_implementation["from_http"](headers, data) # type: ignore[type-var] # intentionally wrong type # noqa: E501
|
||||
assert "Expected json of type (str, bytes, bytearray)" in str(e.value)
|
||||
|
||||
|
||||
def test_wrong_specversion_to_request(cloudevents_implementation):
|
||||
def test_wrong_specversion_to_request(
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
event = cloudevents_implementation["event"]({"source": "s", "type": "t"}, None)
|
||||
with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e:
|
||||
event["specversion"] = "0.2"
|
||||
|
@ -513,7 +582,9 @@ def test_is_structured():
|
|||
assert not is_structured(headers)
|
||||
|
||||
|
||||
def test_empty_json_structured(cloudevents_implementation):
|
||||
def test_empty_json_structured(
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = ""
|
||||
with pytest.raises(cloud_exceptions.MissingRequiredFields) as e:
|
||||
|
@ -521,7 +592,9 @@ def test_empty_json_structured(cloudevents_implementation):
|
|||
assert "Failed to read specversion from both headers and data" in str(e.value)
|
||||
|
||||
|
||||
def test_uppercase_headers_with_none_data_binary(cloudevents_implementation):
|
||||
def test_uppercase_headers_with_none_data_binary(
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
headers = {
|
||||
"Ce-Id": "my-id",
|
||||
"Ce-Source": "<event-source>",
|
||||
|
@ -538,7 +611,7 @@ def test_uppercase_headers_with_none_data_binary(cloudevents_implementation):
|
|||
assert new_data is None
|
||||
|
||||
|
||||
def test_generic_exception(cloudevents_implementation):
|
||||
def test_generic_exception(cloudevents_implementation: PydanticImplementation) -> None:
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = json.dumps(
|
||||
{
|
||||
|
@ -554,7 +627,7 @@ def test_generic_exception(cloudevents_implementation):
|
|||
e.errisinstance(cloud_exceptions.MissingRequiredFields)
|
||||
|
||||
with pytest.raises(cloud_exceptions.GenericException) as e:
|
||||
cloudevents_implementation["from_http"]({}, 123)
|
||||
cloudevents_implementation["from_http"]({}, 123) # type: ignore[type-var] # intentionally wrong type # noqa: E501
|
||||
e.errisinstance(cloud_exceptions.InvalidStructuredJSON)
|
||||
|
||||
with pytest.raises(cloud_exceptions.GenericException) as e:
|
||||
|
@ -569,7 +642,9 @@ def test_generic_exception(cloudevents_implementation):
|
|||
e.errisinstance(cloud_exceptions.DataMarshallerError)
|
||||
|
||||
|
||||
def test_non_dict_data_no_headers_bug(cloudevents_implementation):
|
||||
def test_non_dict_data_no_headers_bug(
|
||||
cloudevents_implementation: PydanticImplementation,
|
||||
) -> None:
|
||||
# Test for issue #116
|
||||
headers = {"Content-Type": "application/cloudevents+json"}
|
||||
data = "123"
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
# This is a requirements constraint file, see:
|
||||
# https://pip.pypa.io/en/stable/user_guide/#constraints-files
|
||||
|
||||
# sanic stopped supporting 3.8 in 24.12:
|
||||
# https://sanic.dev/en/release-notes/changelog.html#version-24120-
|
||||
sanic<24.12.0 ; python_version == '3.8'
|
||||
|
||||
# pydantic stopped supporting 3.8 in 2.11.0:
|
||||
# https://github.com/pydantic/pydantic/releases/tag/v2.11.0
|
||||
pydantic<2.11.0 ; python_version == '3.8'
|
|
@ -5,3 +5,4 @@ pep8-naming
|
|||
flake8-print
|
||||
tox
|
||||
pre-commit
|
||||
mypy
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
# This is a requirements constraint file, see:
|
||||
# https://pip.pypa.io/en/stable/user_guide/#constraints-files
|
||||
|
||||
# Because we run mypy in python 3.8 compatibility mode, dependencies must be
|
||||
# versions that support 3.8.
|
||||
|
||||
# pydantic stopped supporting 3.8 in 2.11.0:
|
||||
# https://github.com/pydantic/pydantic/releases/tag/v2.11.0
|
||||
pydantic<2.11.0
|
|
@ -0,0 +1,5 @@
|
|||
mypy
|
||||
# mypy has the pydantic plugin enabled
|
||||
pydantic>=2.0.0,<3.0
|
||||
types-requests
|
||||
deprecation>=2.0,<3.0
|
|
@ -25,7 +25,7 @@ resp = requests.get(
|
|||
image_bytes = resp.content
|
||||
|
||||
|
||||
def send_binary_cloud_event(url: str):
|
||||
def send_binary_cloud_event(url: str) -> None:
|
||||
# Create cloudevent
|
||||
attributes = {
|
||||
"type": "com.example.string",
|
||||
|
@ -42,7 +42,7 @@ def send_binary_cloud_event(url: str):
|
|||
print(f"Sent {event['id']} of type {event['type']}")
|
||||
|
||||
|
||||
def send_structured_cloud_event(url: str):
|
||||
def send_structured_cloud_event(url: str) -> None:
|
||||
# Create cloudevent
|
||||
attributes = {
|
||||
"type": "com.example.base64",
|
||||
|
|
24
tox.ini
24
tox.ini
|
@ -1,5 +1,5 @@
|
|||
[tox]
|
||||
envlist = py{38,39,310,311,312},lint
|
||||
envlist = py{38,39,310,311,312},lint,mypy,mypy-samples-{image,json}
|
||||
skipsdist = True
|
||||
|
||||
[testenv]
|
||||
|
@ -8,6 +8,7 @@ deps =
|
|||
-r{toxinidir}/requirements/test.txt
|
||||
-r{toxinidir}/requirements/publish.txt
|
||||
setenv =
|
||||
PIP_CONSTRAINT={toxinidir}/requirements/constraints.txt
|
||||
PYTESTARGS = -v -s --tb=long --cov=cloudevents --cov-report term-missing --cov-fail-under=95
|
||||
commands = pytest {env:PYTESTARGS} {posargs}
|
||||
|
||||
|
@ -30,3 +31,24 @@ commands =
|
|||
black --check .
|
||||
isort -c cloudevents samples
|
||||
flake8 cloudevents samples --ignore W503,E731 --extend-ignore E203 --max-line-length 88
|
||||
|
||||
[testenv:mypy]
|
||||
basepython = python3.11
|
||||
setenv =
|
||||
PIP_CONSTRAINT={toxinidir}/requirements/mypy-constraints.txt
|
||||
deps =
|
||||
-r{toxinidir}/requirements/mypy.txt
|
||||
# mypy needs test dependencies to check test modules
|
||||
-r{toxinidir}/requirements/test.txt
|
||||
commands = mypy cloudevents
|
||||
|
||||
[testenv:mypy-samples-{image,json}]
|
||||
basepython = python3.11
|
||||
setenv =
|
||||
PIP_CONSTRAINT={toxinidir}/requirements/mypy-constraints.txt
|
||||
mypy-samples-image: SAMPLE_DIR={toxinidir}/samples/http-image-cloudevents
|
||||
mypy-samples-json: SAMPLE_DIR={toxinidir}/samples/http-json-cloudevents
|
||||
deps =
|
||||
-r{toxinidir}/requirements/mypy.txt
|
||||
-r{env:SAMPLE_DIR}/requirements.txt
|
||||
commands = mypy {env:SAMPLE_DIR}
|
||||
|
|
Loading…
Reference in New Issue