Add optional IGenericRecordSerializer to Avro formatter

Signed-off-by: Kirner- <kirner10@gmail.com>
This commit is contained in:
Kirner- 2023-07-08 22:14:41 +01:00 committed by Peter Trevan
parent 88f7225313
commit 4f4987ece1
6 changed files with 210 additions and 22 deletions

View File

@ -4,7 +4,7 @@
using Avro;
using Avro.Generic;
using Avro.IO;
using CloudNative.CloudEvents.Avro.Interfaces;
using CloudNative.CloudEvents.Core;
using System;
using System.Collections.Generic;
@ -41,30 +41,34 @@ namespace CloudNative.CloudEvents.Avro
private const string DataName = "data";
private static readonly string CloudEventAvroMediaType = MimeUtilities.MediaType + MediaTypeSuffix;
private static readonly RecordSchema avroSchema;
private static readonly DefaultReader avroReader;
private static readonly DefaultWriter avroWriter;
static AvroEventFormatter()
private readonly IGenericRecordSerializer serializer;
/// <summary>
/// Creates an AvroEventFormatter using the default serializer.
/// </summary>
public AvroEventFormatter() : this(new BasicGenericRecordSerializer()) { }
/// <summary>
/// Creates an AvroEventFormatter that uses a custom <see cref="IGenericRecordSerializer"/>.
/// </summary>
/// <remarks>
/// It is recommended to use the default serializer before defining your own wherever possible.
/// </remarks>
public AvroEventFormatter(IGenericRecordSerializer genericRecordSerializer)
{
// We're going to confidently assume that the embedded schema works. If not, type initialization
// will fail and that's okay since the type is useless without the proper schema.
using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
{
avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd());
}
avroReader = new DefaultReader(avroSchema, avroSchema);
avroWriter = new DefaultWriter(avroSchema);
serializer = genericRecordSerializer;
}
/// <summary>
/// Avro schema used to serialize and deserialize the CloudEvent.
/// </summary>
public static RecordSchema AvroSchema { get; } = ParseEmbeddedSchema();
/// <inheritdoc />
public override CloudEvent DecodeStructuredModeMessage(Stream body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
{
Validation.CheckNotNull(body, nameof(body));
var decoder = new BinaryDecoder(body);
// The reuse parameter *is* allowed to be null...
var rawEvent = avroReader.Read<GenericRecord>(reuse: null!, decoder);
var rawEvent = serializer.Deserialize(body);
return DecodeGenericRecord(rawEvent, extensionAttributes);
}
@ -146,7 +150,7 @@ namespace CloudNative.CloudEvents.Avro
contentType = new ContentType(CloudEventAvroMediaType);
// We expect the Avro encoded to detect data types that can't be represented in the schema.
GenericRecord record = new GenericRecord(avroSchema);
GenericRecord record = new GenericRecord(AvroSchema);
record.Add(DataName, cloudEvent.Data);
var recordAttributes = new Dictionary<string, object>();
recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
@ -162,9 +166,7 @@ namespace CloudNative.CloudEvents.Avro
recordAttributes[attribute.Name] = avroValue;
}
record.Add(AttributeName, recordAttributes);
MemoryStream memStream = new MemoryStream();
BinaryEncoder encoder = new BinaryEncoder(memStream);
avroWriter.Write(record, encoder);
var memStream = serializer.Serialize(record);
return memStream.ToArray();
}
@ -176,5 +178,16 @@ namespace CloudNative.CloudEvents.Avro
/// <inheritdoc />
public override void DecodeBinaryModeEventData(ReadOnlyMemory<byte> body, CloudEvent cloudEvent) =>
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
private static RecordSchema ParseEmbeddedSchema()
{
// We're going to confidently assume that the embedded schema works. If not, type initialization
// will fail and that's okay since the type is useless without the proper schema.
using var sr = new StreamReader(typeof(AvroEventFormatter)
.Assembly
.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json"));
return (RecordSchema) Schema.Parse(sr.ReadToEnd());
}
}
}

View File

@ -0,0 +1,47 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using Avro.Generic;
using Avro.IO;
using CloudNative.CloudEvents.Avro.Interfaces;
using System;
using System.IO;
namespace CloudNative.CloudEvents.Avro;
/// <summary>
/// The default implementation of the <see cref="IGenericRecordSerializer"/>.
/// </summary>
/// <remarks>
/// Makes use of the Avro <see cref="DefaultReader"/> and <see cref="DefaultWriter"/>
/// together with the embedded Avro schema.
/// </remarks>
internal sealed class BasicGenericRecordSerializer : IGenericRecordSerializer
{
private readonly DefaultReader avroReader;
private readonly DefaultWriter avroWriter;
public BasicGenericRecordSerializer()
{
avroReader = new DefaultReader(AvroEventFormatter.AvroSchema, AvroEventFormatter.AvroSchema);
avroWriter = new DefaultWriter(AvroEventFormatter.AvroSchema);
}
/// <inheritdoc />
public ReadOnlyMemory<byte> Serialize(GenericRecord record)
{
var memStream = new MemoryStream();
var encoder = new BinaryEncoder(memStream);
avroWriter.Write(record, encoder);
return memStream.ToArray();
}
/// <inheritdoc />
public GenericRecord Deserialize(Stream rawMessagebody)
{
var decoder = new BinaryDecoder(rawMessagebody);
// The reuse parameter *is* allowed to be null...
return avroReader.Read<GenericRecord>(reuse: null!, decoder);
}
}

View File

@ -0,0 +1,39 @@
using Avro.Generic;
using System;
using System.IO;
namespace CloudNative.CloudEvents.Avro.Interfaces;
/// <summary>
/// Used to serialize and deserialize an Avro <see cref="GenericRecord"/>
/// matching the <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
/// CloudEvent Avro schema</see>.
/// </summary>
/// <remarks>
/// <para>
/// An implementation of this interface can optionally be supplied to the <see cref="AvroEventFormatter"/> in cases
/// where a custom Avro serialiser is required for integration with pre-existing tools/infrastructure.
/// </para>
/// <para>
/// It is recommended to use the default serializer before defining your own wherever possible.
/// </para>
/// </remarks>
public interface IGenericRecordSerializer
{
/// <summary>
/// Serialize an Avro <see cref="GenericRecord"/>.
/// </summary>
/// <remarks>
/// The record is guaranteed to match the
/// <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
/// CloudEvent Avro schema</see>.
/// </remarks>
ReadOnlyMemory<byte> Serialize(GenericRecord value);
/// <summary>
/// Deserialize a <see cref="GenericRecord"/> matching the
/// <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
/// CloudEvent Avro schema</see>, represented as a stream.
/// </summary>
GenericRecord Deserialize(Stream messageBody);
}

View File

@ -3,6 +3,7 @@
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.NewtonsoftJson;
using CloudNative.CloudEvents.UnitTests.Avro.Helpers;
using System;
using System.Net.Mime;
using Xunit;
@ -83,5 +84,43 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
Assert.Equal("value", cloudEvent[extensionAttribute]);
}
[Fact]
public void StructuredParseSerializationWithCustomSerializer()
{
var serializer = new FakeGenericRecordSerializer();
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter(serializer);
var expectedSerializedData = new byte[] { 0x1, 0x2, 0x3, };
serializer.SetSerializeResponse(expectedSerializedData);
var inputCloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
var avroData = avroFormatter
.EncodeStructuredModeMessage(inputCloudEvent, out var _)
.ToArray();
Assert.Equal(1, serializer.SerializeCalls);
Assert.Equal(expectedSerializedData, avroData);
}
[Fact]
public void StructuredParseDeserializationWithCustomSerializer()
{
var serializer = new FakeGenericRecordSerializer();
var avroFormatter = new AvroEventFormatter(serializer);
var expectedCloudEventId = "4321";
var expectedCloudEventType = "MyBrilliantEvent";
var expectedCloudEventSource = "https://cloudevents.io.test/test-event";
serializer.SetDeserializeResponseAttributes(
expectedCloudEventId, expectedCloudEventType, expectedCloudEventSource);
var actualCloudEvent = avroFormatter.DecodeStructuredModeMessage(Array.Empty<byte>(), null, null);
Assert.Equal(1, serializer.DeserializeCalls);
Assert.Equal(expectedCloudEventId, actualCloudEvent.Id);
Assert.Equal(expectedCloudEventType, actualCloudEvent.Type);
Assert.Equal(expectedCloudEventSource, actualCloudEvent.Source!.ToString());
}
}
}

View File

@ -0,0 +1,47 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using Avro.Generic;
using CloudNative.CloudEvents.Avro.Interfaces;
using System;
using System.Collections.Generic;
using System.IO;
namespace CloudNative.CloudEvents.UnitTests.Avro.Helpers;
internal class FakeGenericRecordSerializer : IGenericRecordSerializer
{
public byte[]? SerializeResponse { get; private set; }
public GenericRecord DeserializeResponse { get; private set; }
public int DeserializeCalls { get; private set; } = 0;
public int SerializeCalls { get; private set; } = 0;
public FakeGenericRecordSerializer()
{
DeserializeResponse = new GenericRecord(CloudEvents.Avro.AvroEventFormatter.AvroSchema);
}
public GenericRecord Deserialize(Stream messageBody)
{
DeserializeCalls++;
return DeserializeResponse;
}
public ReadOnlyMemory<byte> Serialize(GenericRecord value)
{
SerializeCalls++;
return SerializeResponse;
}
public void SetSerializeResponse(byte[] response) => SerializeResponse = response;
public void SetDeserializeResponseAttributes(string id, string type, string source) =>
DeserializeResponse.Add("attribute", new Dictionary<string, object>()
{
{ CloudEventsSpecVersion.SpecVersionAttribute.Name, CloudEventsSpecVersion.Default.VersionId },
{ CloudEventsSpecVersion.Default.IdAttribute.Name, id},
{ CloudEventsSpecVersion.Default.TypeAttribute.Name, type},
{ CloudEventsSpecVersion.Default.SourceAttribute.Name, source}
});
}

View File

@ -28,4 +28,7 @@
<ProjectReference Include="..\..\src\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="..\..\src\CloudNative.CloudEvents.Avro\AvroSchema.json" />
</ItemGroup>
</Project>