diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 1b8df644..7f4a4a66 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -9,6 +9,7 @@ namespace Dapr.Client using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; + using Dapr.Client.Autogen.Grpc; /// /// Defines methods for clients interacting with the Dapr endpoints. @@ -31,17 +32,62 @@ namespace Dapr.Client /// The name of the topic the request should be published to. /// A that can be used to cancel the operation. /// A . - public abstract Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default); + public abstract Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default); + + /// + /// + /// + /// + /// + /// + /// + /// + /// + public abstract Task InvokeBindingAsync( + string name, + TRequest content, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); + + /// + /// Invokes a method on another dapr app. + /// + /// The type of data to send. + /// The type of the object in the response. + /// The dapr app to invoke a method on. + /// The method to invoke. + /// Data to pass to the method + /// Metadata + /// A cancellation token. + /// + public abstract Task InvokeMethodAsync( + string serviceName, + string methodName, + TRequest data, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default); /// /// Gets the current value associated with the from the Dapr state store. /// /// The state store name. /// The state key. + /// The consistency mode: Strong or Eventual. /// A that can be used to cancel the operation. /// The data type. /// A that will return the value when the operation has completed. - public abstract ValueTask GetStateAsync(string storeName, string key, CancellationToken cancellationToken = default); + public abstract ValueTask GetStateAsync(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default); + + /// + /// Gets the current value associated with the from the Dapr state store and an ETag. + /// + /// The data type. + /// The state store name. + /// The state key. + /// The consistency mode: Strong or Eventual. + /// A that can be used to cancel the operation. + /// + public abstract ValueTask> GetStateAndETagAsync(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default); /// /// Gets a for the current value associated with the from @@ -49,10 +95,11 @@ namespace Dapr.Client /// /// The state store name. /// The state key. + /// The consistency mode: Strong or Eventual. /// A that can be used to cancel the operation. /// The data type. /// A that will return the when the operation has completed. - public async ValueTask> GetStateEntryAsync(string storeName, string key, CancellationToken cancellationToken = default) + public async ValueTask> GetStateEntryAsync(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default) { if (string.IsNullOrEmpty(storeName)) { @@ -64,8 +111,8 @@ namespace Dapr.Client throw new ArgumentException("The value cannot be null or empty.", nameof(key)); } - var value = await this.GetStateAsync(storeName, key, cancellationToken); - return new StateEntry(this, storeName, key, value); + var stateAndETag = await this.GetStateAndETagAsync(storeName, key, consistencyMode, cancellationToken); + return new StateEntry(this, storeName, key, stateAndETag.Data, stateAndETag.ETag); } /// @@ -75,19 +122,30 @@ namespace Dapr.Client /// The state store name. /// The state key. /// The value to save. + /// An ETag. + /// An ETag. + /// A . /// A that can be used to cancel the operation. /// The data type. /// A that will complete when the operation has completed. - public abstract ValueTask SaveStateAsync(string storeName, string key, TValue value, CancellationToken cancellationToken = default); + public abstract ValueTask SaveStateAsync( + string storeName, + string key, + TValue value, + string etag, + IReadOnlyDictionary metadata, + StateRequestOptions stateRequestOptions, + CancellationToken cancellationToken = default); /// /// Deletes the value associated with the provided in the Dapr state store. /// /// The state store name. /// The state key. + /// An ETag. + /// A . /// A that can be used to cancel the operation. /// A that will complete when the operation has completed. - public abstract ValueTask DeleteStateAsync(string storeName, string key, CancellationToken cancellationToken = default); - + public abstract ValueTask DeleteStateAsync(string storeName, string key, string etag, StateOptions stateOptions = default, CancellationToken cancellationToken = default); } } diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index ff15dad5..c1a5c01a 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -8,6 +8,7 @@ namespace Dapr.Client using System; using System.Collections.Generic; using System.IO; + using System.Linq; using System.Text; using System.Text.Json; using System.Text.Json.Serialization; @@ -78,7 +79,7 @@ namespace Dapr.Client { using var stream = new MemoryStream(); await JsonSerializer.SerializeAsync(stream, publishContent, this.jsonSerializerOptions, cancellationToken); - await stream.FlushAsync(); + await stream.FlushAsync(cancellationToken); // set the position to beginning of stream. stream.Seek(0, SeekOrigin.Begin); @@ -88,15 +89,126 @@ namespace Dapr.Client Value = await ByteString.FromStreamAsync(stream) }; - eventToPublish.Data = data; + eventToPublish.Data = data; } var callOptions = new CallOptions(cancellationToken: cancellationToken); await client.PublishEventAsync(eventToPublish, callOptions); } + public override async Task InvokeBindingAsync( + string name, + TRequest content, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.InvokeBindingEnvelope() + { + Name = name, + }; + + if (content != null) + { + using var stream = new MemoryStream(); + await JsonSerializer.SerializeAsync(stream, content, this.jsonSerializerOptions, cancellationToken); + await stream.FlushAsync(cancellationToken); + + // set the position to beginning of stream. + stream.Seek(0, SeekOrigin.Begin); + + var data = new Any + { + Value = await ByteString.FromStreamAsync(stream) + }; + + envelope.Data = data; + } + + if (metadata != null) + { + var d = metadata.ToDictionary(k => k.Key, k => k.Value); + envelope.Metadata.Add(d); + } + + var callOptions = new CallOptions(cancellationToken: cancellationToken); + await client.InvokeBindingAsync(envelope, callOptions); + } + + //public override async Task InvokeMethodAsync( + // string serviceName, + // string methodName, + // IReadOnlyDictionary metadata = default, + // CancellationToken cancellationToken = default) + //{ + //} + + //public override async Task InvokeMethodAsync( + // string serviceName, + // string methodName, + // TRequest data, + // IReadOnlyDictionary metadata = default, + // CancellationToken cancellationToken = default) + //{ + //} + + //public override async Task InvokeMethodAsync( + // string serviceName, + // string methodName, + // IReadOnlyDictionary metadata = default, + // CancellationToken cancellationToken = default) + //{ + //} + + public override async Task InvokeMethodAsync( + string serviceName, + string methodName, + TRequest data, + IReadOnlyDictionary metadata = default, + CancellationToken cancellationToken = default) + { + var envelope = new Autogenerated.InvokeServiceEnvelope() + { + Id = serviceName, + Method = methodName + }; + + if (data != null) + { + using var stream = new MemoryStream(); + await JsonSerializer.SerializeAsync(stream, data, this.jsonSerializerOptions, cancellationToken); + await stream.FlushAsync(cancellationToken); + + // set the position to beginning of stream. + stream.Seek(0, SeekOrigin.Begin); + + var t = new Any + { + Value = await ByteString.FromStreamAsync(stream) + }; + + envelope.Data = t; + } + + if (metadata != null) + { + var d = metadata.ToDictionary(k => k.Key, k => k.Value); + envelope.Metadata.Add(d); + } + + var callOptions = new CallOptions(cancellationToken: cancellationToken); + var response = await client.InvokeServiceAsync(envelope, callOptions); + + if (response.Data.Value.IsEmpty) + { + return default; + } + + var responseData = response.Data.Value.ToStringUtf8(); + return JsonSerializer.Deserialize(responseData, this.jsonSerializerOptions); + } + /// - public override async ValueTask GetStateAsync(string storeName, string key, CancellationToken cancellationToken = default) + public override async ValueTask GetStateAsync(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default) { var getStateEnvelope = new GetStateEnvelope() { @@ -104,6 +216,11 @@ namespace Dapr.Client Key = key, }; + if (consistencyMode != null) + { + getStateEnvelope.Consistency = consistencyMode.ToString(); + } + var callOptions = new CallOptions(cancellationToken: cancellationToken); var response = await client.GetStateAsync(getStateEnvelope, callOptions); @@ -117,7 +234,41 @@ namespace Dapr.Client } /// - public override async ValueTask SaveStateAsync(string storeName, string key, TValue value, CancellationToken cancellationToken = default) + public override async ValueTask> GetStateAndETagAsync(string storeName, string key, ConsistencyMode? consistencyMode = default, CancellationToken cancellationToken = default) + { + var getStateEnvelope = new GetStateEnvelope() + { + StoreName = storeName, + Key = key, + }; + + if (consistencyMode != null) + { + getStateEnvelope.Consistency = consistencyMode.ToString(); + } + + var callOptions = new CallOptions(cancellationToken: cancellationToken); + var response = await client.GetStateAsync(getStateEnvelope, callOptions); + + if (response.Data.Value.IsEmpty) + { + return new StateAndETag(default(TValue), response.Etag); + } + + var responseData = response.Data.Value.ToStringUtf8(); + var deserialized = JsonSerializer.Deserialize(responseData, this.jsonSerializerOptions); + return new StateAndETag(deserialized, response.Etag); + } + + /// + public override async ValueTask SaveStateAsync( + string storeName, + string key, + TValue value, + string etag, + IReadOnlyDictionary metadata, + StateRequestOptions stateRequestOptions, + CancellationToken cancellationToken = default) { // Create PublishEventEnvelope var saveStateEnvelope = new Autogenerated.SaveStateEnvelope() @@ -129,12 +280,28 @@ namespace Dapr.Client { Key = key, }; - + + if (!string.IsNullOrWhiteSpace(etag)) + { + stateRequest.Etag = etag; + } + + if (metadata != null) + { + var d = metadata.ToDictionary(k => k.Key, k => k.Value); + stateRequest.Metadata.Add(d); + } + + if (stateRequestOptions != null) + { + stateRequest.Options = stateRequestOptions; + } + if (value != null) { using var stream = new MemoryStream(); await JsonSerializer.SerializeAsync(stream, value, this.jsonSerializerOptions, cancellationToken); - await stream.FlushAsync(); + await stream.FlushAsync(cancellationToken); // set the position to beginning of stream. stream.Seek(0, SeekOrigin.Begin); @@ -153,7 +320,7 @@ namespace Dapr.Client } /// - public override async ValueTask DeleteStateAsync(string storeName, string key, CancellationToken cancellationToken = default) + public override async ValueTask DeleteStateAsync(string storeName, string key, string etag, StateOptions stateOptions = default, CancellationToken cancellationToken = default) { var deleteStateEnvelope = new DeleteStateEnvelope() { @@ -161,6 +328,16 @@ namespace Dapr.Client Key = key, }; + if (!string.IsNullOrWhiteSpace(etag)) + { + deleteStateEnvelope.Etag = etag; + } + + if (stateOptions != null) + { + deleteStateEnvelope.Options = stateOptions; + } + var callOptions = new CallOptions(cancellationToken: cancellationToken); await client.DeleteStateAsync(deleteStateEnvelope, callOptions); } diff --git a/src/Dapr.Client/StateClasses.cs b/src/Dapr.Client/StateClasses.cs new file mode 100644 index 00000000..0b0dd83f --- /dev/null +++ b/src/Dapr.Client/StateClasses.cs @@ -0,0 +1,63 @@ +using System; +using System.Collections.Generic; +using System.Net.Http.Headers; +using System.Text; + +#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member +namespace Dapr.Client +{ + // temp file to hold state related enums etc + class StateClasses + { + } + + // TODO needs better name? + public sealed class StateAndETag + { + public StateAndETag(TValue data, string etag) + { + this.Data = data; + this.ETag = etag; + } + + public TValue Data { get; } + + public string ETag { get; } + } + /// + /// + /// + /// + + public sealed class RetryOptions + { + // ZZZ must be converted on use + public TimeSpan? RetryInterval { get; set; } + public RetryMode? RetryMode { get; set; } + public int? RetryThreshold + { + get; set; + } + } + + + public enum ConcurrencyMode + { + FirstWrite, + LastWrite, + } + + public enum ConsistencyMode + { + Strong, + Eventual, + } + + public enum RetryMode + { + Linear, + Exponential, + } +} +#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member + diff --git a/src/Dapr.Client/StateEntry.cs b/src/Dapr.Client/StateEntry.cs index bcbd41b1..abe2e132 100644 --- a/src/Dapr.Client/StateEntry.cs +++ b/src/Dapr.Client/StateEntry.cs @@ -6,9 +6,11 @@ namespace Dapr { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Dapr.Client; + using Dapr.Client.Autogen.Grpc; /// /// Represents a value in the Dapr state store. @@ -25,12 +27,12 @@ namespace Dapr /// The state store name. /// The state key. /// The value. + /// The etag. /// - /// Application code should not need to create instances of . Use - /// to access + /// Application code should not need to create instances of . Use /// state entries. /// - public StateEntry(DaprClient client, string storeName, string key, TValue value) + public StateEntry(DaprClient client, string storeName, string key, TValue value, string etag) { if (client is null) { @@ -51,6 +53,8 @@ namespace Dapr this.Key = key; this.Value = value; this.client = client; + + this.ETag = etag; } /// @@ -68,24 +72,65 @@ namespace Dapr /// public TValue Value { get; set; } + /// + /// The ETag. + /// + public string ETag { get; } + /// /// Deletes the entry associated with in the state store. /// + /// A object. /// A that can be used to cancel the operation. /// A that will complete when the operation has completed. - public ValueTask DeleteAsync(CancellationToken cancellationToken = default) + public ValueTask DeleteAsync(StateOptions stateOptions = default, CancellationToken cancellationToken = default) { - return this.client.DeleteStateAsync(this.StoreName, this.Key, cancellationToken); + // ETag is intentionally not specified + return this.client.DeleteStateAsync(this.StoreName, this.Key, null, stateOptions, cancellationToken); } /// /// Saves the current value of to the state store. /// + /// Additional metadata. + /// A . /// A that can be used to cancel the operation. /// A that will complete when the operation has completed. - public ValueTask SaveAsync(CancellationToken cancellationToken = default) + public ValueTask SaveAsync(IReadOnlyDictionary metadata, StateRequestOptions stateRequestOptions, CancellationToken cancellationToken = default) { - return this.client.SaveStateAsync(this.StoreName, this.Key, this.Value, cancellationToken); + // ETag is intentionally not specified + return this.client.SaveStateAsync(this.StoreName, this.Key, this.Value, null, metadata, stateRequestOptions, cancellationToken); + } + + /// + /// Saves the current value of to the state store. + /// + /// Additional metadata. + /// A . + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public async ValueTask TrySaveAsync(IReadOnlyDictionary metadata, StateRequestOptions stateRequestOptions, CancellationToken cancellationToken = default) + { + try + { + await this.client.SaveStateAsync( + this.StoreName, + this.Key, + this.Value, + this.ETag, + metadata, + stateRequestOptions, + cancellationToken); + + return true; + } + catch (Exception) + { + // do not throw, return false + // ? TODO: what type of exception is this? + } + + return false; } } }