diff --git a/cloudevents/sdk/event/base.py b/cloudevents/sdk/event/base.py index d392ae8..a8bb099 100644 --- a/cloudevents/sdk/event/base.py +++ b/cloudevents/sdk/event/base.py @@ -16,6 +16,21 @@ import io import json import typing +_ce_required_fields = { + 'id', + 'source', + 'type', + 'specversion' +} + + +_ce_optional_fields = { + 'datacontenttype', + 'schema', + 'subject', + 'time' +} + # TODO(slinkydeveloper) is this really needed? class EventGetterSetter(object): @@ -117,6 +132,7 @@ class BaseEvent(EventGetterSetter): def UnmarshalJSON(self, b: typing.IO, data_unmarshaller: typing.Callable): raw_ce = json.load(b) + for name, value in raw_ce.items(): if name == "data": value = data_unmarshaller(value) @@ -134,7 +150,6 @@ class BaseEvent(EventGetterSetter): self.SetContentType(value) elif header.startswith("ce-"): self.Set(header[3:], value) - self.Set("data", data_unmarshaller(body)) def MarshalBinary( diff --git a/cloudevents/sdk/http_events.py b/cloudevents/sdk/http_events.py new file mode 100644 index 0000000..4c5de1c --- /dev/null +++ b/cloudevents/sdk/http_events.py @@ -0,0 +1,136 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import copy + +import json +import typing + +from cloudevents.sdk import marshaller + +from cloudevents.sdk.event import base +from cloudevents.sdk.event import v03, v1 + + +class CloudEvent(base.BaseEvent): + """ + Python-friendly cloudevent class supporting v1 events + Currently only supports binary content mode CloudEvents + """ + + def __init__( + self, + headers: dict, + data: dict, + data_unmarshaller: typing.Callable = lambda x: x + ): + """ + Event HTTP Constructor + :param headers: a dict with HTTP headers + e.g. { + "content-type": "application/cloudevents+json", + "ce-id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "0.2" + } + :type headers: dict + :param data: a dict to be stored inside Event + :type data: dict + :param binary: a bool indicating binary events + :type binary: bool + :param data_unmarshaller: callable function for reading/extracting data + :type data_unmarshaller: typing.Callable + """ + headers = {key.lower(): value for key, value in headers.items()} + data = {key.lower(): value for key, value in data.items()} + event_version = CloudEvent.detect_event_version(headers, data) + if CloudEvent.is_binary_cloud_event(headers): + + # Headers validation for binary events + for field in base._ce_required_fields: + ce_prefixed_field = f"ce-{field}" + + # Verify field exists else throw TypeError + if ce_prefixed_field not in headers: + raise TypeError( + "parameter headers has no required attribute {0}" + .format( + ce_prefixed_field + )) + + if not isinstance(headers[ce_prefixed_field], str): + raise TypeError( + "in parameter headers attribute " + "{0} expected type str but found type {1}".format( + ce_prefixed_field, type(headers[ce_prefixed_field]) + )) + + for field in base._ce_optional_fields: + ce_prefixed_field = f"ce-{field}" + if ce_prefixed_field in headers and not \ + isinstance(headers[ce_prefixed_field], str): + raise TypeError( + "in parameter headers attribute " + "{0} expected type str but found type {1}".format( + ce_prefixed_field, type(headers[ce_prefixed_field]) + )) + + else: + # TODO: Support structured CloudEvents + raise NotImplementedError + + self.headers = copy.deepcopy(headers) + self.data = copy.deepcopy(data) + self.marshall = marshaller.NewDefaultHTTPMarshaller() + self.event_handler = event_version() + self.marshall.FromRequest( + self.event_handler, + self.headers, + self.data, + data_unmarshaller + ) + + @staticmethod + def is_binary_cloud_event(headers): + for field in base._ce_required_fields: + if f"ce-{field}" not in headers: + return False + return True + + @staticmethod + def detect_event_version(headers, data): + """ + Returns event handler depending on specversion within + headers for binary cloudevents or within data for structured + cloud events + """ + specversion = headers.get('ce-specversion', data.get('specversion')) + if specversion == '1.0': + return v1.Event + elif specversion == '0.3': + return v03.Event + else: + raise TypeError(f"specversion {specversion} " + "currently unsupported") + + def __repr__(self): + return json.dumps( + { + 'Event': { + 'headers': self.headers, + 'data': self.data + } + }, + indent=4 + ) diff --git a/cloudevents/tests/data.py b/cloudevents/tests/data.py index 6605c7f..ffe63ae 100644 --- a/cloudevents/tests/data.py +++ b/cloudevents/tests/data.py @@ -23,7 +23,7 @@ body = '{"name":"john"}' headers = { v03.Event: { - "ce-specversion": "0.3", + "ce-specversion": "1.0", "ce-type": ce_type, "ce-id": ce_id, "ce-time": eventTime, @@ -42,7 +42,7 @@ headers = { json_ce = { v03.Event: { - "specversion": "0.3", + "specversion": "1.0", "type": ce_type, "id": ce_id, "time": eventTime, diff --git a/cloudevents/tests/test_http_events.py b/cloudevents/tests/test_http_events.py new file mode 100644 index 0000000..943e219 --- /dev/null +++ b/cloudevents/tests/test_http_events.py @@ -0,0 +1,146 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import json + +import copy + +from cloudevents.sdk.http_events import CloudEvent + +from sanic import response +from sanic import Sanic + +import pytest + + +invalid_test_headers = [ + { + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-source": "", + "ce-specversion": "1.0" + }, { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + } +] + +test_data = { + "payload-content": "Hello World!" +} + +app = Sanic(__name__) + + +def post(url, headers, json): + return app.test_client.post(url, headers=headers, data=json) + + +@app.route("/event", ["POST"]) +async def echo(request): + assert isinstance(request.json, dict) + event = CloudEvent(dict(request.headers), request.json) + return response.text(json.dumps(event.data), headers=event.headers) + + +@pytest.mark.parametrize("headers", invalid_test_headers) +def test_invalid_binary_headers(headers): + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = CloudEvent(headers, test_data) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_emit_binary_event(specversion): + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion, + "Content-Type": "application/json" + } + event = CloudEvent(headers, test_data) + _, r = app.test_client.post( + "/event", + headers=event.headers, + data=json.dumps(event.data) + ) + + # Convert byte array to dict + # e.g. r.body = b'{"payload-content": "Hello World!"}' + body = json.loads(r.body.decode('utf-8')) + + # Check response fields + for key in test_data: + assert body[key] == test_data[key] + for key in headers: + assert r.headers[key] == headers[key] + assert r.status_code == 200 + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_missing_ce_prefix_binary_event(specversion): + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": specversion + } + for key in headers: + val = headers.pop(key) + + # breaking prefix e.g. e-id instead of ce-id + headers[key[1:]] = val + with pytest.raises((TypeError, NotImplementedError)): + # CloudEvent constructor throws TypeError if missing required field + # and NotImplementedError because structured calls aren't + # implemented. In this instance one of the required keys should have + # prefix e-id instead of ce-id therefore it should throw + _ = CloudEvent(headers, test_data) + + +@pytest.mark.parametrize("specversion", ['1.0', '0.3']) +def test_valid_cloud_events(specversion): + # Test creating multiple cloud events + events_queue = [] + headers = {} + num_cloudevents = 30 + for i in range(num_cloudevents): + headers = { + "ce-id": f"id{i}", + "ce-source": f"source{i}.com.test", + "ce-type": f"cloudevent.test.type", + "ce-specversion": specversion + } + data = {'payload': f"payload-{i}"} + events_queue.append(CloudEvent(headers, data)) + + for i, event in enumerate(events_queue): + headers = event.headers + data = event.data + + assert headers['ce-id'] == f"id{i}" + assert headers['ce-source'] == f"source{i}.com.test" + assert headers['ce-specversion'] == specversion + assert data['payload'] == f"payload-{i}" diff --git a/requirements/test.txt b/requirements/test.txt index e9df186..1289408 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -7,4 +7,4 @@ pytest==4.0.0 pytest-cov==2.4.0 # web app tests sanic -aiohttp \ No newline at end of file +aiohttp diff --git a/samples/python-event-requests/cloudevent_to_request.py b/samples/python-event-requests/cloudevent_to_request.py new file mode 100644 index 0000000..4b9b567 --- /dev/null +++ b/samples/python-event-requests/cloudevent_to_request.py @@ -0,0 +1,44 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import io +from cloudevents.sdk.http_events import CloudEvent + +import requests + +if __name__ == "__main__": + # expects a url from command line. e.g. + # python3 sample-server.py http://localhost:3000/event + if len(sys.argv) < 2: + sys.exit("Usage: python with_requests.py " + "") + + url = sys.argv[1] + + # CloudEvent headers and data + headers = { + "ce-id": "my-id", + "ce-source": "", + "ce-type": "cloudevent.event.type", + "ce-specversion": "1.0" + } + data = {"payload-content": "Hello World!"} + + # Create a CloudEvent + event = CloudEvent(headers=headers, data=data) + + # Print the created CloudEvent then send it to some url we got from + # command line + print(f"Sent {event}") + requests.post(url, headers=event.headers, json=event.data) diff --git a/samples/python-event-requests/sample-server.py b/samples/python-event-requests/sample-server.py new file mode 100644 index 0000000..fd9f187 --- /dev/null +++ b/samples/python-event-requests/sample-server.py @@ -0,0 +1,34 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from cloudevents.sdk.http_events import CloudEvent +from flask import Flask, request +app = Flask(__name__) + + +# Create an endpoint at http://localhost:/3000/event +@app.route('/event', methods=['POST']) +def hello(): + # Convert headers to dict + headers = dict(request.headers) + + # Create a CloudEvent + event = CloudEvent(headers=headers, data=request.json) + + # Print the received CloudEvent + print(f"Received {event}") + return '', 204 + + +if __name__ == '__main__': + app.run(port=3000)