feat: Add a Protobuf event formatter

Further features may be added later, e.g. convenience methods.

Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
Jon Skeet 2022-01-28 14:38:27 +00:00 committed by Jon Skeet
parent bf27ac4be3
commit 0349eeb274
10 changed files with 2682 additions and 1 deletions

View File

@ -33,7 +33,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Avr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.NewtonsoftJson", "src\CloudNative.CloudEvents.NewtonsoftJson\CloudNative.CloudEvents.NewtonsoftJson.csproj", "{9DC17081-21D8-4123-8650-D97C2153DB8C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CloudNative.CloudEvents.SystemTextJson", "src\CloudNative.CloudEvents.SystemTextJson\CloudNative.CloudEvents.SystemTextJson.csproj", "{FACB3EF2-F078-479A-A91C-719894CB66BF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.SystemTextJson", "src\CloudNative.CloudEvents.SystemTextJson\CloudNative.CloudEvents.SystemTextJson.csproj", "{FACB3EF2-F078-479A-A91C-719894CB66BF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CloudNative.CloudEvents.Protobuf", "src\CloudNative.CloudEvents.Protobuf\CloudNative.CloudEvents.Protobuf.csproj", "{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -189,6 +191,18 @@ Global
{FACB3EF2-F078-479A-A91C-719894CB66BF}.Release|x64.Build.0 = Release|Any CPU
{FACB3EF2-F078-479A-A91C-719894CB66BF}.Release|x86.ActiveCfg = Release|Any CPU
{FACB3EF2-F078-479A-A91C-719894CB66BF}.Release|x86.Build.0 = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|x64.ActiveCfg = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|x64.Build.0 = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|x86.ActiveCfg = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Debug|x86.Build.0 = Debug|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|Any CPU.Build.0 = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x64.ActiveCfg = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x64.Build.0 = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x86.ActiveCfg = Release|Any CPU
{9D82AC2B-0075-4161-AE0E-4A6629C9FF2A}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

67
generate_protos.sh Normal file
View File

@ -0,0 +1,67 @@
#!/bin/bash
# Copyright 2021 Cloud Native Foundation.
# Licensed under the Apache 2.0 license.
# See LICENSE file in the project root for full license information.
set -e
PROTOBUF_VERSION=3.19.3
# Generates the classes for the protobuf event format
case "$OSTYPE" in
linux*)
PROTOBUF_PLATFORM=linux-x86_64
PROTOC=tmp/bin/protoc
;;
win* | msys* | cygwin*)
PROTOBUF_PLATFORM=win64
PROTOC=tmp/bin/protoc.exe
;;
darwin*)
PROTOBUF_PLATFORM=osx-x86_64
PROTOC=tmp/bin/protoc
;;
*)
echo "Unknown OSTYPE: $OSTYPE"
exit 1
esac
# Clean up previous generation results
rm -f src/CloudNative.CloudEvents.Protobuf/*.g.cs
rm -f test/CloudNative.CloudEvents.UnitTests/Protobuf/*.g.cs
rm -rf tmp
mkdir tmp
cd tmp
echo "- Downloading protobuf@$PROTOBUF_VERSION"
curl -sSL \
https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOBUF_VERSION/protoc-$PROTOBUF_VERSION-$PROTOBUF_PLATFORM.zip \
--output protobuf.zip
unzip -q protobuf.zip
echo "- Downloading schema"
# TODO: Use the 1.0.2 branch when it exists.
mkdir cloudevents
curl -sSL https://raw.githubusercontent.com/cloudevents/spec/main/cloudevents/formats/cloudevents.proto -o cloudevents/ProtoSchema.proto
cd ..
# Schema proto
$PROTOC \
-I tmp/include \
-I tmp/cloudevents \
--csharp_out=src/CloudNative.CloudEvents.Protobuf \
--csharp_opt=file_extension=.g.cs \
tmp/cloudevents/ProtoSchema.proto
# Test protos
$PROTOC \
-I tmp/include \
-I test/CloudNative.CloudEvents.UnitTests/Protobuf \
--csharp_out=test/CloudNative.CloudEvents.UnitTests/Protobuf \
--csharp_opt=file_extension=.g.cs \
test/CloudNative.CloudEvents.UnitTests/Protobuf/*.proto
echo "Generated code."
rm -rf tmp

View File

@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
<Description>Support for the Protobuf event format in for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;protobuf</PackageTags>
<LangVersion>8.0</LangVersion>
<Nullable>enable</Nullable>
<Version>2.0.0-local.1</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.19.3" />
<!-- Be explicit about not including these files in the package. -->
<None Include="README.md" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>
</Project>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,400 @@
// Copyright 2021 Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.Core;
using CloudNative.CloudEvents.V1;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.Mime;
using System.Text;
using System.Threading.Tasks;
using static CloudNative.CloudEvents.V1.CloudEvent;
using static CloudNative.CloudEvents.V1.CloudEvent.Types;
using static CloudNative.CloudEvents.V1.CloudEvent.Types.CloudEventAttributeValue;
namespace CloudNative.CloudEvents.Protobuf
{
// TODO: Derived type which expects to only receive protobuf message data with a particular message type,
// so is able to unpack it.
/// <summary>
/// Formatter that implements the Protobuf Event Format, using the Google.Protobuf library for serialization.
/// </summary>
/// <remarks>
/// <para>
/// When encoding CloudEvents in structured mode, three kinds of data are supported, as indicated in the
/// event format. Text is stored in the <see cref="V1.CloudEvent.TextData"/> field; binary data is stored
/// in the <see cref="V1.CloudEvent.BinaryData"/> field; protobuf messages are stored in the
/// <see cref="V1.CloudEvent.ProtoData"/> field. In the last case, the message is packed in an
/// <see cref="Any"/> message, to preserve information about which message is encoded, unless the message
/// is already an <see cref="Any"/> in which case it is stored directly. (This prevents "double-encoding"
/// when a CloudEvent is decoded and then re-encoded.) Attempts to serialize CloudEvents with any other data type
/// will fail. Derived classes can specialize all of this behavior by overriding
/// <see cref="EncodeStructuredModeData(CloudEvent, V1.CloudEvent)"/>.
/// </para>
/// <para>
/// When decoding CloudEvents in structured mode, text and binary data payloads are represented as strings and byte
/// arrays respectively. Protobuf message payloads are represented using the <see cref="Any"/> wrapper, without
/// attempting to "unpack" the message. This avoids any requirement for the underlying message type to be
/// known by the application consuming the CloudEvent. (The data may be stored for later processing by another
/// application with more awareness, for example.) Derived classes can specialize all of this behavior by
/// overriding <see cref="DecodeStructuredModeData(V1.CloudEvent, CloudEvent)"/>.
/// </para>
/// <para>
/// When encoding CloudEvent data in binary mode, this implementation only supports plain binary and text data.
/// (Even text data is only supported when the <see cref="CloudEvent.DataContentType"/> begins with "text/".)
/// While it might be expected that protobuf messages would be serialized into the binary mode data, there is
/// no clear standard as to whether they should be directly serialized, or packed into an <see cref="Any"/>
/// message first, and no standardized content type to use to distinguish these options. Users are encouraged
/// to either use structured mode where possible, or explicitly encode the data as a byte array first. Derived
/// classes can specialize this behavior by overriding <see cref="EncodeBinaryModeEventData(CloudEvent)"/>.
/// </para>
/// <para>
/// When decoding CloudEvent data in binary mode, if the data content type begins with "text/" it is decoded as
/// a string, otherwise it is left as a byte array. Derived classes can specialize this behavior by overriding
/// <see cref="DecodeBinaryModeEventData(ReadOnlyMemory{byte}, CloudEvent)"/>.
/// </para>
/// </remarks>
public class ProtobufEventFormatter : CloudEventFormatter
{
/// <summary>
/// The default value for <see cref="TypeUrlPrefix"/>. This is the value used by Protobuf libraries
/// when no prefix is specifically provided.
/// </summary>
public const string DefaultTypeUrlPrefix = "type.googleapis.com";
private const string MediaTypeSuffix = "+protobuf";
private static readonly string StructuredMediaType = MimeUtilities.MediaType + MediaTypeSuffix;
private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix;
/// <summary>
/// The type URL prefix this event formatter uses when packing messages into <see cref="Any"/>.
/// The value is never null. Note: the type URL prefix is not used when the data within a CloudEvent
/// is already an Any message, as the message is propagated directly.
/// </summary>
public string TypeUrlPrefix { get; }
private static readonly Dictionary<AttrOneofCase, CloudEventAttributeType> protoToCloudEventAttributeType =
new Dictionary<AttrOneofCase, CloudEventAttributeType>
{
{ AttrOneofCase.CeBoolean, CloudEventAttributeType.Boolean },
{ AttrOneofCase.CeBytes, CloudEventAttributeType.Binary },
{ AttrOneofCase.CeInteger, CloudEventAttributeType.Integer },
{ AttrOneofCase.CeString, CloudEventAttributeType.String },
{ AttrOneofCase.CeTimestamp, CloudEventAttributeType.Timestamp },
{ AttrOneofCase.CeUri, CloudEventAttributeType.Uri },
{ AttrOneofCase.CeUriRef, CloudEventAttributeType.UriReference }
};
/// <summary>
/// Constructs an instance of the formatter, using a type URL prefix of
/// "type.googleapis.com" (the default for <see cref="Any.Pack(IMessage)"/>).
/// </summary>
public ProtobufEventFormatter() : this(DefaultTypeUrlPrefix)
{
}
/// <summary>
/// Constructs an instance of the formatter, using the specified type URL prefix
/// when packing messages.
/// </summary>
/// <param name="typeUrlPrefix">The type URL prefix to use when packing messages
/// into <see cref="Any"/>. Must not be null.</param>
public ProtobufEventFormatter(string typeUrlPrefix)
{
TypeUrlPrefix = Validation.CheckNotNull(typeUrlPrefix, nameof(typeUrlPrefix));
}
/// <inheritdoc />
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(ReadOnlyMemory<byte> body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
DecodeBatchModeMessage(BinaryDataUtilities.AsStream(body), contentType, extensionAttributes);
/// <inheritdoc />
public override void DecodeBinaryModeEventData(ReadOnlyMemory<byte> body, CloudEvent cloudEvent)
{
Validation.CheckNotNull(cloudEvent, nameof(cloudEvent));
if (cloudEvent.DataContentType is string dataContentType && dataContentType.StartsWith("text/"))
{
Encoding encoding = MimeUtilities.GetEncoding(new ContentType(dataContentType));
cloudEvent.Data = BinaryDataUtilities.GetString(body, encoding);
}
else
{
cloudEvent.Data = body.ToArray();
}
}
/// <inheritdoc />
public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory<byte> body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
DecodeStructuredModeMessage(BinaryDataUtilities.AsStream(body), contentType, extensionAttributes);
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvents, out ContentType contentType)
{
Validation.CheckNotNull(cloudEvents, nameof(cloudEvents));
contentType = new ContentType(BatchMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var batch = new CloudEventBatch
{
Events = { cloudEvents.Select(cloudEvent => ConvertToProto(cloudEvent, nameof(cloudEvents))) }
};
return batch.ToByteArray();
}
// TODO: Put the boiler-plate code here into CloudEventFormatter
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeBinaryModeEventData(CloudEvent cloudEvent)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
if (cloudEvent.Data is null)
{
return Array.Empty<byte>();
}
if (cloudEvent.DataContentType is string dataContentType && dataContentType.StartsWith("text/") && cloudEvent.Data is string text)
{
ContentType contentType = new ContentType(dataContentType);
return MimeUtilities.GetEncoding(contentType).GetBytes(text);
}
if (cloudEvent.Data is byte[] bytes)
{
return bytes;
}
throw new ArgumentException($"{nameof(ProtobufEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
}
/// <inheritdoc />
public override ReadOnlyMemory<byte> EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
var proto = ConvertToProto(cloudEvent, nameof(cloudEvent));
contentType = new ContentType(StructuredMediaType)
{
CharSet = Encoding.UTF8.WebName
};
return proto.ToByteArray();
}
/// <inheritdoc />
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(body, nameof(body));
var batchProto = CloudEventBatch.Parser.ParseFrom(body);
return batchProto.Events.Select(proto => ConvertFromProto(proto, extensionAttributes, nameof(body))).ToList();
}
/// <inheritdoc />
public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(messageBody, nameof(messageBody));
return ConvertFromProto(V1.CloudEvent.Parser.ParseFrom(messageBody), extensionAttributes, nameof(messageBody));
}
/// <summary>
/// Converts the given protobuf representation of a CloudEvent into an SDK representation.
/// </summary>
/// <param name="proto">The protobuf representation of a CloudEvent. Must not be null.</param>
/// <param name="extensionAttributes">The extension attributes to use when populating the CloudEvent. May be null.</param>
/// <returns>The SDK representation of the CloudEvent.</returns>
public CloudEvent ConvertFromProto(V1.CloudEvent proto, IEnumerable<CloudEventAttribute>? extensionAttributes) =>
ConvertFromProto(Validation.CheckNotNull(proto, nameof(proto)), extensionAttributes, nameof(proto));
private CloudEvent ConvertFromProto(V1.CloudEvent proto, IEnumerable<CloudEventAttribute>? extensionAttributes, string paramName)
{
var specVersion = CloudEventsSpecVersion.FromVersionId(proto.SpecVersion)
?? throw new ArgumentException($"Unsupported CloudEvents spec version '{proto.SpecVersion}'", paramName);
var cloudEvent = new CloudEvent(specVersion, extensionAttributes)
{
Id = proto.Id,
Source = (Uri) specVersion.SourceAttribute.Parse(proto.Source),
Type = proto.Type
};
foreach (var pair in proto.Attributes)
{
if (!protoToCloudEventAttributeType.TryGetValue(pair.Value.AttrCase, out var attrTypeFromProto))
{
// Note: impossible to cover in tests
throw new ArgumentException($"Unhandled protobuf attribute case: {pair.Value.AttrCase}", paramName);
}
// If we've already got an extension attribute specified for this name,
// we validate against it and require the value in the proto to have the right
// type. Otherwise, we create a new extension attribute of the correct type.
var attr = cloudEvent.GetAttribute(pair.Key);
if (attr is null)
{
attr = CloudEventAttribute.CreateExtension(pair.Key, attrTypeFromProto);
}
// Note: if CloudEvents spec version 2.0 contains different required attributes, we may want to
// change exactly how this is specified. For the moment, this is the simplest way of implementing the requirement.
else if (attr.IsRequired)
{
// The required attributes are all specified as proto fields.
// They can't appear in the Attributes repeated field as well.
throw new ArgumentException(
$"Attribute '{attr.Name}' is a required attribute, and must only be specified via the top-level proto field.");
}
else if (attr.Type != attrTypeFromProto)
{
// This prevents any type changes, even those which might validate correctly
// otherwise (e.g. between Uri and UriRef).
throw new ArgumentException(
$"Attribute '{attr.Name}' was specified with type '{attr.Type}', but has type '{attrTypeFromProto}' in the protobuf representation.");
}
// Note: the indexer performs validation.
cloudEvent[attr] = pair.Value.AttrCase switch
{
AttrOneofCase.CeBoolean => pair.Value.CeBoolean,
AttrOneofCase.CeBytes => pair.Value.CeBytes.ToByteArray(),
AttrOneofCase.CeInteger => pair.Value.CeInteger,
AttrOneofCase.CeString => pair.Value.CeString,
AttrOneofCase.CeTimestamp => pair.Value.CeTimestamp.ToDateTimeOffset(),
AttrOneofCase.CeUri => CloudEventAttributeType.Uri.Parse(pair.Value.CeUri),
AttrOneofCase.CeUriRef => CloudEventAttributeType.UriReference.Parse(pair.Value.CeUriRef),
_ => throw new ArgumentException($"Unhandled protobuf attribute case: {pair.Value.AttrCase}")
};
}
DecodeStructuredModeData(proto, cloudEvent);
return Validation.CheckCloudEventArgument(cloudEvent, paramName);
}
/// <summary>
/// Decodes the "data" property provided within a structured-mode message,
/// populating the <see cref="CloudEvents.CloudEvent.Data"/> property accordingly.
/// </summary>
/// <remarks>
/// <para>
/// This implementation simply converts binary data to a byte array, leaves proto data
/// as an <see cref="Google.Protobuf.WellKnownTypes.Any"/>, and converts text data to a string.
/// </para>
/// <para>
/// Override this method to provide more specialized conversions, such as to use <see cref="ByteString"/>
/// instead of a byte array, or to "unwrap" the proto data to generated code.
/// </para>
/// </remarks>
/// <param name="proto">The protobuf representation of the CloudEvent. Will not be null.</param>
/// <param name="cloudEvent">The event being decoded. This should not be modified except to
/// populate the <see cref="CloudEvents.CloudEvent.Data"/> property, but may be used to provide extra
/// information such as the data content type. Will not be null.</param>
/// <returns>The data to populate in the <see cref="CloudEvents.CloudEvent.Data"/> property.</returns>
protected virtual void DecodeStructuredModeData(V1.CloudEvent proto, CloudEvent cloudEvent) =>
cloudEvent.Data = proto.DataCase switch
{
DataOneofCase.BinaryData => proto.BinaryData.ToByteArray(),
DataOneofCase.ProtoData => proto.ProtoData,
DataOneofCase.TextData => proto.TextData,
DataOneofCase.None => null,
// Note: impossible to cover in tests
_ => throw new ArgumentException($"Unhandled protobuf data case: {proto.DataCase}")
};
/// <summary>
/// Encodes structured (or batch) mode data within a CloudEvent, storing it in the specified <see cref="CloudEvents.CloudEvent"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent being encoded, which will have a non-null value for
/// its <see cref="CloudEvents.CloudEvent.Data"/> property.</param>
/// <param name="proto">The protobuf representation of the CloudEvent, which will be non-null.</param>
protected virtual void EncodeStructuredModeData(CloudEvent cloudEvent, V1.CloudEvent proto)
{
switch (cloudEvent.Data)
{
case IMessage message:
proto.ProtoData = message is Any any ? any : Any.Pack(message, TypeUrlPrefix);
break;
case string text:
proto.TextData = text;
break;
case byte[] binary:
proto.BinaryData = ByteString.CopyFrom(binary);
break;
default:
throw new ArgumentException($"{nameof(ProtobufEventFormatter)} cannot serialize data of type {cloudEvent.Data!.GetType()}");
}
}
/// <summary>
/// Converts the given SDK representation of a CloudEvent to a protobuf representation.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <returns>The protobuf representation of the CloudEvent.</returns>
public V1.CloudEvent ConvertToProto(CloudEvent cloudEvent) => ConvertToProto(cloudEvent, nameof(cloudEvent));
private V1.CloudEvent ConvertToProto(CloudEvent cloudEvent, string paramName)
{
Validation.CheckCloudEventArgument(cloudEvent, paramName);
var specVersion = cloudEvent.SpecVersion;
var proto = new V1.CloudEvent
{
Id = cloudEvent.Id,
// Source is a required attribute, and we've validated the CloudEvent,
// so it really should be non-null.
Source = specVersion.SourceAttribute.Format(cloudEvent.Source!),
Type = cloudEvent.Type,
SpecVersion = cloudEvent.SpecVersion.VersionId
};
foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
var attr = pair.Key;
// Skip attributes already handled above.
if (attr == specVersion.IdAttribute ||
attr == specVersion.SourceAttribute ||
attr == specVersion.TypeAttribute)
{
continue;
}
var value = new CloudEventAttributeValue();
switch (CloudEventAttributeTypes.GetOrdinal(attr.Type))
{
case CloudEventAttributeTypeOrdinal.Binary:
value.CeBytes = ByteString.CopyFrom((byte[]) pair.Value);
break;
case CloudEventAttributeTypeOrdinal.Boolean:
value.CeBoolean = (bool) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.Integer:
value.CeInteger = (int) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.String:
value.CeString = (string) pair.Value;
break;
case CloudEventAttributeTypeOrdinal.Timestamp:
value.CeTimestamp = Timestamp.FromDateTimeOffset((DateTimeOffset) pair.Value);
break;
case CloudEventAttributeTypeOrdinal.Uri:
value.CeUri = attr.Format(pair.Value);
break;
case CloudEventAttributeTypeOrdinal.UriReference:
value.CeUriRef = attr.Format(pair.Value);
break;
default:
// Note: impossible to cover in tests
throw new ArgumentException($"Unhandled attribute type: {attr.Type}");
}
proto.Attributes.Add(attr.Name, value);
}
if (cloudEvent.Data is object)
{
EncodeStructuredModeData(cloudEvent, proto);
}
return proto;
}
}
}

View File

@ -0,0 +1,6 @@
# CloudNative.CloudEvents.Protobuf
This implements the Protobuf Event Format for CloudEvents.
The [../../generate_protos.sh]() script is used to generate the C# code from the
spec, as well as any messages used for testing.

View File

@ -24,6 +24,7 @@
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Kafka\CloudNative.CloudEvents.Kafka.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Mqtt\CloudNative.CloudEvents.Mqtt.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.NewtonsoftJson\CloudNative.CloudEvents.NewtonsoftJson.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Protobuf\CloudNative.CloudEvents.Protobuf.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.SystemTextJson\CloudNative.CloudEvents.SystemTextJson.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>

View File

@ -0,0 +1,713 @@
// Copyright 2022 Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using Google.Protobuf;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using System;
using System.IO;
using System.Net.Mime;
using System.Text;
using System.Threading.Tasks;
using Xunit;
using static CloudNative.CloudEvents.UnitTests.TestHelpers;
using static CloudNative.CloudEvents.V1.CloudEvent.Types;
namespace CloudNative.CloudEvents.Protobuf.UnitTests
{
/// <summary>
/// Tests for ProtobufEventFormatter. Note that most tests for encoding/decoding of
/// structured mode events (and batches) are performed via the public methods that
/// perform proto/CloudEvent conversions - the regular event formatter methods are
/// only wrappers around those, covered by minimal tests here.
/// </summary>
public class ProtobufEventFormatterTest
{
private static readonly ContentType s_protobufCloudEventContentType = new ContentType("application/cloudevents+protobuf");
private static readonly ContentType s_protobufCloudEventBatchContentType = new ContentType("application/cloudevents-batch+protobuf");
[Fact]
public void EncodeStructuredModeMessage_Minimal()
{
var cloudEvent = new CloudEvent(CloudEventsSpecVersion.V1_0)
{
Id = "event-id",
Source = new Uri("https://event-source"),
Type = "event-type",
};
var encoded = new ProtobufEventFormatter().EncodeStructuredModeMessage(cloudEvent, out var contentType);
Assert.Equal("application/cloudevents+protobuf; charset=utf-8", contentType.ToString());
var actualProto = V1.CloudEvent.Parser.ParseFrom(encoded.ToArray());
var expectedProto = new V1.CloudEvent
{
SpecVersion = "1.0",
Id = "event-id",
Source = "https://event-source",
Type = "event-type"
};
Assert.Equal(expectedProto, actualProto);
}
/// <summary>
/// A simple test that populates all known v1.0 attributes, so we don't need to test that
/// aspect in the future.
/// </summary>
[Fact]
public void ConvertToProto_V1Attributes()
{
var cloudEvent = new CloudEvent(CloudEventsSpecVersion.V1_0)
{
Data = "text",
DataContentType = "text/plain",
DataSchema = new Uri("https://data-schema"),
Id = "event-id",
Source = new Uri("https://event-source"),
Subject = "event-subject",
Time = new DateTimeOffset(2021, 2, 19, 12, 34, 56, 789, TimeSpan.FromHours(1)),
Type = "event-type"
};
var actualProto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
var expectedProto = new V1.CloudEvent
{
SpecVersion = "1.0",
Id = "event-id",
Source = "https://event-source",
Type = "event-type",
Attributes =
{
{ "datacontenttype", StringAttribute("text/plain") },
{ "dataschema", UriAttribute("https://data-schema") },
{ "subject", StringAttribute("event-subject") },
// Deliberately not reusing cloudEvent.Time: this demonstrates that only the instant in time
// is relevant, not the UTC offset.
{ "time", TimestampAttribute(new DateTimeOffset(2021, 2, 19, 11, 34, 56, 789, TimeSpan.Zero)) }
},
TextData = "text"
};
Assert.Equal(expectedProto, actualProto);
}
[Fact]
public void ConvertToProto_AllAttributeTypes()
{
var cloudEvent = new CloudEvent(AllTypesExtensions)
{
["binary"] = SampleBinaryData,
["boolean"] = true,
["integer"] = 10,
["string"] = "text",
["timestamp"] = SampleTimestamp,
["uri"] = SampleUri,
["urireference"] = SampleUriReference
};
// We're not going to check these.
cloudEvent.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
var expectedAttributes = new MapField<string, CloudEventAttributeValue>
{
{ "binary", BinaryAttribute(SampleBinaryData) },
{ "boolean", BooleanAttribute(true) },
{ "integer", IntegerAttribute(10) },
{ "string", StringAttribute("text") },
{ "timestamp", TimestampAttribute(SampleTimestamp) },
{ "uri", UriAttribute(SampleUriText) },
{ "urireference", UriRefAttribute(SampleUriReferenceText) }
};
Assert.Equal(proto.Attributes, expectedAttributes);
}
[Fact]
public void ConvertToProto_NoData()
{
var cloudEvent = new CloudEvent().PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
Assert.Equal(V1.CloudEvent.DataOneofCase.None, proto.DataCase);
}
[Fact]
public void ConvertToProto_TextData()
{
var cloudEvent = new CloudEvent { Data = "text" }.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
Assert.Equal("text", proto.TextData);
}
[Fact]
public void ConvertToProto_BinaryData()
{
var cloudEvent = new CloudEvent { Data = SampleBinaryData }.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
Assert.Equal(SampleBinaryData, proto.BinaryData.ToByteArray());
}
[Fact]
public void ConvertToProto_MessageData()
{
var data = new PayloadData1 { Name = "test" };
var cloudEvent = new CloudEvent { Data = data }.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
Assert.Equal(Any.Pack(data), proto.ProtoData);
}
[Fact]
public void ConvertToProto_MessageData_AlreadyPacked()
{
var data = new PayloadData1 { Name = "test" };
var packedData = Any.Pack(data);
var cloudEvent = new CloudEvent { Data = packedData }.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter().ConvertToProto(cloudEvent);
// This verifies that the formatter doesn't "double-encode".
Assert.Equal(packedData, proto.ProtoData);
}
[Fact]
public void ConvertToProto_MessageData_CustomTypeUrlPrefix()
{
string typeUrlPrefix = "cloudevents.io/xyz";
var data = new PayloadData1 { Name = "test" };
var cloudEvent = new CloudEvent { Data = data }.PopulateRequiredAttributes();
var proto = new ProtobufEventFormatter(typeUrlPrefix).ConvertToProto(cloudEvent);
Assert.Equal(Any.Pack(data, typeUrlPrefix), proto.ProtoData);
}
[Fact]
public void ConvertToProto_InvalidData()
{
var cloudEvent = new CloudEvent { Data = new object() }.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
Assert.Throws<ArgumentException>(() => formatter.ConvertToProto(cloudEvent));
}
[Fact]
public void EncodeBinaryModeData_Bytes()
{
var cloudEvent = new CloudEvent
{
Data = SampleBinaryData
}.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
var result = formatter.EncodeBinaryModeEventData(cloudEvent);
Assert.Equal(SampleBinaryData, result.ToArray());
}
[Theory]
[InlineData("utf-8")]
[InlineData("iso-8859-1")]
[InlineData(null)]
public void EncodeBinaryModeData_String_TextContentType(string charset)
{
string text = "caf\u00e9"; // Valid in both UTF-8 and ISO-8859-1, but with different representations
var encoding = charset is null ? Encoding.UTF8 : Encoding.GetEncoding(charset);
string contentType = charset is null ? "text/plain" : $"text/plain; charset={charset}";
var cloudEvent = new CloudEvent
{
Data = text,
DataContentType = contentType
}.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
var result = formatter.EncodeBinaryModeEventData(cloudEvent);
Assert.Equal(encoding.GetBytes(text), result.ToArray());
}
[Fact]
public void EncodeBinaryModeData_String_NonTextContentType()
{
var cloudEvent = new CloudEvent
{
Data = "text",
DataContentType = "application/json"
}.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
Assert.Throws<ArgumentException>(() => formatter.EncodeBinaryModeEventData(cloudEvent));
}
[Fact]
public void EncodeBinaryModeData_ProtoMessage()
{
var cloudEvent = new CloudEvent
{
Data = new PayloadData1 { Name = "fail" },
DataContentType = "application/protobuf"
}.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
// See summary documentation for ProtobufEventFormatter for the reasoning for this
Assert.Throws<ArgumentException>(() => formatter.EncodeBinaryModeEventData(cloudEvent));
}
[Fact]
public void EncodeBinaryModeData_ArbitraryObject()
{
var cloudEvent = new CloudEvent
{
Data = new object(),
DataContentType = "application/octet-stream"
}.PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
// See summary documentation for ProtobufEventFormatter for the reasoning for this
Assert.Throws<ArgumentException>(() => formatter.EncodeBinaryModeEventData(cloudEvent));
}
[Fact]
public void EncodeBinaryModeData_NoData()
{
var cloudEvent = new CloudEvent().PopulateRequiredAttributes();
var formatter = new ProtobufEventFormatter();
Assert.Empty(formatter.EncodeBinaryModeEventData(cloudEvent).ToArray());
}
// Just a single test for the code that parses asynchronously... the guts are all the same.
[Fact]
public async Task DecodeStructuredModeMessageAsync_Minimal()
{
var proto = CreateMinimalCloudEventProto();
byte[] bytes = proto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvent = await formatter.DecodeStructuredModeMessageAsync(stream, s_protobufCloudEventContentType, null);
Assert.Equal("test-type", cloudEvent.Type);
Assert.Equal("test-id", cloudEvent.Id);
Assert.Equal(SampleUri, cloudEvent.Source);
}
[Fact]
public void DecodeStructuredModeMessage_Minimal()
{
var proto = CreateMinimalCloudEventProto();
byte[] bytes = proto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvent = formatter.DecodeStructuredModeMessage(stream, s_protobufCloudEventContentType, null);
Assert.Equal("test-type", cloudEvent.Type);
Assert.Equal("test-id", cloudEvent.Id);
Assert.Equal(SampleUri, cloudEvent.Source);
}
[Fact]
public void EncodeBatchModeMessage_Empty()
{
var formatter = new ProtobufEventFormatter();
var bytes = formatter.EncodeBatchModeMessage(new CloudEvent[0], out var contentType);
Assert.Equal("application/cloudevents-batch+protobuf; charset=utf-8", contentType.ToString());
var batch = V1.CloudEventBatch.Parser.ParseFrom(bytes.ToArray());
Assert.Empty(batch.Events);
}
[Fact]
public void EncodeBatchModeMessage_TwoEvents()
{
var event1 = new CloudEvent
{
Id = "event1",
Type = "type1",
Source = new Uri("//event-source1", UriKind.RelativeOrAbsolute),
Data = "simple text",
DataContentType = "text/plain"
};
var event2 = new CloudEvent
{
Id = "event2",
Type = "type2",
Source = new Uri("//event-source2", UriKind.RelativeOrAbsolute),
};
var cloudEvents = new[] { event1, event2 };
var formatter = new ProtobufEventFormatter();
var bytes = formatter.EncodeBatchModeMessage(cloudEvents, out var contentType);
Assert.Equal("application/cloudevents-batch+protobuf; charset=utf-8", contentType.ToString());
var actualBatch = V1.CloudEventBatch.Parser.ParseFrom(bytes.ToArray());
var expectedBatch = new V1.CloudEventBatch
{
Events =
{
new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "type1",
Id = "event1",
Source = "//event-source1",
TextData = "simple text",
Attributes = { { "datacontenttype", StringAttribute("text/plain") } }
},
new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "type2",
Id = "event2",
Source = "//event-source2"
}
}
};
Assert.Equal(expectedBatch, actualBatch);
}
[Fact]
public void EncodeBatchModeMessage_Invalid()
{
var formatter = new ProtobufEventFormatter();
// Invalid CloudEvent
Assert.Throws<ArgumentException>(() => formatter.EncodeBatchModeMessage(new[] { new CloudEvent() }, out _));
// Null argument
Assert.Throws<ArgumentNullException>(() => formatter.EncodeBatchModeMessage(null!, out _));
// Null value within the argument. Arguably this should throw ArgumentException instead of
// ArgumentNullException, but it's unlikely to cause confusion.
Assert.Throws<ArgumentNullException>(() => formatter.EncodeBatchModeMessage(new CloudEvent[1], out _));
}
[Fact]
public void ConvertFromProto_V1Attributes()
{
var proto = new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "test-type",
Id = "test-id",
TextData = "text",
Source = "//event-source",
Attributes =
{
{ "datacontenttype", StringAttribute("text/plain") },
{ "dataschema", UriAttribute("https://data-schema") },
{ "subject", StringAttribute("event-subject") },
{ "time", TimestampAttribute(SampleTimestamp) }
}
};
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("test-type", cloudEvent.Type);
Assert.Equal("test-id", cloudEvent.Id);
Assert.Equal("text/plain", cloudEvent.DataContentType);
Assert.Equal(new Uri("https://data-schema"), cloudEvent.DataSchema);
Assert.Equal("event-subject", cloudEvent.Subject);
Assert.Equal(new Uri("//event-source", UriKind.RelativeOrAbsolute), cloudEvent.Source);
// The protobuf timestamp loses the offset information, but is still the correct instant.
AssertTimestampsEqual(SampleTimestamp.ToUniversalTime(), cloudEvent.Time);
}
[Theory]
// These are required, so have to be specified in the dedicated protobuf field
[InlineData("id")]
[InlineData("type")]
[InlineData("source")]
// These are generally invalid attribute names
[InlineData("specversion")]
[InlineData("a b c")]
[InlineData("ABC")]
public void ConvertFromProto_InvalidAttributeNames(string attributeName)
{
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add(attributeName, StringAttribute("value"));
var formatter = new ProtobufEventFormatter();
Assert.Throws<ArgumentException>(() => formatter.ConvertFromProto(proto, null));
}
[Fact]
public void ConvertFromProto_AllAttributeTypes()
{
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add("binary", BinaryAttribute(SampleBinaryData));
proto.Attributes.Add("boolean", BooleanAttribute(true));
proto.Attributes.Add("integer", IntegerAttribute(10));
proto.Attributes.Add("string", StringAttribute("text"));
proto.Attributes.Add("timestamp", TimestampAttribute(SampleTimestamp));
proto.Attributes.Add("uri", UriAttribute(SampleUriText));
proto.Attributes.Add("urireference", UriRefAttribute(SampleUriReferenceText));
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Equal(SampleBinaryData, cloudEvent["binary"]);
Assert.True((bool) cloudEvent["boolean"]!);
Assert.Equal(10, cloudEvent["integer"]);
Assert.Equal("text", cloudEvent["string"]);
// The protobuf timestamp loses the offset information, but is still the correct instant.
AssertTimestampsEqual(SampleTimestamp.ToUniversalTime(), (DateTimeOffset) cloudEvent["timestamp"]!);
Assert.Equal(SampleUri, cloudEvent["uri"]);
Assert.Equal(SampleUriReference, cloudEvent["urireference"]);
}
[Fact]
public void ConvertFromProto_NoData()
{
var proto = CreateMinimalCloudEventProto();
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Null(cloudEvent.Data);
}
[Fact]
public void ConvertFromProto_TextData()
{
var proto = CreateMinimalCloudEventProto();
proto.TextData = "text";
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Equal("text", cloudEvent.Data);
}
[Fact]
public void ConvertFromProto_BinaryData()
{
var proto = CreateMinimalCloudEventProto();
proto.BinaryData = ByteString.CopyFrom(SampleBinaryData);
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Equal(SampleBinaryData, cloudEvent.Data);
}
[Fact]
public void ConvertFromProto_MessageData()
{
var message = new PayloadData1 { Name = "testing" };
var proto = CreateMinimalCloudEventProto();
proto.ProtoData = Any.Pack(message);
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
// Note: this isn't unpacked automatically.
Assert.Equal(Any.Pack(message), cloudEvent.Data);
}
[Fact]
public void ConvertFromProto_UnspecifiedExtensionAttributes()
{
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add("xyz", StringAttribute("abc"));
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, null);
Assert.Equal("abc", cloudEvent["xyz"]);
}
[Fact]
public void ConvertFromProto_SpecifiedExtensionAttributes_Valid()
{
var attribute = CloudEventAttribute.CreateExtension("xyz", CloudEventAttributeType.String);
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add(attribute.Name, StringAttribute("abc"));
var cloudEvent = new ProtobufEventFormatter().ConvertFromProto(proto, new[] { attribute });
Assert.Equal("abc", cloudEvent[attribute]);
}
[Fact]
public void ConvertFromProto_SpecifiedExtensionAttributes_UnexpectedType()
{
var attribute = CloudEventAttribute.CreateExtension("xyz", CloudEventAttributeType.UriReference);
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add(attribute.Name, UriAttribute("https://xyz"));
// Even though the value would be valid as a URI reference, we fail because
// the type in the proto message is not the same as the type we've specified in the method argument.
Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, new[] { attribute }));
}
[Fact]
public void ConvertFromProto_SpecifiedExtensionAttributes_InvalidValue()
{
var attribute = CloudEventAttribute.CreateExtension("xyz", CloudEventAttributeType.Integer, ValidateValue);
var proto = CreateMinimalCloudEventProto();
proto.Attributes.Add(attribute.Name, IntegerAttribute(1000));
var exception = Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, new[] { attribute }));
Assert.Equal("Boom!", exception!.InnerException!.Message);
void ValidateValue(object value)
{
if ((int) value > 100)
{
throw new Exception("Boom!");
}
}
}
[Fact]
public void ConvertFromProto_Invalid_NoSpecVersion()
{
var proto = CreateMinimalCloudEventProto();
proto.SpecVersion = "";
var exception = Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, null));
}
[Fact]
public void ConvertFromProto_Invalid_NoType()
{
var proto = CreateMinimalCloudEventProto();
proto.SpecVersion = "";
var exception = Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, null));
}
[Fact]
public void ConvertFromProto_Invalid_NoId()
{
var proto = CreateMinimalCloudEventProto();
proto.Id = "";
var exception = Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, null));
}
[Fact]
public void ConvertFromProto_Invalid_NoSource()
{
var proto = CreateMinimalCloudEventProto();
proto.Source = "";
var exception = Assert.Throws<ArgumentException>(() => new ProtobufEventFormatter().ConvertFromProto(proto, null));
}
[Theory]
[InlineData("utf-8")]
[InlineData("iso-8859-1")]
[InlineData(null)]
public void DecodeBinaryModeEventData_Text(string charset)
{
string text = "caf\u00e9"; // Valid in both UTF-8 and ISO-8859-1, but with different representations
var encoding = charset is null ? Encoding.UTF8 : Encoding.GetEncoding(charset);
var bytes = encoding.GetBytes(text);
string contentType = charset is null ? "text/plain" : $"text/plain; charset={charset}";
var data = DecodeBinaryModeEventData(bytes, contentType);
string actualText = Assert.IsType<string>(data);
Assert.Equal(text, actualText);
}
[Theory]
[InlineData("application/json")]
[InlineData(null)]
public void DecodeBinaryModeData_NonTextContentType(string contentType)
{
var bytes = Encoding.UTF8.GetBytes("{}");
var data = DecodeBinaryModeEventData(bytes, contentType);
byte[] actualBytes = Assert.IsType<byte[]>(data);
Assert.Equal(bytes, actualBytes);
}
[Fact]
public void DecodeBatchMode_Minimal()
{
var batchProto = new V1.CloudEventBatch
{
Events = { CreateMinimalCloudEventProto() }
};
byte[] bytes = batchProto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvents = formatter.DecodeBatchModeMessage(stream, s_protobufCloudEventBatchContentType, null);
var cloudEvent = Assert.Single(cloudEvents);
Assert.Equal("test-type", cloudEvent.Type);
Assert.Equal("test-id", cloudEvent.Id);
Assert.Equal(SampleUri, cloudEvent.Source);
}
// Just a single test for the code that parses asynchronously... the guts are all the same.
[Fact]
public async Task DecodeBatchModeMessageAsync_Minimal()
{
var batchProto = new V1.CloudEventBatch
{
Events = { CreateMinimalCloudEventProto() }
};
byte[] bytes = batchProto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvents = await formatter.DecodeBatchModeMessageAsync(stream, s_protobufCloudEventBatchContentType, null);
var cloudEvent = Assert.Single(cloudEvents);
Assert.Equal("test-type", cloudEvent.Type);
Assert.Equal("test-id", cloudEvent.Id);
Assert.Equal(SampleUri, cloudEvent.Source);
}
[Fact]
public void DecodeBatchMode_Empty()
{
var batchProto = new V1.CloudEventBatch();
byte[] bytes = batchProto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvents = formatter.DecodeBatchModeMessage(stream, s_protobufCloudEventBatchContentType, null);
Assert.Empty(cloudEvents);
}
[Fact]
public void DecodeBatchMode_Multiple()
{
var batchProto = new V1.CloudEventBatch
{
Events =
{
new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "type1",
Id = "event1",
Source = "//event-source1",
TextData = "simple text",
Attributes = { { "datacontenttype", StringAttribute("text/plain") } }
},
new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "type2",
Id = "event2",
Source = "//event-source2"
}
}
};
byte[] bytes = batchProto.ToByteArray();
var stream = new MemoryStream(bytes);
var formatter = new ProtobufEventFormatter();
var cloudEvents = formatter.DecodeBatchModeMessage(stream, s_protobufCloudEventBatchContentType, null);
Assert.Equal(2, cloudEvents.Count);
var event1 = cloudEvents[0];
Assert.Equal("type1", event1.Type);
Assert.Equal("event1", event1.Id);
Assert.Equal(new Uri("//event-source1", UriKind.RelativeOrAbsolute), event1.Source);
Assert.Equal("simple text", event1.Data);
Assert.Equal("text/plain", event1.DataContentType);
var event2 = cloudEvents[1];
Assert.Equal("type2", event2.Type);
Assert.Equal("event2", event2.Id);
Assert.Equal(new Uri("//event-source2", UriKind.RelativeOrAbsolute), event2.Source);
Assert.Null(event2.Data);
Assert.Null(event2.DataContentType);
}
// Utility methods
private static object? DecodeBinaryModeEventData(byte[] bytes, string contentType)
{
var cloudEvent = new CloudEvent().PopulateRequiredAttributes();
cloudEvent.DataContentType = contentType;
new ProtobufEventFormatter().DecodeBinaryModeEventData(bytes, cloudEvent);
return cloudEvent.Data;
}
private static CloudEventAttributeValue StringAttribute(string value) =>
new CloudEventAttributeValue { CeString = value };
private static CloudEventAttributeValue BinaryAttribute(byte[] value) =>
new CloudEventAttributeValue { CeBytes = ByteString.CopyFrom(value) };
private static CloudEventAttributeValue BooleanAttribute(bool value) =>
new CloudEventAttributeValue { CeBoolean = value };
private static CloudEventAttributeValue IntegerAttribute(int value) =>
new CloudEventAttributeValue { CeInteger = value };
private static CloudEventAttributeValue UriAttribute(string value) =>
new CloudEventAttributeValue { CeUri = value };
private static CloudEventAttributeValue UriRefAttribute(string value) =>
new CloudEventAttributeValue { CeUriRef = value };
private static CloudEventAttributeValue TimestampAttribute(Timestamp value) =>
new CloudEventAttributeValue { CeTimestamp = value };
private static CloudEventAttributeValue TimestampAttribute(DateTimeOffset value) =>
TimestampAttribute(Timestamp.FromDateTimeOffset(value));
private static V1.CloudEvent CreateMinimalCloudEventProto() => new V1.CloudEvent
{
SpecVersion = "1.0",
Type = "test-type",
Id = "test-id",
Source = SampleUriText
};
}
}

View File

@ -0,0 +1,233 @@
// <auto-generated>
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: test_messages.proto
// </auto-generated>
#pragma warning disable 1591, 0612, 3021
#region Designer generated code
using pb = global::Google.Protobuf;
using pbc = global::Google.Protobuf.Collections;
using pbr = global::Google.Protobuf.Reflection;
using scg = global::System.Collections.Generic;
namespace CloudNative.CloudEvents.Protobuf.UnitTests {
/// <summary>Holder for reflection information generated from test_messages.proto</summary>
public static partial class TestMessagesReflection {
#region Descriptor
/// <summary>File descriptor for test_messages.proto</summary>
public static pbr::FileDescriptor Descriptor {
get { return descriptor; }
}
private static pbr::FileDescriptor descriptor;
static TestMessagesReflection() {
byte[] descriptorData = global::System.Convert.FromBase64String(
string.Concat(
"ChN0ZXN0X21lc3NhZ2VzLnByb3RvEhdpby5jbG91ZGV2ZW50cy52MS50ZXN0",
"cyIcCgxQYXlsb2FkRGF0YTESDAoEbmFtZRgBIAEoCUItqgIqQ2xvdWROYXRp",
"dmUuQ2xvdWRFdmVudHMuUHJvdG9idWYuVW5pdFRlc3RzYgZwcm90bzM="));
descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData,
new pbr::FileDescriptor[] { },
new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] {
new pbr::GeneratedClrTypeInfo(typeof(global::CloudNative.CloudEvents.Protobuf.UnitTests.PayloadData1), global::CloudNative.CloudEvents.Protobuf.UnitTests.PayloadData1.Parser, new[]{ "Name" }, null, null, null, null)
}));
}
#endregion
}
#region Messages
public sealed partial class PayloadData1 : pb::IMessage<PayloadData1>
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
, pb::IBufferMessage
#endif
{
private static readonly pb::MessageParser<PayloadData1> _parser = new pb::MessageParser<PayloadData1>(() => new PayloadData1());
private pb::UnknownFieldSet _unknownFields;
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public static pb::MessageParser<PayloadData1> Parser { get { return _parser; } }
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public static pbr::MessageDescriptor Descriptor {
get { return global::CloudNative.CloudEvents.Protobuf.UnitTests.TestMessagesReflection.Descriptor.MessageTypes[0]; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
pbr::MessageDescriptor pb::IMessage.Descriptor {
get { return Descriptor; }
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public PayloadData1() {
OnConstruction();
}
partial void OnConstruction();
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public PayloadData1(PayloadData1 other) : this() {
name_ = other.name_;
_unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public PayloadData1 Clone() {
return new PayloadData1(this);
}
/// <summary>Field number for the "name" field.</summary>
public const int NameFieldNumber = 1;
private string name_ = "";
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public string Name {
get { return name_; }
set {
name_ = pb::ProtoPreconditions.CheckNotNull(value, "value");
}
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override bool Equals(object other) {
return Equals(other as PayloadData1);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public bool Equals(PayloadData1 other) {
if (ReferenceEquals(other, null)) {
return false;
}
if (ReferenceEquals(other, this)) {
return true;
}
if (Name != other.Name) return false;
return Equals(_unknownFields, other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override int GetHashCode() {
int hash = 1;
if (Name.Length != 0) hash ^= Name.GetHashCode();
if (_unknownFields != null) {
hash ^= _unknownFields.GetHashCode();
}
return hash;
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public override string ToString() {
return pb::JsonFormatter.ToDiagnosticString(this);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void WriteTo(pb::CodedOutputStream output) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
output.WriteRawMessage(this);
#else
if (Name.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Name);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(output);
}
#endif
}
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
void pb::IBufferMessage.InternalWriteTo(ref pb::WriteContext output) {
if (Name.Length != 0) {
output.WriteRawTag(10);
output.WriteString(Name);
}
if (_unknownFields != null) {
_unknownFields.WriteTo(ref output);
}
}
#endif
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public int CalculateSize() {
int size = 0;
if (Name.Length != 0) {
size += 1 + pb::CodedOutputStream.ComputeStringSize(Name);
}
if (_unknownFields != null) {
size += _unknownFields.CalculateSize();
}
return size;
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void MergeFrom(PayloadData1 other) {
if (other == null) {
return;
}
if (other.Name.Length != 0) {
Name = other.Name;
}
_unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields);
}
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
public void MergeFrom(pb::CodedInputStream input) {
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
input.ReadRawMessage(this);
#else
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, input);
break;
case 10: {
Name = input.ReadString();
break;
}
}
}
#endif
}
#if !GOOGLE_PROTOBUF_REFSTRUCT_COMPATIBILITY_MODE
[global::System.Diagnostics.DebuggerNonUserCodeAttribute]
[global::System.CodeDom.Compiler.GeneratedCode("protoc", null)]
void pb::IBufferMessage.InternalMergeFrom(ref pb::ParseContext input) {
uint tag;
while ((tag = input.ReadTag()) != 0) {
switch(tag) {
default:
_unknownFields = pb::UnknownFieldSet.MergeFieldFrom(_unknownFields, ref input);
break;
case 10: {
Name = input.ReadString();
break;
}
}
}
}
#endif
}
#endregion
}
#endregion Designer generated code

View File

@ -0,0 +1,9 @@
syntax = "proto3";
package io.cloudevents.v1.tests;
option csharp_namespace = "CloudNative.CloudEvents.Protobuf.UnitTests";
message PayloadData1 {
string name = 1;
}