dotnet-sdk/src/Dapr.Client/Crypto/DecryptionStreamProcessor.cs

154 lines
5.5 KiB
C#

// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
namespace Dapr.Client.Crypto;
/// <summary>
/// Provides the implementation to decrypt a stream of plaintext data with the Dapr runtime.
/// </summary>
internal sealed class DecryptionStreamProcessor : IDisposable
{
private bool disposed;
private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
/// <summary>
/// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams.
/// </summary>
internal event EventHandler<Exception>? OnException;
/// <summary>
/// Sends the provided bytes in chunks to the sidecar for the encryption operation.
/// </summary>
/// <param name="inputStream">The stream containing the bytes to decrypt.</param>
/// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
/// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
/// <param name="options">The decryption options.</param>
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
public async Task ProcessStreamAsync(
Stream inputStream,
AsyncDuplexStreamingCall<Autogenerated.DecryptRequest, Autogenerated.DecryptResponse> call,
int streamingBlockSizeInBytes,
Autogenerated.DecryptRequestOptions options,
CancellationToken cancellationToken)
{
//Read from the input stream and write to the gRPC call
_ = Task.Run(async () =>
{
try
{
await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
var buffer = new byte[streamingBlockSizeInBytes];
int bytesRead;
ulong sequenceNumber = 0;
while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
{
var request = new Autogenerated.DecryptRequest
{
Payload = new Autogenerated.StreamPayload
{
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
}
};
//Only include the options in the first message
if (sequenceNumber == 0)
{
request.Options = options;
}
await call.RequestStream.WriteAsync(request, cancellationToken);
//Increment the sequence number
sequenceNumber++;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected cancellation exception
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
await call.RequestStream.CompleteAsync();
}
}, cancellationToken);
//Start reading from the gRPC call and writing to the output channel
_ = Task.Run(async () =>
{
try
{
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
{
await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
}
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
outputChannel.Writer.Complete();
}
}, cancellationToken);
}
/// <summary>
/// Retrieves the processed bytes from the operation from the sidecar and
/// returns as an enumerable stream.
/// </summary>
public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return data;
}
}
public void Dispose()
{
Dispose(true);
}
private void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
outputChannel.Writer.TryComplete();
}
disposed = true;
}
}
}