Add Actor Reminder/Timer TTL support (#912)

Add Actor Reminder/Timer TTL support

This commit adds the TTL field to Actor reminders/timers. This allows
reminders and timers to expire after a given TimeSpan instead of
having to be manually deleted.

https://github.com/dapr/dotnet-sdk/issues/788

Signed-off-by: Hal Spang <halspang@microsoft.com>
This commit is contained in:
halspang 2022-08-03 11:56:53 -07:00 committed by GitHub
parent e58b1de56a
commit 8bc7e90194
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 577 additions and 37 deletions

View File

@ -14,6 +14,7 @@
namespace ActorClient
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors;
using Dapr.Actors.Client;
@ -100,6 +101,14 @@ namespace ActorClient
Console.WriteLine("Deregistering reminder. Reminders are durable and would not stop until an explicit deregistration or the actor is deleted.");
await proxy.UnregisterReminder();
Console.WriteLine("Registering reminder and Timer with TTL - The reminder will self delete after 10 seconds.");
await proxy.RegisterReminderWithTtl(TimeSpan.FromSeconds(10));
await proxy.RegisterTimerWithTtl(TimeSpan.FromSeconds(10));
// Track the reminder.
var timer = new Timer(async state => Console.WriteLine($"Received data: {await proxy.GetData()}"), null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(21));
await timer.DisposeAsync();
Console.WriteLine("Creating a Bank Actor");
var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "DemoActor");

View File

@ -70,6 +70,11 @@ namespace DaprDemoActor
await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5));
}
public async Task RegisterReminderWithTtl(TimeSpan ttl)
{
await this.RegisterReminderAsync("TestReminder", null, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), ttl);
}
public Task UnregisterReminder()
{
return this.UnregisterReminderAsync("TestReminder");
@ -102,6 +107,18 @@ namespace DaprDemoActor
return this.RegisterTimerAsync("TestTimer", nameof(this.TimerCallback), serializedTimerParams, TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3));
}
public Task RegisterTimerWithTtl(TimeSpan ttl)
{
var timerParams = new TimerParams
{
IntParam = 100,
StringParam = "timer test",
};
var serializedTimerParams = JsonSerializer.SerializeToUtf8Bytes(timerParams);
return this.RegisterTimerAsync("TestTimer", nameof(this.TimerCallback), serializedTimerParams, TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), ttl);
}
public Task UnregisterTimer()
{
return this.UnregisterTimerAsync("TestTimer");

View File

@ -0,0 +1,10 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
}

View File

@ -13,6 +13,7 @@
namespace IDemoActorInterface
{
using System;
using System.Threading.Tasks;
using Dapr.Actors;
@ -52,6 +53,13 @@ namespace IDemoActorInterface
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterReminder();
/// <summary>
/// Registers a reminder.
/// </summary>
/// <param name="ttl">TimeSpan that dictates when the reminder expires.</param>
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterReminderWithTtl(TimeSpan ttl);
/// <summary>
/// Unregisters the registered reminder.
/// </summary>
@ -64,6 +72,13 @@ namespace IDemoActorInterface
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterTimer();
/// <summary>
/// Registers a timer.
/// </summary>
/// <param name="ttl">Optional TimeSpan that dictates when the timer expires.</param>
/// <returns>A task that represents the asynchronous save operation.</returns>
Task RegisterTimerWithTtl(TimeSpan ttl);
/// <summary>
/// Unregisters the registered timer.
/// </summary>

View File

@ -215,13 +215,77 @@ namespace Dapr.Actors.Runtime
TimeSpan dueTime,
TimeSpan period)
{
var reminder = new ActorReminder(this.actorTypeName, this.Id, reminderName, state, dueTime, period);
return await RegisterReminderAsync(new ActorReminderOptions
{
ActorTypeName = this.actorTypeName,
Id = this.Id,
ReminderName = reminderName,
State = state,
DueTime = dueTime,
Period = period,
Ttl = null
});
}
/// <summary>
/// Registers a reminder with the actor.
/// </summary>
/// <param name="reminderName">The name of the reminder to register. The name must be unique per actor.</param>
/// <param name="state">User state passed to the reminder invocation.</param>
/// <param name="dueTime">The amount of time to delay before invoking the reminder for the first time. Specify negative one (-1) milliseconds to disable invocation. Specify zero (0) to invoke the reminder immediately after registration.
/// </param>
/// <param name="period">
/// The time interval between reminder invocations after the first invocation. Specify negative one (-1) milliseconds to disable periodic invocation.
/// </param>
/// <param name="ttl">The time interval after which the reminder will expire.</param>
/// <returns>
/// A task that represents the asynchronous registration operation. The result of the task provides information about the registered reminder and is used to unregister the reminder using UnregisterReminderAsync />.
/// </returns>
/// <remarks>
/// <para>
/// The class deriving from <see cref="Dapr.Actors.Runtime.Actor" /> must implement <see cref="Dapr.Actors.Runtime.IRemindable" /> to consume reminder invocations. Multiple reminders can be registered at any time, uniquely identified by <paramref name="reminderName" />. Existing reminders can also be updated by calling this method again. Reminder invocations are synchronized both with other reminders and other actor method callbacks.
/// </para>
/// </remarks>
protected async Task<IActorReminder> RegisterReminderAsync(
string reminderName,
byte[] state,
TimeSpan dueTime,
TimeSpan period,
TimeSpan ttl)
{
return await RegisterReminderAsync(new ActorReminderOptions
{
ActorTypeName = this.actorTypeName,
Id = this.Id,
ReminderName = reminderName,
State = state,
DueTime = dueTime,
Period = period,
Ttl = ttl
});
}
/// <summary>
/// Registers a reminder with the actor.
/// </summary>
/// <param name="options">A <see cref="ActorReminderOptions" /> containing the various settings for an <see cref="ActorReminder"/>.</param>
/// <returns>
/// A task that represents the asynchronous registration operation. The result of the task provides information about the registered reminder and is used to unregister the reminder using UnregisterReminderAsync />.
/// </returns>
/// <remarks>
/// <para>
/// The class deriving from <see cref="Dapr.Actors.Runtime.Actor" /> must implement <see cref="Dapr.Actors.Runtime.IRemindable" /> to consume reminder invocations. Multiple reminders can be registered at any time, uniquely identified by <paramref name="options.ReminderName" />. Existing reminders can also be updated by calling this method again. Reminder invocations are synchronized both with other reminders and other actor method callbacks.
/// </para>
/// </remarks>
internal async Task<IActorReminder> RegisterReminderAsync(ActorReminderOptions options)
{
var reminder = new ActorReminder(options);
await this.Host.TimerManager.RegisterReminderAsync(reminder);
return reminder;
}
/// <summary>
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync" />.
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
/// <param name="reminder">The actor reminder to unregister.</param>
/// <returns>
@ -234,7 +298,7 @@ namespace Dapr.Actors.Runtime
}
/// <summary>
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync" />.
/// Unregisters a reminder previously registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
/// <param name="reminderName">The actor reminder name to unregister.</param>
/// <returns>
@ -270,17 +334,72 @@ namespace Dapr.Actors.Runtime
byte[] callbackParams,
TimeSpan dueTime,
TimeSpan period)
{
return await RegisterTimerAsync(new ActorTimerOptions
{
ActorTypeName = this.actorTypeName,
Id = this.Id,
TimerName = timerName,
TimerCallback = callback,
Data = callbackParams,
DueTime = dueTime,
Period = period,
Ttl = null
});
}
/// <summary>
/// Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
/// </summary>
/// <param name="timerName">Timer Name. If a timer name is not provided, a timer is autogenerated.</param>
/// <param name="callback">
/// The name of the method to be called when the timer fires.
/// It has one parameter: the state object passed to RegisterTimer.
/// It returns a <see cref="System.Threading.Tasks.Task"/> representing the asynchronous operation.
/// </param>
/// <param name="callbackParams">An object containing information to be used by the callback method, or null.</param>
/// <param name="dueTime">The amount of time to delay before the async callback is first invoked.
/// Specify negative one (-1) milliseconds to prevent the timer from starting.
/// Specify zero (0) to start the timer immediately.
/// </param>
/// <param name="period">
/// The time interval between invocations of the async callback.
/// Specify negative one (-1) milliseconds to disable periodic signaling.</param>
/// <param name="ttl">The time interval after which a Timer will expire.</param>
/// <returns>Returns IActorTimer object.</returns>
public async Task<ActorTimer> RegisterTimerAsync(
string timerName,
string callback,
byte[] callbackParams,
TimeSpan dueTime,
TimeSpan period,
TimeSpan ttl)
{
return await RegisterTimerAsync(new ActorTimerOptions
{
ActorTypeName = this.actorTypeName,
Id = this.Id,
TimerName = timerName,
TimerCallback = callback,
Data = callbackParams,
DueTime = dueTime,
Period = period,
Ttl = ttl
});
}
internal async Task<ActorTimer> RegisterTimerAsync(ActorTimerOptions options)
{
// Validate that the timer callback specified meets all the required criteria for a valid callback method
this.ValidateTimerCallback(this.Host, callback);
this.ValidateTimerCallback(this.Host, options.TimerCallback);
// create a timer name to register with Dapr runtime.
if (string.IsNullOrEmpty(timerName))
if (string.IsNullOrEmpty(options.TimerName))
{
timerName = $"{this.Id}_Timer_{Guid.NewGuid()}";
options.TimerName = $"{this.Id}_Timer_{Guid.NewGuid()}";
}
var actorTimer = new ActorTimer(this.actorTypeName, this.Id, timerName, callback, callbackParams, dueTime, period);
var actorTimer = new ActorTimer(options);
await this.Host.TimerManager.RegisterTimerAsync(actorTimer);
return actorTimer;
}

View File

@ -41,29 +41,88 @@ namespace Dapr.Actors.Runtime
byte[] state,
TimeSpan dueTime,
TimeSpan period)
: base(actorType, actorId, name)
{
if (dueTime < TimeSpan.Zero)
: this(new ActorReminderOptions
{
throw new ArgumentOutOfRangeException(nameof(dueTime), string.Format(
ActorTypeName = actorType,
Id = actorId,
ReminderName = name,
State = state,
DueTime = dueTime,
Period = period,
Ttl = null
})
{
}
/// <summary>
/// Initializes a new instance of <see cref="ActorReminder" />.
/// </summary>
/// <param name="actorType">The actor type.</param>
/// <param name="actorId">The actor id.</param>
/// <param name="name">The reminder name.</param>
/// <param name="state">The state associated with the reminder.</param>
/// <param name="dueTime">The reminder due time.</param>
/// <param name="period">The reminder period.</param>
/// <param name="ttl">The reminder ttl.</param>
public ActorReminder(
string actorType,
ActorId actorId,
string name,
byte[] state,
TimeSpan dueTime,
TimeSpan period,
TimeSpan ttl)
: this(new ActorReminderOptions
{
ActorTypeName = actorType,
Id = actorId,
ReminderName = name,
State = state,
DueTime = dueTime,
Period = period,
Ttl = ttl
})
{
}
/// <summary>
/// Initializes a new instance of <see cref="ActorReminder" />.
/// </summary>
/// <param name="options">A <see cref="ActorReminderOptions" /> containing the various settings for an <see cref="ActorReminder"/>.</param>
internal ActorReminder(ActorReminderOptions options)
: base(options.ActorTypeName, options.Id, options.ReminderName)
{
if (options.DueTime < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(options.DueTime), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
TimeSpan.Zero.TotalMilliseconds,
TimeSpan.MaxValue.TotalMilliseconds));
}
if (period < MiniumPeriod)
if (options.Period < MiniumPeriod)
{
throw new ArgumentOutOfRangeException(nameof(period), string.Format(
throw new ArgumentOutOfRangeException(nameof(options.Period), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
MiniumPeriod.TotalMilliseconds,
TimeSpan.MaxValue.TotalMilliseconds));
}
this.State = state;
this.DueTime = dueTime;
this.Period = period;
if (options.Ttl != null && (options.Ttl < options.DueTime || options.Ttl < TimeSpan.Zero))
{
throw new ArgumentOutOfRangeException(nameof(options.Ttl), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
options.DueTime,
TimeSpan.MaxValue.TotalMilliseconds));
}
this.State = options.State;
this.DueTime = options.DueTime;
this.Period = options.Period;
this.Ttl = options.Ttl;
}
/// <summary>
@ -80,5 +139,10 @@ namespace Dapr.Actors.Runtime
/// Gets the reminder period.
/// </summary>
public TimeSpan Period { get; }
/// <summary>
/// The optional <see cref="TimeSpan"/> that states when the reminder will expire.
/// </summary>
public TimeSpan? Ttl { get; }
}
}

View File

@ -0,0 +1,44 @@
using System;
namespace Dapr.Actors.Runtime
{
/// <summary>
/// A collection of options used to create an <see cref="ActorReminder"/>.
/// </summary>
internal class ActorReminderOptions
{
/// <summary>
/// The name of the type of the Actor that the reminder will fire for.
/// </summary>
public string ActorTypeName { get; set; }
/// <summary>
/// The <see cref="ActorId"/> that the reminder will fire for.
/// </summary>
public ActorId Id { get; set; }
/// <summary>
/// The name of the reminder.
/// </summary>
public string ReminderName { get; set; }
/// <summary>
/// State that is passed to the Actor when the reminder fires.
/// </summary>
public byte[] State { get; set; }
/// <summary>
/// <see cref="TimeSpan"/> that determines when the reminder will first fire.
/// </summary>
public TimeSpan DueTime { get; set; }
/// <summary>
/// <see cref="TimeSpan"/> that determines how much time there is between the reminder firing. Starts after the <see cref="DueTime"/>.
/// </summary>
public TimeSpan Period { get; set; }
/// <summary>
/// An optional <see cref="TimeSpan"/> that determines when the reminder will expire.
/// </summary>
public TimeSpan? Ttl { get; set; }
}
}

View File

@ -43,30 +43,92 @@ namespace Dapr.Actors.Runtime
byte[] data,
TimeSpan dueTime,
TimeSpan period)
: base(actorType, actorId, name)
{
if (dueTime < TimeSpan.Zero)
: this(new ActorTimerOptions
{
throw new ArgumentOutOfRangeException(nameof(dueTime), string.Format(
ActorTypeName = actorType,
Id = actorId,
TimerName = name,
TimerCallback = timerCallback,
Data = data,
DueTime = dueTime,
Period = period,
Ttl = null
})
{
}
/// <summary>
/// Initializes a new instance of <see cref="ActorTimer" />.
/// </summary>
/// <param name="actorType">The actor type.</param>
/// <param name="actorId">The actor id.</param>
/// <param name="name">The timer name.</param>
/// <param name="timerCallback">The name of the callback associated with the timer.</param>
/// <param name="data">The state associated with the timer.</param>
/// <param name="dueTime">The timer due time.</param>
/// <param name="period">The timer period.</param>
/// <param name="ttl">The timer ttl.</param>
public ActorTimer(
string actorType,
ActorId actorId,
string name,
string timerCallback,
byte[] data,
TimeSpan dueTime,
TimeSpan period,
TimeSpan ttl)
: this(new ActorTimerOptions
{
ActorTypeName = actorType,
Id = actorId,
TimerName = name,
TimerCallback = timerCallback,
Data = data,
DueTime = dueTime,
Period = period,
Ttl = ttl
})
{
}
/// <summary>
/// Initializes a new instance of <see cref="ActorTimer"/>.
/// </summary>
/// <param name="options">An <see cref="ActorTimerOptions"/> containing the various settings for an <see cref="ActorTimer"/>.</param>
internal ActorTimer(ActorTimerOptions options) : base(options.ActorTypeName, options.Id, options.TimerName)
{
if (options.DueTime < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(nameof(options.DueTime), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
TimeSpan.Zero.TotalMilliseconds,
TimeSpan.MaxValue.TotalMilliseconds));
}
if (period < MiniumPeriod)
if (options.Period < MiniumPeriod)
{
throw new ArgumentOutOfRangeException(nameof(period), string.Format(
throw new ArgumentOutOfRangeException(nameof(options.Period), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
MiniumPeriod.TotalMilliseconds,
TimeSpan.MaxValue.TotalMilliseconds));
}
this.TimerCallback = timerCallback;
this.Data = data;
this.DueTime = dueTime;
this.Period = period;
if (options.Ttl != null && (options.Ttl < options.DueTime || options.Ttl < TimeSpan.Zero))
{
throw new ArgumentOutOfRangeException(nameof(options.Ttl), string.Format(
CultureInfo.CurrentCulture,
SR.TimerArgumentOutOfRange,
options.DueTime,
TimeSpan.MaxValue.TotalMilliseconds));
}
this.TimerCallback = options.TimerCallback;
this.Data = options.Data;
this.DueTime = options.DueTime;
this.Period = options.Period;
this.Ttl = options.Ttl;
}
/// <summary>
@ -108,5 +170,10 @@ namespace Dapr.Actors.Runtime
/// Gets the time interval at which the timer is invoked periodically.
/// </summary>
public TimeSpan Period { get; }
/// <summary>
/// The optional <see cref="TimeSpan"/> that states when the reminder will expire.
/// </summary>
public TimeSpan? Ttl { get; }
}
}

View File

@ -0,0 +1,49 @@
using System;
namespace Dapr.Actors.Runtime
{
/// <summary>
/// Collection of all the options used for creating a <see cref="ActorTimer"/>.
/// </summary>
internal struct ActorTimerOptions
{
/// <summary>
/// The name of the type of the Actor that the timer will fire for.
/// </summary>
public string ActorTypeName { get; set; }
/// <summary>
/// The <see cref="ActorId"/> that the timer will fire for.
/// </summary>
public ActorId Id { get; set; }
/// <summary>
/// The name of the timer.
/// </summary>
public string TimerName { get; set; }
/// <summary>
/// The name of the callback for the timer.
/// </summary>
public string TimerCallback { get; set; }
/// <summary>
/// State that is passed to the Actor when the timer fires.
/// </summary>
public byte[] Data { get; set; }
/// <summary>
/// <see cref="TimeSpan"/> that determines when the timer will first fire.
/// </summary>
public TimeSpan DueTime { get; set; }
/// <summary>
/// <see cref="TimeSpan"/> that determines how much time there is between the timer firing. Starts after the <see cref="DueTime"/>.
/// </summary>
public TimeSpan Period { get; set; }
/// <summary>
/// An optional <see cref="TimeSpan"/> that determines when the timer will expire.
/// </summary>
public TimeSpan? Ttl { get; set; }
}
}

View File

@ -55,7 +55,7 @@ namespace Dapr.Actors.Runtime
}
#pragma warning disable 0618
var timerInfo = new TimerInfo(timer.TimerCallback, timer.Data, timer.DueTime, timer.Period);
var timerInfo = new TimerInfo(timer.TimerCallback, timer.Data, timer.DueTime, timer.Period, timer.Ttl);
#pragma warning restore 0618
var data = JsonSerializer.Serialize(timerInfo);
await this.interactor.RegisterTimerAsync(timer.ActorType, timer.ActorId.ToString(), timer.Name, data);
@ -73,7 +73,7 @@ namespace Dapr.Actors.Runtime
private async ValueTask<string> SerializeReminderAsync(ActorReminder reminder)
{
var info = new ReminderInfo(reminder.State, reminder.DueTime, reminder.Period);
var info = new ReminderInfo(reminder.State, reminder.DueTime, reminder.Period, reminder.Ttl);
return await info.SerializeAsync();
}
}

View File

@ -16,7 +16,7 @@ namespace Dapr.Actors.Runtime
using System;
/// <summary>
/// Represents a reminder registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync" />.
/// Represents a reminder registered using <see cref="Dapr.Actors.Runtime.Actor.RegisterReminderAsync(ActorReminderOptions)" />.
/// </summary>
public interface IActorReminder
{

View File

@ -25,11 +25,13 @@ namespace Dapr.Actors.Runtime
public ReminderInfo(
byte[] data,
TimeSpan dueTime,
TimeSpan period)
TimeSpan period,
TimeSpan? ttl = null)
{
this.Data = data;
this.DueTime = dueTime;
this.Period = period;
this.Ttl = ttl;
}
public TimeSpan DueTime { get; private set; }
@ -38,6 +40,8 @@ namespace Dapr.Actors.Runtime
public byte[] Data { get; private set; }
public TimeSpan? Ttl { get; private set; }
internal static async Task<ReminderInfo> DeserializeAsync(Stream stream)
{
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
@ -45,6 +49,7 @@ namespace Dapr.Actors.Runtime
var dueTime = default(TimeSpan);
var period = default(TimeSpan);
var data = default(byte[]);
TimeSpan? ttl = null;
if (json.TryGetProperty("dueTime", out var dueTimeProperty))
{
@ -63,7 +68,13 @@ namespace Dapr.Actors.Runtime
data = dataProperty.GetBytesFromBase64();
}
return new ReminderInfo(data, dueTime, period);
if (json.TryGetProperty("ttl", out var ttlProperty))
{
var ttlString = ttlProperty.GetString();
ttl = ConverterUtils.ConvertTimeSpanFromDaprFormat(ttlString);
}
return new ReminderInfo(data, dueTime, period, ttl);
}
internal async ValueTask<string> SerializeAsync()
@ -75,6 +86,12 @@ namespace Dapr.Actors.Runtime
writer.WriteString("dueTime", ConverterUtils.ConvertTimeSpanValueInDaprFormat(this.DueTime));
writer.WriteString("period", ConverterUtils.ConvertTimeSpanValueInDaprFormat(this.Period));
writer.WriteBase64String("data", this.Data);
if (Ttl != null)
{
writer.WriteString("ttl", ConverterUtils.ConvertTimeSpanValueInDaprFormat(Ttl));
}
writer.WriteEndObject();
await writer.FlushAsync();
return Encoding.UTF8.GetString(stream.ToArray());

View File

@ -34,7 +34,8 @@ namespace Dapr.Actors.Runtime
string callback,
byte[] state,
TimeSpan dueTime,
TimeSpan period)
TimeSpan period,
TimeSpan? ttl = null)
{
this.ValidateDueTime("DueTime", dueTime);
this.ValidatePeriod("Period", period);
@ -42,6 +43,7 @@ namespace Dapr.Actors.Runtime
this.Data = state;
this.DueTime = dueTime;
this.Period = period;
this.Ttl = ttl;
}
internal string Callback { get; private set; }
@ -52,6 +54,8 @@ namespace Dapr.Actors.Runtime
internal byte[] Data { get; private set; }
internal TimeSpan? Ttl { get; private set; }
private void ValidateDueTime(string argName, TimeSpan value)
{
if (value < TimeSpan.Zero)
@ -92,6 +96,7 @@ namespace Dapr.Actors.Runtime
var period = default(TimeSpan);
var data = default(byte[]);
string callback = null;
TimeSpan? ttl = null;
using (JsonDocument document = JsonDocument.ParseValue(ref reader))
{
@ -119,7 +124,13 @@ namespace Dapr.Actors.Runtime
callback = callbackProperty.GetString();
}
return new TimerInfo(callback, data, dueTime, period);
if (json.TryGetProperty("ttl", out var ttlProperty))
{
var ttlString = ttlProperty.GetString();
ttl = ConverterUtils.ConvertTimeSpanFromDaprFormat(ttlString);
}
return new TimerInfo(callback, data, dueTime, period, ttl);
}
}
@ -151,6 +162,11 @@ namespace Dapr.Actors.Runtime
writer.WriteString("callback", value.Callback);
}
if (value.Ttl != null)
{
writer.WriteString("ttl", ConverterUtils.ConvertTimeSpanValueInDaprFormat(value.Ttl));
}
writer.WriteEndObject();
await writer.FlushAsync();
}

View File

@ -0,0 +1,34 @@
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Xunit;
namespace Dapr.Actors.Runtime
{
public class ActorReminderInfoTests
{
[Fact]
public async Task TestActorReminderInfo_SerializeExcludesNullTtl()
{
var info = new ReminderInfo(new byte[] { }, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10));
var serialized = await info.SerializeAsync();
Assert.DoesNotContain("ttl", serialized);
var info2 = await ReminderInfo.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(serialized)));
Assert.Null(info2.Ttl);
}
[Fact]
public async Task TestActorReminderInfo_SerializeIncludesTtlWhenSet()
{
var info = new ReminderInfo(new byte[] { }, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1));
var serialized = await info.SerializeAsync();
Assert.Contains("ttl", serialized);
var info2 = await ReminderInfo.DeserializeAsync(new MemoryStream(Encoding.UTF8.GetBytes(serialized)));
Assert.NotNull(info2.Ttl);
Assert.Equal(TimeSpan.FromSeconds(1), info2.Ttl);
}
}
}

View File

@ -11,6 +11,7 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using Dapr.Actors;
@ -20,6 +21,8 @@ namespace Dapr.E2E.Test.Actors.Reminders
{
Task StartReminder(StartReminderOptions options);
Task StartReminderWithTtl(TimeSpan ttl);
Task<State> GetState();
}
}

View File

@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
namespace Dapr.E2E.Test.Actors.Reminders
{
public class State
@ -18,5 +20,7 @@ namespace Dapr.E2E.Test.Actors.Reminders
public int Count { get; set; }
public bool IsReminderRunning { get; set; }
public DateTime Timestamp { get; set; }
}
}

View File

@ -11,6 +11,7 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using Dapr.Actors;
@ -20,6 +21,8 @@ namespace Dapr.E2E.Test.Actors.Timers
{
Task StartTimer(StartTimerOptions options);
Task StartTimerWithTtl(TimeSpan ttl);
Task<State> GetState();
}
}

View File

@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
namespace Dapr.E2E.Test.Actors.Timers
{
public class State
@ -18,5 +20,7 @@ namespace Dapr.E2E.Test.Actors.Timers
public int Count { get; set; }
public bool IsTimerRunning { get; set; }
public DateTime Timestamp { get; set; }
}
}

View File

@ -44,13 +44,23 @@ namespace Dapr.E2E.Test.Actors.Reminders
await this.StateManager.SetStateAsync<State>("reminder-state", new State(){ IsReminderRunning = true, });
}
public async Task StartReminderWithTtl(TimeSpan ttl)
{
var options = new StartReminderOptions()
{
Total = 100,
};
var bytes = JsonSerializer.SerializeToUtf8Bytes(options, this.Host.JsonSerializerOptions);
await this.RegisterReminderAsync("test-reminder-ttl", bytes, dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(1), ttl: ttl);
await this.StateManager.SetStateAsync<State>("reminder-state", new State() { IsReminderRunning = true, });
}
public async Task ReceiveReminderAsync(string reminderName, byte[] bytes, TimeSpan dueTime, TimeSpan period)
{
if (reminderName != "test-reminder")
if (!reminderName.StartsWith("test-reminder"))
{
return;
}
var options = JsonSerializer.Deserialize<StartReminderOptions>(bytes, this.Host.JsonSerializerOptions);
var state = await this.StateManager.GetStateAsync<State>("reminder-state");
@ -59,7 +69,7 @@ namespace Dapr.E2E.Test.Actors.Reminders
await this.UnregisterReminderAsync("test-reminder");
state.IsReminderRunning = false;
}
state.Timestamp = DateTime.Now;
await this.StateManager.SetStateAsync("reminder-state", state);
}
}

View File

@ -44,6 +44,17 @@ namespace Dapr.E2E.Test.Actors.Timers
await this.StateManager.SetStateAsync<State>("timer-state", new State(){ IsTimerRunning = true, });
}
public async Task StartTimerWithTtl(TimeSpan ttl)
{
var options = new StartTimerOptions()
{
Total = 100,
};
var bytes = JsonSerializer.SerializeToUtf8Bytes(options, this.Host.JsonSerializerOptions);
await this.RegisterTimerAsync("test-timer-ttl", nameof(Tick), bytes, TimeSpan.Zero, TimeSpan.FromSeconds(1), ttl);
await this.StateManager.SetStateAsync<State>("timer-state", new State() { IsTimerRunning = true, });
}
private async Task Tick(byte[] bytes)
{
var options = JsonSerializer.Deserialize<StartTimerOptions>(bytes, this.Host.JsonSerializerOptions);
@ -54,7 +65,7 @@ namespace Dapr.E2E.Test.Actors.Timers
await this.UnregisterTimerAsync("test-timer");
state.IsTimerRunning = false;
}
state.Timestamp = DateTime.Now;
await this.StateManager.SetStateAsync("timer-state", state);
}
}

View File

@ -48,5 +48,27 @@ namespace Dapr.E2E.Test
// Should count up to exactly 10
Assert.Equal(10, state.Count);
}
[Fact]
public async Task ActorCanStartReminderWithTtl()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var proxy = this.ProxyFactory.CreateActorProxy<IReminderActor>(ActorId.CreateRandom(), "ReminderActor");
await WaitForActorRuntimeAsync(proxy, cts.Token);
// Reminder that should fire 3 times (at 0, 1, and 2 seconds)
await proxy.StartReminderWithTtl(TimeSpan.FromSeconds(2));
// Record the start time and wait for longer than the reminder should exist for.
var start = DateTime.Now;
await Task.Delay(TimeSpan.FromSeconds(5));
var state = await proxy.GetState();
// Make sure the reminder has fired and that it didn't fire within the past second since it should have expired.
Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Reminder may not have triggered.");
Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Reminder triggered too recently. {DateTime.Now} - {state.Timestamp}");
}
}
}

View File

@ -48,5 +48,27 @@ namespace Dapr.E2E.Test
// Should count up to exactly 10
Assert.Equal(10, state.Count);
}
[Fact]
public async Task ActorCanStartTimerWithTtl()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
var proxy = this.ProxyFactory.CreateActorProxy<ITimerActor>(ActorId.CreateRandom(), "TimerActor");
await WaitForActorRuntimeAsync(proxy, cts.Token);
// Reminder that should fire 3 times (at 0, 1, and 2 seconds)
await proxy.StartTimerWithTtl(TimeSpan.FromSeconds(2));
// Record the start time and wait for longer than the reminder should exist for.
var start = DateTime.Now;
await Task.Delay(TimeSpan.FromSeconds(5));
var state = await proxy.GetState();
// Make sure the reminder has fired and that it didn't fire within the past second since it should have expired.
Assert.True(state.Timestamp.Subtract(start) > TimeSpan.Zero, "Timer may not have fired.");
Assert.True(DateTime.Now.Subtract(state.Timestamp) > TimeSpan.FromSeconds(1), $"Timer fired too recently. {DateTime.Now} - {state.Timestamp}");
}
}
}