Signed-off-by: Evan Anderson <argent@google.com>
This commit is contained in:
Evan Anderson 2019-01-12 05:08:50 -08:00 committed by Evan Anderson
parent d8cec175a9
commit d90a4861f9
7 changed files with 62 additions and 34 deletions

View File

@ -29,8 +29,7 @@ class BinaryHTTPCloudEventConverter(base.Converter):
return True
def event_supported(self, event):
if type(event) not in self.SUPPORTED_VERSIONS:
raise exceptions.UnsupportedEvent(type(event))
return type(event) in self.SUPPORTED_VERSIONS
def read(self,
event: event_base.BaseEvent,

View File

@ -21,13 +21,14 @@ from cloudevents.sdk.event import base as event_base
class JSONHTTPCloudEventConverter(base.Converter):
TYPE = "structured"
MIME_TYPE = "application/cloudevents+json"
def can_read(self, content_type):
return content_type == "application/cloudevents+json"
return content_type and content_type.startswith(self.MIME_TYPE)
def event_supported(self, event):
# structured format supported by both spec 0.1 and 0.2
pass
return True
def read(self, event: event_base.BaseEvent,
headers: dict,
@ -39,7 +40,8 @@ class JSONHTTPCloudEventConverter(base.Converter):
def write(self,
event: event_base.BaseEvent,
data_marshaller: typing.Callable) -> (dict, typing.IO):
return {}, event.MarshalJSON(data_marshaller)
http_headers = {'content-type': self.MIME_TYPE}
return http_headers, event.MarshalJSON(data_marshaller)
def NewJSONHTTPCloudEventConverter() -> JSONHTTPCloudEventConverter:

View File

@ -129,32 +129,34 @@ class BaseEvent(EventGetterSetter):
def UnmarshalBinary(self, headers: dict, body: typing.IO,
data_unmarshaller: typing.Callable):
props = self.Properties(with_nullable=True)
exts = props.get("extensions")
for key in props:
formatted_key = "ce-{0}".format(key)
if key != "extensions":
self.Set(key, headers.get("ce-{0}".format(key)))
if formatted_key in headers:
del headers[formatted_key]
BINARY_MAPPING = {
'content-type': 'contenttype',
# TODO(someone): add Distributed Tracing. It's not clear
# if this is one extension or two.
# https://github.com/cloudevents/spec/blob/master/extensions/distributed-tracing.md
}
for header, value in headers.items():
header = header.lower()
if header in BINARY_MAPPING:
self.Set(BINARY_MAPPING[header], value)
elif header.startswith("ce-"):
self.Set(header[3:], value)
# rest of headers suppose to an extension?
exts.update(**headers)
self.Set("extensions", exts)
self.Set("data", data_unmarshaller(body))
def MarshalBinary(
self, data_marshaller: typing.Callable) -> (dict, object):
headers = {}
if self.ContentType():
headers["content-type"] = self.ContentType()
props = self.Properties()
for key, value in props.items():
if key not in ["data", "extensions"]:
if key not in ["data", "extensions", "contenttype"]:
if value is not None:
headers["ce-{0}".format(key)] = value
exts = props.get("extensions")
if len(exts) > 0:
headers.update(**exts)
for key, value in props.get("extensions"):
headers["ce-{0}".format(key)] = value
data, _ = self.Get("data")
return headers, io.BytesIO(

View File

@ -35,7 +35,8 @@ class HTTPMarshaller(object):
:param converters: a list of HTTP-to-CloudEvent-to-HTTP constructors
:type converters: typing.List[base.Converter]
"""
self.__converters = {c.TYPE: c for c in converters}
self.__converters = (c for c in converters)
self.__converters_by_type = {c.TYPE: c for c in converters}
def FromRequest(self, event: event_base.BaseEvent,
headers: dict,
@ -61,12 +62,13 @@ class HTTPMarshaller(object):
content_type = headers.get(
"content-type", headers.get("Content-Type"))
for _, cnvrtr in self.__converters.items():
if cnvrtr.can_read(content_type):
cnvrtr.event_supported(event)
for cnvrtr in self.__converters:
if cnvrtr.can_read(content_type) and cnvrtr.event_supported(event):
return cnvrtr.read(event, headers, body, data_unmarshaller)
raise exceptions.UnsupportedEventConverter(content_type)
raise exceptions.UnsupportedEventConverter(
"No registered marshaller for {0} in {1}".format(
content_type, self.__converters))
def ToRequest(self, event: event_base.BaseEvent,
converter_type: str,
@ -85,8 +87,8 @@ class HTTPMarshaller(object):
if not isinstance(data_marshaller, typing.Callable):
raise exceptions.InvalidDataMarshaller()
if converter_type in self.__converters:
cnvrtr = self.__converters.get(converter_type)
if converter_type in self.__converters_by_type:
cnvrtr = self.__converters_by_type[converter_type]
return cnvrtr.write(event, data_marshaller)
raise exceptions.NoSuchConverter(converter_type)

View File

@ -66,7 +66,7 @@ def test_binary_converter_v01():
)
pytest.raises(
exceptions.UnsupportedEvent,
exceptions.UnsupportedEventConverter,
m.FromRequest,
v01.Event, {}, None, lambda x: x)
@ -102,7 +102,7 @@ def test_structured_converter_v01():
assert event.Get("id") == (data.ce_id, True)
def test_default_http_marshaller():
def test_default_http_marshaller_with_structured():
m = marshaller.NewDefaultHTTPMarshaller()
event = m.FromRequest(
@ -116,6 +116,21 @@ def test_default_http_marshaller():
assert event.Get("id") == (data.ce_id, True)
def test_default_http_marshaller_with_binary():
m = marshaller.NewDefaultHTTPMarshaller()
event = m.FromRequest(
v02.Event(),
data.headers,
io.StringIO(json.dumps(data.body)),
json.load
)
assert event is not None
assert event.Get("type") == (data.ce_type, True)
assert event.Get("data") == (data.body, True)
assert event.Get("id") == (data.ce_id, True)
def test_unsupported_event_configuration():
m = marshaller.NewHTTPMarshaller(
[
@ -123,7 +138,7 @@ def test_unsupported_event_configuration():
]
)
pytest.raises(
exceptions.UnsupportedEvent,
exceptions.UnsupportedEventConverter,
m.FromRequest,
v01.Event(),
{"Content-Type": "application/cloudevents+json"},

View File

@ -43,7 +43,7 @@ def test_event_pipeline_upstream():
assert "ce-source" in new_headers
assert "ce-id" in new_headers
assert "ce-time" in new_headers
assert "ce-contenttype" in new_headers
assert "content-type" in new_headers
assert isinstance(body, io.BytesIO)
assert data.body == body.read().decode("utf-8")
@ -66,7 +66,7 @@ def test_event_pipeline_v01():
_, body = m.ToRequest(event, converters.TypeStructured, lambda x: x)
assert isinstance(body, io.BytesIO)
new_headers = json.load(body)
new_headers = json.load(io.TextIOWrapper(body, encoding='utf-8'))
assert new_headers is not None
assert "cloudEventsVersion" in new_headers
assert "eventType" in new_headers

View File

@ -48,9 +48,10 @@ def test_binary_event_to_request_upstream():
def test_structured_event_to_request_upstream():
copy_of_ce = copy.deepcopy(data.ce)
m = marshaller.NewDefaultHTTPMarshaller()
http_headers = {"content-type": "application/cloudevents+json"}
event = m.FromRequest(
v02.Event(),
{"Content-Type": "application/cloudevents+json"},
http_headers,
io.StringIO(json.dumps(data.ce)),
lambda x: x.read()
)
@ -60,6 +61,9 @@ def test_structured_event_to_request_upstream():
new_headers, _ = m.ToRequest(event, converters.TypeStructured, lambda x: x)
for key in new_headers:
if key == "content-type":
assert new_headers[key] == http_headers[key]
continue
assert key in copy_of_ce
@ -70,9 +74,10 @@ def test_structured_event_to_request_v01():
structured.NewJSONHTTPCloudEventConverter()
]
)
http_headers = {"content-type": "application/cloudevents+json"}
event = m.FromRequest(
v01.Event(),
{"Content-Type": "application/cloudevents+json"},
http_headers,
io.StringIO(json.dumps(data.ce)),
lambda x: x.read()
)
@ -82,4 +87,7 @@ def test_structured_event_to_request_v01():
new_headers, _ = m.ToRequest(event, converters.TypeStructured, lambda x: x)
for key in new_headers:
if key == "content-type":
assert new_headers[key] == http_headers[key]
continue
assert key in copy_of_ce