Adding support for Avro encoding.

Signed-off-by: clemensv <clemensv@microsoft.com>
This commit is contained in:
clemensv 2020-01-15 16:31:33 +01:00
parent 91ff29512b
commit 8ed5dcfee4
6 changed files with 363 additions and 0 deletions

View File

@ -29,6 +29,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Asp
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.IntegrationTests", "test\CloudNative.CloudEvents.IntegrationTests\CloudNative.CloudEvents.IntegrationTests.csproj", "{9639E4FD-0438-4901-B57F-EFF773B19D5A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CloudNative.CloudEvents.Avro", "src\CloudNative.CloudEvents.Avro\CloudNative.CloudEvents.Avro.csproj", "{E4BE54BF-F4D7-495F-9278-07E5A8C79935}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -147,6 +149,18 @@ Global
{9639E4FD-0438-4901-B57F-EFF773B19D5A}.Release|x64.Build.0 = Release|Any CPU
{9639E4FD-0438-4901-B57F-EFF773B19D5A}.Release|x86.ActiveCfg = Release|Any CPU
{9639E4FD-0438-4901-B57F-EFF773B19D5A}.Release|x86.Build.0 = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|x64.ActiveCfg = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|x64.Build.0 = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|x86.ActiveCfg = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Debug|x86.Build.0 = Debug|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|Any CPU.Build.0 = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|x64.ActiveCfg = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|x64.Build.0 = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|x86.ActiveCfg = Release|Any CPU
{E4BE54BF-F4D7-495F-9278-07E5A8C79935}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -0,0 +1,173 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
namespace CloudNative.CloudEvents
{
using Avro;
using Avro.Generic;
using Avro.IO;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
using System.Threading.Tasks;
/// <summary>
/// Formatter that implements the JSON Event Format
/// </summary>
public class AvroEventFormatter : ICloudEventFormatter
{
static readonly RecordSchema avroSchema;
static readonly DefaultReader avroReader;
static readonly DefaultWriter avroWriter;
static AvroEventFormatter()
{
// we're going to confidently assume that the embedded schema works. If not, type initialization
// will fail and that's okay since the type is useless without the proper schema
using ( var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
{
avroSchema = (RecordSchema)RecordSchema.Parse(sr.ReadToEnd());
}
avroReader = new DefaultReader(avroSchema, avroSchema);
avroWriter = new DefaultWriter(avroSchema);
}
public const string MediaTypeSuffix = "+avro";
public CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public async Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<ICloudEventExtension> extensions)
{
return DecodeStructuredEvent(data, extensions);
}
public CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<ICloudEventExtension> extensions = null)
{
var decoder = new Avro.IO.BinaryDecoder(data);
var rawEvent = avroReader.Read<GenericRecord>(null, decoder);
return DecodeGenericRecord(rawEvent, extensions);
}
public CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
return DecodeStructuredEvent(new MemoryStream(data), extensions);
}
public CloudEvent DecodeGenericRecord(GenericRecord record, IEnumerable<ICloudEventExtension> extensions = null)
{
if (!record.TryGetValue("attribute", out var attrObj))
{
return null;
}
IDictionary<string, object> recordAttributes = (IDictionary<string, object>)attrObj;
object data = null;
if (!record.TryGetValue("data", out data))
{
data = null;
}
CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
var cloudEvent = new CloudEvent(specVersion, extensions);
cloudEvent.Data = data;
var attributes = cloudEvent.GetAttributes();
foreach (var keyValuePair in recordAttributes)
{
// skip the version since we set that above
if (keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_1), StringComparison.InvariantCultureIgnoreCase) ||
keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V0_2), StringComparison.InvariantCultureIgnoreCase) ||
keyValuePair.Key.Equals(CloudEventAttributes.SpecVersionAttributeName(CloudEventsSpecVersion.V1_0), StringComparison.InvariantCultureIgnoreCase))
{
continue;
}
if (keyValuePair.Key == CloudEventAttributes.SourceAttributeName() ||
keyValuePair.Key == CloudEventAttributes.DataSchemaAttributeName())
{
attributes[keyValuePair.Key] = new Uri((string)keyValuePair.Value);
}
else if (keyValuePair.Key == CloudEventAttributes.TimeAttributeName())
{
attributes[keyValuePair.Key] = DateTime.Parse((string)keyValuePair.Value);
}
else
{
attributes[keyValuePair.Key] = keyValuePair.Value;
}
}
return cloudEvent;
}
public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
{
contentType = new ContentType(CloudEvent.MediaType+AvroEventFormatter.MediaTypeSuffix);
GenericRecord record = new GenericRecord(avroSchema);
var recordAttributes = new Dictionary<string, object>();
var attributes = cloudEvent.GetAttributes();
foreach (var keyValuePair in attributes)
{
if (keyValuePair.Value == null)
{
continue;
}
if (keyValuePair.Value is ContentType && !string.IsNullOrEmpty(((ContentType)keyValuePair.Value).MediaType))
{
recordAttributes[keyValuePair.Key] = ((ContentType)keyValuePair.Value).ToString();
}
else if (keyValuePair.Value is Uri)
{
recordAttributes[keyValuePair.Key] = ((Uri)keyValuePair.Value).ToString();
}
else if (keyValuePair.Value is DateTime)
{
recordAttributes[keyValuePair.Key] = ((DateTime)keyValuePair.Value).ToString("o");
}
else if (cloudEvent.SpecVersion == CloudEventsSpecVersion.V1_0 &&
keyValuePair.Key.Equals(CloudEventAttributes.DataAttributeName(cloudEvent.SpecVersion)))
{
if (keyValuePair.Value is Stream)
{
using (var sr = new BinaryReader((Stream)keyValuePair.Value))
{
record.Add("data", sr.ReadBytes((int)sr.BaseStream.Length));
}
}
else
{
record.Add("data", keyValuePair.Value);
}
}
else
{
recordAttributes[keyValuePair.Key] = keyValuePair.Value;
}
}
record.Add("attribute", recordAttributes);
MemoryStream memStream = new MemoryStream();
BinaryEncoder encoder = new BinaryEncoder(memStream);
avroWriter.Write(record, encoder);
return new Span<byte>(memStream.GetBuffer(), 0, (int)memStream.Length).ToArray();
}
public object DecodeAttribute(CloudEventsSpecVersion specVersion, string name, byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
throw new NotSupportedException("Encoding invidual attributes is not supported for Apache Avro");
}
public byte[] EncodeAttribute(CloudEventsSpecVersion specVersion, string name, object value, IEnumerable<ICloudEventExtension> extensions = null)
{
throw new NotSupportedException("Encoding invidual attributes is not supported for Apache Avro");
}
}
}

View File

@ -0,0 +1,57 @@
{
"namespace": "io.cloudevents",
"type": "record",
"name": "CloudEvent",
"version": "1.0",
"doc": "Avro Event Format for CloudEvents",
"fields": [
{
"name": "attribute",
"type": {
"type": "map",
"values": ["null", "boolean", "int", "string", "bytes"]
}
},
{
"name": "data",
"type": [
"bytes",
"null",
"boolean",
{
"type": "map",
"values": [
"null",
"boolean",
{
"type": "record",
"name": "CloudEventData",
"doc": "Representation of a JSON Value",
"fields": [
{
"name": "value",
"type": {
"type": "map",
"values": [
"null",
"boolean",
{ "type": "map", "values": "CloudEventData" },
{ "type": "array", "items": "CloudEventData" },
"double",
"string"
]
}
}
]
},
"double",
"string"
]
},
{ "type": "array", "items": "CloudEventData" },
"double",
"string"
]
}
]
}

View File

@ -0,0 +1,29 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<PackageVersion>1.0</PackageVersion>
<Description>Avro extensions for CloudNative.CloudEvents</Description>
<Copyright>Copyright Cloud Native Foundation</Copyright>
<RepositoryUrl>https://github.com/cloudevents/sdk-csharp</RepositoryUrl>
<PackageProjectUrl>https://cloudevents.io</PackageProjectUrl>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.Avro" Version="1.9.1" />
<PackageReference Include="System.Memory" Version="4.5.3" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>
<ItemGroup>
<Content Remove="AvroSchema.json" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="AvroSchema.json" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,89 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
namespace CloudNative.CloudEvents.UnitTests
{
using System;
using System.Net.Mime;
using System.Text;
using Xunit;
public class AvroTest
{
const string jsonv10 =
"{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"com.github.pull.create\",\n" +
" \"source\" : \"https://github.com/cloudevents/spec/pull/123\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"time\" : \"2018-04-05T17:31:00Z\",\n" +
" \"comexampleextension1\" : \"value\",\n" +
" \"datacontenttype\" : \"text/xml\",\n" +
" \"data\" : \"<much wow=\\\"xml\\\"/>\"\n" +
"}";
[Fact]
public void ReserializeTest()
{
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
var avroData = avroFormatter.EncodeStructuredEvent(cloudEvent, out var contentType);
var cloudEvent2 = avroFormatter.DecodeStructuredEvent(avroData);
Assert.Equal(cloudEvent2.SpecVersion, cloudEvent.SpecVersion);
Assert.Equal(cloudEvent2.Type, cloudEvent.Type);
Assert.Equal(cloudEvent2.Source, cloudEvent.Source);
Assert.Equal(cloudEvent2.Id, cloudEvent.Id);
Assert.Equal(cloudEvent2.Time.Value.ToUniversalTime(), cloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(cloudEvent2.DataContentType, cloudEvent.DataContentType);
Assert.Equal(cloudEvent2.Data, cloudEvent.Data);
}
[Fact]
public void StructuredParseSuccess10()
{
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData);
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", cloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), cloudEvent.Source);
Assert.Equal("A234-1234-1234", cloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
cloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), cloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", cloudEvent.Data);
var attr = cloudEvent.GetAttributes();
Assert.Equal("value", (string)attr["comexampleextension1"]);
}
[Fact]
public void StructuredParseWithExtensionsSuccess10()
{
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10), new ComExampleExtension1Extension());
var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData, new ComExampleExtension1Extension());
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", cloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), cloudEvent.Source);
Assert.Equal("A234-1234-1234", cloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
cloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), cloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", cloudEvent.Data);
Assert.Equal("value", cloudEvent.Extension<ComExampleExtension1Extension>().ComExampleExtension1);
}
}
}

View File

@ -20,6 +20,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Amqp\CloudNative.CloudEvents.Amqp.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Avro\CloudNative.CloudEvents.Avro.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Kafka\CloudNative.CloudEvents.Kafka.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents.Mqtt\CloudNative.CloudEvents.Mqtt.csproj" />
<ProjectReference Include="..\..\src\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />