Merge branch 'master' into dapr-data-operator

This commit is contained in:
Whit Waldo 2025-05-28 03:52:31 -05:00 committed by GitHub
commit 0810022ca4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 631 additions and 221 deletions

View File

@ -9,12 +9,12 @@
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="GitHubActionsTestLogger" Version="1.1.2" />
<PackageVersion Include="Google.Api.CommonProtos" Version="2.2.0" />
<PackageVersion Include="Google.Protobuf" Version="3.28.2" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.66.0" />
<PackageVersion Include="Google.Protobuf" Version="3.30.2" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.71.0" />
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.66.0" />
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
<PackageVersion Include="Grpc.Net.Client" Version="2.71.0" />
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.71.0" />
<PackageVersion Include="Grpc.Tools" Version="2.71.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.35" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="6.0.35" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
@ -23,8 +23,8 @@
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing.XUnit" Version="1.1.2" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.5.0" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.5.0" />
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.10.0" />
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.10.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />

View File

@ -37,9 +37,8 @@ namespace Cryptography.Examples
await using var encryptFs = new FileStream(fileName, FileMode.Open);
var bufferedEncryptedBytes = new ArrayBufferWriter<byte>();
await foreach (var bytes in (await client.EncryptAsync(componentName, encryptFs, keyName,
new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken))
.WithCancellation(cancellationToken))
await foreach (var bytes in (client.EncryptAsync(componentName, encryptFs, keyName,
new EncryptionOptions(KeyWrapAlgorithm.Rsa), cancellationToken)))
{
bufferedEncryptedBytes.Write(bytes.Span);
}
@ -53,8 +52,8 @@ namespace Cryptography.Examples
//We'll stream the decrypted bytes from a MemoryStream into the above temporary file
await using var encryptedMs = new MemoryStream(bufferedEncryptedBytes.WrittenMemory.ToArray());
await foreach (var result in (await client.DecryptAsync(componentName, encryptedMs, keyName,
cancellationToken)).WithCancellation(cancellationToken))
await foreach (var result in (client.DecryptAsync(componentName, encryptedMs, keyName,
cancellationToken)))
{
decryptFs.Write(result.Span);
}

View File

@ -68,13 +68,12 @@ internal static class DurationExtensions
if (period.StartsWith(MonthlyPrefixPeriod))
{
var dateTime = DateTime.UtcNow;
return dateTime.AddMonths(1) - dateTime;
return TimeSpan.FromDays(30);
}
if (period.StartsWith(MidnightPrefixPeriod))
{
return new TimeSpan();
return TimeSpan.Zero;
}
if (period.StartsWith(WeeklyPrefixPeriod))

View File

@ -223,7 +223,7 @@ namespace Dapr.Actors.Runtime
internal async Task FireTimerAsync(ActorId actorId, Stream requestBodyStream, CancellationToken cancellationToken = default)
{
#pragma warning disable 0618
var timerData = await JsonSerializer.DeserializeAsync<TimerInfo>(requestBodyStream);
var timerData = await DeserializeAsync(requestBodyStream);
#pragma warning restore 0618
// Create a Func to be invoked by common method.
@ -243,6 +243,62 @@ namespace Dapr.Actors.Runtime
await this.DispatchInternalAsync(actorId, this.timerMethodContext, RequestFunc, cancellationToken);
}
#pragma warning disable 0618
internal static async Task<TimerInfo> DeserializeAsync(Stream stream)
{
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
if (json.ValueKind == JsonValueKind.Null)
{
return null;
}
var setAnyProperties = false; // Used to determine if anything was actually deserialized
var dueTime = TimeSpan.Zero;
var callback = "";
var period = TimeSpan.Zero;
var data = Array.Empty<byte>();
TimeSpan? ttl = null;
if (json.TryGetProperty("callback", out var callbackProperty))
{
setAnyProperties = true;
callback = callbackProperty.GetString();
}
if (json.TryGetProperty("dueTime", out var dueTimeProperty))
{
setAnyProperties = true;
var dueTimeString = dueTimeProperty.GetString();
dueTime = ConverterUtils.ConvertTimeSpanFromDaprFormat(dueTimeString);
}
if (json.TryGetProperty("period", out var periodProperty))
{
setAnyProperties = true;
var periodString = periodProperty.GetString();
(period, _) = ConverterUtils.ConvertTimeSpanValueFromISO8601Format(periodString);
}
if (json.TryGetProperty("data", out var dataProperty) && dataProperty.ValueKind != JsonValueKind.Null)
{
setAnyProperties = true;
data = dataProperty.GetBytesFromBase64();
}
if (json.TryGetProperty("ttl", out var ttlProperty))
{
setAnyProperties = true;
var ttlString = ttlProperty.GetString();
ttl = ConverterUtils.ConvertTimeSpanFromDaprFormat(ttlString);
}
if (!setAnyProperties)
{
return null; //No properties were ever deserialized, so return null instead of default values
}
return new TimerInfo(callback, data, dueTime, period, ttl);
}
#pragma warning restore 0618
internal async Task ActivateActorAsync(ActorId actorId)
{
// An actor is activated by "Dapr" runtime when a call is to be made for an actor.

View File

@ -49,7 +49,7 @@ internal static class ConverterUtils
int msIndex = spanOfValue.IndexOf("ms");
// handle days from hours.
var hoursSpan = spanOfValue.Slice(0, hIndex);
var hoursSpan = spanOfValue[..hIndex];
var hours = int.Parse(hoursSpan);
var days = hours / 24;
hours %= 24;
@ -103,7 +103,7 @@ internal static class ConverterUtils
builder.Append($"{value.Days}D");
}
builder.Append("T");
builder.Append('T');
if(value.Hours > 0)
{

View File

@ -0,0 +1,153 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
namespace Dapr.Client.Crypto;
/// <summary>
/// Provides the implementation to decrypt a stream of plaintext data with the Dapr runtime.
/// </summary>
internal sealed class DecryptionStreamProcessor : IDisposable
{
private bool disposed;
private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
/// <summary>
/// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams.
/// </summary>
internal event EventHandler<Exception>? OnException;
/// <summary>
/// Sends the provided bytes in chunks to the sidecar for the encryption operation.
/// </summary>
/// <param name="inputStream">The stream containing the bytes to decrypt.</param>
/// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
/// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
/// <param name="options">The decryption options.</param>
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
public async Task ProcessStreamAsync(
Stream inputStream,
AsyncDuplexStreamingCall<Autogenerated.DecryptRequest, Autogenerated.DecryptResponse> call,
int streamingBlockSizeInBytes,
Autogenerated.DecryptRequestOptions options,
CancellationToken cancellationToken)
{
//Read from the input stream and write to the gRPC call
_ = Task.Run(async () =>
{
try
{
await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
var buffer = new byte[streamingBlockSizeInBytes];
int bytesRead;
ulong sequenceNumber = 0;
while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
{
var request = new Autogenerated.DecryptRequest
{
Payload = new Autogenerated.StreamPayload
{
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
}
};
//Only include the options in the first message
if (sequenceNumber == 0)
{
request.Options = options;
}
await call.RequestStream.WriteAsync(request, cancellationToken);
//Increment the sequence number
sequenceNumber++;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected cancellation exception
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
await call.RequestStream.CompleteAsync();
}
}, cancellationToken);
//Start reading from the gRPC call and writing to the output channel
_ = Task.Run(async () =>
{
try
{
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
{
await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
}
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
outputChannel.Writer.Complete();
}
}, cancellationToken);
}
/// <summary>
/// Retrieves the processed bytes from the operation from the sidecar and
/// returns as an enumerable stream.
/// </summary>
public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return data;
}
}
public void Dispose()
{
Dispose(true);
}
private void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
outputChannel.Writer.TryComplete();
}
disposed = true;
}
}
}

View File

@ -0,0 +1,154 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
namespace Dapr.Client.Crypto;
/// <summary>
/// Provides the implementation to encrypt a stream of plaintext data with the Dapr runtime.
/// </summary>
internal sealed class EncryptionStreamProcessor : IDisposable
{
private bool disposed;
private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
/// <summary>
/// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams.
/// </summary>
internal event EventHandler<Exception>? OnException;
/// <summary>
/// Sends the provided bytes in chunks to the sidecar for the encryption operation.
/// </summary>
/// <param name="inputStream">The stream containing the bytes to encrypt.</param>
/// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
/// <param name="options">The encryption options.</param>
/// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
public async Task ProcessStreamAsync(
Stream inputStream,
AsyncDuplexStreamingCall<Autogenerated.EncryptRequest, Autogenerated.EncryptResponse> call,
Autogenerated.EncryptRequestOptions options,
int streamingBlockSizeInBytes,
CancellationToken cancellationToken)
{
//Read from the input stream and write to the gRPC call
_ = Task.Run(async () =>
{
try
{
await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
var buffer = new byte[streamingBlockSizeInBytes];
int bytesRead;
ulong sequenceNumber = 0;
while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
{
var request = new Autogenerated.EncryptRequest
{
Payload = new Autogenerated.StreamPayload
{
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
}
};
//Only include the options in the first message
if (sequenceNumber == 0)
{
request.Options = options;
}
await call.RequestStream.WriteAsync(request, cancellationToken);
//Increment the sequence number
sequenceNumber++;
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected cancellation exception
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
await call.RequestStream.CompleteAsync();
}
}, cancellationToken);
//Start reading from the gRPC call and writing to the output channel
_ = Task.Run(async () =>
{
try
{
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
{
await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
}
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
outputChannel.Writer.Complete();
}
}, cancellationToken);
}
/// <summary>
/// Retrieves the processed bytes from the operation from the sidecar and
/// returns as an enumerable stream.
/// </summary>
public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return data;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
outputChannel.Writer.TryComplete();
}
disposed = true;
}
}
}

View File

@ -1105,7 +1105,7 @@ namespace Dapr.Client
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that can be used to cancel the operation.</param>
/// <returns>An array of encrypted bytes.</returns>
[Obsolete("The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName,
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName, Stream plaintextStream, string keyName,
EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default);
/// <summary>
@ -1144,7 +1144,7 @@ namespace Dapr.Client
/// <returns>An asynchronously enumerable array of decrypted bytes.</returns>
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
string keyName, DecryptionOptions options, CancellationToken cancellationToken = default);
/// <summary>
@ -1157,7 +1157,7 @@ namespace Dapr.Client
/// <returns>An asynchronously enumerable array of decrypted bytes.</returns>
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName, Stream ciphertextStream,
string keyName, CancellationToken cancellationToken = default);
#endregion

View File

@ -11,10 +11,10 @@
// limitations under the License.
// ------------------------------------------------------------------------
using Dapr.Common.Extensions;
namespace Dapr.Client;
using Crypto;
using System;
using System.Buffers;
using System.Collections.Generic;
@ -23,7 +23,6 @@ using System.Linq;
using System.Net.Http;
using System.Net.Http.Json;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
@ -1665,18 +1664,17 @@ internal class DaprClientGrpc : DaprClient
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override async Task<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName,
ReadOnlyMemory<byte> plaintextBytes, string keyName, EncryptionOptions encryptionOptions,
CancellationToken cancellationToken = default)
{
using var memoryStream = plaintextBytes.CreateMemoryStream(true);
var encryptionResult =
await EncryptAsync(vaultResourceName, memoryStream, keyName, encryptionOptions, cancellationToken);
var encryptionResult = EncryptAsync(vaultResourceName, memoryStream, keyName, encryptionOptions, cancellationToken);
var bufferedResult = new ArrayBufferWriter<byte>();
await foreach (var item in encryptionResult.WithCancellation(cancellationToken))
await foreach (var item in encryptionResult)
{
bufferedResult.Write(item.Span);
}
@ -1686,15 +1684,18 @@ internal class DaprClientGrpc : DaprClient
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public override async Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> EncryptAsync(string vaultResourceName,
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override async IAsyncEnumerable<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName,
Stream plaintextStream,
string keyName, EncryptionOptions encryptionOptions, CancellationToken cancellationToken = default)
string keyName, EncryptionOptions encryptionOptions,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream));
ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions));
EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;
var shouldOmitDecryptionKeyName =
string.IsNullOrWhiteSpace(encryptionOptions
@ -1717,185 +1718,91 @@ internal class DaprClientGrpc : DaprClient
}
var options = CreateCallOptions(headers: null, cancellationToken);
var duplexStream = client.EncryptAlpha1(options);
var duplexStream = Client.EncryptAlpha1(options);
//Run both operations at the same time, but return the output of the streaming values coming from the operation
var receiveResult = Task.FromResult(RetrieveEncryptedStreamAsync(duplexStream, cancellationToken));
return await Task.WhenAll(
//Stream the plaintext data to the sidecar in chunks
SendPlaintextStreamAsync(plaintextStream, encryptionOptions.StreamingBlockSizeInBytes,
duplexStream, encryptRequestOptions, cancellationToken),
//At the same time, retrieve the encrypted response from the sidecar
receiveResult).ContinueWith(_ => receiveResult.Result, cancellationToken);
}
/// <summary>
/// Sends the plaintext bytes in chunks to the sidecar to be encrypted.
/// </summary>
private async Task SendPlaintextStreamAsync(Stream plaintextStream,
int streamingBlockSizeInBytes,
AsyncDuplexStreamingCall<Autogenerated.EncryptRequest, Autogenerated.EncryptResponse> duplexStream,
Autogenerated.EncryptRequestOptions encryptRequestOptions,
CancellationToken cancellationToken)
{
//Start with passing the metadata about the encryption request itself in the first message
await duplexStream.RequestStream.WriteAsync(
new Autogenerated.EncryptRequest { Options = encryptRequestOptions }, cancellationToken);
//Send the plaintext bytes in blocks in subsequent messages
await using (var bufferedStream = new BufferedStream(plaintextStream, streamingBlockSizeInBytes))
using var streamProcessor = new EncryptionStreamProcessor();
try
{
var buffer = new byte[streamingBlockSizeInBytes];
int bytesRead;
ulong sequenceNumber = 0;
streamProcessor.OnException += exceptionHandler;
await streamProcessor.ProcessStreamAsync(plaintextStream, duplexStream, encryptRequestOptions,
encryptionOptions.StreamingBlockSizeInBytes,
cancellationToken);
while ((bytesRead =
await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes),
cancellationToken)) !=
0)
await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken))
{
await duplexStream.RequestStream.WriteAsync(
new Autogenerated.EncryptRequest
{
Payload = new Autogenerated.StreamPayload
{
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
}
}, cancellationToken);
//Increment the sequence number
sequenceNumber++;
yield return value;
}
}
//Send the completion message
await duplexStream.RequestStream.CompleteAsync();
}
/// <summary>
/// Retrieves the encrypted bytes from the encryption operation on the sidecar and returns as an enumerable stream.
/// </summary>
private async IAsyncEnumerable<ReadOnlyMemory<byte>> RetrieveEncryptedStreamAsync(
AsyncDuplexStreamingCall<Autogenerated.EncryptRequest, Autogenerated.EncryptResponse> duplexStream,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var encryptResponse in duplexStream.ResponseStream.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
finally
{
yield return encryptResponse.Payload.Data.Memory;
streamProcessor.OnException -= exceptionHandler;
}
}
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public override async Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName,
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override async IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
Stream ciphertextStream, string keyName,
DecryptionOptions decryptionOptions, CancellationToken cancellationToken = default)
DecryptionOptions decryptionOptions,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
ArgumentVerifier.ThrowIfNull(ciphertextStream, nameof(ciphertextStream));
ArgumentVerifier.ThrowIfNull(decryptionOptions, nameof(decryptionOptions));
decryptionOptions ??= new DecryptionOptions();
EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;
var decryptRequestOptions = new Autogenerated.DecryptRequestOptions
{
ComponentName = vaultResourceName, KeyName = keyName
ComponentName = vaultResourceName,
KeyName = keyName
};
var options = CreateCallOptions(headers: null, cancellationToken);
var duplexStream = client.DecryptAlpha1(options);
//Run both operations at the same time, but return the output of the streaming values coming from the operation
var receiveResult = Task.FromResult(RetrieveDecryptedStreamAsync(duplexStream, cancellationToken));
return await Task.WhenAll(
//Stream the ciphertext data to the sidecar in chunks
SendCiphertextStreamAsync(ciphertextStream, decryptionOptions.StreamingBlockSizeInBytes,
duplexStream, decryptRequestOptions, cancellationToken),
//At the same time, retrieve the decrypted response from the sidecar
receiveResult)
//Return only the result of the `RetrieveEncryptedStreamAsync` method
.ContinueWith(t => receiveResult.Result, cancellationToken);
using var streamProcessor = new DecryptionStreamProcessor();
try
{
streamProcessor.OnException += exceptionHandler;
await streamProcessor.ProcessStreamAsync(ciphertextStream, duplexStream, decryptionOptions.StreamingBlockSizeInBytes,
decryptRequestOptions,
cancellationToken);
await foreach (var value in streamProcessor.GetProcessedDataAsync(cancellationToken))
{
yield return value;
}
}
finally
{
streamProcessor.OnException -= exceptionHandler;
}
}
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public override Task<IAsyncEnumerable<ReadOnlyMemory<byte>>> DecryptAsync(string vaultResourceName,
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
Stream ciphertextStream, string keyName, CancellationToken cancellationToken = default) =>
DecryptAsync(vaultResourceName, ciphertextStream, keyName, new DecryptionOptions(),
cancellationToken);
/// <summary>
/// Sends the ciphertext bytes in chunks to the sidecar to be decrypted.
/// </summary>
private async Task SendCiphertextStreamAsync(Stream ciphertextStream,
int streamingBlockSizeInBytes,
AsyncDuplexStreamingCall<Autogenerated.DecryptRequest, Autogenerated.DecryptResponse> duplexStream,
Autogenerated.DecryptRequestOptions decryptRequestOptions,
CancellationToken cancellationToken)
{
//Start with passing the metadata about the decryption request itself in the first message
await duplexStream.RequestStream.WriteAsync(
new Autogenerated.DecryptRequest { Options = decryptRequestOptions }, cancellationToken);
//Send the ciphertext bytes in blocks in subsequent messages
await using (var bufferedStream = new BufferedStream(ciphertextStream, streamingBlockSizeInBytes))
{
var buffer = new byte[streamingBlockSizeInBytes];
int bytesRead;
ulong sequenceNumber = 0;
while ((bytesRead =
await bufferedStream.ReadAsync(buffer.AsMemory(0, streamingBlockSizeInBytes),
cancellationToken)) != 0)
{
await duplexStream.RequestStream.WriteAsync(
new Autogenerated.DecryptRequest
{
Payload = new Autogenerated.StreamPayload
{
Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
}
}, cancellationToken);
//Increment the sequence number
sequenceNumber++;
}
}
//Send the completion message
await duplexStream.RequestStream.CompleteAsync();
}
/// <summary>
/// Retrieves the decrypted bytes from the decryption operation on the sidecar and returns as an enumerable stream.
/// </summary>
private async IAsyncEnumerable<ReadOnlyMemory<byte>> RetrieveDecryptedStreamAsync(
AsyncDuplexStreamingCall<Autogenerated.DecryptRequest, Autogenerated.DecryptResponse> duplexStream,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var decryptResponse in duplexStream.ResponseStream.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
yield return decryptResponse.Payload.Data.Memory;
}
}
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override async Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
ReadOnlyMemory<byte> ciphertextBytes, string keyName, DecryptionOptions decryptionOptions,
CancellationToken cancellationToken = default)
{
using var memoryStream = ciphertextBytes.CreateMemoryStream(true);
var decryptionResult =
await DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken);
var decryptionResult = DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken);
var bufferedResult = new ArrayBufferWriter<byte>();
await foreach (var item in decryptionResult.WithCancellation(cancellationToken))
await foreach (var item in decryptionResult)
{
bufferedResult.Write(item.Span);
}
@ -1905,7 +1812,7 @@ internal class DaprClientGrpc : DaprClient
/// <inheritdoc />
[Obsolete(
"The API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
"As of Dapr v1.17, this method will be removed and should be used via the Dapr.Cryptography package on NuGet")]
public override async Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
ReadOnlyMemory<byte> ciphertextBytes, string keyName, CancellationToken cancellationToken = default) =>
await DecryptAsync(vaultResourceName, ciphertextBytes, keyName,

View File

@ -163,17 +163,7 @@ internal sealed class DaprJobsGrpcClient : DaprJobsClient
var envelope = new Autogenerated.GetJobRequest { Name = jobName };
var grpcCallOptions = DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprJobsClient).Assembly, this.DaprApiToken, cancellationToken);
var response = await Client.GetJobAlpha1Async(envelope, grpcCallOptions);
var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime)
? DaprJobSchedule.FromDateTime(dueTime)
: new DaprJobSchedule(response.Job.Schedule);
return new DaprJobDetails(schedule)
{
DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null,
Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null,
RepeatCount = (int?)response.Job.Repeats,
Payload = response.Job.Data.ToByteArray()
};
return DeserializeJobResponse(response);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
@ -192,6 +182,29 @@ internal sealed class DaprJobsGrpcClient : DaprJobsClient
throw new DaprException("Get job operation failed: the Dapr endpoint did not return the expected value.");
}
/// <summary>
/// Testable method for performing job response deserialization.
/// </summary>
/// <remarks>
/// This is exposed strictly for testing purposes.
/// </remarks>
/// <param name="response">The job response to deserialize.</param>
/// <returns>The deserialized job response.</returns>
internal static DaprJobDetails DeserializeJobResponse(Autogenerated.GetJobResponse response)
{
var schedule = DateTime.TryParse(response.Job.DueTime, out var dueTime)
? DaprJobSchedule.FromDateTime(dueTime)
: new DaprJobSchedule(response.Job.Schedule);
return new DaprJobDetails(schedule)
{
DueTime = !string.IsNullOrWhiteSpace(response.Job.DueTime) ? DateTime.Parse(response.Job.DueTime) : null,
Ttl = !string.IsNullOrWhiteSpace(response.Job.Ttl) ? DateTime.Parse(response.Job.Ttl) : null,
RepeatCount = (int?)response.Job.Repeats ?? 0,
Payload = response.Job.Data?.ToByteArray() ?? null
};
}
/// <summary>
/// Deletes the specified job.
/// </summary>

View File

@ -12,6 +12,8 @@
// ------------------------------------------------------------------------
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors.Client;
@ -175,6 +177,160 @@ namespace Dapr.Actors.Runtime
Assert.Equal(1, activator.DeleteCallCount);
}
[Fact]
public async Task DeserializeTimer_Period_Iso8601_Time()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"0h0m7s10ms\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Every()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@every 15s\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromSeconds(15), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Every2()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@every 3h2m15s\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromHours(3).Add(TimeSpan.FromMinutes(2)).Add(TimeSpan.FromSeconds(15)), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Monthly()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@monthly\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromDays(30), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Weekly()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@weekly\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromDays(7), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Daily()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@daily\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromDays(1), result.Period);
}
[Fact]
public async Task DeserializeTimer_Period_DaprFormat_Hourly()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"period\": \"@hourly\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.FromHours(1), result.Period);
}
[Fact]
public async Task DeserializeTimer_DueTime_DaprFormat_Hourly()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"dueTime\": \"@hourly\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.FromHours(1), result.DueTime);
Assert.Equal(TimeSpan.Zero, result.Period);
}
[Fact]
public async Task DeserializeTimer_DueTime_Iso8601Times()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"dueTime\": \"0h0m7s10ms\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Null(result.Ttl);
Assert.Equal(TimeSpan.Zero, result.Period);
Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.DueTime);
}
[Fact]
public async Task DeserializeTimer_Ttl_DaprFormat_Hourly()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"ttl\": \"@hourly\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.Zero, result.Period);
Assert.Equal(TimeSpan.FromHours(1), result.Ttl);
}
[Fact]
public async Task DeserializeTimer_Ttl_Iso8601Times()
{
const string timerJson = "{\"callback\": \"TimerCallback\", \"ttl\": \"0h0m7s10ms\"}";
await using var stream = new MemoryStream(Encoding.UTF8.GetBytes(timerJson));
var result = await ActorManager.DeserializeAsync(stream);
Assert.Equal("TimerCallback", result.Callback);
Assert.Equal(Array.Empty<byte>(), result.Data);
Assert.Equal(TimeSpan.Zero, result.DueTime);
Assert.Equal(TimeSpan.Zero, result.Period);
Assert.Equal(TimeSpan.FromSeconds(7).Add(TimeSpan.FromMilliseconds(10)), result.Ttl);
}
private interface ITestActor : IActor { }
private class TestActor : Actor, ITestActor, IDisposable

View File

@ -1,5 +1,4 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
@ -30,28 +29,6 @@ namespace Dapr.Client.Test
(ReadOnlyMemory<byte>) Array.Empty<byte>(), keyName, new EncryptionOptions(KeyWrapAlgorithm.Rsa), CancellationToken.None));
}
[Fact]
public async Task EncryptAsync_Stream_VaultResourceName_ArgumentVerifierException()
{
var client = new DaprClientBuilder().Build();
const string vaultResourceName = "";
//Get response and validate
await Assert.ThrowsAsync<ArgumentException>(async () => await client.EncryptAsync(vaultResourceName,
new MemoryStream(), "MyKey", new EncryptionOptions(KeyWrapAlgorithm.Rsa),
CancellationToken.None));
}
[Fact]
public async Task EncryptAsync_Stream_KeyName_ArgumentVerifierException()
{
var client = new DaprClientBuilder().Build();
const string keyName = "";
//Get response and validate
await Assert.ThrowsAsync<ArgumentException>(async () => await client.EncryptAsync("myVault",
(Stream) new MemoryStream(), keyName, new EncryptionOptions(KeyWrapAlgorithm.Rsa),
CancellationToken.None));
}
[Fact]
public async Task DecryptAsync_ByteArray_VaultResourceName_ArgumentVerifierException()
{
@ -71,25 +48,5 @@ namespace Dapr.Client.Test
await Assert.ThrowsAsync<ArgumentException>(async () => await client.DecryptAsync("myVault",
Array.Empty<byte>(), keyName, new DecryptionOptions(), CancellationToken.None));
}
[Fact]
public async Task DecryptAsync_Stream_VaultResourceName_ArgumentVerifierException()
{
var client = new DaprClientBuilder().Build();
const string vaultResourceName = "";
//Get response and validate
await Assert.ThrowsAsync<ArgumentException>(async () => await client.DecryptAsync(vaultResourceName,
new MemoryStream(), "MyKey", new DecryptionOptions(), CancellationToken.None));
}
[Fact]
public async Task DecryptAsync_Stream_KeyName_ArgumentVerifierException()
{
var client = new DaprClientBuilder().Build();
const string keyName = "";
//Get response and validate
await Assert.ThrowsAsync<ArgumentException>(async () => await client.DecryptAsync("myVault",
new MemoryStream(), keyName, new DecryptionOptions(), CancellationToken.None));
}
}
}

View File

@ -13,6 +13,7 @@
using System;
using System.Net.Http;
using Dapr.Client.Autogen.Grpc.v1;
using Dapr.Jobs.Models;
using Moq;
using Xunit;
@ -167,6 +168,21 @@ public sealed class DaprJobsGrpcClientTests
});
#pragma warning restore CS0618 // Type or member is obsolete
}
[Fact]
public void ShouldDeserialize_EveryExpression()
{
const string scheduleText = "@every 1m";
var response = new GetJobResponse { Job = new Job { Name = "test", Schedule = scheduleText } };
var schedule = DaprJobSchedule.FromExpression(scheduleText);
var jobDetails = DaprJobsGrpcClient.DeserializeJobResponse(response);
Assert.Null(jobDetails.Payload);
Assert.Equal(0, jobDetails.RepeatCount);
Assert.Null(jobDetails.Ttl);
Assert.Null(jobDetails.DueTime);
Assert.Equal(jobDetails.Schedule.ExpressionValue, schedule.ExpressionValue);
}
private sealed record TestPayload(string Name, string Color);
}