Created CloudEvent class (#36)

CloudEvents is a more pythonic interface for using cloud events.
It is powered by internal marshallers and cloud event base classes.
It performs basic validation on fields, and cloud event type checking.

Signed-off-by: Curtis Mason <cumason@google.com>
Signed-off-by: Dustin Ingram <di@users.noreply.github.com>
This commit is contained in:
Curtis Mason 2020-06-23 15:31:37 -04:00 committed by Dustin Ingram
parent d551dba58a
commit 0b8f56de92
No known key found for this signature in database
GPG Key ID: 93D2B8D4930A5E39
7 changed files with 379 additions and 4 deletions

View File

@ -16,6 +16,21 @@ import io
import json
import typing
_ce_required_fields = {
'id',
'source',
'type',
'specversion'
}
_ce_optional_fields = {
'datacontenttype',
'schema',
'subject',
'time'
}
# TODO(slinkydeveloper) is this really needed?
class EventGetterSetter(object):
@ -117,6 +132,7 @@ class BaseEvent(EventGetterSetter):
def UnmarshalJSON(self, b: typing.IO, data_unmarshaller: typing.Callable):
raw_ce = json.load(b)
for name, value in raw_ce.items():
if name == "data":
value = data_unmarshaller(value)
@ -134,7 +150,6 @@ class BaseEvent(EventGetterSetter):
self.SetContentType(value)
elif header.startswith("ce-"):
self.Set(header[3:], value)
self.Set("data", data_unmarshaller(body))
def MarshalBinary(

View File

@ -0,0 +1,136 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import typing
from cloudevents.sdk import marshaller
from cloudevents.sdk.event import base
from cloudevents.sdk.event import v03, v1
class CloudEvent(base.BaseEvent):
"""
Python-friendly cloudevent class supporting v1 events
Currently only supports binary content mode CloudEvents
"""
def __init__(
self,
headers: dict,
data: dict,
data_unmarshaller: typing.Callable = lambda x: x
):
"""
Event HTTP Constructor
:param headers: a dict with HTTP headers
e.g. {
"content-type": "application/cloudevents+json",
"ce-id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124",
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
"ce-specversion": "0.2"
}
:type headers: dict
:param data: a dict to be stored inside Event
:type data: dict
:param binary: a bool indicating binary events
:type binary: bool
:param data_unmarshaller: callable function for reading/extracting data
:type data_unmarshaller: typing.Callable
"""
headers = {key.lower(): value for key, value in headers.items()}
data = {key.lower(): value for key, value in data.items()}
event_version = CloudEvent.detect_event_version(headers, data)
if CloudEvent.is_binary_cloud_event(headers):
# Headers validation for binary events
for field in base._ce_required_fields:
ce_prefixed_field = f"ce-{field}"
# Verify field exists else throw TypeError
if ce_prefixed_field not in headers:
raise TypeError(
"parameter headers has no required attribute {0}"
.format(
ce_prefixed_field
))
if not isinstance(headers[ce_prefixed_field], str):
raise TypeError(
"in parameter headers attribute "
"{0} expected type str but found type {1}".format(
ce_prefixed_field, type(headers[ce_prefixed_field])
))
for field in base._ce_optional_fields:
ce_prefixed_field = f"ce-{field}"
if ce_prefixed_field in headers and not \
isinstance(headers[ce_prefixed_field], str):
raise TypeError(
"in parameter headers attribute "
"{0} expected type str but found type {1}".format(
ce_prefixed_field, type(headers[ce_prefixed_field])
))
else:
# TODO: Support structured CloudEvents
raise NotImplementedError
self.headers = copy.deepcopy(headers)
self.data = copy.deepcopy(data)
self.marshall = marshaller.NewDefaultHTTPMarshaller()
self.event_handler = event_version()
self.marshall.FromRequest(
self.event_handler,
self.headers,
self.data,
data_unmarshaller
)
@staticmethod
def is_binary_cloud_event(headers):
for field in base._ce_required_fields:
if f"ce-{field}" not in headers:
return False
return True
@staticmethod
def detect_event_version(headers, data):
"""
Returns event handler depending on specversion within
headers for binary cloudevents or within data for structured
cloud events
"""
specversion = headers.get('ce-specversion', data.get('specversion'))
if specversion == '1.0':
return v1.Event
elif specversion == '0.3':
return v03.Event
else:
raise TypeError(f"specversion {specversion} "
"currently unsupported")
def __repr__(self):
return json.dumps(
{
'Event': {
'headers': self.headers,
'data': self.data
}
},
indent=4
)

View File

@ -23,7 +23,7 @@ body = '{"name":"john"}'
headers = {
v03.Event: {
"ce-specversion": "0.3",
"ce-specversion": "1.0",
"ce-type": ce_type,
"ce-id": ce_id,
"ce-time": eventTime,
@ -42,7 +42,7 @@ headers = {
json_ce = {
v03.Event: {
"specversion": "0.3",
"specversion": "1.0",
"type": ce_type,
"id": ce_id,
"time": eventTime,

View File

@ -0,0 +1,146 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import copy
from cloudevents.sdk.http_events import CloudEvent
from sanic import response
from sanic import Sanic
import pytest
invalid_test_headers = [
{
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
"ce-specversion": "1.0"
}, {
"ce-id": "my-id",
"ce-type": "cloudevent.event.type",
"ce-specversion": "1.0"
}, {
"ce-id": "my-id",
"ce-source": "<event-source>",
"ce-specversion": "1.0"
}, {
"ce-id": "my-id",
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
}
]
test_data = {
"payload-content": "Hello World!"
}
app = Sanic(__name__)
def post(url, headers, json):
return app.test_client.post(url, headers=headers, data=json)
@app.route("/event", ["POST"])
async def echo(request):
assert isinstance(request.json, dict)
event = CloudEvent(dict(request.headers), request.json)
return response.text(json.dumps(event.data), headers=event.headers)
@pytest.mark.parametrize("headers", invalid_test_headers)
def test_invalid_binary_headers(headers):
with pytest.raises((TypeError, NotImplementedError)):
# CloudEvent constructor throws TypeError if missing required field
# and NotImplementedError because structured calls aren't
# implemented. In this instance one of the required keys should have
# prefix e-id instead of ce-id therefore it should throw
_ = CloudEvent(headers, test_data)
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
def test_emit_binary_event(specversion):
headers = {
"ce-id": "my-id",
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
"ce-specversion": specversion,
"Content-Type": "application/json"
}
event = CloudEvent(headers, test_data)
_, r = app.test_client.post(
"/event",
headers=event.headers,
data=json.dumps(event.data)
)
# Convert byte array to dict
# e.g. r.body = b'{"payload-content": "Hello World!"}'
body = json.loads(r.body.decode('utf-8'))
# Check response fields
for key in test_data:
assert body[key] == test_data[key]
for key in headers:
assert r.headers[key] == headers[key]
assert r.status_code == 200
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
def test_missing_ce_prefix_binary_event(specversion):
headers = {
"ce-id": "my-id",
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
"ce-specversion": specversion
}
for key in headers:
val = headers.pop(key)
# breaking prefix e.g. e-id instead of ce-id
headers[key[1:]] = val
with pytest.raises((TypeError, NotImplementedError)):
# CloudEvent constructor throws TypeError if missing required field
# and NotImplementedError because structured calls aren't
# implemented. In this instance one of the required keys should have
# prefix e-id instead of ce-id therefore it should throw
_ = CloudEvent(headers, test_data)
@pytest.mark.parametrize("specversion", ['1.0', '0.3'])
def test_valid_cloud_events(specversion):
# Test creating multiple cloud events
events_queue = []
headers = {}
num_cloudevents = 30
for i in range(num_cloudevents):
headers = {
"ce-id": f"id{i}",
"ce-source": f"source{i}.com.test",
"ce-type": f"cloudevent.test.type",
"ce-specversion": specversion
}
data = {'payload': f"payload-{i}"}
events_queue.append(CloudEvent(headers, data))
for i, event in enumerate(events_queue):
headers = event.headers
data = event.data
assert headers['ce-id'] == f"id{i}"
assert headers['ce-source'] == f"source{i}.com.test"
assert headers['ce-specversion'] == specversion
assert data['payload'] == f"payload-{i}"

View File

@ -7,4 +7,4 @@ pytest==4.0.0
pytest-cov==2.4.0
# web app tests
sanic
aiohttp
aiohttp

View File

@ -0,0 +1,44 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import io
from cloudevents.sdk.http_events import CloudEvent
import requests
if __name__ == "__main__":
# expects a url from command line. e.g.
# python3 sample-server.py http://localhost:3000/event
if len(sys.argv) < 2:
sys.exit("Usage: python with_requests.py "
"<CloudEvents controller URL>")
url = sys.argv[1]
# CloudEvent headers and data
headers = {
"ce-id": "my-id",
"ce-source": "<event-source>",
"ce-type": "cloudevent.event.type",
"ce-specversion": "1.0"
}
data = {"payload-content": "Hello World!"}
# Create a CloudEvent
event = CloudEvent(headers=headers, data=data)
# Print the created CloudEvent then send it to some url we got from
# command line
print(f"Sent {event}")
requests.post(url, headers=event.headers, json=event.data)

View File

@ -0,0 +1,34 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudevents.sdk.http_events import CloudEvent
from flask import Flask, request
app = Flask(__name__)
# Create an endpoint at http://localhost:/3000/event
@app.route('/event', methods=['POST'])
def hello():
# Convert headers to dict
headers = dict(request.headers)
# Create a CloudEvent
event = CloudEvent(headers=headers, data=request.json)
# Print the received CloudEvent
print(f"Received {event}")
return '', 204
if __name__ == '__main__':
app.run(port=3000)