dotnet-sdk/src/Dapr.Client/DaprClientGrpc.cs

1149 lines
43 KiB
C#

// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.Client
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Google.Protobuf;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
using System.Net.Http.Json;
using Google.Protobuf.WellKnownTypes;
using Grpc.Net.Client;
/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
internal class DaprClientGrpc : DaprClient
{
private const string AppIdKey = "appId";
private const string MethodNameKey = "methodName";
private readonly Uri httpEndpoint;
private readonly HttpClient httpClient;
private readonly JsonSerializerOptions jsonSerializerOptions;
private readonly GrpcChannel channel;
private readonly Autogenerated.Dapr.DaprClient client;
private readonly KeyValuePair<string, string>? apiTokenHeader;
// property exposed for testing purposes
internal Autogenerated.Dapr.DaprClient Client => client;
public override JsonSerializerOptions JsonSerializerOptions => jsonSerializerOptions;
internal DaprClientGrpc(
GrpcChannel channel,
Autogenerated.Dapr.DaprClient inner,
HttpClient httpClient,
Uri httpEndpoint,
JsonSerializerOptions jsonSerializerOptions,
KeyValuePair<string, string>? apiTokenHeader)
{
this.channel = channel;
this.client = inner;
this.httpClient = httpClient;
this.httpEndpoint = httpEndpoint;
this.jsonSerializerOptions = jsonSerializerOptions;
this.apiTokenHeader = apiTokenHeader;
}
#region Publish Apis
/// <inheritdoc/>
public override Task PublishEventAsync<TData>(
string pubsubName,
string topicName,
TData data,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(data, nameof(data));
var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
return MakePublishRequest(pubsubName, topicName, content, null, cancellationToken);
}
public override Task PublishEventAsync<TData>(
string pubsubName,
string topicName,
TData data,
Dictionary<string, string> metadata,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(data, nameof(data));
ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));
var content = TypeConverters.ToJsonByteString(data, this.JsonSerializerOptions);
return MakePublishRequest(pubsubName, topicName, content, metadata, cancellationToken);
}
/// <inheritdoc/>
public override Task PublishEventAsync(
string pubsubName,
string topicName,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
return MakePublishRequest(pubsubName, topicName, null, null, cancellationToken);
}
public override Task PublishEventAsync(
string pubsubName,
string topicName,
Dictionary<string, string> metadata,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(metadata, nameof(metadata));
return MakePublishRequest(pubsubName, topicName, null, metadata, cancellationToken);
}
private async Task MakePublishRequest(
string pubsubName,
string topicName,
ByteString content,
Dictionary<string, string> metadata,
CancellationToken cancellationToken)
{
// Create PublishEventEnvelope
var envelope = new Autogenerated.PublishEventRequest()
{
PubsubName = pubsubName,
Topic = topicName,
};
if (content != null)
{
envelope.Data = content;
envelope.DataContentType = Constants.ContentTypeApplicationJson;
}
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
await client.PublishEventAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("Publish operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
#endregion
#region InvokeBinding Apis
/// <inheritdoc/>
public override async Task InvokeBindingAsync<TRequest>(
string bindingName,
string operation,
TRequest data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(bindingName, nameof(bindingName));
ArgumentVerifier.ThrowIfNullOrEmpty(operation, nameof(operation));
var bytes = TypeConverters.ToJsonByteString<TRequest>(data, this.jsonSerializerOptions);
_ = await MakeInvokeBindingRequestAsync(bindingName, operation, bytes, metadata, cancellationToken);
}
/// <inheritdoc/>
public override async Task<TResponse> InvokeBindingAsync<TRequest, TResponse>(
string bindingName,
string operation,
TRequest data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(bindingName, nameof(bindingName));
ArgumentVerifier.ThrowIfNullOrEmpty(operation, nameof(operation));
var bytes = TypeConverters.ToJsonByteString<TRequest>(data, this.jsonSerializerOptions);
var response = await MakeInvokeBindingRequestAsync(bindingName, operation, bytes, metadata, cancellationToken);
try
{
return TypeConverters.FromJsonByteString<TResponse>(response.Data, this.JsonSerializerOptions);
}
catch (JsonException ex)
{
throw new DaprException("Binding operation failed: the response payload could not be deserialized. See InnerException for details.", ex);
}
}
public override async Task<BindingResponse> InvokeBindingAsync(BindingRequest request, CancellationToken cancellationToken = default)
{
var bytes = ByteString.CopyFrom(request.Data.Span);
var response = await this.MakeInvokeBindingRequestAsync(request.BindingName, request.Operation, bytes, request.Metadata, cancellationToken);
return new BindingResponse(request, response.Data.Memory, response.Metadata);
}
private async Task<Autogenerated.InvokeBindingResponse> MakeInvokeBindingRequestAsync(
string name,
string operation,
ByteString data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.InvokeBindingRequest()
{
Name = name,
Operation = operation
};
if (data != null)
{
envelope.Data = data;
}
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
return await client.InvokeBindingAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("Binding operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
#endregion
#region InvokeMethod Apis
public override HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName)
{
ArgumentVerifier.ThrowIfNull(httpMethod, nameof(httpMethod));
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNull(methodName, nameof(methodName));
// Note about this, it's possible to construct invalid stuff using path navigation operators
// like `../..`. But the principle of garbage in -> garbage out holds.
//
// This approach avoids some common pitfalls that could lead to undesired encoding.
var path = $"/v1.0/invoke/{appId}/method/{methodName.TrimStart('/')}";
var request = new HttpRequestMessage(httpMethod, new Uri(this.httpEndpoint, path))
{
Properties =
{
{ AppIdKey, appId },
{ MethodNameKey, methodName },
}
};
if (this.apiTokenHeader is not null)
{
request.Headers.TryAddWithoutValidation(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
}
return request;
}
public override HttpRequestMessage CreateInvokeMethodRequest<TRequest>(HttpMethod httpMethod, string appId, string methodName, TRequest data)
{
ArgumentVerifier.ThrowIfNull(httpMethod, nameof(httpMethod));
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNull(methodName, nameof(methodName));
var request = CreateInvokeMethodRequest(httpMethod, appId, methodName);
request.Content = JsonContent.Create<TRequest>(data, options: this.JsonSerializerOptions);
return request;
}
public override async Task<HttpResponseMessage> InvokeMethodWithResponseAsync(HttpRequestMessage request, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNull(request, nameof(request));
if (!this.httpEndpoint.IsBaseOf(request.RequestUri))
{
throw new InvalidOperationException("The provided request URI is not a Dapr service invocation URI.");
}
// Note: we intentionally DO NOT validate the status code here.
// This method allows you to 'invoke' without exceptions on non-2xx.
try
{
return await this.httpClient.SendAsync(request, cancellationToken);
}
catch (HttpRequestException ex)
{
// Our code path for creating requests places these keys in the request properties. We don't want to fail
// if they are not present.
request.Properties.TryGetValue(AppIdKey, out var appId);
request.Properties.TryGetValue(MethodNameKey, out var methodName);
throw new InvocationException(
appId: appId as string,
methodName: methodName as string,
innerException: ex,
response: null);
}
}
public async override Task InvokeMethodAsync(HttpRequestMessage request, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNull(request, nameof(request));
var response = await InvokeMethodWithResponseAsync(request, cancellationToken);
try
{
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
// Our code path for creating requests places these keys in the request properties. We don't want to fail
// if they are not present.
request.Properties.TryGetValue(AppIdKey, out var appId);
request.Properties.TryGetValue(MethodNameKey, out var methodName);
throw new InvocationException(
appId: appId as string,
methodName: methodName as string,
innerException: ex,
response: response);
}
}
public async override Task<TResponse> InvokeMethodAsync<TResponse>(HttpRequestMessage request, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNull(request, nameof(request));
var response = await InvokeMethodWithResponseAsync(request, cancellationToken);
try
{
response.EnsureSuccessStatusCode();
}
catch (HttpRequestException ex)
{
// Our code path for creating requests places these keys in the request properties. We don't want to fail
// if they are not present.
request.Properties.TryGetValue(AppIdKey, out var appId);
request.Properties.TryGetValue(MethodNameKey, out var methodName);
throw new InvocationException(
appId: appId as string,
methodName: methodName as string,
innerException: ex,
response: response);
}
try
{
return await response.Content.ReadFromJsonAsync<TResponse>(this.jsonSerializerOptions, cancellationToken);
}
catch (HttpRequestException ex)
{
// Our code path for creating requests places these keys in the request properties. We don't want to fail
// if they are not present.
request.Properties.TryGetValue(AppIdKey, out var appId);
request.Properties.TryGetValue(MethodNameKey, out var methodName);
throw new InvocationException(
appId: appId as string,
methodName: methodName as string,
innerException: ex,
response: response);
}
catch (JsonException ex)
{
request.Properties.TryGetValue(AppIdKey, out var appId);
request.Properties.TryGetValue(MethodNameKey, out var methodName);
throw new InvocationException(
appId: appId as string,
methodName: methodName as string,
innerException: ex,
response: response);
}
}
public override async Task InvokeMethodGrpcAsync(string appId, string methodName, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));
var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName,
},
};
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
_ = await this.Client.InvokeServiceAsync(envelope, options);
}
catch (RpcException ex)
{
throw new InvocationException(appId, methodName, ex);
}
}
public override async Task InvokeMethodGrpcAsync<TRequest>(string appId, string methodName, TRequest data, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));
var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName,
ContentType = Constants.ContentTypeApplicationGrpc,
Data = Any.Pack(data),
},
};
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
_ = await this.Client.InvokeServiceAsync(envelope, options);
}
catch (RpcException ex)
{
throw new InvocationException(appId, methodName, ex);
}
}
public override async Task<TResponse> InvokeMethodGrpcAsync<TResponse>(string appId, string methodName, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));
var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName,
},
};
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
var response = await this.Client.InvokeServiceAsync(envelope, options);
return response.Data.Unpack<TResponse>();
}
catch (RpcException ex)
{
throw new InvocationException(appId, methodName, ex);
}
}
public override async Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>(string appId, string methodName, TRequest data, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(appId, nameof(appId));
ArgumentVerifier.ThrowIfNullOrEmpty(methodName, nameof(methodName));
var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName,
ContentType = Constants.ContentTypeApplicationGrpc,
Data = Any.Pack(data),
},
};
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
var response = await this.Client.InvokeServiceAsync(envelope, options);
return response.Data.Unpack<TResponse>();
}
catch (RpcException ex)
{
throw new InvocationException(appId, methodName, ex);
}
}
#endregion
#region State Apis
public override async Task<IReadOnlyList<BulkStateItem>> GetBulkStateAsync(string storeName, IReadOnlyList<string> keys, int? parallelism, IReadOnlyDictionary<string, string> metadata = default, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
if (keys.Count == 0)
throw new ArgumentException("keys do not contain any elements");
var envelope = new Autogenerated.GetBulkStateRequest()
{
StoreName = storeName,
Parallelism = parallelism ?? default
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
envelope.Keys.AddRange(keys);
var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetBulkStateResponse response;
try
{
response = await client.GetBulkStateAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
var bulkResponse = new List<BulkStateItem>();
foreach (var item in response.Items)
{
bulkResponse.Add(new BulkStateItem(item.Key, item.Data.ToStringUtf8(), item.Etag));
}
return bulkResponse;
}
/// <inheritdoc/>
public override async Task<TValue> GetStateAsync<TValue>(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
var envelope = new Autogenerated.GetStateRequest()
{
StoreName = storeName,
Key = key,
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (consistencyMode != null)
{
envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
}
var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetStateResponse response;
try
{
response = await client.GetStateAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
try
{
return TypeConverters.FromJsonByteString<TValue>(response.Data, this.JsonSerializerOptions);
}
catch (JsonException ex)
{
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
}
}
/// <inheritdoc />
public override async Task DeleteBulkStateAsync(string storeName, IReadOnlyList<BulkDeleteStateItem> items, CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.DeleteBulkStateRequest()
{
StoreName = storeName,
};
foreach (var item in items)
{
var stateItem = new Autogenerated.StateItem()
{
Key = item.Key,
};
if (item.ETag != null)
{
stateItem.Etag = new Autogenerated.Etag() { Value = item.ETag };
}
if (item.Metadata != null)
{
foreach (var kvp in item.Metadata)
{
stateItem.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (item.StateOptions != null)
{
stateItem.Options = ToAutoGeneratedStateOptions(item.StateOptions);
}
envelope.States.Add(stateItem);
}
try
{
await this.Client.DeleteBulkStateAsync(envelope, cancellationToken: cancellationToken);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
/// <inheritdoc/>
public override async Task<(TValue value, string etag)> GetStateAndETagAsync<TValue>(
string storeName,
string key,
ConsistencyMode? consistencyMode = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
var envelope = new Autogenerated.GetStateRequest()
{
StoreName = storeName,
Key = key
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (consistencyMode != null)
{
envelope.Consistency = GetStateConsistencyForConsistencyMode(consistencyMode.Value);
}
var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetStateResponse response;
try
{
response = await client.GetStateAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
try
{
return (TypeConverters.FromJsonByteString<TValue>(response.Data, this.JsonSerializerOptions), response.Etag);
}
catch (JsonException ex)
{
throw new DaprException("State operation failed: the state payload could not be deserialized. See InnerException for details.", ex);
}
}
/// <inheritdoc/>
public override async Task SaveStateAsync<TValue>(
string storeName,
string key,
TValue value,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
_ = await this.MakeSaveStateCallAsync(
storeName,
key,
value,
etag: null,
stateOptions,
metadata,
cancellationToken);
}
/// <inheritdoc/>
public override async Task<bool> TrySaveStateAsync<TValue>(
string storeName,
string key,
TValue value,
string etag,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
// Not all state stores treat empty etag as invalid. Therefore, we will not verify an empty etag and
// rely on bubbling up the error if any from Dapr runtime
ArgumentVerifier.ThrowIfNull(etag, nameof(etag));
return await this.MakeSaveStateCallAsync(storeName, key, value, etag, stateOptions, metadata, cancellationToken);
}
private async Task<bool> MakeSaveStateCallAsync<TValue>(
string storeName,
string key,
TValue value,
string etag = default,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.SaveStateRequest()
{
StoreName = storeName,
};
var stateItem = new Autogenerated.StateItem()
{
Key = key,
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
stateItem.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (etag != null)
{
stateItem.Etag = new Autogenerated.Etag() { Value = etag };
}
if (stateOptions != null)
{
stateItem.Options = ToAutoGeneratedStateOptions(stateOptions);
}
if (value != null)
{
stateItem.Value = TypeConverters.ToJsonByteString(value, this.jsonSerializerOptions);
}
envelope.States.Add(stateItem);
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
await client.SaveStateAsync(envelope, options);
return true;
}
catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
{
// This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
// the right status code at first, but check the docs, it fits this use-case.
//
// When an ETag is used we surface this though the Try... pattern
return false;
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
/// <inheritdoc/>
public override async Task ExecuteStateTransactionAsync(
string storeName,
IReadOnlyList<StateTransactionRequest> operations,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNull(operations, nameof(operations));
if (operations.Count == 0)
{
throw new ArgumentException($"{nameof(operations)} does not contain any elements");
}
await this.MakeExecuteStateTransactionCallAsync(
storeName,
operations,
metadata,
cancellationToken);
}
private async Task MakeExecuteStateTransactionCallAsync(
string storeName,
IReadOnlyList<StateTransactionRequest> states,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.ExecuteStateTransactionRequest()
{
StoreName = storeName,
};
foreach (var state in states)
{
var stateOperation = new Autogenerated.TransactionalStateOperation
{
OperationType = state.OperationType.ToString().ToLower(),
Request = ToAutogeneratedStateItem(state)
};
envelope.Operations.Add(stateOperation);
}
// Add metadata that applies to all operations if specified
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
await client.ExecuteStateTransactionAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
private Autogenerated.StateItem ToAutogeneratedStateItem(StateTransactionRequest state)
{
var stateOperation = new Autogenerated.StateItem
{
Key = state.Key
};
if (state.Value != null)
{
stateOperation.Value = ByteString.CopyFrom(state.Value);
}
if (state.ETag != null)
{
stateOperation.Etag = new Autogenerated.Etag() { Value = state.ETag };
}
if (state.Metadata != null)
{
foreach (var kvp in state.Metadata)
{
stateOperation.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (state.Options != null)
{
stateOperation.Options = ToAutoGeneratedStateOptions(state.Options);
}
return stateOperation;
}
/// <inheritdoc/>
public override async Task DeleteStateAsync(
string storeName,
string key,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
_ = await this.MakeDeleteStateCallAsync(
storeName,
key,
etag: null,
stateOptions,
metadata,
cancellationToken);
}
/// <inheritdoc/>
public override async Task<bool> TryDeleteStateAsync(
string storeName,
string key,
string etag,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
// Not all state stores treat empty etag as invalid. Therefore, we will not verify an empty etag and
// rely on bubbling up the error if any from Dapr runtime
ArgumentVerifier.ThrowIfNull(etag, nameof(etag));
return await this.MakeDeleteStateCallAsync(storeName, key, etag, stateOptions, metadata, cancellationToken);
}
private async Task<bool> MakeDeleteStateCallAsync(
string storeName,
string key,
string etag = default,
StateOptions stateOptions = default,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var deleteStateEnvelope = new Autogenerated.DeleteStateRequest()
{
StoreName = storeName,
Key = key,
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
deleteStateEnvelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
if (etag != null)
{
deleteStateEnvelope.Etag = new Autogenerated.Etag() { Value = etag };
}
if (stateOptions != null)
{
deleteStateEnvelope.Options = ToAutoGeneratedStateOptions(stateOptions);
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
await client.DeleteStateAsync(deleteStateEnvelope, options);
return true;
}
catch (RpcException rpc) when (etag != null && rpc.StatusCode == StatusCode.Aborted)
{
// This kind of failure indicates an ETag mismatch. Aborted doesn't seem like
// the right status code at first, but check the docs, it fits this use-case.
//
// When an ETag is used we surface this though the Try... pattern
return false;
}
catch (RpcException ex)
{
throw new DaprException("State operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}
#endregion
#region Secret Apis
/// <inheritdoc/>
public async override Task<Dictionary<string, string>> GetSecretAsync(
string storeName,
string key,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(storeName, nameof(storeName));
ArgumentVerifier.ThrowIfNullOrEmpty(key, nameof(key));
var envelope = new Autogenerated.GetSecretRequest()
{
StoreName = storeName,
Key = key
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetSecretResponse response;
try
{
response = await client.GetSecretAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("Secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
return response.Data.ToDictionary(kv => kv.Key, kv => kv.Value);
}
/// <inheritdoc/>
public async override Task<Dictionary<string, Dictionary<string, string>>> GetBulkSecretAsync(
string storeName,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.GetBulkSecretRequest()
{
StoreName = storeName
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
Autogenerated.GetBulkSecretResponse response;
try
{
response = await client.GetBulkSecretAsync(envelope, options);
}
catch (RpcException ex)
{
throw new DaprException("Bulk secret operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
return response.Data.ToDictionary(r => r.Key, r => r.Value.Secrets.ToDictionary(s => s.Key, s => s.Value));
}
#endregion
protected override void Dispose(bool disposing)
{
if (disposing)
{
this.channel.Dispose();
this.httpClient.Dispose();
}
}
#region Helper Methods
private CallOptions CreateCallOptions(Metadata headers, CancellationToken cancellationToken)
{
var options = new CallOptions(headers: headers ?? new Metadata(), cancellationToken: cancellationToken);
// add token for dapr api token based authentication
if (this.apiTokenHeader is not null)
{
options.Headers.Add(this.apiTokenHeader.Value.Key, this.apiTokenHeader.Value.Value);
}
return options;
}
/// <summary>
/// Makes Grpc call using the cancellationToken and handles Errors.
/// All common exception handling logic will reside here.
/// </summary>
/// <typeparam name="TResponse"></typeparam>
/// <param name="callFunc"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
private async Task<TResponse> MakeGrpcCallHandleError<TResponse>(Func<CallOptions, AsyncUnaryCall<TResponse>> callFunc, CancellationToken cancellationToken = default)
{
var callOptions = CreateCallOptions(headers: null, cancellationToken);
return await callFunc.Invoke(callOptions);
}
private Autogenerated.StateOptions ToAutoGeneratedStateOptions(StateOptions stateOptions)
{
var stateRequestOptions = new Autogenerated.StateOptions();
if (stateOptions.Consistency != null)
{
stateRequestOptions.Consistency = GetStateConsistencyForConsistencyMode(stateOptions.Consistency.Value);
}
if (stateOptions.Concurrency != null)
{
stateRequestOptions.Concurrency = GetStateConcurrencyForConcurrencyMode(stateOptions.Concurrency.Value);
}
return stateRequestOptions;
}
private static Autogenerated.StateOptions.Types.StateConsistency GetStateConsistencyForConsistencyMode(ConsistencyMode consistencyMode)
{
return consistencyMode switch
{
ConsistencyMode.Eventual => Autogenerated.StateOptions.Types.StateConsistency.ConsistencyEventual,
ConsistencyMode.Strong => Autogenerated.StateOptions.Types.StateConsistency.ConsistencyStrong,
_ => throw new ArgumentException($"{consistencyMode} Consistency Mode is not supported.")
};
}
private static Autogenerated.StateOptions.Types.StateConcurrency GetStateConcurrencyForConcurrencyMode(ConcurrencyMode concurrencyMode)
{
return concurrencyMode switch
{
ConcurrencyMode.FirstWrite => Autogenerated.StateOptions.Types.StateConcurrency.ConcurrencyFirstWrite,
ConcurrencyMode.LastWrite => Autogenerated.StateOptions.Types.StateConcurrency.ConcurrencyLastWrite,
_ => throw new ArgumentException($"{concurrencyMode} Concurrency Mode is not supported.")
};
}
#endregion Helper Methods
}
}