First approach of an implementation of a Pulsar protocol binding

This commit is contained in:
Alberto Leon 2024-11-27 21:33:05 +01:00
parent b4632f426d
commit c470aa4c7d
6 changed files with 223 additions and 24 deletions

View File

@ -12,6 +12,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.gitignore = .gitignore
appveyor.yml = appveyor.yml
LICENSE = LICENSE
nuget.config = nuget.config
README.md = README.md
EndProjectSection
EndProject
@ -72,6 +73,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "xml", "xml", "{4012C753-68D
conformance\format\xml\valid-events.xml = conformance\format\xml\valid-events.xml
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cloudnative.CloudEvents.Pulsar", "Cloudnative.CloudEvents.Pulsar\Cloudnative.CloudEvents.Pulsar.csproj", "{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -238,6 +241,18 @@ Global
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x64.Build.0 = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x86.ActiveCfg = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x86.Build.0 = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|Any CPU.Build.0 = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|x64.ActiveCfg = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|x64.Build.0 = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|x86.ActiveCfg = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Debug|x86.Build.0 = Debug|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|Any CPU.ActiveCfg = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|Any CPU.Build.0 = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|x64.ActiveCfg = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|x64.Build.0 = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|x86.ActiveCfg = Release|Any CPU
{41FAAAF0-7E77-4D61-8291-F8CB5F5D2D00}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="DotPulsar" Version="3.5.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\src\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,160 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.Core;
using CloudNative.CloudEvents.Extensions;
using DotPulsar;
using DotPulsar.Abstractions;
using DotPulsar.Internal;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Net.Mime;
using System.Reflection.PortableExecutable;
using System.Text;
namespace CloudNative.CloudEvents.Pulsar
{
/// <summary>
/// Extension methods to convert between CloudEvents and Pulsar messages.
/// </summary>
public static class PulsarExtensions
{
private const string PulsarHeaderPrefix = "ce-";
// Visible for testing
internal const string PulsarContentTypeAttributeName = "content-type";
private const string SpecVersionPulsarHeader = PulsarHeaderPrefix + "specversion";
/// <summary>
/// Indicates whether this message holds a single CloudEvent.
/// </summary>
/// <remarks>
/// This method returns false for batch requests, as they need to be parsed differently.
/// </remarks>
/// <param name="message">The message to check for the presence of a CloudEvent. Must not be null.</param>
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent(this IMessage<ReadOnlySequence<byte>> message) =>
GetHeaderValue(message, SpecVersionPulsarHeader) is object ||
MimeUtilities.IsCloudEventsContentType(GetHeaderValue(message, PulsarContentTypeAttributeName));
/// <summary>
/// Converts this Pulsar message into a CloudEvent object.
/// </summary>
/// <param name="message">The Pulsar message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this IMessage<ReadOnlySequence<byte>> message,
CloudEventFormatter formatter, params CloudEventAttribute[]? extensionAttributes) =>
ToCloudEvent(message, formatter, (IEnumerable<CloudEventAttribute>?) extensionAttributes);
/// <summary>
/// Converts this Pulsar message into a CloudEvent object.
/// </summary>
/// <param name="message">The Pulsar message to convert. Must not be null.</param>
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
/// <returns>A reference to a validated CloudEvent instance.</returns>
public static CloudEvent ToCloudEvent(this IMessage<ReadOnlySequence<byte>> message,
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(message, nameof(message));
Validation.CheckNotNull(formatter, nameof(formatter));
if (!IsCloudEvent(message))
{
throw new InvalidOperationException();
}
var contentType = GetHeaderValue(message, PulsarContentTypeAttributeName);
CloudEvent cloudEvent;
// Structured mode
if (MimeUtilities.IsCloudEventsContentType(contentType))
{
cloudEvent = formatter.DecodeStructuredModeMessage(message.Value().ToArray(), new ContentType(contentType), extensionAttributes);
}
else
{
// Binary mode
if (!(GetHeaderValue(message, SpecVersionPulsarHeader) is string versionId))
{
throw new ArgumentException("Request is not a CloudEvent");
}
CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionId)
?? throw new ArgumentException($"Unknown CloudEvents spec version '{versionId}'", nameof(message));
cloudEvent = new CloudEvent(version, extensionAttributes)
{
DataContentType = contentType
};
foreach (var header in message.Properties.Where(h => h.Key.StartsWith(PulsarHeaderPrefix)))
{
var attributeName = header.Key.Substring(PulsarHeaderPrefix.Length).ToLowerInvariant();
if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name)
{
continue;
}
// TODO: Is this feasible?
var headerValue = header.Value;
if (headerValue is null)
{
continue;
}
cloudEvent.SetAttributeFromString(attributeName, headerValue);
}
formatter.DecodeBinaryModeEventData(message.Value().ToArray(), cloudEvent);
}
InitPartitioningKey(message, cloudEvent);
return Validation.CheckCloudEventArgument(cloudEvent, nameof(message));
}
private static void InitPartitioningKey(IMessage<ReadOnlySequence<byte>> message, CloudEvent cloudEvent)
{
if (!string.IsNullOrEmpty(message.Key))
{
cloudEvent[Partitioning.PartitionKeyAttribute] = message.Key;
}
}
/// <summary>
/// Returns the last header value with the given name, or null if there is no such header.
/// </summary>
private static string? GetHeaderValue(IMessage<ReadOnlySequence<byte>> message, string headerName) =>
Validation.CheckNotNull(message.Properties, nameof(message.Properties)) is null
? null
: message.Properties.TryGetValue(headerName, out var headerValue) ? headerValue : null;
public static MessageMetadata ToPulsarMessageMetadata(this CloudEvent cloudEvent)
{
var metadata = new MessageMetadata();
metadata[SpecVersionPulsarHeader] = cloudEvent.SpecVersion.VersionId;
foreach (var attribute in cloudEvent.GetPopulatedAttributes())
{
metadata[PulsarHeaderPrefix + attribute.Key.Name]=attribute.Value.ToString();
}
return metadata;
}
public static byte[] ToPulsarMessageBody(this CloudEvent cloudEvent, CloudEventFormatter formatter, ContentMode contentMode)
{
switch (contentMode)
{
case ContentMode.Structured:
return formatter.EncodeStructuredModeMessage(cloudEvent, out _).ToArray();
case ContentMode.Binary:
return formatter.EncodeBinaryModeEventData(cloudEvent).ToArray();
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
}
}
}
}

View File

@ -1,24 +1,24 @@
<Project>
<ItemGroup>
<PackageVersion Include="AMQPNetLite" Version="2.4.11" />
<PackageVersion Include="AMQPNetLite.Serialization" Version="2.4.11" />
<PackageVersion Include="Apache.Avro" Version="1.11.3" />
<PackageVersion Include="Confluent.Kafka" Version="1.9.3" />
<PackageVersion Include="Google.Protobuf" Version="3.27.3" />
<PackageVersion Include="McMaster.Extensions.CommandLineUtils" Version="4.1.1" />
<PackageVersion Include="Microsoft.AspNetCore.Http" Version="2.1.34" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.32" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageVersion Include="MQTTnet" Version="4.3.6.1152" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="Nullable" Version="1.3.1" />
<PackageVersion Include="System.Memory" Version="4.5.5" />
<PackageVersion Include="System.Text.Encodings.Web" Version="8.0.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.4" />
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.runner.console" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
</Project>
<ItemGroup>
<PackageVersion Include="AMQPNetLite" Version="2.4.11" />
<PackageVersion Include="AMQPNetLite.Serialization" Version="2.4.11" />
<PackageVersion Include="Apache.Avro" Version="1.11.3" />
<PackageVersion Include="Confluent.Kafka" Version="1.9.3" />
<PackageVersion Include="Google.Protobuf" Version="3.27.3" />
<PackageVersion Include="McMaster.Extensions.CommandLineUtils" Version="4.1.1" />
<PackageVersion Include="Microsoft.AspNetCore.Http" Version="2.1.34" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.32" />
<PackageVersion Include="Microsoft.CSharp" Version="4.7.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<PackageVersion Include="MQTTnet" Version="4.3.6.1152" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="Nullable" Version="1.3.1" />
<PackageVersion Include="System.Memory" Version="4.5.5" />
<PackageVersion Include="System.Text.Encodings.Web" Version="8.0.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.runner.console" Version="2.9.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
</ItemGroup>
</Project>

@ -1 +1 @@
Subproject commit c5bcce28fa078aeef41e5b1998a7c01270d62d35
Subproject commit 7a8ee0ac0e782bba1ba30e58c62d24d2e6c337e5

7
nuget.config Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<packageSources>
<clear />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
</packageSources>
</configuration>