#906 -Added methods in status API supports for saving and reading binary data (#1116)

* Added methods in status API supports for direct storage and reading of byte arrays #906

Signed-off-by: Divya Perumal <divzi.perumal@gmail.com>
Signed-off-by: Divya Perumal <diperuma@microsoft.com>
This commit is contained in:
Divya Perumal 2024-12-12 02:11:14 +05:30 committed by GitHub
parent ccf2bfdce3
commit 3a930c26d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 1042 additions and 350 deletions

View File

@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -24,7 +24,8 @@ namespace Samples.Client
new StateStoreExample(),
new StateStoreTransactionsExample(),
new StateStoreETagsExample(),
new BulkStateExample()
new BulkStateExample(),
new StateStoreBinaryExample()
};
static async Task<int> Main(string[] args)

View File

@ -0,0 +1,47 @@
using System;
using System.Collections.Generic;
using System.Text;
using Dapr.Client;
using System.Threading.Tasks;
using System.Threading;
using Google.Protobuf;
namespace Samples.Client
{
public class StateStoreBinaryExample : Example
{
private static readonly string stateKeyName = "binarydata";
private static readonly string storeName = "statestore";
public override string DisplayName => "Using the State Store with binary data";
public override async Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();
var state = "Test Binary Data";
// convert variable in to byte array
var stateBytes = Encoding.UTF8.GetBytes(state);
await client.SaveByteStateAsync(storeName, stateKeyName, stateBytes.AsMemory(), cancellationToken: cancellationToken);
Console.WriteLine("Saved State!");
var responseBytes = await client.GetByteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken);
var savedState = Encoding.UTF8.GetString(ByteString.CopyFrom(responseBytes.Span).ToByteArray());
if (savedState == null)
{
Console.WriteLine("State not found in store");
}
else
{
Console.WriteLine($"Got State: {savedState}");
}
await client.DeleteStateAsync(storeName, stateKeyName, cancellationToken: cancellationToken);
Console.WriteLine("Deleted State!");
}
}
}

View File

@ -850,6 +850,80 @@ namespace Dapr.Client
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Saves the provided <paramref name="binaryValue" /> associated with the provided <paramref name="key" /> to the Dapr state
/// store
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="binaryValue">The binary data that will be stored in the state store.</param>
/// <param name="stateOptions">Options for performing save state operation.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task SaveByteStateAsync(
string storeName,
string key,
ReadOnlyMemory<byte> binaryValue,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
///Saves the provided <paramref name="binaryValue" /> associated with the provided <paramref name="key" /> using the
/// <paramref name="etag"/> to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store.
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="binaryValue">The binary data that will be stored in the state store.</param>
/// <param name="etag">An ETag.</param>
/// <param name="stateOptions">Options for performing save state operation.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task<bool> TrySaveByteStateAsync(
string storeName,
string key,
ReadOnlyMemory<byte> binaryValue,
string etag,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the current binary value associated with the <paramref name="key" /> from the Dapr state store.
/// </summary>
/// <param name="storeName">The name of state store to read from.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode <see cref="ConsistencyMode" />.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed.</returns>
public abstract Task<ReadOnlyMemory<byte>> GetByteStateAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the current binary value associated with the <paramref name="key" /> from the Dapr state store and an ETag.
/// </summary>
/// <param name="storeName">The name of the state store.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode <see cref="ConsistencyMode" />.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the state store. The valid metadata keys and values are determined by the type of state store used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed. This wraps the read value and an ETag.</returns>
public abstract Task<(ReadOnlyMemory<byte>, string etag)> GetByteStateAndETagAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Tries to save the state <paramref name="value" /> associated with the provided <paramref name="key" /> using the
/// <paramref name="etag"/> to the Dapr state. State store implementation will allow the update only if the attached ETag matches with the latest ETag in the state store.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,41 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
#nullable enable
using System;
using System.Reflection;
using System.Runtime.Serialization;
namespace Dapr.Client
{
internal static class EnumExtensions
{
/// <summary>
/// Reads the value of an enum out of the attached <see cref="EnumMemberAttribute"/> attribute.
/// </summary>
/// <typeparam name="T">The enum.</typeparam>
/// <param name="value">The value of the enum to pull the value for.</param>
/// <returns></returns>
public static string GetValueFromEnumMember<T>(this T value) where T : Enum
{
ArgumentNullException.ThrowIfNull(value, nameof(value));
var memberInfo = typeof(T).GetMember(value.ToString(), BindingFlags.Static | BindingFlags.Public | BindingFlags.DeclaredOnly);
if (memberInfo.Length <= 0)
return value.ToString();
var attributes = memberInfo[0].GetCustomAttributes(typeof(EnumMemberAttribute), false);
return (attributes.Length > 0 ? ((EnumMemberAttribute)attributes[0]).Value : value.ToString()) ?? value.ToString();
}
}
}

View File

@ -199,8 +199,8 @@ public abstract class DaprGenericClientBuilder<TClientBuilder> where TClientBuil
// Set correct switch to make secure gRPC service calls. This switch must be set before creating the GrpcChannel.
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
}
var httpEndpoint = new Uri(this.HttpEndpoint);
var httpEndpoint = new Uri(this.HttpEndpoint);
if (httpEndpoint.Scheme != "http" && httpEndpoint.Scheme != "https")
{
throw new InvalidOperationException("The HTTP endpoint must use http or https.");

View File

@ -47,7 +47,7 @@ internal sealed class DaprJobsGrpcClient : DaprJobsClient
/// Property exposed for testing purposes.
/// </remarks>
internal Autogenerated.Dapr.DaprClient Client { get; }
internal DaprJobsGrpcClient(
Autogenerated.Dapr.DaprClient innerClient,
HttpClient httpClient,

View File

@ -30,6 +30,7 @@ public static class DaprJobsServiceCollectionExtensions
/// <param name="lifetime">The lifetime of the registered services.</param>
/// <returns></returns>
public static IServiceCollection AddDaprJobsClient(this IServiceCollection serviceCollection, Action<IServiceProvider, DaprJobsClientBuilder>? configure = null, ServiceLifetime lifetime = ServiceLifetime.Singleton)
{
ArgumentNullException.ThrowIfNull(serviceCollection, nameof(serviceCollection));
@ -62,7 +63,7 @@ public static class DaprJobsServiceCollectionExtensions
serviceCollection.TryAddSingleton(registration);
break;
}
return serviceCollection;
}
}

View File

@ -1225,6 +1225,7 @@ message Job {
// Systemd timer style cron accepts 6 fields:
// seconds | minutes | hours | day of month | month | day of week
// 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-6/sun-sat
//
// "0 30 * * * *" - every hour on the half hour
// "0 15 3 * * *" - every day at 03:15
@ -1344,4 +1345,4 @@ message ConversationResponse {
// An array of results.
repeated ConversationResult outputs = 2;
}
}

View File

@ -0,0 +1,38 @@
using System.Runtime.Serialization;
using Xunit;
namespace Dapr.Client.Test.Extensions
{
public class EnumExtensionTest
{
[Fact]
public void GetValueFromEnumMember_RedResolvesAsExpected()
{
var value = TestEnum.Red.GetValueFromEnumMember();
Assert.Equal("red", value);
}
[Fact]
public void GetValueFromEnumMember_YellowResolvesAsExpected()
{
var value = TestEnum.Yellow.GetValueFromEnumMember();
Assert.Equal("YELLOW", value);
}
[Fact]
public void GetValueFromEnumMember_BlueResolvesAsExpected()
{
var value = TestEnum.Blue.GetValueFromEnumMember();
Assert.Equal("Blue", value);
}
}
public enum TestEnum
{
[EnumMember(Value = "red")]
Red,
[EnumMember(Value = "YELLOW")]
Yellow,
Blue
}
}

View File

@ -11,24 +11,26 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Net;
using System.Text.Json;
using System.Threading.Tasks;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
using FluentAssertions;
using Google.Protobuf;
using Grpc.Core;
using Moq;
using StateConsistency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConsistency;
using StateConcurrency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConcurrency;
using Xunit;
using System.Threading;
using System.Net.Http;
using System.Text;
namespace Dapr.Client.Test
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Text.Json;
using System.Threading.Tasks;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
using FluentAssertions;
using Google.Protobuf;
using Grpc.Core;
using Moq;
using StateConsistency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConsistency;
using StateConcurrency = Dapr.Client.Autogen.Grpc.v1.StateOptions.Types.StateConcurrency;
using Xunit;
using System.Threading;
using System.Net.Http;
public class StateApiTest
{
[Fact]
@ -36,10 +38,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test"));
request.Dismiss();
@ -58,14 +57,11 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null);
});
const string key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null));
// Create Response & Respond
var data = "value";
const string data = "value";
var envelope = MakeGetBulkStateResponse(key, data);
var state = await request.CompleteWithMessageAsync(envelope);
@ -78,11 +74,8 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetBulkStateAsync<Widget>("testStore", new List<string>() {key}, null);
});
const string key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync<Widget>("testStore", new List<string>() {key}, null));
// Create Response & Respond
const string size = "small";
@ -102,11 +95,8 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null);
});
const string key = "test";
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null));
// Create Response & Respond
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
@ -121,15 +111,12 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var key = "test";
const string key = "test";
var metadata = new Dictionary<string, string>
{
{ "partitionKey", "mypartition" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null, metadata: metadata);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetBulkStateAsync("testStore", new List<string>() { key }, null, metadata: metadata));
request.Dismiss();
@ -144,10 +131,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAndETagAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync<Widget>("testStore", "test"));
// Create Response & Respond
var data = new Widget() { Size = "small", Color = "yellow", };
@ -165,10 +149,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAndETagAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync<Widget>("testStore", "test"));
// Create Response & Respond
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
@ -183,10 +164,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAndETagAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAndETagAsync<Widget>("testStore", "test"));
// Create Response & Respond
var envelope = new Autogenerated.GetStateResponse()
@ -206,10 +184,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test", ConsistencyMode.Eventual);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test", ConsistencyMode.Eventual));
// Create Response & Respond
var envelope = MakeGetStateResponse<Widget>(null);
@ -226,10 +201,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test", consistencyMode);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test", consistencyMode));
// Get Request & Validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.GetStateRequest>();
@ -253,10 +225,7 @@ namespace Dapr.Client.Test
{
{ "partitionKey", "mypartition" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test", metadata: metadata);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test", metadata: metadata));
// Get Request & Validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.GetStateRequest>();
@ -276,10 +245,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test"));
// Create Response & Respond
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
@ -294,10 +260,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateAsync<Widget>("testStore", "test"));
// Create Response & Respond
var stateResponse = new Autogenerated.GetStateResponse()
@ -467,10 +430,10 @@ namespace Dapr.Client.Test
};
var state1 = new StateTransactionRequest("stateKey1", JsonSerializer.SerializeToUtf8Bytes(stateValue1), StateOperationType.Upsert, "testEtag", metadata1, options1);
var stateValue2 = 100;
const int stateValue2 = 100;
var state2 = new StateTransactionRequest("stateKey2", JsonSerializer.SerializeToUtf8Bytes(stateValue2), StateOperationType.Delete);
var stateValue3 = "teststring";
const string stateValue3 = "teststring";
var state3 = new StateTransactionRequest("stateKey3", JsonSerializer.SerializeToUtf8Bytes(stateValue3), StateOperationType.Upsert);
var states = new List<StateTransactionRequest>
@ -619,10 +582,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateEntryAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync<Widget>("testStore", "test"));
// Create Response & Respond
var data = new Widget() { Size = "small", Color = "yellow", };
@ -639,10 +599,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateEntryAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync<Widget>("testStore", "test"));
// Create Response & Respond
var envelope = MakeGetStateResponse<Widget>(null);
@ -657,10 +614,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateEntryAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync<Widget>("testStore", "test"));
// Create Response & Respond
var data = new Widget() { Size = "small", Color = "yellow", };
@ -699,10 +653,7 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.GetStateEntryAsync<Widget>("testStore", "test");
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetStateEntryAsync<Widget>("testStore", "test"));
// Create Response & Respond
var data = new Widget() { Size = "small", Color = "yellow", };
@ -805,10 +756,7 @@ namespace Dapr.Client.Test
{ "key1", "value1" },
{ "key2", "value2" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.TrySaveStateAsync("testStore", "test", widget, "Test_Etag", stateOptions, metadata);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TrySaveStateAsync("testStore", "test", widget, "Test_Etag", stateOptions, metadata));
request.Dismiss();
@ -1021,10 +969,7 @@ namespace Dapr.Client.Test
Consistency = consistencyMode
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.TryDeleteStateAsync("testStore", "test", "Test_Etag", stateOptions);
});
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TryDeleteStateAsync("testStore", "test", "Test_Etag", stateOptions));
request.Dismiss();
@ -1042,8 +987,8 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var key = "test";
var etag = "etag";
const string key = "test";
const string etag = "etag";
var metadata = new Dictionary<string, string>
{
{ "partitionKey", "mypartition" }
@ -1069,11 +1014,8 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>());
});
const string queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>()));
// Validate request.
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.QueryStateRequest>();
@ -1099,11 +1041,8 @@ namespace Dapr.Client.Test
{
await using var client = TestClient.CreateForDaprClient();
var queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>());
});
const string queryJson = "{'query':{'filter':{ 'EQ': {'value':'test'}}}}";
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.QueryStateAsync<Widget>("testStore", queryJson, new Dictionary<string, string>()));
// Validate request.
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.QueryStateRequest>();
@ -1175,14 +1114,371 @@ namespace Dapr.Client.Test
private Autogenerated.QueryStateItem MakeQueryStateItem<T>(string key, T data, string etag = default, string error = default)
{
var wireItem = new Autogenerated.QueryStateItem();
wireItem.Key = key;
wireItem.Data = ByteString.CopyFromUtf8(JsonSerializer.Serialize(data));
wireItem.Etag = etag ?? string.Empty;
wireItem.Error = error ?? string.Empty;
var wireItem = new Autogenerated.QueryStateItem
{
Key = key, Data = ByteString.CopyFromUtf8(JsonSerializer.Serialize(data)), Etag = etag ?? string.Empty,
Error = error ?? string.Empty
};
return wireItem;
}
[Theory]
[InlineData(ConsistencyMode.Eventual, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyFirstWrite)]
[InlineData(ConsistencyMode.Eventual, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyLastWrite)]
[InlineData(ConsistencyMode.Strong, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyFirstWrite)]
[InlineData(ConsistencyMode.Strong, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyLastWrite)]
public async Task SaveByteStateAsync_ValidateOptions(
ConsistencyMode consistencyMode,
ConcurrencyMode concurrencyMode,
StateConsistency expectedConsistency,
StateConcurrency expectedConcurrency)
{
await using var client = TestClient.CreateForDaprClient();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var stateOptions = new StateOptions
{
Concurrency = concurrencyMode,
Consistency = consistencyMode
};
var metadata = new Dictionary<string, string>
{
{ "key1", "value1" },
{ "key2", "value2" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.SaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), stateOptions, metadata);
});
request.Dismiss();
// Get Request and validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.SaveStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.States.Count.Should().Be(1);
var state = envelope.States[0];
state.Key.Should().Be("test");
state.Metadata.Count.Should().Be(2);
state.Metadata.Keys.Contains("key1").Should().BeTrue();
state.Metadata.Keys.Contains("key2").Should().BeTrue();
state.Metadata["key1"].Should().Be("value1");
state.Metadata["key2"].Should().Be("value2");
state.Options.Concurrency.Should().Be(expectedConcurrency);
state.Options.Consistency.Should().Be(expectedConsistency);
var stateBinaryData = state.Value.ToStringUtf8();
stateBinaryData.Should().Be(data);
}
[Fact]
public async Task SaveByteStateAsync_CanSaveState()
{
await using var client = TestClient.CreateForDaprClient();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.SaveByteStateAsync("testStore", "test", stateBytes.AsMemory());
});
request.Dismiss();
// Get Request and validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.SaveStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.States.Count.Should().Be(1);
var state = envelope.States[0];
state.Key.Should().Be("test");
var stateBinaryData = state.Value.ToStringUtf8();
stateBinaryData.Should().Be(data);
}
[Fact]
public async Task SaveByteStateAsync_CanClearState()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.SaveByteStateAsync("testStore", "test", null);
});
request.Dismiss();
// Get Request and validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.SaveStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.States.Count.Should().Be(1);
var state = envelope.States[0];
state.Key.Should().Be("test");
state.Value.Should().Equal(ByteString.Empty);
}
[Fact]
public async Task SaveByteStateAsync_WithCancelledToken()
{
await using var client = TestClient.CreateForDaprClient();
var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
await client.InnerClient.SaveByteStateAsync("testStore", "test", null, cancellationToken: cts.Token);
});
}
[Theory]
[InlineData(ConsistencyMode.Eventual, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyFirstWrite)]
[InlineData(ConsistencyMode.Eventual, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyEventual, StateConcurrency.ConcurrencyLastWrite)]
[InlineData(ConsistencyMode.Strong, ConcurrencyMode.FirstWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyFirstWrite)]
[InlineData(ConsistencyMode.Strong, ConcurrencyMode.LastWrite, StateConsistency.ConsistencyStrong, StateConcurrency.ConcurrencyLastWrite)]
public async Task TrySaveByteStateAsync_ValidateOptions(
ConsistencyMode consistencyMode,
ConcurrencyMode concurrencyMode,
StateConsistency expectedConsistency,
StateConcurrency expectedConcurrency)
{
await using var client = TestClient.CreateForDaprClient();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var stateOptions = new StateOptions
{
Concurrency = concurrencyMode,
Consistency = consistencyMode
};
var metadata = new Dictionary<string, string>
{
{ "key1", "value1" },
{ "key2", "value2" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.TrySaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), "Test_Etag", stateOptions, metadata));
request.Dismiss();
// Get Request and validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.SaveStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.States.Count.Should().Be(1);
var state = envelope.States[0];
state.Etag.Value.Should().Be("Test_Etag");
state.Metadata.Count.Should().Be(2);
state.Metadata.Keys.Contains("key1").Should().BeTrue();
state.Metadata.Keys.Contains("key2").Should().BeTrue();
state.Metadata["key1"].Should().Be("value1");
state.Metadata["key2"].Should().Be("value2");
state.Options.Concurrency.Should().Be(expectedConcurrency);
state.Options.Consistency.Should().Be(expectedConsistency);
var stateBinaryData = state.Value.ToStringUtf8();
stateBinaryData.Should().Be(data);
}
[Fact]
public async Task TrySaveByteStateAsync_ValidateNonETagErrorThrowsException()
{
var client = new MockClient();
var response = client.CallStateApi<string>()
.Build();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var rpcException = new RpcException(new Status(StatusCode.Internal, "Network Error"));
// Setup the mock client to throw an Rpc Exception with the expected details info
client.Mock
.Setup(m => m.SaveStateAsync(It.IsAny<Autogen.Grpc.v1.SaveStateRequest>(), It.IsAny<CallOptions>()))
.Throws(rpcException);
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
{
await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), "someETag");
});
Assert.Same(rpcException, ex.InnerException);
}
[Fact]
public async Task TrySaveByteStateAsync_ValidateETagRelatedExceptionReturnsFalse()
{
var client = new MockClient();
var response = client.CallStateApi<string>()
.Build();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var rpcException = new RpcException(new Status(StatusCode.Aborted, $"failed saving state in state store testStore"));
// Setup the mock client to throw an Rpc Exception with the expected details info
client.Mock
.Setup(m => m.SaveStateAsync(It.IsAny<Autogen.Grpc.v1.SaveStateRequest>(), It.IsAny<CallOptions>()))
.Throws(rpcException);
var operationResult = await client.DaprClient.TrySaveByteStateAsync("testStore", "test", stateBytes.AsMemory(), "invalidETag");
Assert.False(operationResult);
}
[Fact]
public async Task TrySaveByteStateAsync_NullEtagThrowsArgumentException()
{
var client = new MockClient();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var response = client.CallStateApi<string>()
.Build();
await FluentActions.Awaiting(async () => await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), null))
.Should().ThrowAsync<ArgumentException>();
}
[Fact]
public async Task TrySaveByteStateAsync_EmptyEtagDoesNotThrow()
{
var client = new MockClient();
const string data = "Test binary data";
var stateBytes = Encoding.UTF8.GetBytes(data);
var response = client.CallStateApi<Google.Protobuf.WellKnownTypes.Empty>()
.Build();
// Setup the mock client to return success
client.Mock
.Setup(m => m.SaveStateAsync(It.IsAny<Autogen.Grpc.v1.SaveStateRequest>(), It.IsAny<CallOptions>()))
.Returns(response);
var result = await client.DaprClient.TrySaveByteStateAsync("test", "test", stateBytes.AsMemory(), "");
Assert.True(result);
}
[Fact]
public async Task GetByteStateAsync_CanReadEmptyState_ReturnsDefault()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test", ConsistencyMode.Eventual));
// Create Response & Respond to request
var envelope = MakeGetByteStateResponse(null);
var state = await request.CompleteWithMessageAsync(envelope);
// Get response and validate
state.ToArray().Should().BeNullOrEmpty();
}
[Theory]
[InlineData(ConsistencyMode.Eventual, StateConsistency.ConsistencyEventual)]
[InlineData(ConsistencyMode.Strong, StateConsistency.ConsistencyStrong)]
public async Task GetByteStateAsync_ValidateRequest(ConsistencyMode consistencyMode, StateConsistency expectedConsistencyMode)
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test", consistencyMode));
// Get Request & Validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.GetStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.Key.Should().Be("test");
envelope.Consistency.Should().Be(expectedConsistencyMode);
var binaryData = Encoding.ASCII.GetBytes("test data");
// Create Response & Respond
var state = await request.CompleteWithMessageAsync(MakeGetByteStateResponse(binaryData.AsMemory()));
var stateStr = ByteString.CopyFrom(state.Span).ToByteArray();
// Get response and validate
stateStr.Should().BeEquivalentTo(binaryData);
}
[Fact]
public async Task GetByteStateAndEtagAsync_ValidateRequest()
{
await using var client = TestClient.CreateForDaprClient();
var metadata = new Dictionary<string, string>
{
{ "partitionKey", "mypartition" }
};
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test", metadata: metadata));
// Get Request & Validate
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.GetStateRequest>();
envelope.StoreName.Should().Be("testStore");
envelope.Key.Should().Be("test");
envelope.Metadata.Should().BeEquivalentTo(metadata);
var binaryData = Encoding.ASCII.GetBytes("test data");
// Create Response & Respond
var (state, etag) = await request.CompleteWithMessageAsync((MakeGetByteStateResponse(binaryData.AsMemory())));
var stateStr = ByteString.CopyFrom(state.Span).ToByteArray();
// Get response and validate
stateStr.Should().BeEquivalentTo(binaryData);
}
[Fact]
public async Task GetByteStateAsync_WrapsRpcException()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAsync("testStore", "test"));
// Create Response & Respond
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
{
await request.CompleteAsync(new HttpResponseMessage(HttpStatusCode.NotAcceptable));
});
Assert.IsType<RpcException>(ex.InnerException);
}
[Fact]
public async Task GetByteStateAndEtagAsync_CanReadState()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test"));
// Create Response & Respond
var binaryData = Encoding.ASCII.GetBytes("test data");
var envelope = MakeGetByteStateResponse(binaryData.AsMemory(), "Test_Etag");
var (state, etag) = await request.CompleteWithMessageAsync(envelope);
var stateStr = ByteString.CopyFrom(state.Span).ToByteArray();
// Get response and validate
stateStr.Should().BeEquivalentTo(binaryData);
etag.Should().Be("Test_Etag");
}
[Fact]
public async Task GetByteStateAndETagAsync_WrapsRpcException()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient => await daprClient.GetByteStateAndETagAsync("testStore", "test"));
// Create Response & Respond
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
{
await request.CompleteAsync(new HttpResponseMessage(HttpStatusCode.NotAcceptable));
});
Assert.IsType<RpcException>(ex.InnerException);
}
private Autogenerated.GetStateResponse MakeGetByteStateResponse(ReadOnlyMemory<byte> state, string etag = null)
{
var response = new Autogenerated.GetStateResponse();
// convert to byte string if state is not null
if (!state.Span.IsEmpty)
{
response.Data = ByteString.CopyFrom(state.Span);
}
if (etag != null)
{
response.Etag = etag;
}
return response;
}
private class Widget
{
public string Size { get; set; }

View File

@ -25,6 +25,7 @@ namespace Dapr.Jobs.Test.Extensions;
public class DaprJobsServiceCollectionExtensionsTest
{
[Fact]
public void AddDaprJobsClient_FromIConfiguration()
{
const string apiToken = "abc123";
@ -50,6 +51,7 @@ public class DaprJobsServiceCollectionExtensionsTest
{
var services = new ServiceCollection();
var clientBuilder = new Action<IServiceProvider, DaprJobsClientBuilder>((sp, builder) =>
{
builder.UseDaprApiToken("abc");