diff --git a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
index 4553932..47ded29 100644
--- a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
+++ b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
@@ -4,7 +4,7 @@
using Avro;
using Avro.Generic;
-using Avro.IO;
+using CloudNative.CloudEvents.Avro.Interfaces;
using CloudNative.CloudEvents.Core;
using System;
using System.Collections.Generic;
@@ -41,30 +41,34 @@ namespace CloudNative.CloudEvents.Avro
private const string DataName = "data";
private static readonly string CloudEventAvroMediaType = MimeUtilities.MediaType + MediaTypeSuffix;
- private static readonly RecordSchema avroSchema;
- private static readonly DefaultReader avroReader;
- private static readonly DefaultWriter avroWriter;
-
- static AvroEventFormatter()
+ private readonly IGenericRecordSerializer serializer;
+
+ ///
+ /// Creates an AvroEventFormatter using the default serializer.
+ ///
+ public AvroEventFormatter() : this(new BasicGenericRecordSerializer()) { }
+
+ ///
+ /// Creates an AvroEventFormatter that uses a custom .
+ ///
+ ///
+ /// It is recommended to use the default serializer before defining your own wherever possible.
+ ///
+ public AvroEventFormatter(IGenericRecordSerializer genericRecordSerializer)
{
- // We're going to confidently assume that the embedded schema works. If not, type initialization
- // will fail and that's okay since the type is useless without the proper schema.
- using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
- {
- avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd());
- }
- avroReader = new DefaultReader(avroSchema, avroSchema);
- avroWriter = new DefaultWriter(avroSchema);
+ serializer = genericRecordSerializer;
}
+ ///
+ /// Avro schema used to serialize and deserialize the CloudEvent.
+ ///
+ public static RecordSchema AvroSchema { get; } = ParseEmbeddedSchema();
+
///
public override CloudEvent DecodeStructuredModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes)
{
Validation.CheckNotNull(body, nameof(body));
-
- var decoder = new BinaryDecoder(body);
- // The reuse parameter *is* allowed to be null...
- var rawEvent = avroReader.Read(reuse: null!, decoder);
+ var rawEvent = serializer.Deserialize(body);
return DecodeGenericRecord(rawEvent, extensionAttributes);
}
@@ -146,7 +150,7 @@ namespace CloudNative.CloudEvents.Avro
contentType = new ContentType(CloudEventAvroMediaType);
// We expect the Avro encoded to detect data types that can't be represented in the schema.
- GenericRecord record = new GenericRecord(avroSchema);
+ GenericRecord record = new GenericRecord(AvroSchema);
record.Add(DataName, cloudEvent.Data);
var recordAttributes = new Dictionary();
recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
@@ -162,9 +166,7 @@ namespace CloudNative.CloudEvents.Avro
recordAttributes[attribute.Name] = avroValue;
}
record.Add(AttributeName, recordAttributes);
- MemoryStream memStream = new MemoryStream();
- BinaryEncoder encoder = new BinaryEncoder(memStream);
- avroWriter.Write(record, encoder);
+ var memStream = serializer.Serialize(record);
return memStream.ToArray();
}
@@ -176,5 +178,16 @@ namespace CloudNative.CloudEvents.Avro
///
public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent) =>
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
+
+ private static RecordSchema ParseEmbeddedSchema()
+ {
+ // We're going to confidently assume that the embedded schema works. If not, type initialization
+ // will fail and that's okay since the type is useless without the proper schema.
+ using var sr = new StreamReader(typeof(AvroEventFormatter)
+ .Assembly
+ .GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json"));
+
+ return (RecordSchema) Schema.Parse(sr.ReadToEnd());
+ }
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs b/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs
new file mode 100644
index 0000000..580f9d4
--- /dev/null
+++ b/src/CloudNative.CloudEvents.Avro/BasicGenericRecordSerializer.cs
@@ -0,0 +1,47 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+using Avro.Generic;
+using Avro.IO;
+using CloudNative.CloudEvents.Avro.Interfaces;
+using System;
+using System.IO;
+
+namespace CloudNative.CloudEvents.Avro;
+
+///
+/// The default implementation of the .
+///
+///
+/// Makes use of the Avro and
+/// together with the embedded Avro schema.
+///
+internal sealed class BasicGenericRecordSerializer : IGenericRecordSerializer
+{
+ private readonly DefaultReader avroReader;
+ private readonly DefaultWriter avroWriter;
+
+ public BasicGenericRecordSerializer()
+ {
+ avroReader = new DefaultReader(AvroEventFormatter.AvroSchema, AvroEventFormatter.AvroSchema);
+ avroWriter = new DefaultWriter(AvroEventFormatter.AvroSchema);
+ }
+
+ ///
+ public ReadOnlyMemory Serialize(GenericRecord record)
+ {
+ var memStream = new MemoryStream();
+ var encoder = new BinaryEncoder(memStream);
+ avroWriter.Write(record, encoder);
+ return memStream.ToArray();
+ }
+
+ ///
+ public GenericRecord Deserialize(Stream rawMessagebody)
+ {
+ var decoder = new BinaryDecoder(rawMessagebody);
+ // The reuse parameter *is* allowed to be null...
+ return avroReader.Read(reuse: null!, decoder);
+ }
+}
diff --git a/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs b/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs
new file mode 100644
index 0000000..23fd3bd
--- /dev/null
+++ b/src/CloudNative.CloudEvents.Avro/Interfaces/IGenericRecordSerializer.cs
@@ -0,0 +1,39 @@
+using Avro.Generic;
+using System;
+using System.IO;
+
+namespace CloudNative.CloudEvents.Avro.Interfaces;
+
+///
+/// Used to serialize and deserialize an Avro
+/// matching the
+/// CloudEvent Avro schema.
+///
+///
+///
+/// An implementation of this interface can optionally be supplied to the in cases
+/// where a custom Avro serialiser is required for integration with pre-existing tools/infrastructure.
+///
+///
+/// It is recommended to use the default serializer before defining your own wherever possible.
+///
+///
+public interface IGenericRecordSerializer
+{
+ ///
+ /// Serialize an Avro .
+ ///
+ ///
+ /// The record is guaranteed to match the
+ ///
+ /// CloudEvent Avro schema.
+ ///
+ ReadOnlyMemory Serialize(GenericRecord value);
+
+ ///
+ /// Deserialize a matching the
+ ///
+ /// CloudEvent Avro schema, represented as a stream.
+ ///
+ GenericRecord Deserialize(Stream messageBody);
+}
diff --git a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
index e83e790..037cc0b 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
@@ -3,6 +3,7 @@
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.NewtonsoftJson;
+using CloudNative.CloudEvents.UnitTests.Avro.Helpers;
using System;
using System.Net.Mime;
using Xunit;
@@ -83,5 +84,43 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
Assert.Equal("value", cloudEvent[extensionAttribute]);
}
+
+ [Fact]
+ public void StructuredParseSerializationWithCustomSerializer()
+ {
+ var serializer = new FakeGenericRecordSerializer();
+ var jsonFormatter = new JsonEventFormatter();
+ var avroFormatter = new AvroEventFormatter(serializer);
+
+ var expectedSerializedData = new byte[] { 0x1, 0x2, 0x3, };
+ serializer.SetSerializeResponse(expectedSerializedData);
+
+ var inputCloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
+ var avroData = avroFormatter
+ .EncodeStructuredModeMessage(inputCloudEvent, out var _)
+ .ToArray();
+
+ Assert.Equal(1, serializer.SerializeCalls);
+ Assert.Equal(expectedSerializedData, avroData);
+ }
+
+ [Fact]
+ public void StructuredParseDeserializationWithCustomSerializer()
+ {
+ var serializer = new FakeGenericRecordSerializer();
+ var avroFormatter = new AvroEventFormatter(serializer);
+ var expectedCloudEventId = "4321";
+ var expectedCloudEventType = "MyBrilliantEvent";
+ var expectedCloudEventSource = "https://cloudevents.io.test/test-event";
+ serializer.SetDeserializeResponseAttributes(
+ expectedCloudEventId, expectedCloudEventType, expectedCloudEventSource);
+
+ var actualCloudEvent = avroFormatter.DecodeStructuredModeMessage(Array.Empty(), null, null);
+
+ Assert.Equal(1, serializer.DeserializeCalls);
+ Assert.Equal(expectedCloudEventId, actualCloudEvent.Id);
+ Assert.Equal(expectedCloudEventType, actualCloudEvent.Type);
+ Assert.Equal(expectedCloudEventSource, actualCloudEvent.Source!.ToString());
+ }
}
}
diff --git a/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs b/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs
new file mode 100644
index 0000000..9bbf26e
--- /dev/null
+++ b/test/CloudNative.CloudEvents.UnitTests/Avro/Helpers/FakeGenericRecordSerializer.cs
@@ -0,0 +1,47 @@
+// Copyright (c) Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+using Avro.Generic;
+using CloudNative.CloudEvents.Avro.Interfaces;
+using System;
+using System.Collections.Generic;
+using System.IO;
+
+namespace CloudNative.CloudEvents.UnitTests.Avro.Helpers;
+
+internal class FakeGenericRecordSerializer : IGenericRecordSerializer
+{
+ public byte[]? SerializeResponse { get; private set; }
+ public GenericRecord DeserializeResponse { get; private set; }
+ public int DeserializeCalls { get; private set; } = 0;
+ public int SerializeCalls { get; private set; } = 0;
+
+ public FakeGenericRecordSerializer()
+ {
+ DeserializeResponse = new GenericRecord(CloudEvents.Avro.AvroEventFormatter.AvroSchema);
+ }
+
+ public GenericRecord Deserialize(Stream messageBody)
+ {
+ DeserializeCalls++;
+ return DeserializeResponse;
+ }
+
+ public ReadOnlyMemory Serialize(GenericRecord value)
+ {
+ SerializeCalls++;
+ return SerializeResponse;
+ }
+
+ public void SetSerializeResponse(byte[] response) => SerializeResponse = response;
+
+ public void SetDeserializeResponseAttributes(string id, string type, string source) =>
+ DeserializeResponse.Add("attribute", new Dictionary()
+ {
+ { CloudEventsSpecVersion.SpecVersionAttribute.Name, CloudEventsSpecVersion.Default.VersionId },
+ { CloudEventsSpecVersion.Default.IdAttribute.Name, id},
+ { CloudEventsSpecVersion.Default.TypeAttribute.Name, type},
+ { CloudEventsSpecVersion.Default.SourceAttribute.Name, source}
+ });
+}
diff --git a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
index ccf97a6..0fc8ce6 100644
--- a/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
+++ b/test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj
@@ -28,4 +28,7 @@
+
+
+