diff --git a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs index 01b5c2d..dd0c8d9 100644 --- a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs +++ b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs @@ -9,7 +9,6 @@ using System; using System.Collections.Generic; using System.IO; using System.Net.Mime; -using System.Threading.Tasks; namespace CloudNative.CloudEvents { @@ -36,13 +35,9 @@ namespace CloudNative.CloudEvents } public const string MediaTypeSuffix = "+avro"; - // FIXME: We shouldn't use synchronous stream methods... - public override Task DecodeStructuredEventAsync(Stream data, IEnumerable extensionAttributes) => - Task.FromResult(DecodeStructuredEvent(data, extensionAttributes)); - public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes) { - var decoder = new Avro.IO.BinaryDecoder(data); + var decoder = new BinaryDecoder(data); var rawEvent = avroReader.Read(null, decoder); return DecodeGenericRecord(rawEvent, extensionAttributes); } diff --git a/src/CloudNative.CloudEvents/BinaryDataUtilities.cs b/src/CloudNative.CloudEvents/BinaryDataUtilities.cs new file mode 100644 index 0000000..70017df --- /dev/null +++ b/src/CloudNative.CloudEvents/BinaryDataUtilities.cs @@ -0,0 +1,31 @@ +// Copyright 2021 Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +using System.IO; +using System.Threading.Tasks; + +namespace CloudNative.CloudEvents +{ + /// + /// Utilities methods for dealing with binary data, converting between + /// streams, arrays, Memory{T} etc. + /// + internal static class BinaryDataUtilities + { + internal async static Task ToByteArrayAsync(Stream stream) + { + // TODO: Optimize if it's already a MemoryStream? + var memory = new MemoryStream(); + await stream.CopyToAsync(memory).ConfigureAwait(false); + return memory.ToArray(); + } + + internal static byte[] ToByteArray(Stream stream) + { + var memory = new MemoryStream(); + stream.CopyTo(memory); + return memory.ToArray(); + } + } +} diff --git a/src/CloudNative.CloudEvents/CloudEventFormatter.cs b/src/CloudNative.CloudEvents/CloudEventFormatter.cs index a98c32c..3c27b2e 100644 --- a/src/CloudNative.CloudEvents/CloudEventFormatter.cs +++ b/src/CloudNative.CloudEvents/CloudEventFormatter.cs @@ -16,22 +16,32 @@ namespace CloudNative.CloudEvents public abstract class CloudEventFormatter { /// - /// Decode a structured event from a stream + /// Decode a structured event from a stream. The default implementation copies the + /// content of the stream into a byte array before passing it to , + /// but this can be overridden by event formatters that can decode a stream more efficiently. /// /// /// /// - public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes) => - throw new NotImplementedException(); + public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes) + { + var bytes = BinaryDataUtilities.ToByteArray(data); + return DecodeStructuredEvent(bytes, extensionAttributes); + } /// - /// Decode a structured event from a stream asynchonously + /// Decode a structured event from a stream. The default implementation asynchronously copies the + /// content of the stream into a byte array before passing it to , + /// but this can be overridden by event formatters that can decode a stream more efficiently. /// /// /// /// - public virtual Task DecodeStructuredEventAsync(Stream data, IEnumerable extensionAttributes) => - throw new NotImplementedException(); + public virtual async Task DecodeStructuredEventAsync(Stream data, IEnumerable extensionAttributes) + { + var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false); + return DecodeStructuredEvent(bytes, extensionAttributes); + } // TODO: Remove either this one or the stream one? It seems unnecessary to have both.