Getter/Setters with some tests

Signed-off-by: Denis Makogon <lildee1991@gmail.com>
This commit is contained in:
Denis Makogon 2018-11-16 12:16:52 +02:00
parent 5cfb98719f
commit 132fe61a8c
5 changed files with 278 additions and 24 deletions

View File

@ -17,24 +17,80 @@ import ujson
import typing
class BaseEvent(object):
class EventGetterSetter(object):
def Properties(self) -> dict:
def CloudEventVersion(self) -> str:
raise Exception("not implemented")
# CloudEvent attribute getters
def EventType(self) -> str:
raise Exception("not implemented")
def Source(self) -> str:
raise Exception("not implemented")
def EventID(self) -> str:
raise Exception("not implemented")
def EventTime(self) -> str:
raise Exception("not implemented")
def SchemaURL(self) -> str:
raise Exception("not implemented")
def Data(self) -> object:
raise Exception("not implemented")
def Extensions(self) -> dict:
raise Exception("not implemented")
def ContentType(self) -> str:
raise Exception("not implemented")
# CloudEvent attribute constructors
# Each setter return an instance of its class
# in order to build a pipeline of setter
def WithEventType(self, eventType: str) -> object:
raise Exception("not implemented")
def WithSource(self, source: str) -> object:
raise Exception("not implemented")
def WithEventID(self, eventID: str) -> object:
raise Exception("not implemented")
def WithEventTime(self, eventTime: str) -> object:
raise Exception("not implemented")
def WithSchemaURL(self, schemaURL: str) -> object:
raise Exception("not implemented")
def WithData(self, data: object) -> object:
raise Exception("not implemented")
def WithExtensions(self, extensions: dict) -> object:
raise Exception("not implemented")
def WithContentType(self, contentType: str) -> object:
raise Exception("not implemented")
class BaseEvent(EventGetterSetter):
def Properties(self, with_nullable=False) -> dict:
props = dict()
for name, value in self.__dict__.items():
if str(name).startswith("ce__"):
props.update(
{
str(name).replace("ce__", ""): value.get()
}
)
v = value.get()
if v is not None or with_nullable:
props.update(
{
str(name).replace("ce__", ""): value.get()
}
)
return props
def Extensions(self) -> dict:
props = self.Properties()
return props.get("extensions")
def Get(self, key: str) -> (object, bool):
formatted_key = "ce__{0}".format(key.lower())
ok = hasattr(self, formatted_key)
@ -67,7 +123,7 @@ class BaseEvent(object):
self.Set(name, value)
def UnmarshalBinary(self, headers: dict, body: typing.IO):
props = self.Properties()
props = self.Properties(with_nullable=True)
exts = props.get("extensions")
for key in props:
formatted_key = "ce-{0}".format(key)

View File

@ -31,3 +31,59 @@ class Event(base.BaseEvent):
def CloudEventVersion(self) -> str:
return self.ce__specversion.get()
def EventType(self) -> str:
return self.ce__type.get()
def Source(self) -> str:
return self.ce__source.get()
def EventID(self) -> str:
return self.ce__id.get()
def EventTime(self) -> str:
return self.ce__time.get()
def SchemaURL(self) -> str:
return self.ce__schemaurl.get()
def Data(self) -> object:
return self.ce__data.get()
def Extensions(self) -> dict:
return self.ce__extensions.get()
def ContentType(self) -> str:
return self.ce__contenttype.get()
def WithEventType(self, eventType: str) -> base.BaseEvent:
self.Set("type", eventType)
return self
def WithSource(self, source: str) -> base.BaseEvent:
self.Set("source", source)
return self
def WithEventID(self, eventID: str) -> base.BaseEvent:
self.Set("id", eventID)
return self
def WithEventTime(self, eventTime: str) -> base.BaseEvent:
self.Set("time", eventTime)
return self
def WithSchemaURL(self, schemaURL: str) -> base.BaseEvent:
self.Set("schemaurl", schemaURL)
return self
def WithData(self, data: object) -> base.BaseEvent:
self.Set("data", data)
return self
def WithExtensions(self, extensions: dict) -> base.BaseEvent:
self.Set("extension", extensions)
return self
def WithContentType(self, contentType: str) -> base.BaseEvent:
self.Set("contenttype", contentType)
return self

View File

@ -26,7 +26,7 @@ class Event(base.BaseEvent):
"eventTypeVersion", None, False)
self.ce__source = opt.Option("source", None, True)
self.ce__eventID = opt.Option("eventID", None, True)
self.ce__evenTime = opt.Option("eventTime", None, True)
self.ce__eventTime = opt.Option("eventTime", None, True)
self.ce__schemaURL = opt.Option("schemaURL", None, False)
self.ce__contentType = opt.Option("contentType", None, False)
self.ce__data = opt.Option("data", None, False)
@ -34,3 +34,67 @@ class Event(base.BaseEvent):
def CloudEventVersion(self) -> str:
return self.ce__cloudEventsVersion.get()
def EventType(self) -> str:
return self.ce__eventType.get()
def Source(self) -> str:
return self.ce__source.get()
def EventID(self) -> str:
return self.ce__eventID.get()
def EventTime(self) -> str:
return self.ce__eventTime.get()
def SchemaURL(self) -> str:
return self.ce__schemaURL.get()
def Data(self) -> object:
return self.ce__data.get()
def Extensions(self) -> dict:
return self.ce__extensions.get()
def ContentType(self) -> str:
return self.ce__contentType.get()
def WithEventType(self, eventType: str) -> base.BaseEvent:
self.Set("eventType", eventType)
return self
def WithSource(self, source: str) -> base.BaseEvent:
self.Set("source", source)
return self
def WithEventID(self, eventID: str) -> base.BaseEvent:
self.Set("eventID", eventID)
return self
def WithEventTime(self, eventTime: str) -> base.BaseEvent:
self.Set("eventTime", eventTime)
return self
def WithSchemaURL(self, schemaURL: str) -> base.BaseEvent:
self.Set("schemaURL", schemaURL)
return self
def WithData(self, data: object) -> base.BaseEvent:
self.Set("data", data)
return self
def WithExtensions(self, extensions: dict) -> base.BaseEvent:
self.Set("extension", extensions)
return self
def WithContentType(self, contentType: str) -> base.BaseEvent:
self.Set("contentType", contentType)
return self
# additional getter/setter
def EventTypeVersion(self) -> str:
return self.ce__eventTypeVersion.get()
def WithEventTypeVersion(self, eventTypeVersion: str) -> base.BaseEvent:
self.Set("eventTypeVersion", eventTypeVersion)
return self

View File

@ -12,22 +12,26 @@
# License for the specific language governing permissions and limitations
# under the License.
contentType = "application/json"
ce_type = "word.found.exclamation"
ce_id = "16fb5f0b-211e-1102-3dfe-ea6e2806f124"
source = "pytest"
specversion = "0.1"
eventTime = "2018-10-23T12:28:23.3464579Z"
body = '{"name":"john"}'
headers = {
"ce-specversion": "0.1",
"ce-specversion": specversion,
"ce-type": ce_type,
"ce-id": ce_id,
"ce-time": "2018-10-23T12:28:23.3464579Z",
"ce-source": "pytest",
"Content-Type": "application/json"
"ce-time": eventTime,
"ce-source": source,
"Content-Type": contentType
}
ce = {
"specversion": "0.1",
"type": "word.found.exclamation",
"id": "16fb5f0b-211e-1102-3dfe-ea6e2806f124",
"time": "2018-10-23T12:28:23.3464579Z",
"source": "pytest",
"contenttype": "application/json"
"specversion": specversion,
"type": ce_type,
"id": ce_id,
"time": eventTime,
"source": source,
"contenttype": contentType
}

View File

@ -0,0 +1,74 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ujson
from cloudevents.sdk.event import v01
from cloudevents.sdk.event import upstream
from cloudevents.sdk import converters
from cloudevents.sdk import marshaller
from cloudevents.sdk.converters import structured
from cloudevents.tests import data
def test_event_pipeline_upstream():
event = (
upstream.Event().
WithContentType(data.contentType).
WithData(data.body).
WithEventID(data.ce_id).
WithSource(data.source).
WithEventTime(data.eventTime).
WithEventType(data.ce_type)
)
m = marshaller.NewDefaultHTTPMarshaller(type(event))
new_headers, body = m.ToRequest(event, converters.TypeBinary, lambda x: x)
assert new_headers is not None
assert "ce-specversion" in new_headers
assert "ce-type" in new_headers
assert "ce-source" in new_headers
assert "ce-id" in new_headers
assert "ce-time" in new_headers
assert "ce-contenttype" in new_headers
assert data.body == body
def test_event_pipeline_v01():
event = (
v01.Event().
WithContentType(data.contentType).
WithData(data.body).
WithEventID(data.ce_id).
WithSource(data.source).
WithEventTime(data.eventTime).
WithEventType(data.ce_type)
)
m = marshaller.NewHTTPMarshaller(
[
structured.NewJSONHTTPCloudEventConverter(type(event))
]
)
_, body = m.ToRequest(event, converters.TypeStructured, lambda x: x)
new_headers = ujson.load(body)
assert new_headers is not None
assert "cloudEventsVersion" in new_headers
assert "eventType" in new_headers
assert "source" in new_headers
assert "eventID" in new_headers
assert "eventTime" in new_headers
assert "contentType" in new_headers
assert data.body == new_headers["data"]