dotnet-sdk/src/Dapr.Client/DaprClientGrpc.cs

370 lines
13 KiB
C#

// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.Client
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client.Autogen.Grpc;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Grpc.Net.Client;
using Autogenerated = Dapr.Client.Autogen.Grpc;
/// <summary>
/// A client for interacting with the Dapr endpoints.
/// </summary>
internal class DaprClientGrpc : DaprClient
{
private readonly Autogenerated.Dapr.DaprClient client;
private readonly JsonSerializerOptions jsonSerializerOptions;
/// <summary>
/// Initializes a new instance of the <see cref="DaprClientGrpc"/> class.
/// </summary>
/// <param name="channel">gRPC channel to create gRPC clients.</param>
/// <param name="jsonSerializerOptions">Json serialization options.</param>
internal DaprClientGrpc(GrpcChannel channel, JsonSerializerOptions jsonSerializerOptions = null)
{
this.jsonSerializerOptions = jsonSerializerOptions;
this.client = new Autogenerated.Dapr.DaprClient(channel);
}
/// <inheritdoc/>
public override Task PublishEventAsync<TRequest>(string topicName, TRequest publishContent, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(topicName))
{
throw new ArgumentException("The value cannot be null or empty", nameof(topicName));
}
if (publishContent is null)
{
throw new ArgumentNullException(nameof(publishContent));
}
return MakePublishRequest(topicName, publishContent, cancellationToken);
}
/// <inheritdoc/>
public override Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(topicName))
{
throw new ArgumentException("The value cannot be null or empty", nameof(topicName));
}
return MakePublishRequest(topicName, string.Empty, cancellationToken);
}
private async Task MakePublishRequest<TRequest>(string topicName, TRequest publishContent, CancellationToken cancellationToken)
{
// Create PublishEventEnvelope
var eventToPublish = new Autogenerated.PublishEventEnvelope()
{
Topic = topicName,
};
if (publishContent != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, publishContent, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var data = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
eventToPublish.Data = data;
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.PublishEventAsync(eventToPublish, callOptions);
}
public override async Task InvokeBindingAsync<TRequest>(
string name,
TRequest content,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.InvokeBindingEnvelope()
{
Name = name,
};
if (content != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, content, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var data = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
envelope.Data = data;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
envelope.Metadata.Add(d);
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.InvokeBindingAsync(envelope, callOptions);
}
//public override async Task InvokeMethodAsync(
// string serviceName,
// string methodName,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
//public override async Task InvokeMethodAsync<TRequest>(
// string serviceName,
// string methodName,
// TRequest data,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
//public override async Task<TResponse> InvokeMethodAsync<TResponse>(
// string serviceName,
// string methodName,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
public override async Task<TResponse> InvokeMethodAsync<TRequest, TResponse>(
string serviceName,
string methodName,
TRequest data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.InvokeServiceEnvelope()
{
Id = serviceName,
Method = methodName
};
if (data != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, data, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var t = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
envelope.Data = t;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
envelope.Metadata.Add(d);
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.InvokeServiceAsync(envelope, callOptions);
if (response.Data.Value.IsEmpty)
{
return default;
}
var responseData = response.Data.Value.ToStringUtf8();
return JsonSerializer.Deserialize<TResponse>(responseData, this.jsonSerializerOptions);
}
/// <inheritdoc/>
public override ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, CancellationToken cancellationToken = default)
{
return this.GetStateAsync<TValue>(storeName, key, null, cancellationToken);
}
/// <inheritdoc/>
public override async ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default)
{
var getStateEnvelope = new GetStateEnvelope()
{
StoreName = storeName,
Key = key,
};
if (consistencyMode != null)
{
getStateEnvelope.Consistency = consistencyMode.ToString();
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.GetStateAsync(getStateEnvelope, callOptions);
if (response.Data.Value.IsEmpty)
{
return default;
}
var responseData = response.Data.Value.ToStringUtf8();
return JsonSerializer.Deserialize<TValue>(responseData, this.jsonSerializerOptions);
}
/// <inheritdoc/>
public override ValueTask<StateAndETag<TValue>> GetStateAndETagAsync<TValue>(string storeName, string key, CancellationToken cancellationToken = default)
{
return this.GetStateAndETagAsync<TValue>(storeName, key, null, cancellationToken);
}
/// <inheritdoc/>
public override async ValueTask<StateAndETag<TValue>> GetStateAndETagAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default)
{
var getStateEnvelope = new GetStateEnvelope()
{
StoreName = storeName,
Key = key,
};
if (consistencyMode != null)
{
getStateEnvelope.Consistency = consistencyMode.ToString();
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.GetStateAsync(getStateEnvelope, callOptions);
if (response.Data.Value.IsEmpty)
{
return new StateAndETag<TValue>(default(TValue), response.Etag);
}
var responseData = response.Data.Value.ToStringUtf8();
var deserialized = JsonSerializer.Deserialize<TValue>(responseData, this.jsonSerializerOptions);
return new StateAndETag<TValue>(deserialized, response.Etag);
}
/// <inheritdoc/>
public override ValueTask SaveStateAsync<TValue>(string storeName, string key, TValue value, CancellationToken cancellationToken = default)
{
return this.SaveStateAsync<TValue>(storeName, key, value, null, null, null, cancellationToken);
}
/// <inheritdoc/>
public override async ValueTask SaveStateAsync<TValue>(
string storeName,
string key,
TValue value,
string etag,
IReadOnlyDictionary<string, string> metadata,
StateRequestOptions stateRequestOptions,
CancellationToken cancellationToken = default)
{
// Create PublishEventEnvelope
var saveStateEnvelope = new Autogenerated.SaveStateEnvelope()
{
StoreName = storeName,
};
var stateRequest = new Autogenerated.StateRequest()
{
Key = key,
};
if (!string.IsNullOrWhiteSpace(etag))
{
stateRequest.Etag = etag;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
stateRequest.Metadata.Add(d);
}
if (stateRequestOptions != null)
{
stateRequest.Options = stateRequestOptions;
}
if (value != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, value, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var data = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
stateRequest.Value = data;
}
saveStateEnvelope.Requests.Add(stateRequest);
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.SaveStateAsync(saveStateEnvelope, callOptions);
}
/// <inheritdoc/>
public override ValueTask DeleteStateAsync(string storeName, string key, CancellationToken cancellationToken = default)
{
return this.DeleteStateAsync(storeName, key, null, null, cancellationToken);
}
/// <inheritdoc/>
public override async ValueTask DeleteStateAsync(string storeName, string key, string etag, StateOptions stateOptions = default, CancellationToken cancellationToken = default)
{
var deleteStateEnvelope = new DeleteStateEnvelope()
{
StoreName = storeName,
Key = key,
};
if (!string.IsNullOrWhiteSpace(etag))
{
deleteStateEnvelope.Etag = etag;
}
if (stateOptions != null)
{
deleteStateEnvelope.Options = stateOptions;
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.DeleteStateAsync(deleteStateEnvelope, callOptions);
}
}
}