Allow event formatters to only deal with byte arrays, and do so synchronously
Everything else builds on this, but can be overridden for more efficient parsing where appropriate. Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
parent
bdcfa830f0
commit
d6d6479e83
|
@ -9,7 +9,6 @@ using System;
|
|||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net.Mime;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents
|
||||
{
|
||||
|
@ -36,13 +35,9 @@ namespace CloudNative.CloudEvents
|
|||
}
|
||||
public const string MediaTypeSuffix = "+avro";
|
||||
|
||||
// FIXME: We shouldn't use synchronous stream methods...
|
||||
public override Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
Task.FromResult(DecodeStructuredEvent(data, extensionAttributes));
|
||||
|
||||
public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var decoder = new Avro.IO.BinaryDecoder(data);
|
||||
var decoder = new BinaryDecoder(data);
|
||||
var rawEvent = avroReader.Read<GenericRecord>(null, decoder);
|
||||
return DecodeGenericRecord(rawEvent, extensionAttributes);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
// Copyright 2021 Cloud Native Foundation.
|
||||
// Licensed under the Apache 2.0 license.
|
||||
// See LICENSE file in the project root for full license information.
|
||||
|
||||
using System.IO;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents
|
||||
{
|
||||
/// <summary>
|
||||
/// Utilities methods for dealing with binary data, converting between
|
||||
/// streams, arrays, Memory{T} etc.
|
||||
/// </summary>
|
||||
internal static class BinaryDataUtilities
|
||||
{
|
||||
internal async static Task<byte[]> ToByteArrayAsync(Stream stream)
|
||||
{
|
||||
// TODO: Optimize if it's already a MemoryStream?
|
||||
var memory = new MemoryStream();
|
||||
await stream.CopyToAsync(memory).ConfigureAwait(false);
|
||||
return memory.ToArray();
|
||||
}
|
||||
|
||||
internal static byte[] ToByteArray(Stream stream)
|
||||
{
|
||||
var memory = new MemoryStream();
|
||||
stream.CopyTo(memory);
|
||||
return memory.ToArray();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,22 +16,32 @@ namespace CloudNative.CloudEvents
|
|||
public abstract class CloudEventFormatter
|
||||
{
|
||||
/// <summary>
|
||||
/// Decode a structured event from a stream
|
||||
/// Decode a structured event from a stream. The default implementation copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredEvent(byte[], IEnumerable{CloudEventAttribute})"/>,
|
||||
/// but this can be overridden by event formatters that can decode a stream more efficiently.
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
/// <param name="extensions"></param>
|
||||
/// <returns></returns>
|
||||
public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
throw new NotImplementedException();
|
||||
public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var bytes = BinaryDataUtilities.ToByteArray(data);
|
||||
return DecodeStructuredEvent(bytes, extensionAttributes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decode a structured event from a stream asynchonously
|
||||
/// Decode a structured event from a stream. The default implementation asynchronously copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredEvent(byte[], IEnumerable{CloudEventAttribute})"/>,
|
||||
/// but this can be overridden by event formatters that can decode a stream more efficiently.
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
/// <param name="extensions"></param>
|
||||
/// <returns></returns>
|
||||
public virtual Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
throw new NotImplementedException();
|
||||
public virtual async Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false);
|
||||
return DecodeStructuredEvent(bytes, extensionAttributes);
|
||||
}
|
||||
|
||||
// TODO: Remove either this one or the stream one? It seems unnecessary to have both.
|
||||
|
||||
|
|
Loading…
Reference in New Issue