chore: Improve exceptions handling. Have exceptions grouped by attribute name and typed exceptions
Signed-off-by: Tudor Plugaru <plugaru.tudor@protonmail.com>
This commit is contained in:
parent
6e13f723aa
commit
e78a70b69d
|
@ -13,10 +13,17 @@
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Final, Optional
|
from typing import Any, Final, Optional
|
||||||
|
|
||||||
from cloudevents.core.v1.exceptions import CloudEventValidationError
|
from cloudevents.core.v1.exceptions import (
|
||||||
|
BaseCloudEventException,
|
||||||
|
CloudEventValidationError,
|
||||||
|
CustomExtensionAttributeError,
|
||||||
|
InvalidAttributeTypeError,
|
||||||
|
MissingRequiredAttributeError,
|
||||||
|
)
|
||||||
|
|
||||||
REQUIRED_ATTRIBUTES: Final[list[str]] = ["id", "source", "type", "specversion"]
|
REQUIRED_ATTRIBUTES: Final[list[str]] = ["id", "source", "type", "specversion"]
|
||||||
OPTIONAL_ATTRIBUTES: Final[list[str]] = [
|
OPTIONAL_ATTRIBUTES: Final[list[str]] = [
|
||||||
|
@ -57,108 +64,133 @@ class CloudEvent:
|
||||||
|
|
||||||
See https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#required-attributes
|
See https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#required-attributes
|
||||||
"""
|
"""
|
||||||
errors = {}
|
errors: dict[str, list] = defaultdict(list)
|
||||||
errors.update(CloudEvent._validate_required_attributes(attributes))
|
errors.update(CloudEvent._validate_required_attributes(attributes))
|
||||||
errors.update(CloudEvent._validate_attribute_types(attributes))
|
|
||||||
errors.update(CloudEvent._validate_optional_attributes(attributes))
|
errors.update(CloudEvent._validate_optional_attributes(attributes))
|
||||||
errors.update(CloudEvent._validate_extension_attributes(attributes))
|
errors.update(CloudEvent._validate_extension_attributes(attributes))
|
||||||
if errors:
|
if errors:
|
||||||
raise CloudEventValidationError(errors)
|
raise CloudEventValidationError(dict(errors))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _validate_required_attributes(
|
def _validate_required_attributes(
|
||||||
attributes: dict[str, Any],
|
attributes: dict[str, Any],
|
||||||
) -> dict[str, list[str]]:
|
) -> dict[str, list[BaseCloudEventException]]:
|
||||||
"""
|
|
||||||
Validates that all required attributes are present.
|
|
||||||
|
|
||||||
:param attributes: The attributes of the CloudEvent instance.
|
|
||||||
:return: A dictionary of validation error messages.
|
|
||||||
"""
|
|
||||||
errors = {}
|
|
||||||
missing_attributes = [
|
|
||||||
attr for attr in REQUIRED_ATTRIBUTES if attr not in attributes
|
|
||||||
]
|
|
||||||
if missing_attributes:
|
|
||||||
errors["required"] = [
|
|
||||||
f"Missing required attribute(s): {', '.join(missing_attributes)}"
|
|
||||||
]
|
|
||||||
return errors
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _validate_attribute_types(attributes: dict[str, Any]) -> dict[str, list[str]]:
|
|
||||||
"""
|
"""
|
||||||
Validates the types of the required attributes.
|
Validates the types of the required attributes.
|
||||||
|
|
||||||
:param attributes: The attributes of the CloudEvent instance.
|
:param attributes: The attributes of the CloudEvent instance.
|
||||||
:return: A dictionary of validation error messages.
|
:return: A dictionary of validation error messages.
|
||||||
"""
|
"""
|
||||||
errors = {}
|
errors = defaultdict(list)
|
||||||
type_errors = []
|
|
||||||
|
if "id" not in attributes:
|
||||||
|
errors["id"].append(MissingRequiredAttributeError(missing="id"))
|
||||||
if attributes.get("id") is None:
|
if attributes.get("id") is None:
|
||||||
type_errors.append("Attribute 'id' must not be None")
|
errors["id"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'id' must not be None")
|
||||||
|
)
|
||||||
if not isinstance(attributes.get("id"), str):
|
if not isinstance(attributes.get("id"), str):
|
||||||
type_errors.append("Attribute 'id' must be a string")
|
errors["id"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'id' must be a string")
|
||||||
|
)
|
||||||
|
|
||||||
|
if "source" not in attributes:
|
||||||
|
errors["source"].append(MissingRequiredAttributeError(missing="source"))
|
||||||
if not isinstance(attributes.get("source"), str):
|
if not isinstance(attributes.get("source"), str):
|
||||||
type_errors.append("Attribute 'source' must be a string")
|
errors["source"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'source' must be a string")
|
||||||
|
)
|
||||||
|
|
||||||
|
if "type" not in attributes:
|
||||||
|
errors["type"].append(MissingRequiredAttributeError(missing="type"))
|
||||||
if not isinstance(attributes.get("type"), str):
|
if not isinstance(attributes.get("type"), str):
|
||||||
type_errors.append("Attribute 'type' must be a string")
|
errors["type"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'type' must be a string")
|
||||||
|
)
|
||||||
|
|
||||||
|
if "specversion" not in attributes:
|
||||||
|
errors["specversion"].append(
|
||||||
|
MissingRequiredAttributeError(missing="specversion")
|
||||||
|
)
|
||||||
if not isinstance(attributes.get("specversion"), str):
|
if not isinstance(attributes.get("specversion"), str):
|
||||||
type_errors.append("Attribute 'specversion' must be a string")
|
errors["specversion"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'specversion' must be a string")
|
||||||
|
)
|
||||||
if attributes.get("specversion") != "1.0":
|
if attributes.get("specversion") != "1.0":
|
||||||
type_errors.append("Attribute 'specversion' must be '1.0'")
|
errors["specversion"].append(
|
||||||
if type_errors:
|
InvalidAttributeTypeError("Attribute 'specversion' must be '1.0'")
|
||||||
errors["type"] = type_errors
|
)
|
||||||
return errors
|
return errors
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _validate_optional_attributes(
|
def _validate_optional_attributes(
|
||||||
attributes: dict[str, Any],
|
attributes: dict[str, Any],
|
||||||
) -> dict[str, list[str]]:
|
) -> dict[str, list[BaseCloudEventException]]:
|
||||||
"""
|
"""
|
||||||
Validates the types and values of the optional attributes.
|
Validates the types and values of the optional attributes.
|
||||||
|
|
||||||
:param attributes: The attributes of the CloudEvent instance.
|
:param attributes: The attributes of the CloudEvent instance.
|
||||||
:return: A dictionary of validation error messages.
|
:return: A dictionary of validation error messages.
|
||||||
"""
|
"""
|
||||||
errors = {}
|
errors = defaultdict(list)
|
||||||
optional_errors = []
|
|
||||||
if "time" in attributes:
|
if "time" in attributes:
|
||||||
if not isinstance(attributes["time"], datetime):
|
if not isinstance(attributes["time"], datetime):
|
||||||
optional_errors.append("Attribute 'time' must be a datetime object")
|
errors["time"].append(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'time' must be a datetime object"
|
||||||
|
)
|
||||||
|
)
|
||||||
if hasattr(attributes["time"], "tzinfo") and not attributes["time"].tzinfo:
|
if hasattr(attributes["time"], "tzinfo") and not attributes["time"].tzinfo:
|
||||||
optional_errors.append("Attribute 'time' must be timezone aware")
|
errors["time"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'time' must be timezone aware")
|
||||||
|
)
|
||||||
if "subject" in attributes:
|
if "subject" in attributes:
|
||||||
if not isinstance(attributes["subject"], str):
|
if not isinstance(attributes["subject"], str):
|
||||||
optional_errors.append("Attribute 'subject' must be a string")
|
errors["subject"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'subject' must be a string")
|
||||||
|
)
|
||||||
if not attributes["subject"]:
|
if not attributes["subject"]:
|
||||||
optional_errors.append("Attribute 'subject' must not be empty")
|
errors["subject"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'subject' must not be empty")
|
||||||
|
)
|
||||||
if "datacontenttype" in attributes:
|
if "datacontenttype" in attributes:
|
||||||
if not isinstance(attributes["datacontenttype"], str):
|
if not isinstance(attributes["datacontenttype"], str):
|
||||||
optional_errors.append("Attribute 'datacontenttype' must be a string")
|
errors["datacontenttype"].append(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'datacontenttype' must be a string"
|
||||||
|
)
|
||||||
|
)
|
||||||
if not attributes["datacontenttype"]:
|
if not attributes["datacontenttype"]:
|
||||||
optional_errors.append("Attribute 'datacontenttype' must not be empty")
|
errors["datacontenttype"].append(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'datacontenttype' must not be empty"
|
||||||
|
)
|
||||||
|
)
|
||||||
if "dataschema" in attributes:
|
if "dataschema" in attributes:
|
||||||
if not isinstance(attributes["dataschema"], str):
|
if not isinstance(attributes["dataschema"], str):
|
||||||
optional_errors.append("Attribute 'dataschema' must be a string")
|
errors["dataschema"].append(
|
||||||
|
InvalidAttributeTypeError("Attribute 'dataschema' must be a string")
|
||||||
|
)
|
||||||
if not attributes["dataschema"]:
|
if not attributes["dataschema"]:
|
||||||
optional_errors.append("Attribute 'dataschema' must not be empty")
|
errors["dataschema"].append(
|
||||||
if optional_errors:
|
InvalidAttributeTypeError(
|
||||||
errors["optional"] = optional_errors
|
"Attribute 'dataschema' must not be empty"
|
||||||
|
)
|
||||||
|
)
|
||||||
return errors
|
return errors
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _validate_extension_attributes(
|
def _validate_extension_attributes(
|
||||||
attributes: dict[str, Any],
|
attributes: dict[str, Any],
|
||||||
) -> dict[str, list[str]]:
|
) -> dict[str, list[BaseCloudEventException]]:
|
||||||
"""
|
"""
|
||||||
Validates the extension attributes.
|
Validates the extension attributes.
|
||||||
|
|
||||||
:param attributes: The attributes of the CloudEvent instance.
|
:param attributes: The attributes of the CloudEvent instance.
|
||||||
:return: A dictionary of validation error messages.
|
:return: A dictionary of validation error messages.
|
||||||
"""
|
"""
|
||||||
errors = {}
|
errors = defaultdict(list)
|
||||||
extension_errors = []
|
|
||||||
extension_attributes = [
|
extension_attributes = [
|
||||||
key
|
key
|
||||||
for key in attributes.keys()
|
for key in attributes.keys()
|
||||||
|
@ -166,19 +198,23 @@ class CloudEvent:
|
||||||
]
|
]
|
||||||
for extension_attribute in extension_attributes:
|
for extension_attribute in extension_attributes:
|
||||||
if extension_attribute == "data":
|
if extension_attribute == "data":
|
||||||
extension_errors.append(
|
errors[extension_attribute].append(
|
||||||
"Extension attribute 'data' is reserved and must not be used"
|
CustomExtensionAttributeError(
|
||||||
|
"Extension attribute 'data' is reserved and must not be used"
|
||||||
|
)
|
||||||
)
|
)
|
||||||
if not (1 <= len(extension_attribute) <= 20):
|
if not (1 <= len(extension_attribute) <= 20):
|
||||||
extension_errors.append(
|
errors[extension_attribute].append(
|
||||||
f"Extension attribute '{extension_attribute}' should be between 1 and 20 characters long"
|
CustomExtensionAttributeError(
|
||||||
|
f"Extension attribute '{extension_attribute}' should be between 1 and 20 characters long"
|
||||||
|
)
|
||||||
)
|
)
|
||||||
if not re.match(r"^[a-z0-9]+$", extension_attribute):
|
if not re.match(r"^[a-z0-9]+$", extension_attribute):
|
||||||
extension_errors.append(
|
errors[extension_attribute].append(
|
||||||
f"Extension attribute '{extension_attribute}' should only contain lowercase letters and numbers"
|
CustomExtensionAttributeError(
|
||||||
|
f"Extension attribute '{extension_attribute}' should only contain lowercase letters and numbers"
|
||||||
|
)
|
||||||
)
|
)
|
||||||
if extension_errors:
|
|
||||||
errors["extensions"] = extension_errors
|
|
||||||
return errors
|
return errors
|
||||||
|
|
||||||
def get_id(self) -> str:
|
def get_id(self) -> str:
|
||||||
|
|
|
@ -11,17 +11,46 @@
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
class CloudEventValidationError(Exception):
|
class BaseCloudEventException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CloudEventValidationError(BaseCloudEventException):
|
||||||
"""
|
"""
|
||||||
Custom exception for validation errors.
|
Holds validation errors aggregated during a CloudEvent creation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, errors: dict[str, list[str]]) -> None:
|
def __init__(self, errors: dict[str, list[BaseCloudEventException]]) -> None:
|
||||||
|
"""
|
||||||
|
:param errors: The errors gathered during the CloudEvent creation where key
|
||||||
|
is the name of the attribute and value is a list of errors related to that attribute.
|
||||||
|
"""
|
||||||
super().__init__("Validation errors occurred")
|
super().__init__("Validation errors occurred")
|
||||||
self.errors: dict[str, list[str]] = errors
|
self.errors: dict[str, list[BaseCloudEventException]] = errors
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
error_messages = [
|
error_messages = [
|
||||||
f"{key}: {', '.join(value)}" for key, value in self.errors.items()
|
f"{key}: {', '.join(str(value))}" for key, value in self.errors.items()
|
||||||
]
|
]
|
||||||
return f"{super().__str__()}: {', '.join(error_messages)}"
|
return f"{super().__str__()}: {', '.join(error_messages)}"
|
||||||
|
|
||||||
|
|
||||||
|
class MissingRequiredAttributeError(BaseCloudEventException):
|
||||||
|
"""
|
||||||
|
Exception for missing required attribute.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, missing: str) -> None:
|
||||||
|
super().__init__(f"Missing required attribute: '{missing}'")
|
||||||
|
|
||||||
|
|
||||||
|
class CustomExtensionAttributeError(BaseCloudEventException):
|
||||||
|
"""
|
||||||
|
Exception for invalid custom extension names.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidAttributeTypeError(BaseCloudEventException):
|
||||||
|
"""
|
||||||
|
Exception for invalid attribute type.
|
||||||
|
"""
|
||||||
|
|
|
@ -18,41 +18,87 @@ from typing import Any
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from cloudevents.core.v1.event import CloudEvent
|
from cloudevents.core.v1.event import CloudEvent
|
||||||
from cloudevents.core.v1.exceptions import CloudEventValidationError
|
from cloudevents.core.v1.exceptions import (
|
||||||
|
CloudEventValidationError,
|
||||||
|
CustomExtensionAttributeError,
|
||||||
|
InvalidAttributeTypeError,
|
||||||
|
MissingRequiredAttributeError,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_missing_required_attributes() -> None:
|
def test_missing_required_attributes() -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent({})
|
CloudEvent({})
|
||||||
|
|
||||||
assert e.value.errors == {
|
expected_errors = {
|
||||||
"required": ["Missing required attribute(s): id, source, type, specversion"],
|
"id": [
|
||||||
|
str(MissingRequiredAttributeError("id")),
|
||||||
|
str(InvalidAttributeTypeError("Attribute 'id' must not be None")),
|
||||||
|
str(InvalidAttributeTypeError("Attribute 'id' must be a string")),
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
str(MissingRequiredAttributeError("source")),
|
||||||
|
str(InvalidAttributeTypeError("Attribute 'source' must be a string")),
|
||||||
|
],
|
||||||
"type": [
|
"type": [
|
||||||
"Attribute 'id' must not be None",
|
str(MissingRequiredAttributeError("type")),
|
||||||
"Attribute 'id' must be a string",
|
str(InvalidAttributeTypeError("Attribute 'type' must be a string")),
|
||||||
"Attribute 'source' must be a string",
|
],
|
||||||
"Attribute 'type' must be a string",
|
"specversion": [
|
||||||
"Attribute 'specversion' must be a string",
|
str(MissingRequiredAttributeError("specversion")),
|
||||||
"Attribute 'specversion' must be '1.0'",
|
str(InvalidAttributeTypeError("Attribute 'specversion' must be a string")),
|
||||||
|
str(InvalidAttributeTypeError("Attribute 'specversion' must be '1.0'")),
|
||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
actual_errors = {
|
||||||
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_errors
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"time,error",
|
"time,expected_error",
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
"2023-10-25T17:09:19.736166Z",
|
"2023-10-25T17:09:19.736166Z",
|
||||||
{"optional": ["Attribute 'time' must be a datetime object"]},
|
{
|
||||||
|
"time": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'time' must be a datetime object"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
datetime(2023, 10, 25, 17, 9, 19, 736166),
|
datetime(2023, 10, 25, 17, 9, 19, 736166),
|
||||||
{"optional": ["Attribute 'time' must be timezone aware"]},
|
{
|
||||||
|
"time": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'time' must be timezone aware"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
|
(
|
||||||
|
1,
|
||||||
|
{
|
||||||
|
"time": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'time' must be a datetime object"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
),
|
),
|
||||||
(1, {"optional": ["Attribute 'time' must be a datetime object"]}),
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_time_validation(time: Any, error: dict) -> None:
|
def test_time_validation(time: Any, expected_error: dict) -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent(
|
CloudEvent(
|
||||||
{
|
{
|
||||||
|
@ -63,21 +109,42 @@ def test_time_validation(time: Any, error: dict) -> None:
|
||||||
"time": time,
|
"time": time,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
actual_errors = {
|
||||||
assert e.value.errors == error
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_error
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"subject,error",
|
"subject,expected_error",
|
||||||
[
|
[
|
||||||
(1234, {"optional": ["Attribute 'subject' must be a string"]}),
|
(
|
||||||
|
1234,
|
||||||
|
{
|
||||||
|
"subject": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'subject' must be a string"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"",
|
"",
|
||||||
{"optional": ["Attribute 'subject' must not be empty"]},
|
{
|
||||||
|
"subject": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'subject' must not be empty"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_subject_validation(subject: Any, error: dict) -> None:
|
def test_subject_validation(subject: Any, expected_error: dict) -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent(
|
CloudEvent(
|
||||||
{
|
{
|
||||||
|
@ -89,20 +156,42 @@ def test_subject_validation(subject: Any, error: dict) -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
assert e.value.errors == error
|
actual_errors = {
|
||||||
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_error
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"datacontenttype,error",
|
"datacontenttype,expected_error",
|
||||||
[
|
[
|
||||||
(1234, {"optional": ["Attribute 'datacontenttype' must be a string"]}),
|
(
|
||||||
|
1234,
|
||||||
|
{
|
||||||
|
"datacontenttype": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'datacontenttype' must be a string"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"",
|
"",
|
||||||
{"optional": ["Attribute 'datacontenttype' must not be empty"]},
|
{
|
||||||
|
"datacontenttype": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'datacontenttype' must not be empty"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_datacontenttype_validation(datacontenttype: Any, error: dict) -> None:
|
def test_datacontenttype_validation(datacontenttype: Any, expected_error: dict) -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent(
|
CloudEvent(
|
||||||
{
|
{
|
||||||
|
@ -114,20 +203,42 @@ def test_datacontenttype_validation(datacontenttype: Any, error: dict) -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
assert e.value.errors == error
|
actual_errors = {
|
||||||
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_error
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"dataschema,error",
|
"dataschema,expected_error",
|
||||||
[
|
[
|
||||||
(1234, {"optional": ["Attribute 'dataschema' must be a string"]}),
|
(
|
||||||
|
1234,
|
||||||
|
{
|
||||||
|
"dataschema": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'dataschema' must be a string"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"",
|
"",
|
||||||
{"optional": ["Attribute 'dataschema' must not be empty"]},
|
{
|
||||||
|
"dataschema": [
|
||||||
|
str(
|
||||||
|
InvalidAttributeTypeError(
|
||||||
|
"Attribute 'dataschema' must not be empty"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
]
|
||||||
|
},
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_dataschema_validation(dataschema: Any, error: str) -> None:
|
def test_dataschema_validation(dataschema: Any, expected_error: dict) -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent(
|
CloudEvent(
|
||||||
{
|
{
|
||||||
|
@ -139,40 +250,59 @@ def test_dataschema_validation(dataschema: Any, error: str) -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
assert e.value.errors == error
|
actual_errors = {
|
||||||
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_error
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"extension_name,error",
|
"extension_name,expected_error",
|
||||||
[
|
[
|
||||||
(
|
(
|
||||||
"",
|
"",
|
||||||
{
|
{
|
||||||
"extensions": [
|
"": [
|
||||||
"Extension attribute '' should be between 1 and 20 characters long",
|
str(
|
||||||
"Extension attribute '' should only contain lowercase letters and numbers",
|
CustomExtensionAttributeError(
|
||||||
|
"Extension attribute '' should be between 1 and 20 characters long"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
str(
|
||||||
|
CustomExtensionAttributeError(
|
||||||
|
"Extension attribute '' should only contain lowercase letters and numbers"
|
||||||
|
)
|
||||||
|
),
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"thisisaverylongextension",
|
"thisisaverylongextension",
|
||||||
{
|
{
|
||||||
"extensions": [
|
"thisisaverylongextension": [
|
||||||
"Extension attribute 'thisisaverylongextension' should be between 1 and 20 characters long"
|
str(
|
||||||
|
CustomExtensionAttributeError(
|
||||||
|
"Extension attribute 'thisisaverylongextension' should be between 1 and 20 characters long"
|
||||||
|
)
|
||||||
|
)
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
(
|
(
|
||||||
"data",
|
"data",
|
||||||
{
|
{
|
||||||
"extensions": [
|
"data": [
|
||||||
"Extension attribute 'data' is reserved and must not be used"
|
str(
|
||||||
|
CustomExtensionAttributeError(
|
||||||
|
"Extension attribute 'data' is reserved and must not be used"
|
||||||
|
)
|
||||||
|
)
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
def test_custom_extension(extension_name: str, error: dict) -> None:
|
def test_custom_extension(extension_name: str, expected_error: dict) -> None:
|
||||||
with pytest.raises(CloudEventValidationError) as e:
|
with pytest.raises(CloudEventValidationError) as e:
|
||||||
CloudEvent(
|
CloudEvent(
|
||||||
{
|
{
|
||||||
|
@ -184,7 +314,10 @@ def test_custom_extension(extension_name: str, error: dict) -> None:
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
assert e.value.errors == error
|
actual_errors = {
|
||||||
|
key: [str(e) for e in value] for key, value in e.value.errors.items()
|
||||||
|
}
|
||||||
|
assert actual_errors == expected_error
|
||||||
|
|
||||||
|
|
||||||
def test_cloud_event_constructor() -> None:
|
def test_cloud_event_constructor() -> None:
|
||||||
|
|
Loading…
Reference in New Issue