diff --git a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj index 6009427..776918c 100644 --- a/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj +++ b/src/CloudNative.CloudEvents.Kafka/CloudNative.CloudEvents.Kafka.csproj @@ -4,7 +4,7 @@ netstandard2.0;netstandard2.1;net8.0 Kafka extensions for CloudNative.CloudEvents cncf;cloudnative;cloudevents;events;kafka - 9.0 + 10.0 enable diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs index cfa4008..6517ac9 100644 --- a/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs +++ b/src/CloudNative.CloudEvents.Kafka/KafkaExtensions.cs @@ -25,6 +25,7 @@ namespace CloudNative.CloudEvents.Kafka internal const string KafkaContentTypeAttributeName = "content-type"; private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion"; + /// /// Indicates whether this message holds a single CloudEvent. /// @@ -33,6 +34,17 @@ namespace CloudNative.CloudEvents.Kafka /// /// The message to check for the presence of a CloudEvent. Must not be null. /// true, if the request is a CloudEvent + public static bool IsCloudEvent(this Message message) => IsCloudEvent(message); + + /// + /// Indicates whether this message holds a single CloudEvent. + /// + /// + /// This method returns false for batch requests, as they need to be parsed differently. + /// + /// The message to check for the presence of a CloudEvent. Must not be null. + /// The type of key of the Kafka message. + /// true, if the request is a CloudEvent public static bool IsCloudEvent(this Message message) => GetHeaderValue(message, SpecVersionKafkaHeader) is object || MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, KafkaContentTypeAttributeName)); diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs index 434b1d9..a402651 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/BinaryGuidPartitionKeyAdapter.cs @@ -1,36 +1,39 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + using System; -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that converts to and from Guids in binary representation. +/// +public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter { - /// - /// Partion Key Adapter that converts to and from Guids in binary representation. - /// - public class BinaryGuidPartitionKeyAdapter : IPartitionKeyAdapter + /// + public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue) { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(byte[]? keyValue, out string? attributeValue) + if (keyValue == null) { - if (keyValue == null) - { - attributeValue = null; - return false; - } - - attributeValue = new Guid(keyValue).ToString(); - return true; + attributeValue = null; + return false; } - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue) - { - if (string.IsNullOrEmpty(attributeValue)) - { - keyValue = default; - return false; - } + attributeValue = new Guid(keyValue).ToString(); + return true; + } - keyValue = Guid.Parse(attributeValue).ToByteArray(); - return true; + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out byte[]? keyValue) + { + if (string.IsNullOrEmpty(attributeValue)) + { + keyValue = default; + return false; } + + keyValue = Guid.Parse(attributeValue).ToByteArray(); + return true; } } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs index 81b8ac1..bfa6ecc 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/IPartitionKeyAdapter.cs @@ -1,26 +1,29 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Defines the methods of the adapters responsible for transforming from cloud event +/// PartitionKey Attribute to Kafka Message Key. +/// +/// The type of Kafka Message Key. +public interface IPartitionKeyAdapter { /// - /// Defines the methods of the adapters responsible for transforming from cloud event - /// PartitionKey Attribute to Kafka Message Key. + /// Converts a Message Key to PartionKey Attribute Value. /// - /// - public interface IPartitionKeyAdapter - { - /// - /// Converts a Message Key to PartionKey Attribute Value. - /// - /// The key value to transform. - /// The transformed attribute value (output). - /// Whether the attribute should be set. - bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue); + /// The key value to transform. + /// The transformed attribute value (output). + /// Whether the attribute should be set. + bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue); - /// - /// Converts a PartitionKey Attribute value to a Message Key. - /// - /// The attribute value to transform. - /// The transformed key value (output) - /// Whether the key should be set. - bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue); - } + /// + /// Converts a PartitionKey Attribute value to a Message Key. + /// + /// The attribute value to transform. + /// The transformed key value (output) + /// Whether the key should be set. + bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue); } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs index 50d88b8..1aca2fb 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/NullPartitionKeyAdapter.cs @@ -1,23 +1,26 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters -{ - /// - /// Partion Key Adapter that skips handling the key. - /// - /// The type of Kafka Message Key - public class NullPartitionKeyAdapter : IPartitionKeyAdapter - { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue) - { - attributeValue = null; - return false; - } +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue) - { - keyValue = default; - return false; - } +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that skips handling the key. +/// +/// The type of Kafka Message Key. +public class NullPartitionKeyAdapter : IPartitionKeyAdapter +{ + /// + public bool ConvertKeyToPartitionKeyAttributeValue(TKey keyValue, out string? attributeValue) + { + attributeValue = null; + return false; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out TKey? keyValue) + { + keyValue = default; + return false; } } diff --git a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs index bcd0f9c..5d82124 100644 --- a/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs +++ b/src/CloudNative.CloudEvents.Kafka/PartitionKeyAdapters/StringPartitionKeyAdapter.cs @@ -1,22 +1,25 @@ -namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters -{ - /// - /// Partion Key Adapter that skips handling the key. - /// - public class StringPartitionKeyAdapter : IPartitionKeyAdapter - { - /// - public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue) - { - attributeValue = keyValue; - return true; - } +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. - /// - public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue) - { - keyValue = attributeValue; - return true; - } +namespace CloudNative.CloudEvents.Kafka.PartitionKeyAdapters; + +/// +/// Partion Key Adapter that skips handling the key. +/// +public class StringPartitionKeyAdapter : IPartitionKeyAdapter +{ + /// + public bool ConvertKeyToPartitionKeyAttributeValue(string? keyValue, out string? attributeValue) + { + attributeValue = keyValue; + return true; + } + + /// + public bool ConvertPartitionKeyAttributeValueToKey(string? attributeValue, out string? keyValue) + { + keyValue = attributeValue; + return true; } } diff --git a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs index 413e511..1debc00 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Kafka/KafkaTest.cs @@ -140,7 +140,6 @@ namespace CloudNative.CloudEvents.Kafka.UnitTests public void KafkaNullKeyedStructuredMessageTest() { // It will test the serialization using Confluent's Confluent.Kafka.Null type for the key. - // As the default behavior without adapter is to skip the key it will work properly. var partitionKeyAdapter = new PartitionKeyAdapters.NullPartitionKeyAdapter(); var jsonEventFormatter = new JsonEventFormatter(); var cloudEvent = CreateTestCloudEvent();