diff --git a/CloudEvents.sln b/CloudEvents.sln
index 9e299b1..1d9d4fc 100644
--- a/CloudEvents.sln
+++ b/CloudEvents.sln
@@ -21,6 +21,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Mqt
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Amqp", "src\CloudNative.CloudEvents.Amqp\CloudNative.CloudEvents.Amqp.csproj", "{39EF4DB0-9890-4CAD-A36E-F7E25D2E72EF}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Kafka", "src\CloudNative.CloudEvents.Kafka\CloudNative.CloudEvents.Kafka.csproj", "{193D6D9D-C1A0-459E-86CF-F207CDF0FC73}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -91,6 +93,18 @@ Global
{39EF4DB0-9890-4CAD-A36E-F7E25D2E72EF}.Release|x64.Build.0 = Release|Any CPU
{39EF4DB0-9890-4CAD-A36E-F7E25D2E72EF}.Release|x86.ActiveCfg = Release|Any CPU
{39EF4DB0-9890-4CAD-A36E-F7E25D2E72EF}.Release|x86.Build.0 = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|x64.Build.0 = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Debug|x86.Build.0 = Debug|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|Any CPU.Build.0 = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|x64.ActiveCfg = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|x64.Build.0 = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|x86.ActiveCfg = Release|Any CPU
+ {193D6D9D-C1A0-459E-86CF-F207CDF0FC73}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
index 6c10821..011d881 100644
--- a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
+++ b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
@@ -63,8 +63,7 @@ namespace CloudNative.CloudEvents.Amqp
}
else
{
- var cloudEvent = new CloudEvent(
- message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader1)
+ var specVersion = message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader1)
? CloudEventsSpecVersion.V0_1
: message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader2)
? (message.ApplicationProperties.Map[SpecVersionAmqpHeader2] as string == "0.2"
@@ -72,7 +71,9 @@ namespace CloudNative.CloudEvents.Amqp
(message.ApplicationProperties.Map[SpecVersionAmqpHeader2] as string == "0.3"
? CloudEventsSpecVersion.V0_3
: CloudEventsSpecVersion.Default))
- : CloudEventsSpecVersion.Default, extensions);
+ : CloudEventsSpecVersion.Default;
+
+ var cloudEvent = new CloudEvent(specVersion , extensions);
var attributes = cloudEvent.GetAttributes();
foreach (var prop in message.ApplicationProperties.Map)
{
diff --git a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj
new file mode 100644
index 0000000..f715e12
--- /dev/null
+++ b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj
@@ -0,0 +1,20 @@
+
+
+
+ netstandard2.0
+ 0.1
+ Kafka extensions for CloudNative.CloudEvents
+ Copyright Cloud Native Foundation
+ https://github.com/cloudevents/sdk-csharp
+ https://cloudevents.io
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
new file mode 100644
index 0000000..dd831c6
--- /dev/null
+++ b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
@@ -0,0 +1,159 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+
+namespace CloudNative.CloudEvents.Kafka
+{
+ using CloudNative.CloudEvents.Extensions;
+ using Confluent.Kafka;
+ using System;
+ using System.Linq;
+ using System.Net.Mime;
+ using System.Text;
+
+ public static class KafkaClientExtensions
+ {
+ private static string StructuredContentTypePrefix = "application/cloudevents";
+ private const string SpecVersionKafkaHeader1 = KafkaCloudEventMessage.KafkaHeaderPerfix + "cloudEventsVersion";
+
+ private const string SpecVersionKafkaHeader2 = KafkaCloudEventMessage.KafkaHeaderPerfix + "specversion";
+
+ private static JsonEventFormatter _jsonFormatter = new JsonEventFormatter();
+
+ public static bool IsCloudEvent(this Message message)
+ {
+ return message.Headers.Any(x =>
+ string.Equals(x.Key, SpecVersionKafkaHeader1, StringComparison.InvariantCultureIgnoreCase)
+ || string.Equals(x.Key, SpecVersionKafkaHeader2, StringComparison.InvariantCultureIgnoreCase)
+ || (string.Equals(x.Key, KafkaCloudEventMessage.KafkaContentTypeAttributeName, StringComparison.InvariantCultureIgnoreCase)
+ && Encoding.UTF8.GetString(x.GetValueBytes() ?? Array.Empty()).StartsWith(StructuredContentTypePrefix)));
+ }
+
+ public static CloudEvent ToCloudEvent(this Message message,
+ ICloudEventFormatter eventFormatter = null, params ICloudEventExtension[] extensions)
+ {
+ if (!IsCloudEvent(message))
+ {
+ throw new InvalidOperationException();
+ }
+
+ var contentType = ExtractContentType(message);
+
+ CloudEvent cloudEvent;
+
+ if (!string.IsNullOrEmpty(contentType)
+ && contentType.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase))
+ {
+ // structured mode
+ if (eventFormatter == null)
+ {
+ if (contentType.EndsWith(JsonEventFormatter.MediaTypeSuffix, StringComparison.InvariantCultureIgnoreCase))
+ {
+ eventFormatter = _jsonFormatter;
+ }
+ else
+ {
+ throw new InvalidOperationException("Not supported CloudEvents media formatter.");
+ }
+ }
+
+ cloudEvent = _jsonFormatter.DecodeStructuredEvent(message.Value, extensions);
+ }
+ else
+ {
+ // binary mode
+ var specVersion = ExtractVersion(message);
+
+ cloudEvent = new CloudEvent(specVersion, extensions);
+ var attributes = cloudEvent.GetAttributes();
+ var cloudEventHeaders = message.Headers.Where(h => h.Key.StartsWith(KafkaCloudEventMessage.KafkaHeaderPerfix));
+
+ foreach (var header in cloudEventHeaders)
+ {
+ if (string.Equals(header.Key, SpecVersionKafkaHeader1, StringComparison.InvariantCultureIgnoreCase)
+ || string.Equals(header.Key, SpecVersionKafkaHeader2, StringComparison.InvariantCultureIgnoreCase))
+ {
+ continue;
+ }
+
+ var attributeName = header.Key.Substring(KafkaCloudEventMessage.KafkaHeaderPerfix.Length);
+ attributes.Add(attributeName,
+ eventFormatter.DecodeAttribute(specVersion, attributeName, header.GetValueBytes(), extensions));
+ }
+
+ cloudEvent.DataContentType = contentType != null ? new ContentType(contentType) : null;
+ cloudEvent.Data = message.Value;
+ }
+
+ InitPartitioningKey(message, cloudEvent);
+
+ return cloudEvent;
+ }
+
+ private static string ExtractContentType(Message message)
+ {
+ var contentTypeHeader = message.Headers.FirstOrDefault(x => string.Equals(x.Key, KafkaCloudEventMessage.KafkaContentTypeAttributeName,
+ StringComparison.InvariantCultureIgnoreCase));
+ string contentType = null;
+ if (contentTypeHeader != null)
+ {
+ var bytes = contentTypeHeader.GetValueBytes();
+ contentType = Encoding.UTF8.GetString(bytes ?? Array.Empty());
+ }
+
+ return contentType;
+ }
+
+ private static void InitPartitioningKey(Message message, CloudEvent cloudEvent)
+ {
+ if (!string.IsNullOrEmpty(message.Key))
+ {
+ var extension = cloudEvent.Extension();
+ extension.PartitioningKeyValue = message.Key;
+ }
+ }
+
+ private static CloudEventsSpecVersion ExtractVersion(Message message)
+ {
+ var specVersionHeaders = message.Headers.Where(x => string.Equals(x.Key, SpecVersionKafkaHeader1, StringComparison.InvariantCultureIgnoreCase)
+ || string.Equals(x.Key, SpecVersionKafkaHeader2, StringComparison.InvariantCultureIgnoreCase))
+ .ToDictionary(x => x.Key, x => x, StringComparer.InvariantCultureIgnoreCase);
+
+ var specVersion = CloudEventsSpecVersion.Default;
+ if (specVersionHeaders.ContainsKey(SpecVersionKafkaHeader1))
+ {
+ specVersion = CloudEventsSpecVersion.V0_1;
+ }
+ else if (specVersionHeaders.ContainsKey(SpecVersionKafkaHeader2))
+ {
+ var specVersionValue = Encoding.UTF8.GetString(specVersionHeaders[SpecVersionKafkaHeader2].GetValueBytes() ?? Array.Empty());
+ if (specVersionValue == "0.2")
+ {
+ specVersion = CloudEventsSpecVersion.V0_2;
+ }
+ else if (specVersionValue == "0.3")
+ {
+ specVersion = CloudEventsSpecVersion.V0_3;
+ }
+ }
+
+ return specVersion;
+ }
+
+ private static (bool isBinaryMode, string contentType) IsBinaryMode(Message message)
+ {
+ var contentTypeHeader = message.Headers.FirstOrDefault(x => string.Equals(x.Key, KafkaCloudEventMessage.KafkaContentTypeAttributeName));
+ if (contentTypeHeader != null)
+ {
+ var value = Encoding.UTF8.GetString(contentTypeHeader.GetValueBytes());
+ if (!string.IsNullOrEmpty( value) && value.StartsWith(StructuredContentTypePrefix, StringComparison.InvariantCultureIgnoreCase))
+ {
+ return (true, value);
+ }
+ }
+
+ return (false, null);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs b/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs
new file mode 100644
index 0000000..0ad5dc9
--- /dev/null
+++ b/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs
@@ -0,0 +1,88 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+namespace CloudNative.CloudEvents.Kafka
+{
+ using CloudNative.CloudEvents.Extensions;
+ using Confluent.Kafka;
+ using System;
+ using System.IO;
+ using System.Text;
+
+ public class KafkaCloudEventMessage : Message
+ {
+ public const string KafkaHeaderPerfix = "ce_";
+
+ public const string KafkaContentTypeAttributeName = "content-type";
+
+ public KafkaCloudEventMessage(CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter)
+ {
+ if (cloudEvent.Data == null)
+ {
+ throw new ArgumentNullException(nameof(cloudEvent.Data));
+ }
+
+ Headers = new Headers();
+
+ Key = ExtractPartitionKey(cloudEvent);
+
+ if (contentMode == ContentMode.Structured)
+ {
+ Value = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentType.MediaType));
+ }
+ else
+ {
+ if (cloudEvent.Data is byte[] byteData)
+ {
+ Value = byteData;
+ }
+ else if (cloudEvent.Data is Stream dataStream)
+ {
+ if (dataStream is MemoryStream dataMemoryStream)
+ {
+ Value = dataMemoryStream.ToArray();
+ }
+ else
+ {
+ var buffer = new MemoryStream();
+ dataStream.CopyTo(buffer);
+ Value = buffer.ToArray();
+ }
+ }
+ else
+ {
+ throw new InvalidOperationException($"{cloudEvent.Data.GetType()} type is not supported for Cloud Event's Value.");
+ }
+
+ Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(cloudEvent.DataContentType?.MediaType));
+ }
+
+ MapHeaders(cloudEvent, formatter);
+ }
+
+ private void MapHeaders(CloudEvent cloudEvent, ICloudEventFormatter formatter)
+ {
+ foreach (var attr in cloudEvent.GetAttributes())
+ {
+ if (string.Equals(attr.Key, CloudEventAttributes.DataAttributeName(cloudEvent.SpecVersion))
+ || string.Equals(attr.Key, CloudEventAttributes.DataContentTypeAttributeName(cloudEvent.SpecVersion))
+ || string.Equals(attr.Key, PartitioningExtension.PartitioningKeyAttributeName))
+ {
+ continue;
+ }
+
+ Headers.Add(KafkaHeaderPerfix + attr.Key,
+ formatter.EncodeAttribute(cloudEvent.SpecVersion, attr.Key, attr.Value, cloudEvent.Extensions.Values));
+ }
+ }
+
+ protected string ExtractPartitionKey(CloudEvent cloudEvent)
+ {
+ var extension = cloudEvent.Extension();
+
+ return extension?.PartitioningKeyValue;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/Attributes.cs b/src/CloudNative.CloudEvents/Attributes.cs
index eef52a3..eae940b 100644
--- a/src/CloudNative.CloudEvents/Attributes.cs
+++ b/src/CloudNative.CloudEvents/Attributes.cs
@@ -5,4 +5,5 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("CloudNative.CloudEvents.Amqp")]
-[assembly: InternalsVisibleTo("CloudNative.CloudEvents.Mqtt")]
\ No newline at end of file
+[assembly: InternalsVisibleTo("CloudNative.CloudEvents.Mqtt")]
+[assembly: InternalsVisibleTo("CloudNative.CloudEvents.Kafka")]
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/CloudEvent.cs b/src/CloudNative.CloudEvents/CloudEvent.cs
index e8f211b..27be03b 100644
--- a/src/CloudNative.CloudEvents/CloudEvent.cs
+++ b/src/CloudNative.CloudEvents/CloudEvent.cs
@@ -214,7 +214,13 @@ namespace CloudNative.CloudEvents
/// Extension instance if registered
public T Extension()
{
- return (T)Extensions[typeof(T)];
+ var key = typeof(T);
+ if (Extensions.TryGetValue(key, out var extension))
+ {
+ return (T)extension;
+ }
+
+ return default(T);
}
///
diff --git a/src/CloudNative.CloudEvents/Extensions/PartitioningExtension.cs b/src/CloudNative.CloudEvents/Extensions/PartitioningExtension.cs
new file mode 100644
index 0000000..4e8511e
--- /dev/null
+++ b/src/CloudNative.CloudEvents/Extensions/PartitioningExtension.cs
@@ -0,0 +1,66 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+namespace CloudNative.CloudEvents.Extensions
+{
+ using System;
+ using System.Collections.Generic;
+
+ public class PartitioningExtension : ICloudEventExtension
+ {
+ public const string PartitioningKeyAttributeName = "partitionkey";
+
+ IDictionary _attributes = new Dictionary();
+
+ public string PartitioningKeyValue
+ {
+ get => _attributes[PartitioningKeyAttributeName] as string;
+ set => _attributes[PartitioningKeyAttributeName] = value;
+ }
+
+ public PartitioningExtension(string partitioningKeyValue = null)
+ {
+ PartitioningKeyValue = partitioningKeyValue;
+ }
+
+ void ICloudEventExtension.Attach(CloudEvent cloudEvent)
+ {
+ var eventAttributes = cloudEvent.GetAttributes();
+ if (_attributes == eventAttributes)
+ {
+ // already done
+ return;
+ }
+
+ foreach (var attr in _attributes)
+ {
+ if (attr.Value != null)
+ {
+ eventAttributes[attr.Key] = attr.Value;
+ }
+ }
+ _attributes = eventAttributes;
+ }
+
+ bool ICloudEventExtension.ValidateAndNormalize(string key, ref dynamic value)
+ {
+ if (string.Equals(key, PartitioningKeyAttributeName))
+ {
+ if (value is string)
+ {
+ return true;
+ }
+
+ throw new InvalidOperationException(Strings.ErrorPartitioningKeyValueIsaNotAString);
+ }
+
+ return false;
+ }
+
+ public Type GetAttributeType(string name)
+ {
+ return string.Equals(name, PartitioningKeyAttributeName) ? typeof(string) : null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/Strings.Designer.cs b/src/CloudNative.CloudEvents/Strings.Designer.cs
index c954819..6b04078 100644
--- a/src/CloudNative.CloudEvents/Strings.Designer.cs
+++ b/src/CloudNative.CloudEvents/Strings.Designer.cs
@@ -19,7 +19,7 @@ namespace CloudNative.CloudEvents {
// class via a tool like ResGen or Visual Studio.
// To add or remove a member, edit your .ResX file then rerun ResGen
// with the /str option, or rebuild your VS project.
- [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "16.0.0.0")]
+ [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "15.0.0.0")]
[global::System.Diagnostics.DebuggerNonUserCodeAttribute()]
[global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()]
internal class Strings {
@@ -87,6 +87,15 @@ namespace CloudNative.CloudEvents {
}
}
+ ///
+ /// Looks up a localized string similar to The 'key' attribute value must be a string.
+ ///
+ internal static string ErrorPartitioningKeyValueIsaNotAString {
+ get {
+ return ResourceManager.GetString("ErrorPartitioningKeyValueIsaNotAString", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to The 'sampledrate' attribute value must be an integer.
///
diff --git a/src/CloudNative.CloudEvents/Strings.resx b/src/CloudNative.CloudEvents/Strings.resx
index 2049d83..f462f33 100644
--- a/src/CloudNative.CloudEvents/Strings.resx
+++ b/src/CloudNative.CloudEvents/Strings.resx
@@ -126,6 +126,9 @@
The 'id' attribute value must be a string
+
+ The 'key' attribute value must be a string
+
The 'sampledrate' attribute value must be an integer
diff --git a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
index eb68075..8a7202c 100644
--- a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
+++ b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
@@ -20,6 +20,7 @@
+
diff --git a/test/CloudNative.CloudEvents.UnitTests/ExtensionsTest.cs b/test/CloudNative.CloudEvents.UnitTests/ExtensionsTest.cs
index 355c6d3..80e40d7 100644
--- a/test/CloudNative.CloudEvents.UnitTests/ExtensionsTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/ExtensionsTest.cs
@@ -50,6 +50,19 @@ namespace CloudNative.CloudEvents.UnitTests
" \"data\" : \"test\"\n" +
"}";
+
+ const string jsonPartitioningKey =
+ "{\n" +
+ " \"specversion\" : \"0.3\",\n" +
+ " \"type\" : \"com.github.pull.create\",\n" +
+ " \"source\" : \"https://github.com/cloudevents/spec/pull/123\",\n" +
+ " \"id\" : \"A234-1234-1234\",\n" +
+ " \"time\" : \"2018-04-05T17:31:00Z\",\n" +
+ " \"partitionkey\" : \"1\",\n" +
+ " \"datacontenttype\" : \"text/plain\",\n" +
+ " \"data\" : \"test\"\n" +
+ "}";
+
[Fact]
public void DistTraceParse()
{
@@ -185,5 +198,25 @@ namespace CloudNative.CloudEvents.UnitTests
Assert.Equal(1, cloudEvent.Extension().SampledRate.Value);
}
+
+ [Fact]
+ public void PartitioningParse()
+ {
+ var jsonFormatter = new JsonEventFormatter();
+ var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonPartitioningKey), new PartitioningExtension());
+
+ Assert.Equal("1", cloudEvent.Extension().PartitioningKeyValue);
+ }
+
+ [Fact]
+ public void PartitioningJsonTranscode()
+ {
+ var jsonFormatter = new JsonEventFormatter();
+ var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonPartitioningKey));
+ var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, new PartitioningExtension());
+
+ Assert.Equal("1", cloudEvent.Extension().PartitioningKeyValue);
+ }
}
}
\ No newline at end of file
diff --git a/test/CloudNative.CloudEvents.UnitTests/KafkaTest.cs b/test/CloudNative.CloudEvents.UnitTests/KafkaTest.cs
new file mode 100644
index 0000000..c1bb748
--- /dev/null
+++ b/test/CloudNative.CloudEvents.UnitTests/KafkaTest.cs
@@ -0,0 +1,180 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+namespace CloudNative.CloudEvents.UnitTests
+{
+ using System;
+ using System.Net.Mime;
+ using CloudNative.CloudEvents.Amqp;
+ using CloudNative.CloudEvents.Kafka;
+ using Newtonsoft.Json;
+ using Xunit;
+ using Confluent.Kafka;
+ using Newtonsoft.Json.Linq;
+ using System.Collections.Generic;
+ using System.Text;
+ using CloudNative.CloudEvents.Extensions;
+
+ public class KafkaTest
+ {
+ [Fact]
+ public void KafkaStructuredMessageTest()
+ {
+ // Kafka doesn't provide any way to get to the message transport level to do the test properly
+ // and it doesn't have an embedded version of a server for .Net so the lowest we can get is
+ // the `Message`
+
+ var jsonEventFormatter = new JsonEventFormatter();
+
+ var cloudEvent = new CloudEvent(CloudEventsSpecVersion.V0_3,
+ "com.github.pull.create",
+ source: new Uri("https://github.com/cloudevents/spec/pull"),
+ subject: "123")
+ {
+ Id = "A234-1234-1234",
+ Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
+ DataContentType = new ContentType(MediaTypeNames.Text.Xml),
+ Data = ""
+ };
+
+ var attrs = cloudEvent.GetAttributes();
+ attrs["comexampleextension1"] = "value";
+ attrs["comexampleextension2"] = new { othervalue = 5 };
+
+ var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Structured, new JsonEventFormatter());
+
+ Assert.True(message.IsCloudEvent());
+
+ // using serialization to create fully independent copy thus simulating message transport
+ // real transport will work in a similar way
+ var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
+ var messageCopy = JsonConvert.DeserializeObject>(serialized, new HeadersConverter(), new HeaderConverter());
+
+ Assert.True(messageCopy.IsCloudEvent());
+ var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter);
+
+ Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
+ Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
+ Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source);
+ Assert.Equal("123", receivedCloudEvent.Subject);
+ Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
+ Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
+ receivedCloudEvent.Time.Value.ToUniversalTime());
+ Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.DataContentType);
+ Assert.Equal("", receivedCloudEvent.Data);
+
+ var attr = receivedCloudEvent.GetAttributes();
+ Assert.Equal("value", (string)attr["comexampleextension1"]);
+ Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
+ }
+
+ [Fact]
+ public void KafkaBinaryMessageTest()
+ {
+ // Kafka doesn't provide any way to get to the message transport level to do the test properly
+ // and it doesn't have an embedded version of a server for .Net so the lowest we can get is
+ // the `Message`
+
+ var jsonEventFormatter = new JsonEventFormatter();
+ var cloudEvent = new CloudEvent("com.github.pull.create",
+ new Uri("https://github.com/cloudevents/spec/pull/123"),
+ extensions: new PartitioningExtension())
+ {
+ Id = "A234-1234-1234",
+ Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
+ DataContentType = new ContentType(MediaTypeNames.Text.Xml),
+ Data = Encoding.UTF8.GetBytes("")
+ };
+
+ var attrs = cloudEvent.GetAttributes();
+ attrs["comexampleextension1"] = "value";
+ attrs["comexampleextension2"] = new { othervalue = 5 };
+ cloudEvent.Extension().PartitioningKeyValue = "hello much wow";
+
+ var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Binary, new JsonEventFormatter());
+ Assert.True(message.IsCloudEvent());
+
+ // using serialization to create fully independent copy thus simulating message transport
+ // real transport will work in a similar way
+ var serialized = JsonConvert.SerializeObject(message, new HeaderConverter());
+ var messageCopy = JsonConvert.DeserializeObject>(serialized, new HeadersConverter(), new HeaderConverter());
+
+ Assert.True(messageCopy.IsCloudEvent());
+ var receivedCloudEvent = messageCopy.ToCloudEvent(jsonEventFormatter, new PartitioningExtension());
+
+ Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
+ Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
+ Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
+ Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
+ Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
+ receivedCloudEvent.Time.Value.ToUniversalTime());
+ Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.DataContentType);
+ Assert.Equal(Encoding.UTF8.GetBytes(""), receivedCloudEvent.Data);
+ Assert.Equal("hello much wow", receivedCloudEvent.Extension().PartitioningKeyValue);
+
+ var attr = receivedCloudEvent.GetAttributes();
+ Assert.Equal("value", (string)attr["comexampleextension1"]);
+ Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
+ }
+
+ private class HeadersConverter : JsonConverter
+ {
+ public override bool CanConvert(Type objectType)
+ {
+ return objectType == typeof(Headers);
+ }
+
+ public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ {
+ if (reader.TokenType == JsonToken.Null)
+ {
+ return null;
+ }
+ else
+ {
+ var surrogate = serializer.Deserialize>(reader);
+ var headers = new Headers();
+
+ foreach(var header in surrogate)
+ {
+ headers.Add(header.Key, header.GetValueBytes());
+ }
+ return headers;
+ }
+ }
+
+ public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
+ {
+ throw new NotImplementedException();
+ }
+ }
+
+ private class HeaderConverter : JsonConverter
+ {
+ private class HeaderContainer
+ {
+ public string Key { get; set; }
+ public byte[] Value { get; set; }
+ }
+
+ public override bool CanConvert(Type objectType)
+ {
+ return objectType == typeof(Header) || objectType == typeof(IHeader);
+ }
+
+ public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ {
+ var headerContainer = serializer.Deserialize(reader);
+ return new Header(headerContainer.Key, headerContainer.Value);
+ }
+
+ public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
+ {
+ var header = (IHeader)value;
+ var container = new HeaderContainer { Key = header.Key, Value = header.GetValueBytes() };
+ serializer.Serialize(writer, container);
+ }
+ }
+ }
+}
\ No newline at end of file