Merge branch 'release-1.0.0' into master

This commit is contained in:
vinayada1 2021-02-03 15:20:37 -08:00
commit a98f0e66d7
14 changed files with 104 additions and 35 deletions

View File

@ -20,21 +20,21 @@ namespace Samples.Client
{
using var client = new DaprClientBuilder().Build();
Console.WriteLine("Invoking grpc balance");
var request = new GetAccountRequest() { Id = "17", };
var account = await client.InvokeMethodGrpcAsync<GetAccountRequest, Account>("grpcsample", "getaccount", request, cancellationToken);
Console.WriteLine($"Received grpc balance {account.Balance}");
Console.WriteLine("Invoking grpc deposit");
var data = new GrpcServiceSample.Generated.Transaction() { Id = "17", Amount = 99 };
account = await client.InvokeMethodGrpcAsync<Transaction, Account>("grpcsample", "deposit", data, cancellationToken);
var deposit = new GrpcServiceSample.Generated.Transaction() { Id = "17", Amount = 99 };
var account = await client.InvokeMethodGrpcAsync<GrpcServiceSample.Generated.Transaction, Account>("grpcsample", "deposit", deposit, cancellationToken);
Console.WriteLine("Returned: id:{0} | Balance:{1}", account.Id, account.Balance);
Console.WriteLine("Completed grpc deposit");
Console.WriteLine("Invoking grpc withdraw");
var withdraw = new Transaction() { Id = "17", Amount = 10, };
await client.InvokeMethodGrpcAsync("grpcsample", "withdraw", data, cancellationToken);
var withdraw = new GrpcServiceSample.Generated.Transaction() { Id = "17", Amount = 10, };
await client.InvokeMethodGrpcAsync("grpcsample", "withdraw", withdraw, cancellationToken);
Console.WriteLine("Completed grpc withdraw");
Console.WriteLine("Invoking grpc balance");
var request = new GetAccountRequest() { Id = "17", };
account = await client.InvokeMethodGrpcAsync<GetAccountRequest, Account>("grpcsample", "getaccount", request, cancellationToken);
Console.WriteLine($"Received grpc balance {account.Balance}");
}
}
}

View File

@ -207,6 +207,8 @@ namespace Dapr.Actors.Runtime
TimeSpan dueTime,
TimeSpan period)
{
EnsureInteractorInitialized();
var reminderInfo = new ReminderInfo(state, dueTime, period);
var reminder = new ActorReminder(this.Id, reminderName, reminderInfo);
var serializedReminderInfo = await reminderInfo.SerializeAsync();
@ -223,6 +225,7 @@ namespace Dapr.Actors.Runtime
/// </returns>
protected Task UnregisterReminderAsync(IActorReminder reminder)
{
EnsureInteractorInitialized();
return this.Host.DaprInteractor.UnregisterReminderAsync(this.actorTypeName, this.Id.ToString(), reminder.Name);
}
@ -235,6 +238,7 @@ namespace Dapr.Actors.Runtime
/// </returns>
protected Task UnregisterReminderAsync(string reminderName)
{
EnsureInteractorInitialized();
return this.Host.DaprInteractor.UnregisterReminderAsync(this.actorTypeName, this.Id.ToString(), reminderName);
}
@ -263,6 +267,8 @@ namespace Dapr.Actors.Runtime
TimeSpan dueTime,
TimeSpan period)
{
EnsureInteractorInitialized();
// Validate that the timer callback specified meets all the required criteria for a valid callback method
this.ValidateTimerCallback(this.Host, callback);
@ -287,6 +293,7 @@ namespace Dapr.Actors.Runtime
/// <returns>Task representing the Unregister timer operation.</returns>
protected async Task UnregisterTimerAsync(ActorTimer timer)
{
EnsureInteractorInitialized();
await this.Host.DaprInteractor.UnregisterTimerAsync(this.actorTypeName, this.Id.ToString(), timer.Name);
}
@ -297,6 +304,7 @@ namespace Dapr.Actors.Runtime
/// <returns>Task representing the Unregister timer operation.</returns>
protected async Task UnregisterTimerAsync(string timerName)
{
EnsureInteractorInitialized();
await this.Host.DaprInteractor.UnregisterTimerAsync(this.actorTypeName, this.Id.ToString(), timerName);
}
@ -340,5 +348,15 @@ namespace Dapr.Actors.Runtime
throw new ArgumentException("Timer callback can only return type Task");
}
}
private void EnsureInteractorInitialized()
{
if (this.Host.DaprInteractor == null)
{
throw new InvalidOperationException(
"The actor was initialized without an HTTP client, and so cannot interact with timers or reminders. " +
"This is likely to happen inside a unit test.");
}
}
}
}

View File

@ -22,21 +22,17 @@ namespace Dapr.Actors.Runtime
/// <param name="jsonSerializerOptions">The <see cref="JsonSerializerOptions"/> to use for actor state persistence and message deserialization.</param>
/// <param name="loggerFactory">The logger factory.</param>
/// <param name="proxyFactory">The <see cref="ActorProxyFactory" />.</param>
/// <param name="daprInteractor">The <see cref="IDaprInteractor" />.</param>
internal ActorHost(
public ActorHost(
ActorTypeInformation actorTypeInfo,
ActorId id,
JsonSerializerOptions jsonSerializerOptions,
ILoggerFactory loggerFactory,
IActorProxyFactory proxyFactory,
IDaprInteractor daprInteractor)
IActorProxyFactory proxyFactory)
{
this.ActorTypeInfo = actorTypeInfo;
this.Id = id;
this.LoggerFactory = loggerFactory;
this.ProxyFactory = proxyFactory;
this.DaprInteractor = daprInteractor;
this.StateProvider = new DaprStateProvider(this.DaprInteractor, jsonSerializerOptions);
}
/// <summary>
@ -64,7 +60,7 @@ namespace Dapr.Actors.Runtime
/// </summary>
public IActorProxyFactory ProxyFactory { get; }
internal DaprStateProvider StateProvider { get; }
internal DaprStateProvider StateProvider { get; set; }
internal IDaprInteractor DaprInteractor { get; set; }
}

View File

@ -263,7 +263,11 @@ namespace Dapr.Actors.Runtime
private async Task<ActorActivatorState> CreateActorAsync(ActorId actorId)
{
this.logger.LogDebug("Creating Actor of type {ActorType} with ActorId {ActorId}", this.ActorTypeInfo.ImplementationType, actorId);
var host = new ActorHost(this.ActorTypeInfo, actorId, this.jsonSerializerOptions, this.loggerFactory, this.proxyFactory, this.daprInteractor);
var host = new ActorHost(this.ActorTypeInfo, actorId, this.jsonSerializerOptions, this.loggerFactory, this.proxyFactory)
{
DaprInteractor = this.daprInteractor,
StateProvider = new DaprStateProvider(this.daprInteractor, this.jsonSerializerOptions),
};
var state = await this.activator.CreateAsync(host);
this.logger.LogDebug("Finished creating Actor of type {ActorType} with ActorId {ActorId}", this.ActorTypeInfo.ImplementationType, actorId);
return state;

View File

@ -27,6 +27,8 @@ namespace Dapr.Actors.Runtime
public async Task AddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
if (!(await this.TryAddStateAsync(stateName, value, cancellationToken)))
{
throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, SR.ActorStateAlreadyExists, stateName));
@ -37,6 +39,8 @@ namespace Dapr.Actors.Runtime
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -62,6 +66,8 @@ namespace Dapr.Actors.Runtime
public async Task<T> GetStateAsync<T>(string stateName, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
var condRes = await this.TryGetStateAsync<T>(stateName, cancellationToken);
if (condRes.HasValue)
@ -75,6 +81,9 @@ namespace Dapr.Actors.Runtime
public async Task<ConditionalValue<T>> TryGetStateAsync<T>(string stateName, CancellationToken cancellationToken)
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -101,6 +110,8 @@ namespace Dapr.Actors.Runtime
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -124,6 +135,8 @@ namespace Dapr.Actors.Runtime
public async Task RemoveStateAsync(string stateName, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
if (!(await this.TryRemoveStateAsync(stateName, cancellationToken)))
{
throw new KeyNotFoundException(string.Format(CultureInfo.CurrentCulture, SR.ErrorNamedActorStateNotFound, stateName));
@ -134,6 +147,8 @@ namespace Dapr.Actors.Runtime
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -164,6 +179,8 @@ namespace Dapr.Actors.Runtime
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -182,6 +199,8 @@ namespace Dapr.Actors.Runtime
public async Task<T> GetOrAddStateAsync<T>(string stateName, T value, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
var condRes = await this.TryGetStateAsync<T>(stateName, cancellationToken);
if (condRes.HasValue)
@ -203,6 +222,8 @@ namespace Dapr.Actors.Runtime
{
ArgumentVerifier.ThrowIfNull(stateName, nameof(stateName));
EnsureStateProviderInitialized();
if (this.stateChangeTracker.ContainsKey(stateName))
{
var stateMetadata = this.stateChangeTracker[stateName];
@ -240,12 +261,16 @@ namespace Dapr.Actors.Runtime
public Task ClearCacheAsync(CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
this.stateChangeTracker.Clear();
return Task.CompletedTask;
}
public async Task SaveStateAsync(CancellationToken cancellationToken = default)
{
EnsureStateProviderInitialized();
if (this.stateChangeTracker.Count > 0)
{
var stateChangeList = new List<ActorStateChange>();
@ -296,9 +321,20 @@ namespace Dapr.Actors.Runtime
private Task<ConditionalValue<T>> TryGetStateFromStateProviderAsync<T>(string stateName, CancellationToken cancellationToken)
{
EnsureStateProviderInitialized();
return this.actor.Host.StateProvider.TryLoadStateAsync<T>(this.actorTypeName, this.actor.Id.ToString(), stateName, cancellationToken);
}
private void EnsureStateProviderInitialized()
{
if (this.actor.Host.StateProvider == null)
{
throw new InvalidOperationException(
"The actor was initialized without a state provider, and so cannot interact with state. " +
"If this is inside a unit test, replace Actor.StateProvider with a mock.");
}
}
private sealed class StateMetadata
{
private StateMetadata(object value, Type type, StateChangeKind changeKind)

View File

@ -129,6 +129,7 @@ namespace Dapr.Client
if (content != null)
{
envelope.Data = content;
envelope.DataContentType = "application/json";
}
if (metadata != null)

View File

@ -33,6 +33,9 @@ service Dapr {
// Deletes the state for a specific key.
rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}
// Deletes a bulk of state items for a list of keys
rpc DeleteBulkState(DeleteBulkStateRequest) returns (google.protobuf.Empty) {}
// Executes transactions for a specified store
rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}
@ -174,6 +177,15 @@ message DeleteStateRequest {
map<string,string> metadata = 5;
}
// DeleteBulkStateRequest is the message to delete a list of key-value states from specific state store.
message DeleteBulkStateRequest {
// The name of state store.
string store_name = 1;
// The array of the state key values.
repeated common.v1.StateItem states = 2;
}
// SaveStateRequest is the message to save multiple states into state store.
message SaveStateRequest {
// The name of state store.

View File

@ -32,7 +32,7 @@ namespace Dapr.Actors.Runtime
{
var activator = CreateActivator(typeof(TestActor));
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state = await activator.CreateAsync(host);
var actor = Assert.IsType<TestActor>(state.Actor);
@ -45,11 +45,11 @@ namespace Dapr.Actors.Runtime
{
var activator = CreateActivator(typeof(TestActor));
var host1 = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host1 = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state1 = await activator.CreateAsync(host1);
var actor1 = Assert.IsType<TestActor>(state1.Actor);
var host2 = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host2 = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state2 = await activator.CreateAsync(host2);
var actor2 = Assert.IsType<TestActor>(state2.Actor);
@ -62,7 +62,7 @@ namespace Dapr.Actors.Runtime
{
var activator = CreateActivator(typeof(TestActor));
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state = await activator.CreateAsync(host);
var actor = Assert.IsType<TestActor>(state.Actor);
@ -78,7 +78,7 @@ namespace Dapr.Actors.Runtime
{
var activator = CreateActivator(typeof(DisposableActor));
var host = new ActorHost(ActorTypeInformation.Get(typeof(DisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(DisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state = await activator.CreateAsync(host);
var actor = Assert.IsType<DisposableActor>(state.Actor);
@ -92,7 +92,7 @@ namespace Dapr.Actors.Runtime
{
var activator = CreateActivator(typeof(AsyncDisposableActor));
var host = new ActorHost(ActorTypeInformation.Get(typeof(AsyncDisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(AsyncDisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state = await activator.CreateAsync(host);
var actor = Assert.IsType<AsyncDisposableActor>(state.Actor);

View File

@ -16,7 +16,7 @@ namespace Dapr.Actors.Test
{
public class ApiTokenTests
{
[Fact]
[Fact(Skip = "Failing due to #573")]
public void CreateProxyWithRemoting_WithApiToken()
{
var actorId = new ActorId("abc");
@ -34,7 +34,7 @@ namespace Dapr.Actors.Test
headerValues.Should().Contain("test_token");
}
[Fact]
[Fact(Skip = "Failing due to #573")]
public void CreateProxyWithRemoting_WithNoApiToken()
{
var actorId = new ActorId("abc");
@ -48,7 +48,7 @@ namespace Dapr.Actors.Test
action.Should().Throw<InvalidOperationException>();
}
[Fact]
[Fact(Skip = "Failing due to #573")]
public void CreateProxyWithNoRemoting_WithApiToken()
{
var actorId = new ActorId("abc");
@ -66,7 +66,7 @@ namespace Dapr.Actors.Test
headerValues.Should().Contain("test_token");
}
[Fact]
[Fact(Skip = "Failing due to #573")]
public void CreateProxyWithNoRemoting_WithNoApiToken()
{
var actorId = new ActorId("abc");

View File

@ -61,7 +61,7 @@ namespace Dapr.Actors.Test
}
}
[Fact]
[Fact(Skip = "Failing due to #573")]
public void GetState_ValidateRequest()
{
var handler = new TestHttpClientHandler();

View File

@ -20,7 +20,7 @@ namespace Dapr.Actors.Test.Runtime
{
var actorType = typeof(TestActor);
var actorTypeInformation = ActorTypeInformation.Get(actorType);
var host = new ActorHost(actorTypeInformation, ActorId.CreateRandom(), JsonSerializerDefaults.Web, new LoggerFactory(), ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(actorTypeInformation, ActorId.CreateRandom(), JsonSerializerDefaults.Web, new LoggerFactory(), ActorProxy.DefaultProxyFactory);
var actor = new TestActor(host);
var activator = Mock.Of<ActorActivator>();

View File

@ -116,7 +116,7 @@ namespace Dapr.Actors.Test.Runtime
{
var actorTypeInformation = ActorTypeInformation.Get(typeof(TestActor));
var loggerFactory = new LoggerFactory();
var host = new ActorHost(actorTypeInformation, ActorId.CreateRandom(), JsonSerializerDefaults.Web, loggerFactory, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(actorTypeInformation, ActorId.CreateRandom(), JsonSerializerDefaults.Web, loggerFactory, ActorProxy.DefaultProxyFactory);
var testActor = new TestActor(host, actorStateManager);
return testActor;
}

View File

@ -18,7 +18,7 @@ namespace Dapr.Actors.Runtime
{
var activator = new DefaultActorActivator();
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var state = await activator.CreateAsync(host);
Assert.IsType<TestActor>(state.Actor);
}
@ -28,7 +28,7 @@ namespace Dapr.Actors.Runtime
{
var activator = new DefaultActorActivator();
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(TestActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var actor = new TestActor(host);
var state = new ActorActivatorState(actor);
@ -40,7 +40,7 @@ namespace Dapr.Actors.Runtime
{
var activator = new DefaultActorActivator();
var host = new ActorHost(ActorTypeInformation.Get(typeof(DisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(DisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var actor = new DisposableActor(host);
var state = new ActorActivatorState(actor);
@ -54,7 +54,7 @@ namespace Dapr.Actors.Runtime
{
var activator = new DefaultActorActivator();
var host = new ActorHost(ActorTypeInformation.Get(typeof(AsyncDisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory, new DaprHttpInteractor());
var host = new ActorHost(ActorTypeInformation.Get(typeof(AsyncDisposableActor)), ActorId.CreateRandom(), JsonSerializerDefaults.Web, NullLoggerFactory.Instance, ActorProxy.DefaultProxyFactory);
var actor = new AsyncDisposableActor(host);
var state = new ActorActivatorState(actor);

View File

@ -36,6 +36,7 @@ namespace Dapr.Client.Test
var request = await GrpcUtils.GetRequestFromRequestMessageAsync<PublishEventRequest>(entry.Request);
var jsonFromRequest = request.Data.ToStringUtf8();
request.DataContentType.Should().Be("application/json");
request.PubsubName.Should().Be(TestPubsubName);
request.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(publishData, daprClient.JsonSerializerOptions));
@ -63,6 +64,7 @@ namespace Dapr.Client.Test
var request = await GrpcUtils.GetRequestFromRequestMessageAsync<PublishEventRequest>(entry.Request);
var jsonFromRequest = request.Data.ToStringUtf8();
request.DataContentType.Should().Be("application/json");
request.PubsubName.Should().Be(TestPubsubName);
request.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(publishData, daprClient.JsonSerializerOptions));