From ece9fbe0d4378523a3490a61e1cb4a591672d327 Mon Sep 17 00:00:00 2001 From: Shivam Kumar Date: Fri, 9 Jun 2023 21:50:51 +0530 Subject: [PATCH] adding get actor reminder API (#1103) * get actor reminder API Signed-off-by: Shivam Kumar * handling serialization better Signed-off-by: Shivam Kumar --------- Signed-off-by: Shivam Kumar --- examples/Actor/ActorClient/Program.cs | 13 +++++ examples/Actor/DemoActor/DemoActor.cs | 5 ++ examples/Actor/IDemoActor/IDemoActor.cs | 8 +++ src/Dapr.Actors/DaprHttpInteractor.cs | 17 ++++++ src/Dapr.Actors/IDaprInteractor.cs | 10 ++++ src/Dapr.Actors/Runtime/Actor.cs | 12 +++++ src/Dapr.Actors/Runtime/ActorTestOptions.cs | 5 ++ src/Dapr.Actors/Runtime/ActorTimerManager.cs | 7 +++ .../Runtime/DefaultActorTimerManager.cs | 30 +++++++++++ src/Dapr.Actors/Runtime/ReminderInfo.cs | 7 ++- test/Dapr.Actors.Test/ActorUnitTestTests.cs | 13 +++++ test/Dapr.Actors.Test/TestDaprInteractor.cs | 14 +++++ .../Reminders/IReminderActor.cs | 2 + .../Dapr.E2E.Test.App/Actors/ReminderActor.cs | 6 +++ .../Actors/E2ETests.ReminderTests.cs | 53 +++++++++++++++++++ 15 files changed, 200 insertions(+), 2 deletions(-) diff --git a/examples/Actor/ActorClient/Program.cs b/examples/Actor/ActorClient/Program.cs index 103aed6b..aeee2838 100644 --- a/examples/Actor/ActorClient/Program.cs +++ b/examples/Actor/ActorClient/Program.cs @@ -96,6 +96,10 @@ namespace ActorClient receivedData = await proxy.GetData(); Console.WriteLine($"Received data is {receivedData}."); + Console.WriteLine("Getting details of the registered reminder"); + var reminder = await proxy.GetReminder(); + Console.WriteLine($"Received reminder is {reminder}."); + Console.WriteLine("Deregistering timer. Timers would any way stop if the actor is deactivated as part of Dapr garbage collection."); await proxy.UnregisterTimer(); Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted."); @@ -105,14 +109,23 @@ namespace ActorClient await proxy.RegisterReminderWithRepetitions(3); Console.WriteLine("Waiting so the reminder can be triggered"); await Task.Delay(5000); + Console.WriteLine("Getting details of the registered reminder"); + reminder = await proxy.GetReminder(); + Console.WriteLine($"Received reminder is {reminder}."); Console.WriteLine("Registering reminder with ttl and repetitions, i.e. reminder stops when either condition is met - The reminder will repeat 2 times."); await proxy.RegisterReminderWithTtlAndRepetitions(TimeSpan.FromSeconds(5), 2); + Console.WriteLine("Getting details of the registered reminder"); + reminder = await proxy.GetReminder(); + Console.WriteLine($"Received reminder is {reminder}."); Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted."); await proxy.UnregisterReminder(); Console.WriteLine("Registering reminder and Timer with TTL - The reminder will self delete after 10 seconds."); await proxy.RegisterReminderWithTtl(TimeSpan.FromSeconds(10)); await proxy.RegisterTimerWithTtl(TimeSpan.FromSeconds(10)); + Console.WriteLine("Getting details of the registered reminder"); + reminder = await proxy.GetReminder(); + Console.WriteLine($"Received reminder is {reminder}."); // Track the reminder. var timer = new Timer(async state => Console.WriteLine($"Received data: {await proxy.GetData()}"), null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5)); diff --git a/examples/Actor/DemoActor/DemoActor.cs b/examples/Actor/DemoActor/DemoActor.cs index 057b7df6..0ab633fc 100644 --- a/examples/Actor/DemoActor/DemoActor.cs +++ b/examples/Actor/DemoActor/DemoActor.cs @@ -85,6 +85,11 @@ namespace DaprDemoActor await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1), repetitions, ttl); } + public async Task GetReminder() + { + return await this.GetReminderAsync("TestReminder"); + } + public Task UnregisterReminder() { return this.UnregisterReminderAsync("TestReminder"); diff --git a/examples/Actor/IDemoActor/IDemoActor.cs b/examples/Actor/IDemoActor/IDemoActor.cs index 3220dfdb..adec6df6 100644 --- a/examples/Actor/IDemoActor/IDemoActor.cs +++ b/examples/Actor/IDemoActor/IDemoActor.cs @@ -16,6 +16,7 @@ namespace IDemoActorInterface using System; using System.Threading.Tasks; using Dapr.Actors; + using Dapr.Actors.Runtime; /// /// Interface for Actor method. @@ -94,6 +95,13 @@ namespace IDemoActorInterface /// A task that represents the asynchronous save operation. Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions); + /// + /// Gets the registered reminder. + /// + /// The name of the reminder. + /// A task that returns the reminder after completion. + Task GetReminder(); + /// /// Unregisters the registered timer. /// diff --git a/src/Dapr.Actors/DaprHttpInteractor.cs b/src/Dapr.Actors/DaprHttpInteractor.cs index df5207f4..410925da 100644 --- a/src/Dapr.Actors/DaprHttpInteractor.cs +++ b/src/Dapr.Actors/DaprHttpInteractor.cs @@ -254,6 +254,23 @@ namespace Dapr.Actors return this.SendAsync(RequestFunc, relativeUrl, cancellationToken); } + public async Task GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default) + { + var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName); + + HttpRequestMessage RequestFunc() + { + var request = new HttpRequestMessage() + { + Method = HttpMethod.Get, + }; + return request; + } + + var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken); + return await response.Content.ReadAsStreamAsync(); + } + public Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default) { var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName); diff --git a/src/Dapr.Actors/IDaprInteractor.cs b/src/Dapr.Actors/IDaprInteractor.cs index 04eb66de..8f30aa18 100644 --- a/src/Dapr.Actors/IDaprInteractor.cs +++ b/src/Dapr.Actors/IDaprInteractor.cs @@ -74,6 +74,16 @@ namespace Dapr.Actors /// A representing the result of the asynchronous operation. Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default); + /// + /// Gets a reminder. + /// + /// Type of actor. + /// ActorId. + /// Name of reminder to unregister. + /// Cancels the operation. + /// A representing the result of the asynchronous operation. + Task GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default); + /// /// Unregisters a reminder. /// diff --git a/src/Dapr.Actors/Runtime/Actor.cs b/src/Dapr.Actors/Runtime/Actor.cs index 0f74513a..d7291cb5 100644 --- a/src/Dapr.Actors/Runtime/Actor.cs +++ b/src/Dapr.Actors/Runtime/Actor.cs @@ -360,6 +360,18 @@ namespace Dapr.Actors.Runtime return reminder; } + /// + /// Gets a reminder previously registered using . + /// + /// The name of the reminder to get. + /// + /// Returns a task that represents the asynchronous get operation. The result of the task contains the reminder if it exists, otherwise null. + /// + protected async Task GetReminderAsync(string reminderName) + { + return await this.Host.TimerManager.GetReminderAsync(new ActorReminderToken(this.actorTypeName, this.Id, reminderName)); + } + /// /// Unregisters a reminder previously registered using . /// diff --git a/src/Dapr.Actors/Runtime/ActorTestOptions.cs b/src/Dapr.Actors/Runtime/ActorTestOptions.cs index 1a1fa6f2..47e73ce3 100644 --- a/src/Dapr.Actors/Runtime/ActorTestOptions.cs +++ b/src/Dapr.Actors/Runtime/ActorTestOptions.cs @@ -91,6 +91,11 @@ namespace Dapr.Actors.Runtime throw new NotImplementedException(Message); } + public override Task GetReminderAsync(ActorReminderToken reminder) + { + throw new NotImplementedException(Message); + } + public override Task UnregisterReminderAsync(ActorReminderToken reminder) { throw new NotImplementedException(Message); diff --git a/src/Dapr.Actors/Runtime/ActorTimerManager.cs b/src/Dapr.Actors/Runtime/ActorTimerManager.cs index 1dc304fd..784cf418 100644 --- a/src/Dapr.Actors/Runtime/ActorTimerManager.cs +++ b/src/Dapr.Actors/Runtime/ActorTimerManager.cs @@ -27,6 +27,13 @@ namespace Dapr.Actors.Runtime /// A task which will complete when the operation completes. public abstract Task RegisterReminderAsync(ActorReminder reminder); + /// + /// Gets a reminder previously registered using + /// + /// The to unregister. + /// A task which will complete when the operation completes. + public abstract Task GetReminderAsync(ActorReminderToken reminder); + /// /// Unregisters the provided reminder with the runtime. /// diff --git a/src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs b/src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs index d3378c96..b42b432a 100644 --- a/src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs +++ b/src/Dapr.Actors/Runtime/DefaultActorTimerManager.cs @@ -14,6 +14,8 @@ using System; using System.Text.Json; using System.Threading.Tasks; +using System.IO; +using System.Text; namespace Dapr.Actors.Runtime { @@ -37,6 +39,18 @@ namespace Dapr.Actors.Runtime await this.interactor.RegisterReminderAsync(reminder.ActorType, reminder.ActorId.ToString(), reminder.Name, serialized); } + public override async Task GetReminderAsync(ActorReminderToken token) + { + if (token == null) + { + throw new ArgumentNullException(nameof(token)); + } + + var responseStream = await this.interactor.GetReminderAsync(token.ActorType, token.ActorId.ToString(), token.Name); + var reminder = await DeserializeReminderAsync(responseStream, token); + return reminder; + } + public override async Task UnregisterReminderAsync(ActorReminderToken reminder) { if (reminder == null) @@ -77,5 +91,21 @@ namespace Dapr.Actors.Runtime reminder.Ttl); return await info.SerializeAsync(); } + + private async ValueTask DeserializeReminderAsync(Stream stream, ActorReminderToken token) + { + if (stream == null) + { + throw new ArgumentNullException(nameof(stream)); + } + var info = await ReminderInfo.DeserializeAsync(stream); + if(info == null) + { + return null; + } + var reminder = new ActorReminder(token.ActorType, token.ActorId, token.Name, info.Data, info.DueTime, + info.Period); + return reminder; + } } } diff --git a/src/Dapr.Actors/Runtime/ReminderInfo.cs b/src/Dapr.Actors/Runtime/ReminderInfo.cs index 84e56bbc..447cf607 100644 --- a/src/Dapr.Actors/Runtime/ReminderInfo.cs +++ b/src/Dapr.Actors/Runtime/ReminderInfo.cs @@ -20,7 +20,7 @@ namespace Dapr.Actors.Runtime using System.Threading.Tasks; // represents the wire format used by Dapr to store reminder info with the runtime - internal struct ReminderInfo + internal class ReminderInfo { public ReminderInfo( byte[] data, @@ -49,13 +49,16 @@ namespace Dapr.Actors.Runtime internal static async Task DeserializeAsync(Stream stream) { var json = await JsonSerializer.DeserializeAsync(stream); + if(json.ValueKind == JsonValueKind.Null) + { + return null; + } var dueTime = default(TimeSpan); var period = default(TimeSpan); var data = default(byte[]); int? repetition = null; TimeSpan? ttl = null; - if (json.TryGetProperty("dueTime", out var dueTimeProperty)) { var dueTimeString = dueTimeProperty.GetString(); diff --git a/test/Dapr.Actors.Test/ActorUnitTestTests.cs b/test/Dapr.Actors.Test/ActorUnitTestTests.cs index 318ede9f..baa52c56 100644 --- a/test/Dapr.Actors.Test/ActorUnitTestTests.cs +++ b/test/Dapr.Actors.Test/ActorUnitTestTests.cs @@ -74,6 +74,7 @@ namespace Dapr.Actors public async Task CanTestStartingAndStoppinReminder() { var reminders = new List(); + IActorReminder getReminder = null; var timerManager = new Mock(MockBehavior.Strict); timerManager @@ -84,6 +85,9 @@ namespace Dapr.Actors .Setup(tm => tm.UnregisterReminderAsync(It.IsAny())) .Callback(reminder => reminders.RemoveAll(t => t.Name == reminder.Name)) .Returns(Task.CompletedTask); + timerManager + .Setup(tm => tm.GetReminderAsync(It.IsAny())) + .Returns(() => Task.FromResult(getReminder)); var host = ActorHost.CreateForTest(new ActorTestOptions(){ TimerManager = timerManager.Object, }); var actor = new CoolTestActor(host); @@ -109,6 +113,10 @@ namespace Dapr.Actors await actor.ReceiveReminderAsync(reminder.Name, reminder.State, reminder.DueTime, reminder.Period); } + getReminder = reminder; + var reminderFromGet = await actor.GetReminderAsync(); + Assert.Equal(reminder, reminderFromGet); + // Stop the reminder await actor.StopReminderAsync(); Assert.Empty(reminders); @@ -148,6 +156,11 @@ namespace Dapr.Actors await this.RegisterReminderAsync("record", bytes, dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(5)); } + public async Task GetReminderAsync() + { + return await this.GetReminderAsync("record"); + } + public async Task StopReminderAsync() { await this.UnregisterReminderAsync("record"); diff --git a/test/Dapr.Actors.Test/TestDaprInteractor.cs b/test/Dapr.Actors.Test/TestDaprInteractor.cs index 1b382208..92cfa709 100644 --- a/test/Dapr.Actors.Test/TestDaprInteractor.cs +++ b/test/Dapr.Actors.Test/TestDaprInteractor.cs @@ -99,6 +99,20 @@ namespace Dapr.Actors throw new System.NotImplementedException(); } + /// + /// Gets a reminder. + /// + /// Type of actor. + /// ActorId. + /// Name of reminder to unregister. + /// Cancels the operation. + /// A representing the result of the asynchronous operation. + public Task GetReminderAsync(string actorType, string actorId, string reminderName, + CancellationToken cancellationToken = default) + { + throw new System.NotImplementedException(); + } + /// /// Unregisters a reminder. /// diff --git a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs index 33a5e0d0..0bf57f64 100644 --- a/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs +++ b/test/Dapr.E2E.Test.Actors/Reminders/IReminderActor.cs @@ -28,5 +28,7 @@ namespace Dapr.E2E.Test.Actors.Reminders Task StartReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions); Task GetState(); + + Task GetReminder(); } } diff --git a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs index f9b9d757..b08e483c 100644 --- a/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs +++ b/test/Dapr.E2E.Test.App/Actors/ReminderActor.cs @@ -44,6 +44,12 @@ namespace Dapr.E2E.Test.Actors.Reminders await this.StateManager.SetStateAsync("reminder-state", new State(){ IsReminderRunning = true, }); } + public async Task GetReminder(){ + var reminder = await this.GetReminderAsync("test-reminder"); + var reminderString = JsonSerializer.Serialize(reminder, this.Host.JsonSerializerOptions); + return reminderString; + } + public async Task StartReminderWithTtl(TimeSpan ttl) { var options = new StartReminderOptions() diff --git a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs index 50cd8721..ff39cce8 100644 --- a/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs +++ b/test/Dapr.E2E.Test/Actors/E2ETests.ReminderTests.cs @@ -13,6 +13,7 @@ namespace Dapr.E2E.Test { using System; + using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Dapr.Actors; @@ -49,6 +50,58 @@ namespace Dapr.E2E.Test Assert.Equal(10, state.Count); } + [Fact] + public async Task ActorCanStartAndStopAndGetReminder() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + var proxy = this.ProxyFactory.CreateActorProxy(ActorId.CreateRandom(), "ReminderActor"); + + await WaitForActorRuntimeAsync(proxy, cts.Token); + + // Get reminder before starting it, should return null. + var reminder = await proxy.GetReminder(); + Assert.Equal("null", reminder); + + // Start reminder, to count up to 10 + await proxy.StartReminder(new StartReminderOptions(){ Total = 10, }); + + State state = new State(); + var countGetReminder = 0; + while (true) + { + cts.Token.ThrowIfCancellationRequested(); + + reminder = await proxy.GetReminder(); + Assert.NotNull(reminder); + + // If reminder is null then it means the reminder has been stopped. + if (reminder != "null") + { + countGetReminder++; + var reminderJson = JsonSerializer.Deserialize(reminder); + var name = reminderJson.GetProperty("name").ToString(); + var period = reminderJson.GetProperty("period").ToString(); + var dueTime = reminderJson.GetProperty("dueTime").ToString(); + + Assert.Equal("test-reminder", name); + Assert.Equal(TimeSpan.FromMilliseconds(50).ToString(), period); + Assert.Equal(TimeSpan.Zero.ToString(), dueTime); + } + + state = await proxy.GetState(); + this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}"); + if (!state.IsReminderRunning) + { + break; + } + } + + // Should count up to exactly 10 + Assert.Equal(10, state.Count); + // Should be able to Get Reminder at least once. + Assert.True(countGetReminder > 0); + } + [Fact] public async Task ActorCanStartReminderWithRepetitions() {