Remove unnecessary inheritance for Kafka messages

Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
Jon Skeet 2021-02-26 08:24:16 +00:00 committed by Jon Skeet
parent b934614324
commit 72a5dc31f4
3 changed files with 92 additions and 97 deletions

View File

@ -6,6 +6,7 @@ using CloudNative.CloudEvents.Extensions;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
@ -14,10 +15,15 @@ namespace CloudNative.CloudEvents.Kafka
{
public static class KafkaClientExtensions
{
private const string KafkaHeaderPrefix = "ce_";
private const string KafkaContentTypeAttributeName = "content-type";
private const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion";
// TODO: Avoid all the byte[] -> string conversions? If we didn't care about case-sensitivity, we could prepare byte arrays to perform comparisons with.
public static bool IsCloudEvent(this Message<string, byte[]> message) =>
GetHeaderValue(message, KafkaCloudEventMessage.SpecVersionKafkaHeader) is object ||
GetHeaderValue(message, SpecVersionKafkaHeader) is object ||
(ExtractContentType(message)?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true);
public static CloudEvent ToCloudEvent(this Message<string, byte[]> message,
@ -44,7 +50,7 @@ namespace CloudNative.CloudEvents.Kafka
else
{
// Binary mode
if (!(GetHeaderValue(message, KafkaCloudEventMessage.SpecVersionKafkaHeader) is byte[] versionIdBytes))
if (!(GetHeaderValue(message, SpecVersionKafkaHeader) is byte[] versionIdBytes))
{
throw new ArgumentException("Request is not a CloudEvent");
}
@ -60,9 +66,9 @@ namespace CloudNative.CloudEvents.Kafka
Data = message.Value,
DataContentType = contentType
};
foreach (var header in message.Headers.Where(h => h.Key.StartsWith(KafkaCloudEventMessage.KafkaHeaderPrefix)))
foreach (var header in message.Headers.Where(h => h.Key.StartsWith(KafkaHeaderPrefix)))
{
var attributeName = header.Key.Substring(KafkaCloudEventMessage.KafkaHeaderPrefix.Length).ToLowerInvariant();
var attributeName = header.Key.Substring(KafkaHeaderPrefix.Length).ToLowerInvariant();
if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name)
{
continue;
@ -85,7 +91,7 @@ namespace CloudNative.CloudEvents.Kafka
private static string ExtractContentType(Message<string, byte[]> message)
{
var headerValue = GetHeaderValue(message, KafkaCloudEventMessage.KafkaContentTypeAttributeName);
var headerValue = GetHeaderValue(message, KafkaContentTypeAttributeName);
return headerValue is null ? null : Encoding.UTF8.GetString(headerValue);
}
@ -100,5 +106,84 @@ namespace CloudNative.CloudEvents.Kafka
private static byte[] GetHeaderValue(MessageMetadata message, string headerName) =>
message.Headers.FirstOrDefault(x => string.Equals(x.Key, headerName, StringComparison.InvariantCultureIgnoreCase))
?.GetValueBytes();
public static Message<string, byte[]> ToKafkaMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
{
// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
if (cloudEvent.Data == null)
{
throw new ArgumentNullException(nameof(cloudEvent.Data));
}
var headers = MapHeaders(cloudEvent, formatter);
string key = (string) cloudEvent[Partitioning.PartitionKeyAttribute];
byte[] value;
string contentTypeHeaderValue = null;
if (contentMode == ContentMode.Structured)
{
value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
// TODO: What about the non-media type parts?
contentTypeHeaderValue = contentType.MediaType;
}
else
{
if (cloudEvent.Data is byte[] byteData)
{
value = byteData;
}
else if (cloudEvent.Data is Stream dataStream)
{
// TODO: Extract this common code somewhere, or use shared source to access BinaryDataUtilities.
if (dataStream is MemoryStream dataMemoryStream)
{
value = dataMemoryStream.ToArray();
}
else
{
var buffer = new MemoryStream();
dataStream.CopyTo(buffer);
value = buffer.ToArray();
}
}
else
{
throw new InvalidOperationException($"{cloudEvent.Data.GetType()} type is not supported for Cloud Event's Value.");
}
if (cloudEvent.DataContentType is string dataContentType)
{
contentTypeHeaderValue = dataContentType;
}
}
if (contentTypeHeaderValue is object)
{
headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentTypeHeaderValue));
}
return new Message<string, byte[]>
{
Headers = headers,
Value = value,
Key = key
};
}
private static Headers MapHeaders(CloudEvent cloudEvent, CloudEventFormatter formatter)
{
var headers = new Headers
{
{ SpecVersionKafkaHeader, Encoding.UTF8.GetBytes(cloudEvent.SpecVersion.VersionId) }
};
foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
var attribute = pair.Key;
if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute ||
attribute.Name == Partitioning.PartitionKeyAttribute.Name)
{
continue;
}
var value = attribute.Format(pair.Value);
headers.Add(KafkaHeaderPrefix + attribute.Name, Encoding.UTF8.GetBytes(value));
}
return headers;
}
}
}

View File

@ -1,90 +0,0 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.Extensions;
using Confluent.Kafka;
using System;
using System.IO;
using System.Text;
namespace CloudNative.CloudEvents.Kafka
{
// TODO: avoid the inheritance here? Constructors are somewhat constricting...
public class KafkaCloudEventMessage : Message<string, byte[]>
{
internal const string KafkaHeaderPrefix = "ce_";
internal const string KafkaContentTypeAttributeName = "content-type";
internal const string SpecVersionKafkaHeader = KafkaHeaderPrefix + "specversion";
public KafkaCloudEventMessage(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
{
// TODO: Is this appropriate? Why can't we transport a CloudEvent without data in Kafka?
if (cloudEvent.Data == null)
{
throw new ArgumentNullException(nameof(cloudEvent.Data));
}
Headers = new Headers
{
{ SpecVersionKafkaHeader, Encoding.UTF8.GetBytes(cloudEvent.SpecVersion.VersionId) }
};
Key = (string) cloudEvent[Partitioning.PartitionKeyAttribute];
if (contentMode == ContentMode.Structured)
{
Value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentType.MediaType));
}
else
{
if (cloudEvent.Data is byte[] byteData)
{
Value = byteData;
}
else if (cloudEvent.Data is Stream dataStream)
{
// TODO: Extract this common code somewhere
if (dataStream is MemoryStream dataMemoryStream)
{
Value = dataMemoryStream.ToArray();
}
else
{
var buffer = new MemoryStream();
dataStream.CopyTo(buffer);
Value = buffer.ToArray();
}
}
else
{
throw new InvalidOperationException($"{cloudEvent.Data.GetType()} type is not supported for Cloud Event's Value.");
}
if (cloudEvent.DataContentType is string dataContentType)
{
Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(dataContentType));
}
}
MapHeaders(cloudEvent, formatter);
}
private void MapHeaders(CloudEvent cloudEvent, CloudEventFormatter formatter)
{
foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
var attribute = pair.Key;
if (attribute == cloudEvent.SpecVersion.DataContentTypeAttribute ||
attribute.Name == Partitioning.PartitionKeyAttribute.Name)
{
continue;
}
var value = attribute.Format(pair.Value);
Headers.Add(KafkaHeaderPrefix + attribute.Name, Encoding.UTF8.GetBytes(value));
}
}
}
}

View File

@ -38,7 +38,7 @@ namespace CloudNative.CloudEvents.Kafka.UnitTests
["comexampleextension1"] = "value"
};
var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Structured, new JsonEventFormatter());
var message = cloudEvent.ToKafkaMessage(ContentMode.Structured, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
@ -82,7 +82,7 @@ namespace CloudNative.CloudEvents.Kafka.UnitTests
[Partitioning.PartitionKeyAttribute] = "hello much wow"
};
var message = new KafkaCloudEventMessage(cloudEvent, ContentMode.Binary, new JsonEventFormatter());
var message = cloudEvent.ToKafkaMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
// using serialization to create fully independent copy thus simulating message transport