diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs index 599058c..4bde348 100644 --- a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs +++ b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs @@ -6,6 +6,7 @@ using CloudNative.CloudEvents.Extensions; using Confluent.Kafka; using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net.Mime; using System.Text; @@ -14,10 +15,15 @@ namespace CloudNative.CloudEvents.Kafka { public static class KafkaClientExtensions { + private const string KafkaHeaderPrefix = "ce_"; + + private const string KafkaContentTypeAttributeName = "content-type"; + private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion"; + // TODO: Avoid all the byte[] -> string conversions? If we didn't care about case-sensitivity, we could prepare byte arrays to perform comparisons with. public static bool IsCloudEvent(this Message message) => - GetHeaderValue(message, KafkaCloudEventMessage.SpecVersionKafkaHeader) is object || + GetHeaderValue(message, SpecVersionKafkaHeader) is object || (ExtractContentType(message)?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true); public static CloudEvent ToCloudEvent(this Message message, @@ -44,7 +50,7 @@ namespace CloudNative.CloudEvents.Kafka else { // Binary mode - if (!(GetHeaderValue(message, KafkaCloudEventMessage.SpecVersionKafkaHeader) is byte[] versionIdBytes)) + if (!(GetHeaderValue(message, SpecVersionKafkaHeader) is byte[] versionIdBytes)) { throw new ArgumentException("Request is not a CloudEvent"); } @@ -60,9 +66,9 @@ namespace CloudNative.CloudEvents.Kafka Data = message.Value, DataContentType = contentType }; - foreach (var header in message.Headers.Where(h => h.Key.StartsWith(KafkaCloudEventMessage.KafkaHeaderPrefix))) + foreach (var header in message.Headers.Where(h => h.Key.StartsWith(KafkaHeaderPrefix))) { - var attributeName = header.Key.Substring(KafkaCloudEventMessage.KafkaHeaderPrefix.Length).ToLowerInvariant(); + var attributeName = header.Key.Substring(KafkaHeaderPrefix.Length).ToLowerInvariant(); if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name) { continue; @@ -85,7 +91,7 @@ namespace CloudNative.CloudEvents.Kafka private static string ExtractContentType(Message message) { - var headerValue = GetHeaderValue(message, KafkaCloudEventMessage.KafkaContentTypeAttributeName); + var headerValue = GetHeaderValue(message, KafkaContentTypeAttributeName); return headerValue is null ? null : Encoding.UTF8.GetString(headerValue); } @@ -100,5 +106,84 @@ namespace CloudNative.CloudEvents.Kafka private static byte[] GetHeaderValue(MessageMetadata message, string headerName) => message.Headers.FirstOrDefault(x => string.Equals(x.Key, headerName, StringComparison.InvariantCultureIgnoreCase)) ?.GetValueBytes(); + + public static Message ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) + { + // TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka? + if (cloudEvent.Data == null) + { + throw new ArgumentNullException(nameof(cloudEvent.Data)); + } + var headers = MapHeaders(cloudEvent, formatter); + string key = (string) cloudEvent[Partitioning.PartitionKeyAttribute]; + byte[] value; + string contentTypeHeaderValue = null; + + if (contentMode == ContentMode.Structured) + { + value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType); + // TODO: What about the non-media type parts? + contentTypeHeaderValue = contentType.MediaType; + } + else + { + if (cloudEvent.Data is byte[] byteData) + { + value = byteData; + } + else if (cloudEvent.Data is Stream dataStream) + { + // TODO: Extract this common code somewhere, or use shared source to access BinaryDataUtilities. + if (dataStream is MemoryStream dataMemoryStream) + { + value = dataMemoryStream.ToArray(); + } + else + { + var buffer = new MemoryStream(); + dataStream.CopyTo(buffer); + value = buffer.ToArray(); + } + } + else + { + throw new InvalidOperationException($"{cloudEvent.Data.GetType()} type is not supported for Cloud Event's Value."); + } + if (cloudEvent.DataContentType is string dataContentType) + { + contentTypeHeaderValue = dataContentType; + } + } + if (contentTypeHeaderValue is object) + { + headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue)); + } + return new Message + { + Headers = headers, + Value = value, + Key = key + }; + } + + private static Headers MapHeaders(CloudEvent cloudEvent, CloudEventFormatter formatter) + { + var headers = new Headers + { + { SpecVersionKafkaHeader, Encoding.UTF8.GetBytes(cloudEvent.SpecVersion.VersionId) } + }; + foreach (var pair in cloudEvent.GetPopulatedAttributes()) + { + var attribute = pair.Key; + if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute || + attribute.Name == Partitioning.PartitionKeyAttribute.Name) + { + continue; + } + var value = attribute.Format(pair.Value); + headers.Add(KafkaHeaderPrefix + attribute.Name, Encoding.UTF8.GetBytes(value)); + } + return headers; + } } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs b/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs deleted file mode 100644 index abaffff..0000000 --- a/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright (c) Cloud Native Foundation. -// Licensed under the Apache 2.0 license. -// See LICENSE file in the project root for full license information. - -using CloudNative.CloudEvents.Extensions; -using Confluent.Kafka; -using System; -using System.IO; -using System.Text; - -namespace CloudNative.CloudEvents.Kafka -{ - // TODO: avoid the inheritance here? Constructors are somewhat constricting... - public class KafkaCloudEventMessage : Message - { - internal const string KafkaHeaderPrefix = "ce_"; - - internal const string KafkaContentTypeAttributeName = "content-type"; - internal const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion"; - - public KafkaCloudEventMessage(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) - { - // TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka? - if (cloudEvent.Data == null) - { - throw new ArgumentNullException(nameof(cloudEvent.Data)); - } - - Headers = new Headers - { - { SpecVersionKafkaHeader, Encoding.UTF8.GetBytes(cloudEvent.SpecVersion.VersionId) } - }; - Key = (string) cloudEvent[Partitioning.PartitionKeyAttribute]; - - if (contentMode == ContentMode.Structured) - { - Value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType); - Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentType.MediaType)); - } - else - { - if (cloudEvent.Data is byte[] byteData) - { - Value = byteData; - } - else if (cloudEvent.Data is Stream dataStream) - { - // TODO: Extract this common code somewhere - if (dataStream is MemoryStream dataMemoryStream) - { - Value = dataMemoryStream.ToArray(); - } - else - { - var buffer = new MemoryStream(); - dataStream.CopyTo(buffer); - Value = buffer.ToArray(); - } - } - else - { - throw new InvalidOperationException($"{cloudEvent.Data.GetType()} type is not supported for Cloud Event's Value."); - } - if (cloudEvent.DataContentType is string dataContentType) - { - Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(dataContentType)); - } - } - - MapHeaders(cloudEvent, formatter); - } - - private void MapHeaders(CloudEvent cloudEvent, CloudEventFormatter formatter) - { - - foreach (var pair in cloudEvent.GetPopulatedAttributes()) - { - var attribute = pair.Key; - if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute || - attribute.Name == Partitioning.PartitionKeyAttribute.Name) - { - continue; - } - var value = attribute.Format(pair.Value); - Headers.Add(KafkaHeaderPrefix + attribute.Name, Encoding.UTF8.GetBytes(value)); - } - } - - } -} \ No newline at end of file diff --git a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs index 66e2c07..df71ba0 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs @@ -38,7 +38,7 @@ namespace CloudNative.CloudEvents.Kafka.UnitTests ["comexampleextension1"] = "value" }; - var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Structured, new JsonEventFormatter()); + var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, new JsonEventFormatter()); Assert.True(message.IsCloudEvent()); @@ -82,7 +82,7 @@ namespace CloudNative.CloudEvents.Kafka.UnitTests [Partitioning.PartitionKeyAttribute] = "hello much wow" }; - var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Binary, new JsonEventFormatter()); + var message = cloudEvent.ToKafkaMessage(ContentMode.Binary, new JsonEventFormatter()); Assert.True(message.IsCloudEvent()); // using serialization to create fully independent copy thus simulating message transport