feat: Make the Kafka protocol bindings follow conventions

Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
Jon Skeet 2021-03-12 15:10:57 +00:00 committed by Jon Skeet
parent 450ba23e96
commit 17d943302a
1 changed files with 52 additions and 22 deletions

View File

@ -6,13 +6,15 @@ using CloudNative.CloudEvents.Extensions;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
namespace CloudNative.CloudEvents.Kafka
{
/// <summary>
/// Extension methods to convert between CloudEvents and Kafka messages.
/// </summary>
public static class KafkaClientExtensions
{
private const string KafkaHeaderPrefix = "ce_";
@ -26,13 +28,31 @@ namespace CloudNative.CloudEvents.Kafka
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
(ExtractContentType(message)?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true);
public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
CloudEventFormatter eventFormatter, params CloudEventAttribute[] extensionAttributes) =>
ToCloudEvent(message, eventFormatter, (IEnumerable<CloudEventAttribute>) extensionAttributes);
/// <summary>
/// Converts this Kafka message into a CloudEvent object.
/// </summary>
/// <param name="message">The Kafka message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
CloudEventFormatter eventFormatter, IEnumerable<CloudEventAttribute> extensionAttributes)
CloudEventFormatter formatter, params CloudEventAttribute[] extensionAttributes) =>
ToCloudEvent(message, formatter, (IEnumerable<CloudEventAttribute>) extensionAttributes);
/// <summary>
/// Converts this Kafka message into a CloudEvent object.
/// </summary>
/// <param name="message">The Kafka message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute> extensionAttributes)
{
message = message ?? throw new ArgumentNullException(nameof(message));
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
if (!IsCloudEvent(message))
{
throw new InvalidOperationException();
@ -45,7 +65,7 @@ namespace CloudNative.CloudEvents.Kafka
// Structured mode
if (contentType?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true)
{
cloudEvent = eventFormatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
cloudEvent = formatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
}
else
{
@ -55,11 +75,8 @@ namespace CloudNative.CloudEvents.Kafka
throw new ArgumentException("Request is not a CloudEvent");
}
string versionId = Encoding.UTF8.GetString(versionIdBytes);
CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionId);
if (version is null)
{
throw new ArgumentException($"Unsupported CloudEvents spec version '{versionId}'");
}
CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionId)
?? throw new ArgumentException($"Unknown CloudEvents spec version '{versionId}'", nameof(message));
cloudEvent = new CloudEvent(version, extensionAttributes)
{
@ -83,11 +100,11 @@ namespace CloudNative.CloudEvents.Kafka
cloudEvent.SetAttributeFromString(attributeName, attributeValue);
}
eventFormatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
formatter.DecodeBinaryModeEventData(message.Value, cloudEvent);
}
InitPartitioningKey(message, cloudEvent);
return cloudEvent;
return cloudEvent.ValidateForConversion(nameof(message));
}
private static string ExtractContentType(Message<string, byte[]> message)
@ -108,8 +125,18 @@ namespace CloudNative.CloudEvents.Kafka
message.Headers.FirstOrDefault(x => string.Equals(x.Key, headerName, StringComparison.InvariantCultureIgnoreCase))
?.GetValueBytes();
/// <summary>
/// Converts a CloudEvent to <see cref="Message"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message<string, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
{
cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent));
cloudEvent.ValidateForConversion(nameof(cloudEvent));
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
if (cloudEvent.Data == null)
{
@ -120,16 +147,19 @@ namespace CloudNative.CloudEvents.Kafka
byte[] value;
string contentTypeHeaderValue;
if (contentMode == ContentMode.Structured)
switch (contentMode)
{
value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
// TODO: What about the non-media type parts?
contentTypeHeaderValue = contentType.MediaType;
}
else
{
value = formatter.EncodeBinaryModeEventData(cloudEvent);
contentTypeHeaderValue = cloudEvent.DataContentType;
case ContentMode.Structured:
value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
// TODO: What about the non-media type parts?
contentTypeHeaderValue = contentType.MediaType;
break;
case ContentMode.Binary:
value = formatter.EncodeBinaryModeEventData(cloudEvent);
contentTypeHeaderValue = cloudEvent.DataContentType;
break;
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
}
if (contentTypeHeaderValue is object)
{