From 4f4987ece10f6ba011dfa78c457634ca8159b585 Mon Sep 17 00:00:00 2001 From: Kirner- Date: Sat, 8 Jul 2023 22:14:41 +0100 Subject: [PATCH] Add optional IGenericRecordSerializer to Avro formatter Signed-off-by: Kirner- --- .../AvroEventFormatter.cs | 57 ++++++++++++------- .../BasicGenericRecordSerializer.cs | 47 +++++++++++++++ .../Interfaces/IGenericRecordSerializer.cs | 39 +++++++++++++ .../Avro/AvroEventFormatterTest.cs | 39 +++++++++++++ .../Helpers/FakeGenericRecordSerializer.cs | 47 +++++++++++++++ .../CloudNative.CloudEvents.UnitTests.csproj | 3 + 6 files changed, 210 insertions(+), 22 deletions(-) create mode 100644 src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs create mode 100644 src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs create mode 100644 test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs diff --git a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs index 4553932..47ded29 100644 --- a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs +++ b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs @@ -4,7 +4,7 @@ using Avro; using Avro.Generic; -using Avro.IO; +using CloudNative.CloudEvents.Avro.Interfaces; using CloudNative.CloudEvents.Core; using System; using System.Collections.Generic; @@ -41,30 +41,34 @@ namespace CloudNative.CloudEvents.Avro private const string DataName = "data"; private static readonly string CloudEventAvroMediaType = MimeUtilities.MediaType + MediaTypeSuffix; - private static readonly RecordSchema avroSchema; - private static readonly DefaultReader avroReader; - private static readonly DefaultWriter avroWriter; - - static AvroEventFormatter() + private readonly IGenericRecordSerializer serializer; + + /// + /// Creates an AvroEventFormatter using the default serializer. + /// + public AvroEventFormatter() : this(new BasicGenericRecordSerializer()) { } + + /// + /// Creates an AvroEventFormatter that uses a custom . + /// + /// + /// It is recommended to use the default serializer before defining your own wherever possible. + /// + public AvroEventFormatter(IGenericRecordSerializer genericRecordSerializer) { - // We're going to confidently assume that the embedded schema works. If not, type initialization - // will fail and that's okay since the type is useless without the proper schema. - using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json"))) - { - avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd()); - } - avroReader = new DefaultReader(avroSchema, avroSchema); - avroWriter = new DefaultWriter(avroSchema); + serializer = genericRecordSerializer; } + /// + /// Avro schema used to serialize and deserialize the CloudEvent. + /// + public static RecordSchema AvroSchema { get; } = ParseEmbeddedSchema(); + /// public override CloudEvent DecodeStructuredModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) { Validation.CheckNotNull(body, nameof(body)); - - var decoder = new BinaryDecoder(body); - // The reuse parameter *is* allowed to be null... - var rawEvent = avroReader.Read(reuse: null!, decoder); + var rawEvent = serializer.Deserialize(body); return DecodeGenericRecord(rawEvent, extensionAttributes); } @@ -146,7 +150,7 @@ namespace CloudNative.CloudEvents.Avro contentType = new ContentType(CloudEventAvroMediaType); // We expect the Avro encoded to detect data types that can't be represented in the schema. - GenericRecord record = new GenericRecord(avroSchema); + GenericRecord record = new GenericRecord(AvroSchema); record.Add(DataName, cloudEvent.Data); var recordAttributes = new Dictionary(); recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId; @@ -162,9 +166,7 @@ namespace CloudNative.CloudEvents.Avro recordAttributes[attribute.Name] = avroValue; } record.Add(AttributeName, recordAttributes); - MemoryStream memStream = new MemoryStream(); - BinaryEncoder encoder = new BinaryEncoder(memStream); - avroWriter.Write(record, encoder); + var memStream = serializer.Serialize(record); return memStream.ToArray(); } @@ -176,5 +178,16 @@ namespace CloudNative.CloudEvents.Avro /// public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent) => throw new NotSupportedException("The Avro event formatter does not support binary content mode"); + + private static RecordSchema ParseEmbeddedSchema() + { + // We're going to confidently assume that the embedded schema works. If not, type initialization + // will fail and that's okay since the type is useless without the proper schema. + using var sr = new StreamReader(typeof(AvroEventFormatter) + .Assembly + .GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")); + + return (RecordSchema) Schema.Parse(sr.ReadToEnd()); + } } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs b/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs new file mode 100644 index 0000000..580f9d4 --- /dev/null +++ b/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs @@ -0,0 +1,47 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +using Avro.Generic; +using Avro.IO; +using CloudNative.CloudEvents.Avro.Interfaces; +using System; +using System.IO; + +namespace CloudNative.CloudEvents.Avro; + +/// +/// The default implementation of the . +/// +/// +/// Makes use of the Avro and +/// together with the embedded Avro schema. +/// +internal sealed class BasicGenericRecordSerializer : IGenericRecordSerializer +{ + private readonly DefaultReader avroReader; + private readonly DefaultWriter avroWriter; + + public BasicGenericRecordSerializer() + { + avroReader = new DefaultReader(AvroEventFormatter.AvroSchema, AvroEventFormatter.AvroSchema); + avroWriter = new DefaultWriter(AvroEventFormatter.AvroSchema); + } + + /// + public ReadOnlyMemory Serialize(GenericRecord record) + { + var memStream = new MemoryStream(); + var encoder = new BinaryEncoder(memStream); + avroWriter.Write(record, encoder); + return memStream.ToArray(); + } + + /// + public GenericRecord Deserialize(Stream rawMessagebody) + { + var decoder = new BinaryDecoder(rawMessagebody); + // The reuse parameter *is* allowed to be null... + return avroReader.Read(reuse: null!, decoder); + } +} diff --git a/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs b/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs new file mode 100644 index 0000000..23fd3bd --- /dev/null +++ b/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs @@ -0,0 +1,39 @@ +using Avro.Generic; +using System; +using System.IO; + +namespace CloudNative.CloudEvents.Avro.Interfaces; + +/// +/// Used to serialize and deserialize an Avro +/// matching the +/// CloudEvent Avro schema. +/// +/// +/// +/// An implementation of this interface can optionally be supplied to the in cases +/// where a custom Avro serialiser is required for integration with pre-existing tools/infrastructure. +/// +/// +/// It is recommended to use the default serializer before defining your own wherever possible. +/// +/// +public interface IGenericRecordSerializer +{ + /// + /// Serialize an Avro . + /// + /// + /// The record is guaranteed to match the + /// + /// CloudEvent Avro schema. + /// + ReadOnlyMemory Serialize(GenericRecord value); + + /// + /// Deserialize a matching the + /// + /// CloudEvent Avro schema, represented as a stream. + /// + GenericRecord Deserialize(Stream messageBody); +} diff --git a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs index e83e790..037cc0b 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs @@ -3,6 +3,7 @@ // See LICENSE file in the project root for full license information. using CloudNative.CloudEvents.NewtonsoftJson; +using CloudNative.CloudEvents.UnitTests.Avro.Helpers; using System; using System.Net.Mime; using Xunit; @@ -83,5 +84,43 @@ namespace CloudNative.CloudEvents.Avro.UnitTests Assert.Equal("value", cloudEvent[extensionAttribute]); } + + [Fact] + public void StructuredParseSerializationWithCustomSerializer() + { + var serializer = new FakeGenericRecordSerializer(); + var jsonFormatter = new JsonEventFormatter(); + var avroFormatter = new AvroEventFormatter(serializer); + + var expectedSerializedData = new byte[] { 0x1, 0x2, 0x3, }; + serializer.SetSerializeResponse(expectedSerializedData); + + var inputCloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10); + var avroData = avroFormatter + .EncodeStructuredModeMessage(inputCloudEvent, out var _) + .ToArray(); + + Assert.Equal(1, serializer.SerializeCalls); + Assert.Equal(expectedSerializedData, avroData); + } + + [Fact] + public void StructuredParseDeserializationWithCustomSerializer() + { + var serializer = new FakeGenericRecordSerializer(); + var avroFormatter = new AvroEventFormatter(serializer); + var expectedCloudEventId = "4321"; + var expectedCloudEventType = "MyBrilliantEvent"; + var expectedCloudEventSource = "https://cloudevents.io.test/test-event"; + serializer.SetDeserializeResponseAttributes( + expectedCloudEventId, expectedCloudEventType, expectedCloudEventSource); + + var actualCloudEvent = avroFormatter.DecodeStructuredModeMessage(Array.Empty(), null, null); + + Assert.Equal(1, serializer.DeserializeCalls); + Assert.Equal(expectedCloudEventId, actualCloudEvent.Id); + Assert.Equal(expectedCloudEventType, actualCloudEvent.Type); + Assert.Equal(expectedCloudEventSource, actualCloudEvent.Source!.ToString()); + } } } diff --git a/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs b/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs new file mode 100644 index 0000000..9bbf26e --- /dev/null +++ b/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs @@ -0,0 +1,47 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +using Avro.Generic; +using CloudNative.CloudEvents.Avro.Interfaces; +using System; +using System.Collections.Generic; +using System.IO; + +namespace CloudNative.CloudEvents.UnitTests.Avro.Helpers; + +internal class FakeGenericRecordSerializer : IGenericRecordSerializer +{ + public byte[]? SerializeResponse { get; private set; } + public GenericRecord DeserializeResponse { get; private set; } + public int DeserializeCalls { get; private set; } = 0; + public int SerializeCalls { get; private set; } = 0; + + public FakeGenericRecordSerializer() + { + DeserializeResponse = new GenericRecord(CloudEvents.Avro.AvroEventFormatter.AvroSchema); + } + + public GenericRecord Deserialize(Stream messageBody) + { + DeserializeCalls++; + return DeserializeResponse; + } + + public ReadOnlyMemory Serialize(GenericRecord value) + { + SerializeCalls++; + return SerializeResponse; + } + + public void SetSerializeResponse(byte[] response) => SerializeResponse = response; + + public void SetDeserializeResponseAttributes(string id, string type, string source) => + DeserializeResponse.Add("attribute", new Dictionary() + { + { CloudEventsSpecVersion.SpecVersionAttribute.Name, CloudEventsSpecVersion.Default.VersionId }, + { CloudEventsSpecVersion.Default.IdAttribute.Name, id}, + { CloudEventsSpecVersion.Default.TypeAttribute.Name, type}, + { CloudEventsSpecVersion.Default.SourceAttribute.Name, source} + }); +} diff --git a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj index ccf97a6..0fc8ce6 100644 --- a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj +++ b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj @@ -28,4 +28,7 @@ + + +