Modified content-type to abide by attribute naming conventions for cloudevents (#232)
* fix: changed content-type to a valid attribute Signed-off-by: vivjd <vivjdeng@hotmail.com> * fix: changed headers back to content-type Signed-off-by: Vivian <118199397+vivjd@users.noreply.github.com> Signed-off-by: vivjd <vivjdeng@hotmail.com> * modified kafka test cases to match datacontenttype Signed-off-by: vivjd <vivjdeng@hotmail.com> * fix: updated kafka/conversion.py and test cases to check for valid attributes Signed-off-by: vivjd <vivjdeng@hotmail.com> --------- Signed-off-by: vivjd <vivjdeng@hotmail.com> Signed-off-by: Vivian <118199397+vivjd@users.noreply.github.com> Co-authored-by: Yurii Serhiichuk <xSAVIKx@users.noreply.github.com>
This commit is contained in:
parent
11520e35e1
commit
16441d79f4
|
@ -87,10 +87,10 @@ def to_binary(
|
|||
)
|
||||
|
||||
headers = {}
|
||||
if event["content-type"]:
|
||||
headers["content-type"] = event["content-type"].encode("utf-8")
|
||||
if event["datacontenttype"]:
|
||||
headers["content-type"] = event["datacontenttype"].encode("utf-8")
|
||||
for attr, value in event.get_attributes().items():
|
||||
if attr not in ["data", "partitionkey", "content-type"]:
|
||||
if attr not in ["data", "partitionkey", "datacontenttype"]:
|
||||
if value is not None:
|
||||
headers["ce_{0}".format(attr)] = value.encode("utf-8")
|
||||
|
||||
|
@ -126,7 +126,7 @@ def from_binary(
|
|||
for header, value in message.headers.items():
|
||||
header = header.lower()
|
||||
if header == "content-type":
|
||||
attributes["content-type"] = value.decode()
|
||||
attributes["datacontenttype"] = value.decode()
|
||||
elif header.startswith("ce_"):
|
||||
attributes[header[3:]] = value.decode()
|
||||
|
||||
|
@ -189,8 +189,8 @@ def to_structured(
|
|||
attrs["data"] = data
|
||||
|
||||
headers = {}
|
||||
if "content-type" in attrs:
|
||||
headers["content-type"] = attrs.pop("content-type").encode("utf-8")
|
||||
if "datacontenttype" in attrs:
|
||||
headers["content-type"] = attrs.pop("datacontenttype").encode("utf-8")
|
||||
|
||||
try:
|
||||
value = envelope_marshaller(attrs)
|
||||
|
@ -255,7 +255,10 @@ def from_structured(
|
|||
attributes[name] = decoded_value
|
||||
|
||||
for header, val in message.headers.items():
|
||||
attributes[header.lower()] = val.decode()
|
||||
if header.lower() == "content-type":
|
||||
attributes["datacontenttype"] = val.decode()
|
||||
else:
|
||||
attributes[header.lower()] = val.decode()
|
||||
if event_type:
|
||||
result = event_type.create(attributes, data)
|
||||
else:
|
||||
|
|
|
@ -59,7 +59,7 @@ class KafkaConversionTestBase:
|
|||
"source": "pytest",
|
||||
"type": "com.pytest.test",
|
||||
"time": datetime.datetime(2000, 1, 1, 6, 42, 33).isoformat(),
|
||||
"content-type": "foo",
|
||||
"datacontenttype": "foo",
|
||||
"partitionkey": "test_key_123",
|
||||
},
|
||||
data=self.expected_data,
|
||||
|
@ -123,7 +123,7 @@ class TestToBinary(KafkaConversionTestBase):
|
|||
assert result.headers["ce_source"] == source_event["source"].encode("utf-8")
|
||||
assert result.headers["ce_type"] == source_event["type"].encode("utf-8")
|
||||
assert result.headers["ce_time"] == source_event["time"].encode("utf-8")
|
||||
assert result.headers["content-type"] == source_event["content-type"].encode(
|
||||
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
|
||||
"utf-8"
|
||||
)
|
||||
assert "data" not in result.headers
|
||||
|
@ -163,7 +163,7 @@ class TestFromBinary(KafkaConversionTestBase):
|
|||
"ce_time": datetime.datetime(2000, 1, 1, 6, 42, 33)
|
||||
.isoformat()
|
||||
.encode("utf-8"),
|
||||
"content-type": "foo".encode("utf-8"),
|
||||
"datacontenttype": "foo".encode("utf-8"),
|
||||
},
|
||||
value=simple_serialize(self.expected_data),
|
||||
key="test_key_123",
|
||||
|
@ -205,7 +205,7 @@ class TestFromBinary(KafkaConversionTestBase):
|
|||
assert result["type"] == source_binary_json_message.headers["ce_type"].decode()
|
||||
assert result["time"] == source_binary_json_message.headers["ce_time"].decode()
|
||||
assert (
|
||||
result["content-type"]
|
||||
result["datacontenttype"]
|
||||
== source_binary_json_message.headers["content-type"].decode()
|
||||
)
|
||||
|
||||
|
@ -328,7 +328,7 @@ class TestToStructured(KafkaConversionTestBase):
|
|||
def test_sets_headers(self, source_event):
|
||||
result = to_structured(source_event)
|
||||
assert len(result.headers) == 1
|
||||
assert result.headers["content-type"] == source_event["content-type"].encode(
|
||||
assert result.headers["content-type"] == source_event["datacontenttype"].encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
|
@ -474,7 +474,7 @@ class TestFromStructured(KafkaConversionTestBase):
|
|||
):
|
||||
result = from_structured(source_structured_json_message)
|
||||
assert (
|
||||
result["content-type"]
|
||||
result["datacontenttype"]
|
||||
== source_structured_json_message.headers["content-type"].decode()
|
||||
)
|
||||
|
||||
|
@ -487,7 +487,7 @@ class TestFromStructured(KafkaConversionTestBase):
|
|||
envelope_unmarshaller=custom_unmarshaller,
|
||||
)
|
||||
assert (
|
||||
result["content-type"]
|
||||
result["datacontenttype"]
|
||||
== source_structured_bytes_bytes_message.headers["content-type"].decode()
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue