More DaprClient logic

This commit is contained in:
LM 2020-03-06 00:31:01 -08:00
parent 642ae4108a
commit 6e6aadf779
4 changed files with 365 additions and 22 deletions

View File

@ -9,6 +9,7 @@ namespace Dapr.Client
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client.Autogen.Grpc;
/// <summary>
/// Defines methods for clients interacting with the Dapr endpoints.
@ -31,17 +32,62 @@ namespace Dapr.Client
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" />.</returns>
public abstract Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default);
public abstract Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default);
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <param name="name"></param>
/// <param name="content"></param>
/// <param name="metadata"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public abstract Task InvokeBindingAsync<TRequest>(
string name,
TRequest content,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Invokes a method on another dapr app.
/// </summary>
/// <typeparam name="TRequest">The type of data to send.</typeparam>
/// <typeparam name="TResponse">The type of the object in the response.</typeparam>
/// <param name="serviceName">The dapr app to invoke a method on.</param>
/// <param name="methodName">The method to invoke.</param>
/// <param name="data">Data to pass to the method</param>
/// <param name="metadata">Metadata</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns></returns>
public abstract Task<TResponse> InvokeMethodAsync<TRequest, TResponse>(
string serviceName,
string methodName,
TRequest data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Gets the current value associated with the <paramref name="key" /> from the Dapr state store.
/// </summary>
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode: Strong or Eventual.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <typeparam name="TValue">The data type.</typeparam>
/// <returns>A <see cref="ValueTask" /> that will return the value when the operation has completed.</returns>
public abstract ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, CancellationToken cancellationToken = default);
public abstract ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default);
/// <summary>
/// Gets the current value associated with the <paramref name="key" /> from the Dapr state store and an ETag.
/// </summary>
/// <typeparam name="TValue">The data type.</typeparam>
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode: Strong or Eventual.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns></returns>
public abstract ValueTask<StateAndETag<TValue>> GetStateAndETagAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a <see cref="StateEntry{T}" /> for the current value associated with the <paramref name="key" /> from
@ -49,10 +95,11 @@ namespace Dapr.Client
/// </summary>
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="consistencyMode">The consistency mode: Strong or Eventual.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <typeparam name="TValue">The data type.</typeparam>
/// <returns>A <see cref="ValueTask" /> that will return the <see cref="StateEntry{T}" /> when the operation has completed.</returns>
public async ValueTask<StateEntry<TValue>> GetStateEntryAsync<TValue>(string storeName, string key, CancellationToken cancellationToken = default)
public async ValueTask<StateEntry<TValue>> GetStateEntryAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(storeName))
{
@ -64,8 +111,8 @@ namespace Dapr.Client
throw new ArgumentException("The value cannot be null or empty.", nameof(key));
}
var value = await this.GetStateAsync<TValue>(storeName, key, cancellationToken);
return new StateEntry<TValue>(this, storeName, key, value);
var stateAndETag = await this.GetStateAndETagAsync<TValue>(storeName, key, consistencyMode, cancellationToken);
return new StateEntry<TValue>(this, storeName, key, stateAndETag.Data, stateAndETag.ETag);
}
/// <summary>
@ -75,19 +122,30 @@ namespace Dapr.Client
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="value">The value to save.</param>
/// <param name="etag">An ETag.</param>
/// <param name="metadata">An ETag.</param>
/// <param name="stateRequestOptions">A <see cref="StateRequestOptions" />.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <typeparam name="TValue">The data type.</typeparam>
/// <returns>A <see cref="ValueTask" /> that will complete when the operation has completed.</returns>
public abstract ValueTask SaveStateAsync<TValue>(string storeName, string key, TValue value, CancellationToken cancellationToken = default);
public abstract ValueTask SaveStateAsync<TValue>(
string storeName,
string key,
TValue value,
string etag,
IReadOnlyDictionary<string, string> metadata,
StateRequestOptions stateRequestOptions,
CancellationToken cancellationToken = default);
/// <summary>
/// Deletes the value associated with the provided <paramref name="key" /> in the Dapr state store.
/// </summary>
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="etag">An ETag.</param>
/// <param name="stateOptions">A <see cref="StateOptions" />.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask" /> that will complete when the operation has completed.</returns>
public abstract ValueTask DeleteStateAsync(string storeName, string key, CancellationToken cancellationToken = default);
public abstract ValueTask DeleteStateAsync(string storeName, string key, string etag, StateOptions stateOptions = default, CancellationToken cancellationToken = default);
}
}

View File

@ -8,6 +8,7 @@ namespace Dapr.Client
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
@ -78,7 +79,7 @@ namespace Dapr.Client
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, publishContent, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync();
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
@ -88,15 +89,126 @@ namespace Dapr.Client
Value = await ByteString.FromStreamAsync(stream)
};
eventToPublish.Data = data;
eventToPublish.Data = data;
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.PublishEventAsync(eventToPublish, callOptions);
}
public override async Task InvokeBindingAsync<TRequest>(
string name,
TRequest content,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.InvokeBindingEnvelope()
{
Name = name,
};
if (content != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, content, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var data = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
envelope.Data = data;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
envelope.Metadata.Add(d);
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.InvokeBindingAsync(envelope, callOptions);
}
//public override async Task InvokeMethodAsync(
// string serviceName,
// string methodName,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
//public override async Task InvokeMethodAsync<TRequest>(
// string serviceName,
// string methodName,
// TRequest data,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
//public override async Task<TResponse> InvokeMethodAsync<TResponse>(
// string serviceName,
// string methodName,
// IReadOnlyDictionary<string, string> metadata = default,
// CancellationToken cancellationToken = default)
//{
//}
public override async Task<TResponse> InvokeMethodAsync<TRequest, TResponse>(
string serviceName,
string methodName,
TRequest data,
IReadOnlyDictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
var envelope = new Autogenerated.InvokeServiceEnvelope()
{
Id = serviceName,
Method = methodName
};
if (data != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, data, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
var t = new Any
{
Value = await ByteString.FromStreamAsync(stream)
};
envelope.Data = t;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
envelope.Metadata.Add(d);
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.InvokeServiceAsync(envelope, callOptions);
if (response.Data.Value.IsEmpty)
{
return default;
}
var responseData = response.Data.Value.ToStringUtf8();
return JsonSerializer.Deserialize<TResponse>(responseData, this.jsonSerializerOptions);
}
/// <inheritdoc/>
public override async ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, CancellationToken cancellationToken = default)
public override async ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default)
{
var getStateEnvelope = new GetStateEnvelope()
{
@ -104,6 +216,11 @@ namespace Dapr.Client
Key = key,
};
if (consistencyMode != null)
{
getStateEnvelope.Consistency = consistencyMode.ToString();
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.GetStateAsync(getStateEnvelope, callOptions);
@ -117,7 +234,41 @@ namespace Dapr.Client
}
/// <inheritdoc/>
public override async ValueTask SaveStateAsync<TValue>(string storeName, string key, TValue value, CancellationToken cancellationToken = default)
public override async ValueTask<StateAndETag<TValue>> GetStateAndETagAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default)
{
var getStateEnvelope = new GetStateEnvelope()
{
StoreName = storeName,
Key = key,
};
if (consistencyMode != null)
{
getStateEnvelope.Consistency = consistencyMode.ToString();
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
var response = await client.GetStateAsync(getStateEnvelope, callOptions);
if (response.Data.Value.IsEmpty)
{
return new StateAndETag<TValue>(default(TValue), response.Etag);
}
var responseData = response.Data.Value.ToStringUtf8();
var deserialized = JsonSerializer.Deserialize<TValue>(responseData, this.jsonSerializerOptions);
return new StateAndETag<TValue>(deserialized, response.Etag);
}
/// <inheritdoc/>
public override async ValueTask SaveStateAsync<TValue>(
string storeName,
string key,
TValue value,
string etag,
IReadOnlyDictionary<string, string> metadata,
StateRequestOptions stateRequestOptions,
CancellationToken cancellationToken = default)
{
// Create PublishEventEnvelope
var saveStateEnvelope = new Autogenerated.SaveStateEnvelope()
@ -129,12 +280,28 @@ namespace Dapr.Client
{
Key = key,
};
if (!string.IsNullOrWhiteSpace(etag))
{
stateRequest.Etag = etag;
}
if (metadata != null)
{
var d = metadata.ToDictionary(k => k.Key, k => k.Value);
stateRequest.Metadata.Add(d);
}
if (stateRequestOptions != null)
{
stateRequest.Options = stateRequestOptions;
}
if (value != null)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync(stream, value, this.jsonSerializerOptions, cancellationToken);
await stream.FlushAsync();
await stream.FlushAsync(cancellationToken);
// set the position to beginning of stream.
stream.Seek(0, SeekOrigin.Begin);
@ -153,7 +320,7 @@ namespace Dapr.Client
}
/// <inheritdoc/>
public override async ValueTask DeleteStateAsync(string storeName, string key, CancellationToken cancellationToken = default)
public override async ValueTask DeleteStateAsync(string storeName, string key, string etag, StateOptions stateOptions = default, CancellationToken cancellationToken = default)
{
var deleteStateEnvelope = new DeleteStateEnvelope()
{
@ -161,6 +328,16 @@ namespace Dapr.Client
Key = key,
};
if (!string.IsNullOrWhiteSpace(etag))
{
deleteStateEnvelope.Etag = etag;
}
if (stateOptions != null)
{
deleteStateEnvelope.Options = stateOptions;
}
var callOptions = new CallOptions(cancellationToken: cancellationToken);
await client.DeleteStateAsync(deleteStateEnvelope, callOptions);
}

View File

@ -0,0 +1,63 @@
using System;
using System.Collections.Generic;
using System.Net.Http.Headers;
using System.Text;
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
namespace Dapr.Client
{
// temp file to hold state related enums etc
class StateClasses
{
}
// TODO needs better name?
public sealed class StateAndETag<TValue>
{
public StateAndETag(TValue data, string etag)
{
this.Data = data;
this.ETag = etag;
}
public TValue Data { get; }
public string ETag { get; }
}
/// <summary>
///
/// </summary>
///
public sealed class RetryOptions
{
// ZZZ must be converted on use
public TimeSpan? RetryInterval { get; set; }
public RetryMode? RetryMode { get; set; }
public int? RetryThreshold
{
get; set;
}
}
public enum ConcurrencyMode
{
FirstWrite,
LastWrite,
}
public enum ConsistencyMode
{
Strong,
Eventual,
}
public enum RetryMode
{
Linear,
Exponential,
}
}
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member

View File

@ -6,9 +6,11 @@
namespace Dapr
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Client.Autogen.Grpc;
/// <summary>
/// Represents a value in the Dapr state store.
@ -25,12 +27,12 @@ namespace Dapr
/// <param name="storeName">The state store name.</param>
/// <param name="key">The state key.</param>
/// <param name="value">The value.</param>
/// <param name="etag">The etag.</param>
/// <remarks>
/// Application code should not need to create instances of <see cref="StateEntry{T}" />. Use
/// <see cref="Dapr.Client.DaprClient.GetStateEntryAsync{TValue}(string, string, CancellationToken)" /> to access
/// Application code should not need to create instances of <see cref="StateEntry{T}" />. Use
/// state entries.
/// </remarks>
public StateEntry(DaprClient client, string storeName, string key, TValue value)
public StateEntry(DaprClient client, string storeName, string key, TValue value, string etag)
{
if (client is null)
{
@ -51,6 +53,8 @@ namespace Dapr
this.Key = key;
this.Value = value;
this.client = client;
this.ETag = etag;
}
/// <summary>
@ -68,24 +72,65 @@ namespace Dapr
/// </summary>
public TValue Value { get; set; }
/// <summary>
/// The ETag.
/// </summary>
public string ETag { get; }
/// <summary>
/// Deletes the entry associated with <see cref="Key" /> in the state store.
/// </summary>
/// <param name="stateOptions">A <see cref="StateOptions"/> object.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask" /> that will complete when the operation has completed.</returns>
public ValueTask DeleteAsync(CancellationToken cancellationToken = default)
public ValueTask DeleteAsync(StateOptions stateOptions = default, CancellationToken cancellationToken = default)
{
return this.client.DeleteStateAsync(this.StoreName, this.Key, cancellationToken);
// ETag is intentionally not specified
return this.client.DeleteStateAsync(this.StoreName, this.Key, null, stateOptions, cancellationToken);
}
/// <summary>
/// Saves the current value of <see cref="Value" /> to the state store.
/// </summary>
/// <param name="metadata">Additional metadata.</param>
/// <param name="stateRequestOptions">A <see cref="StateRequestOptions" />.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask" /> that will complete when the operation has completed.</returns>
public ValueTask SaveAsync(CancellationToken cancellationToken = default)
public ValueTask SaveAsync(IReadOnlyDictionary<string, string> metadata, StateRequestOptions stateRequestOptions, CancellationToken cancellationToken = default)
{
return this.client.SaveStateAsync(this.StoreName, this.Key, this.Value, cancellationToken);
// ETag is intentionally not specified
return this.client.SaveStateAsync(this.StoreName, this.Key, this.Value, null, metadata, stateRequestOptions, cancellationToken);
}
/// <summary>
/// Saves the current value of <see cref="Value" /> to the state store.
/// </summary>
/// <param name="metadata">Additional metadata.</param>
/// <param name="stateRequestOptions">A <see cref="StateRequestOptions" />.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask" /> that will complete when the operation has completed.</returns>
public async ValueTask<bool> TrySaveAsync(IReadOnlyDictionary<string, string> metadata, StateRequestOptions stateRequestOptions, CancellationToken cancellationToken = default)
{
try
{
await this.client.SaveStateAsync(
this.StoreName,
this.Key,
this.Value,
this.ETag,
metadata,
stateRequestOptions,
cancellationToken);
return true;
}
catch (Exception)
{
// do not throw, return false
// ? TODO: what type of exception is this?
}
return false;
}
}
}