adding get actor reminder API (#1103)

* get actor reminder API

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

* handling serialization better

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>

---------

Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
This commit is contained in:
Shivam Kumar 2023-06-09 21:50:51 +05:30
parent edb09a08b7
commit ece9fbe0d4
15 changed files with 200 additions and 2 deletions

View File

@ -96,6 +96,10 @@ namespace ActorClient
receivedData = await proxy.GetData();
Console.WriteLine($"Received data is {receivedData}.");
Console.WriteLine("Getting details of the registered reminder");
var reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Deregistering timer. Timers would any way stop if the actor is deactivated as part of Dapr garbage collection.");
await proxy.UnregisterTimer();
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
@ -105,14 +109,23 @@ namespace ActorClient
await proxy.RegisterReminderWithRepetitions(3);
Console.WriteLine("Waiting so the reminder can be triggered");
await Task.Delay(5000);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Registering reminder with ttl and repetitions, i.e. reminder stops when either condition is met - The reminder will repeat 2 times.");
await proxy.RegisterReminderWithTtlAndRepetitions(TimeSpan.FromSeconds(5), 2);
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
await proxy.UnregisterReminder();
Console.WriteLine("Registering reminder and Timer with TTL - The reminder will self delete after 10 seconds.");
await proxy.RegisterReminderWithTtl(TimeSpan.FromSeconds(10));
await proxy.RegisterTimerWithTtl(TimeSpan.FromSeconds(10));
Console.WriteLine("Getting details of the registered reminder");
reminder = await proxy.GetReminder();
Console.WriteLine($"Received reminder is {reminder}.");
// Track the reminder.
var timer = new Timer(async state => Console.WriteLine($"Received data: {await proxy.GetData()}"), null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));

View File

@ -85,6 +85,11 @@ namespace DaprDemoActor
await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(1), repetitions, ttl);
}
public async Task<IActorReminder> GetReminder()
{
return await this.GetReminderAsync("TestReminder");
}
public Task UnregisterReminder()
{
return this.UnregisterReminderAsync("TestReminder");

View File

@ -16,6 +16,7 @@ namespace IDemoActorInterface
using System;
using System.Threading.Tasks;
using Dapr.Actors;
using Dapr.Actors.Runtime;
/// <summary>
/// Interface for Actor method.
@ -94,6 +95,13 @@ namespace IDemoActorInterface
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);
/// <summary>
/// Gets the registered reminder.
/// </summary>
/// <param name="reminderName">The name of the reminder.</param>
/// <returns>A task that returns the reminder after completion.</returns>
Task<IActorReminder> GetReminder();
/// <summary>
/// Unregisters the registered timer.
/// </summary>

View File

@ -254,6 +254,23 @@ namespace Dapr.Actors
return this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
}
public async Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);
HttpRequestMessage RequestFunc()
{
var request = new HttpRequestMessage()
{
Method = HttpMethod.Get,
};
return request;
}
var response = await this.SendAsync(RequestFunc, relativeUrl, cancellationToken);
return await response.Content.ReadAsStreamAsync();
}
public Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default)
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorReminderRelativeUrlFormat, actorType, actorId, reminderName);

View File

@ -74,6 +74,16 @@ namespace Dapr.Actors
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default);
/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default);
/// <summary>
/// Unregisters a reminder.
/// </summary>

View File

@ -360,6 +360,18 @@ namespace Dapr.Actors.Runtime
return reminder;
}
/// <summary>
/// Gets a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
/// <param name="reminderName">The name of the reminder to get.</param>
/// <returns>
/// Returns a task that represents the asynchronous get operation. The result of the task contains the reminder if it exists, otherwise null.
/// </returns>
protected async Task<IActorReminder> GetReminderAsync(string reminderName)
{
return await this.Host.TimerManager.GetReminderAsync(new ActorReminderToken(this.actorTypeName, this.Id, reminderName));
}
/// <summary>
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>

View File

@ -91,6 +91,11 @@ namespace Dapr.Actors.Runtime
throw new NotImplementedException(Message);
}
public override Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);
}
public override Task UnregisterReminderAsync(ActorReminderToken reminder)
{
throw new NotImplementedException(Message);

View File

@ -27,6 +27,13 @@ namespace Dapr.Actors.Runtime
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task RegisterReminderAsync(ActorReminder reminder);
/// <summary>
/// Gets a reminder previously registered using
/// </summary>
/// <param name="reminder">The <see cref="ActorReminderToken" /> to unregister.</param>
/// <returns>A task which will complete when the operation completes.</returns>
public abstract Task<IActorReminder> GetReminderAsync(ActorReminderToken reminder);
/// <summary>
/// Unregisters the provided reminder with the runtime.
/// </summary>

View File

@ -14,6 +14,8 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using System.IO;
using System.Text;
namespace Dapr.Actors.Runtime
{
@ -37,6 +39,18 @@ namespace Dapr.Actors.Runtime
await this.interactor.RegisterReminderAsync(reminder.ActorType, reminder.ActorId.ToString(), reminder.Name, serialized);
}
public override async Task<IActorReminder> GetReminderAsync(ActorReminderToken token)
{
if (token == null)
{
throw new ArgumentNullException(nameof(token));
}
var responseStream = await this.interactor.GetReminderAsync(token.ActorType, token.ActorId.ToString(), token.Name);
var reminder = await DeserializeReminderAsync(responseStream, token);
return reminder;
}
public override async Task UnregisterReminderAsync(ActorReminderToken reminder)
{
if (reminder == null)
@ -77,5 +91,21 @@ namespace Dapr.Actors.Runtime
reminder.Ttl);
return await info.SerializeAsync();
}
private async ValueTask<ActorReminder> DeserializeReminderAsync(Stream stream, ActorReminderToken token)
{
if (stream == null)
{
throw new ArgumentNullException(nameof(stream));
}
var info = await ReminderInfo.DeserializeAsync(stream);
if(info == null)
{
return null;
}
var reminder = new ActorReminder(token.ActorType, token.ActorId, token.Name, info.Data, info.DueTime,
info.Period);
return reminder;
}
}
}

View File

@ -20,7 +20,7 @@ namespace Dapr.Actors.Runtime
using System.Threading.Tasks;
// represents the wire format used by Dapr to store reminder info with the runtime
internal struct ReminderInfo
internal class ReminderInfo
{
public ReminderInfo(
byte[] data,
@ -49,13 +49,16 @@ namespace Dapr.Actors.Runtime
internal static async Task<ReminderInfo> DeserializeAsync(Stream stream)
{
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
if(json.ValueKind == JsonValueKind.Null)
{
return null;
}
var dueTime = default(TimeSpan);
var period = default(TimeSpan);
var data = default(byte[]);
int? repetition = null;
TimeSpan? ttl = null;
if (json.TryGetProperty("dueTime", out var dueTimeProperty))
{
var dueTimeString = dueTimeProperty.GetString();

View File

@ -74,6 +74,7 @@ namespace Dapr.Actors
public async Task CanTestStartingAndStoppinReminder()
{
var reminders = new List<ActorReminder>();
IActorReminder getReminder = null;
var timerManager = new Mock<ActorTimerManager>(MockBehavior.Strict);
timerManager
@ -84,6 +85,9 @@ namespace Dapr.Actors
.Setup(tm => tm.UnregisterReminderAsync(It.IsAny<ActorReminderToken>()))
.Callback<ActorReminderToken>(reminder => reminders.RemoveAll(t => t.Name == reminder.Name))
.Returns(Task.CompletedTask);
timerManager
.Setup(tm => tm.GetReminderAsync(It.IsAny<ActorReminderToken>()))
.Returns(() => Task.FromResult(getReminder));
var host = ActorHost.CreateForTest<CoolTestActor>(new ActorTestOptions(){ TimerManager = timerManager.Object, });
var actor = new CoolTestActor(host);
@ -109,6 +113,10 @@ namespace Dapr.Actors
await actor.ReceiveReminderAsync(reminder.Name, reminder.State, reminder.DueTime, reminder.Period);
}
getReminder = reminder;
var reminderFromGet = await actor.GetReminderAsync();
Assert.Equal(reminder, reminderFromGet);
// Stop the reminder
await actor.StopReminderAsync();
Assert.Empty(reminders);
@ -148,6 +156,11 @@ namespace Dapr.Actors
await this.RegisterReminderAsync("record", bytes, dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(5));
}
public async Task<IActorReminder> GetReminderAsync()
{
return await this.GetReminderAsync("record");
}
public async Task StopReminderAsync()
{
await this.UnregisterReminderAsync("record");

View File

@ -99,6 +99,20 @@ namespace Dapr.Actors
throw new System.NotImplementedException();
}
/// <summary>
/// Gets a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="reminderName">Name of reminder to unregister.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
public Task<Stream> GetReminderAsync(string actorType, string actorId, string reminderName,
CancellationToken cancellationToken = default)
{
throw new System.NotImplementedException();
}
/// <summary>
/// Unregisters a reminder.
/// </summary>

View File

@ -28,5 +28,7 @@ namespace Dapr.E2E.Test.Actors.Reminders
Task StartReminderWithTtlAndRepetitions(TimeSpan ttl, int repetitions);
Task<State> GetState();
Task<String> GetReminder();
}
}

View File

@ -44,6 +44,12 @@ namespace Dapr.E2E.Test.Actors.Reminders
await this.StateManager.SetStateAsync<State>("reminder-state", new State(){ IsReminderRunning = true, });
}
public async Task<String> GetReminder(){
var reminder = await this.GetReminderAsync("test-reminder");
var reminderString = JsonSerializer.Serialize(reminder, this.Host.JsonSerializerOptions);
return reminderString;
}
public async Task StartReminderWithTtl(TimeSpan ttl)
{
var options = new StartReminderOptions()

View File

@ -13,6 +13,7 @@
namespace Dapr.E2E.Test
{
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
@ -49,6 +50,58 @@ namespace Dapr.E2E.Test
Assert.Equal(10, state.Count);
}
[Fact]
public async Task ActorCanStartAndStopAndGetReminder()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var proxy = this.ProxyFactory.CreateActorProxy<IReminderActor>(ActorId.CreateRandom(), "ReminderActor");
await WaitForActorRuntimeAsync(proxy, cts.Token);
// Get reminder before starting it, should return null.
var reminder = await proxy.GetReminder();
Assert.Equal("null", reminder);
// Start reminder, to count up to 10
await proxy.StartReminder(new StartReminderOptions(){ Total = 10, });
State state = new State();
var countGetReminder = 0;
while (true)
{
cts.Token.ThrowIfCancellationRequested();
reminder = await proxy.GetReminder();
Assert.NotNull(reminder);
// If reminder is null then it means the reminder has been stopped.
if (reminder != "null")
{
countGetReminder++;
var reminderJson = JsonSerializer.Deserialize<JsonElement>(reminder);
var name = reminderJson.GetProperty("name").ToString();
var period = reminderJson.GetProperty("period").ToString();
var dueTime = reminderJson.GetProperty("dueTime").ToString();
Assert.Equal("test-reminder", name);
Assert.Equal(TimeSpan.FromMilliseconds(50).ToString(), period);
Assert.Equal(TimeSpan.Zero.ToString(), dueTime);
}
state = await proxy.GetState();
this.Output.WriteLine($"Got Count: {state.Count} IsReminderRunning: {state.IsReminderRunning}");
if (!state.IsReminderRunning)
{
break;
}
}
// Should count up to exactly 10
Assert.Equal(10, state.Count);
// Should be able to Get Reminder at least once.
Assert.True(countGetReminder > 0);
}
[Fact]
public async Task ActorCanStartReminderWithRepetitions()
{