diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
index c2c1d70..1b97b75 100644
--- a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
+++ b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
@@ -6,13 +6,15 @@ using CloudNative.CloudEvents.Extensions;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
-using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
namespace CloudNative.CloudEvents.Kafka
{
+ ///
+ /// Extension methods to convert between CloudEvents and Kafka messages.
+ ///
public static class KafkaClientExtensions
{
private const string KafkaHeaderPrefix = "ce_";
@@ -26,13 +28,31 @@ namespace CloudNative.CloudEvents.Kafka
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
(ExtractContentType(message)?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true);
- public static CloudEvent ToCloudEvent(this Message message,
- CloudEventFormatter eventFormatter, params CloudEventAttribute[] extensionAttributes) =>
- ToCloudEvent(message, eventFormatter, (IEnumerable) extensionAttributes);
+ ///
+ /// Converts this Kafka message into a CloudEvent object.
+ ///
+ /// The Kafka message to convert. Must not be null.
+ /// The event formatter to use to parse the CloudEvent. Must not be null.
+ /// The extension attributes to use when parsing the CloudEvent. May be null.
+ /// A reference to a validated CloudEvent instance.
public static CloudEvent ToCloudEvent(this Message message,
- CloudEventFormatter eventFormatter, IEnumerable extensionAttributes)
+ CloudEventFormatter formatter, params CloudEventAttribute[] extensionAttributes) =>
+ ToCloudEvent(message, formatter, (IEnumerable) extensionAttributes);
+
+ ///
+ /// Converts this Kafka message into a CloudEvent object.
+ ///
+ /// The Kafka message to convert. Must not be null.
+ /// The event formatter to use to parse the CloudEvent. Must not be null.
+ /// The extension attributes to use when parsing the CloudEvent. May be null.
+ /// A reference to a validated CloudEvent instance.
+ public static CloudEvent ToCloudEvent(this Message message,
+ CloudEventFormatter formatter, IEnumerable extensionAttributes)
{
+ message = message ?? throw new ArgumentNullException(nameof(message));
+ formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
+
if (!IsCloudEvent(message))
{
throw new InvalidOperationException();
@@ -45,7 +65,7 @@ namespace CloudNative.CloudEvents.Kafka
// Structured mode
if (contentType?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true)
{
- cloudEvent = eventFormatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
+ cloudEvent = formatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
}
else
{
@@ -55,11 +75,8 @@ namespace CloudNative.CloudEvents.Kafka
throw new ArgumentException("Request is not a CloudEvent");
}
string versionId = Encoding.UTF8.GetString(versionIdBytes);
- CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionId);
- if (version is null)
- {
- throw new ArgumentException($"Unsupported CloudEvents spec version '{versionId}'");
- }
+ CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionId)
+ ?? throw new ArgumentException($"Unknown CloudEvents spec version '{versionId}'", nameof(message));
cloudEvent = new CloudEvent(version, extensionAttributes)
{
@@ -83,11 +100,11 @@ namespace CloudNative.CloudEvents.Kafka
cloudEvent.SetAttributeFromString(attributeName, attributeValue);
}
- eventFormatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
+ formatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
}
InitPartitioningKey(message, cloudEvent);
- return cloudEvent;
+ return cloudEvent.ValidateForConversion(nameof(message));
}
private static string ExtractContentType(Message message)
@@ -108,8 +125,18 @@ namespace CloudNative.CloudEvents.Kafka
message.Headers.FirstOrDefault(x => string.Equals(x.Key, headerName, StringComparison.InvariantCultureIgnoreCase))
?.GetValueBytes();
+ ///
+ /// Converts a CloudEvent to .
+ ///
+ /// The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.
+ /// Content mode. Structured or binary.
+ /// The formatter to use within the conversion. Must not be null.
public static Message ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
{
+ cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent));
+ cloudEvent.ValidateForConversion(nameof(cloudEvent));
+ formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
+
// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
if (cloudEvent.Data == null)
{
@@ -120,16 +147,19 @@ namespace CloudNative.CloudEvents.Kafka
byte[] value;
string contentTypeHeaderValue;
- if (contentMode == ContentMode.Structured)
+ switch (contentMode)
{
- value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
- // TODO: What about the non-media type parts?
- contentTypeHeaderValue = contentType.MediaType;
- }
- else
- {
- value = formatter.EncodeBinaryModeEventData(cloudEvent);
- contentTypeHeaderValue = cloudEvent.DataContentType;
+ case ContentMode.Structured:
+ value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
+ // TODO: What about the non-media type parts?
+ contentTypeHeaderValue = contentType.MediaType;
+ break;
+ case ContentMode.Binary:
+ value = formatter.EncodeBinaryModeEventData(cloudEvent);
+ contentTypeHeaderValue = cloudEvent.DataContentType;
+ break;
+ default:
+ throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
}
if (contentTypeHeaderValue is object)
{