Adding timer support

This commit is contained in:
Aman Bhardwaj 2019-08-24 17:58:23 -07:00
parent 3792c626d9
commit 985db9966a
12 changed files with 367 additions and 29 deletions

View File

@ -271,6 +271,44 @@ namespace Microsoft.Actions.Actors
return this.SendAsync(RequestFunc, relativeUrl, requestId, cancellationToken);
}
public Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default(CancellationToken))
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.ActorTimerRelativeUrlFormat, actorType, actorId, timerName);
var requestId = Guid.NewGuid().ToString();
HttpRequestMessage RequestFunc()
{
var request = new HttpRequestMessage()
{
Method = HttpMethod.Put,
Content = new StringContent(data, Encoding.UTF8),
};
request.Content.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json; charset=utf-8");
return request;
}
return this.SendAsync(RequestFunc, relativeUrl, requestId, cancellationToken);
}
public Task UnregisterTimerAsync(string actorType, string actorId, string timerName, CancellationToken cancellationToken = default(CancellationToken))
{
var relativeUrl = string.Format(CultureInfo.InvariantCulture, Constants.Timers, actorType, actorId, timerName);
var requestId = Guid.NewGuid().ToString();
HttpRequestMessage RequestFunc()
{
var request = new HttpRequestMessage()
{
Method = HttpMethod.Delete,
};
return request;
}
return this.SendAsync(RequestFunc, relativeUrl, requestId, cancellationToken);
}
/// <summary>
/// Sends an HTTP get request to Actions.
/// </summary>

View File

@ -24,6 +24,7 @@ namespace Microsoft.Actions.Actors.AspNetCore
actorRouteBuilder.AddActorDeactivationRoute();
actorRouteBuilder.AddActorMethodRoute();
actorRouteBuilder.AddReminderRoute();
actorRouteBuilder.AddTimerRoute();
app.UseRouter(actorRouteBuilder.Build());
next(app);

View File

@ -89,5 +89,18 @@ namespace Microsoft.Actions.Actors.AspNetCore
return ActorRuntime.FireReminderAsync(actorTypeName, actorId, reminderName, request.Body);
});
}
public static void AddTimerRoute(this IRouteBuilder routeBuilder)
{
routeBuilder.MapPut("actors/{actorTypeName}/{actorId}/method/timer/{timerName}", (request, response, routeData) =>
{
var actorTypeName = (string)routeData.Values["actorTypeName"];
var actorId = (string)routeData.Values["actorId"];
var timerName = (string)routeData.Values["timerName"];
// read dueTime, period and data from Request Body.
return ActorRuntime.FireTimerAsync(actorTypeName, actorId, timerName);
});
}
}
}

View File

@ -24,6 +24,7 @@ namespace Microsoft.Actions.Actors
public const string ActionsVersion = "v1.0";
public const string Method = "method";
public const string Reminders = "reminders";
public const string Timers = "timers";
/// <summary>
/// Gets string format for Actors state management relative url.
@ -44,5 +45,10 @@ namespace Microsoft.Actions.Actors
/// Gets string format for Actors reminder registration relative url..
/// </summary>
public static string ActorReminderRelativeUrlFormat => $"{ActionsVersion}/{Actors}/{{0}}/{{1}}/{Reminders}/{{2}}";
/// <summary>
/// Gets string format for Actors timer registration relative url..
/// </summary>
public static string ActorTimerRelativeUrlFormat => $"{ActionsVersion}/{Actors}/{{0}}/{{1}}/{Timers}/{{2}}";
}
}

View File

@ -79,7 +79,7 @@ namespace Microsoft.Actions.Actors
Task<IActorResponseMessage> InvokeActorMethodWithRemotingAsync(IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Invokes Actor method.
/// Register a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
@ -90,7 +90,7 @@ namespace Microsoft.Actions.Actors
Task RegisterReminderAsync(string actorType, string actorId, string reminderName, string data, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Invokes Actor method.
/// Unregisters a reminder.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
@ -98,5 +98,26 @@ namespace Microsoft.Actions.Actors
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task UnregisterReminderAsync(string actorType, string actorId, string reminderName, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Registers a timer.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="timerName">Name of timer to register.</param>
/// <param name="data">Json reminder data as per the actions spec.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task RegisterTimerAsync(string actorType, string actorId, string timerName, string data, CancellationToken cancellationToken = default(CancellationToken));
/// <summary>
/// Unegisters a timer.
/// </summary>
/// <param name="actorType">Type of actor.</param>
/// <param name="actorId">ActorId.</param>
/// <param name="timerName">Name of timer to register.</param>
/// <param name="cancellationToken">Cancels the operation.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
Task UnregisterTimerAsync(string actorType, string actorId, string timerName, CancellationToken cancellationToken = default(CancellationToken));
}
}

View File

@ -6,6 +6,7 @@
namespace Microsoft.Actions.Actors.Runtime
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
/// <summary>
@ -22,6 +23,11 @@ namespace Microsoft.Actions.Actors.Runtime
private readonly string traceId;
private readonly string actorImplementaionTypeName;
/// <summary>
/// Contains timers to be invoked.
/// </summary>
private Dictionary<string, IActorTimer> timers = new Dictionary<string, IActorTimer>();
/// <summary>
/// Initializes a new instance of the <see cref="Actor"/> class.
/// </summary>
@ -97,6 +103,12 @@ namespace Microsoft.Actions.Actors.Runtime
return this.StateManager.ClearCacheAsync();
}
internal Task FireTimerAsync(string timerName)
{
var timer = this.timers[timerName];
return timer.AsyncCallback.Invoke(timer.State);
}
/// <summary>
/// Saves all the state changes (add/update/remove) that were made since last call to
/// <see cref="Actor.SaveStateAsync"/>,
@ -201,9 +213,9 @@ namespace Microsoft.Actions.Actors.Runtime
TimeSpan dueTime,
TimeSpan period)
{
var reminderData = new ReminderData(state, dueTime, period);
var reminder = new ActorReminder(this.Id, reminderName, reminderData);
await ActorRuntime.ActionsInteractor.RegisterReminderAsync(this.actorImplementaionTypeName, this.Id.ToString(), reminderName, reminderData.SerializeToJson());
var reminderInfo = new ReminderInfo(state, dueTime, period);
var reminder = new ActorReminder(this.Id, reminderName, reminderInfo);
await ActorRuntime.ActionsInteractor.RegisterReminderAsync(this.actorImplementaionTypeName, this.Id.ToString(), reminderName, reminderInfo.SerializeToJson());
return reminder;
}
@ -230,5 +242,97 @@ namespace Microsoft.Actions.Actors.Runtime
{
return ActorRuntime.ActionsInteractor.UnregisterReminderAsync(this.actorImplementaionTypeName, this.Id.ToString(), reminderName);
}
/// <summary>
/// Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
/// </summary>
/// <param name="asyncCallback">
/// A delegate that specifies a method to be called when the timer fires.
/// It has one parameter: the state object passed to RegisterTimer.
/// It returns a <see cref="System.Threading.Tasks.Task"/> representing the asynchronous operation.
/// </param>
/// <param name="state">An object containing information to be used by the callback method, or null.</param>
/// <param name="dueTime">The amount of time to delay before the async callback is first invoked.
/// Specify negative one (-1) milliseconds to prevent the timer from starting.
/// Specify zero (0) to start the timer immediately.
/// </param>
/// <param name="period">
/// The time interval between invocations of the async callback.
/// Specify negative one (-1) milliseconds to disable periodic signaling.</param>
/// <returns>Returns IActorTimer object.</returns>
protected Task<IActorTimer> RegisterTimerAsync(
Func<object, Task> asyncCallback,
object state,
TimeSpan dueTime,
TimeSpan period)
{
return this.RegisterTimerAsync(null, asyncCallback, state, dueTime, period);
}
/// <summary>
/// Registers a Timer for the actor. If a timer name is not provided, a timer is autogenerated.
/// </summary>
/// <param name="timerName">Timer Name. If a timer name is not provided, a timer is autogenerated.</param>
/// <param name="asyncCallback">
/// A delegate that specifies a method to be called when the timer fires.
/// It has one parameter: the state object passed to RegisterTimer.
/// It returns a <see cref="System.Threading.Tasks.Task"/> representing the asynchronous operation.
/// </param>
/// <param name="state">An object containing information to be used by the callback method, or null.</param>
/// <param name="dueTime">The amount of time to delay before the async callback is first invoked.
/// Specify negative one (-1) milliseconds to prevent the timer from starting.
/// Specify zero (0) to start the timer immediately.
/// </param>
/// <param name="period">
/// The time interval between invocations of the async callback.
/// Specify negative one (-1) milliseconds to disable periodic signaling.</param>
/// <returns>Returns IActorTimer object.</returns>
protected async Task<IActorTimer> RegisterTimerAsync(
string timerName,
Func<object, Task> asyncCallback,
object state,
TimeSpan dueTime,
TimeSpan period)
{
// create a timer name to register with Actions runtime.
if (string.IsNullOrEmpty(timerName))
{
timerName = $"{this.Id.ToString()}_Timer_{this.timers.Count + 1}";
}
var actorTimer = new ActorTimer(this, timerName, asyncCallback, state, dueTime, period);
await ActorRuntime.ActionsInteractor.RegisterTimerAsync(this.actorImplementaionTypeName, this.Id.ToString(), timerName, actorTimer.SerializeToJson());
this.timers[timerName] = actorTimer;
return actorTimer;
}
/// <summary>
/// Unregisters a Timer previously set on this actor.
/// </summary>
/// <param name="timer">An IActorTimer representing timer that needs to be unregistered.</param>
/// <returns>Task representing the Unregister timer operation.</returns>
protected async Task UnregisterTimerAsync(IActorTimer timer)
{
await ActorRuntime.ActionsInteractor.UnregisterTimerAsync(this.actorImplementaionTypeName, this.Id.ToString(), timer.Name);
if (this.timers.ContainsKey(timer.Name))
{
this.timers.Remove(timer.Name);
}
}
/// <summary>
/// Unregisters a Timer previously set on this actor.
/// </summary>
/// <param name="timerName">Name of timer to unregister.</param>
/// <returns>Task representing the Unregister timer operation.</returns>
protected async Task UnregisterTimerAsync(string timerName)
{
await ActorRuntime.ActionsInteractor.UnregisterTimerAsync(this.actorImplementaionTypeName, this.Id.ToString(), timerName);
if (this.timers.ContainsKey(timerName))
{
this.timers.Remove(timerName);
}
}
}
}

View File

@ -24,9 +24,11 @@ namespace Microsoft.Actions.Actors.Runtime
{
private const string TraceType = "ActorManager";
private const string ReceiveReminderMethodName = "ReceiveReminderAsync";
private const string TimerMethodName = "FireTimerAsync";
private readonly ActorService actorService;
private readonly ConcurrentDictionary<ActorId, Actor> activeActors;
private readonly ActorMethodContext reminderMethodContext;
private readonly ActorMethodContext timerMethodContext;
private readonly ActorMessageSerializersManager serializersManager;
private IActorMessageBodyFactory messageBodyFactory;
@ -47,6 +49,7 @@ namespace Microsoft.Actions.Actors.Runtime
this.actorMethodInfoMap = new ActorMethodInfoMap(this.actorService.ActorTypeInfo.InterfaceTypes);
this.activeActors = new ConcurrentDictionary<ActorId, Actor>();
this.reminderMethodContext = ActorMethodContext.CreateForReminder(ReceiveReminderMethodName);
this.timerMethodContext = ActorMethodContext.CreateForReminder(TimerMethodName);
this.serializersManager = IntializeSerializationManager(null);
this.messageBodyFactory = new DataContractMessageFactory();
}
@ -133,11 +136,18 @@ namespace Microsoft.Actions.Actors.Runtime
await awaitable;
// Write Response back if method's return type is other than Task.
// Serialize result if it has result (return type was not just Task.)
var resultProperty = awaitable.GetType().GetProperty("Result");
// already await, Getting result will be non blocking.
return resultProperty == null ? default(object) : awaitable.GetAwaiter().GetResult();
if (methodInfo.ReturnType.Name != typeof(Task).Name)
{
// already await, Getting result will be non blocking.
var x = awaitable.GetAwaiter().GetResult();
return x;
}
else
{
return default(object);
}
}
var result = await this.DispatchInternalAsync(actorId, actorMethodContext, RequestFunc, cancellationToken);
@ -156,7 +166,7 @@ namespace Microsoft.Actions.Actors.Runtime
// Only FireReminder if its IRemindable, else ignore it.
if (this.ActorTypeInfo.IsRemindable)
{
var reminderdata = ReminderData.Deserialize(requestBodyStream);
var reminderdata = ReminderInfo.Deserialize(requestBodyStream);
// Create a Func to be invoked by common method.
async Task<byte[]> RequestFunc(Actor actor, CancellationToken ct)
@ -177,6 +187,20 @@ namespace Microsoft.Actions.Actors.Runtime
return Task.CompletedTask;
}
internal Task FireTimerAsync(ActorId actorId, string timerName, CancellationToken cancellationToken = default(CancellationToken))
{
// Create a Func to be invoked by common method.
async Task<byte[]> RequestFunc(Actor actor, CancellationToken ct)
{
await
actor.FireTimerAsync(timerName);
return null;
}
return this.DispatchInternalAsync(actorId, this.timerMethodContext, RequestFunc, cancellationToken);
}
internal async Task ActivateActor(ActorId actorId)
{
// An actor is activated by "actions" runtime when a call is to be made for an actor.

View File

@ -15,31 +15,31 @@ namespace Microsoft.Actions.Actors.Runtime
public ActorReminder(
ActorId actorId,
string reminderName,
ReminderData reminderData)
ReminderInfo reminderInfo)
{
this.OwnerActorId = actorId;
this.Name = reminderName;
this.ReminderData = reminderData;
this.ReminderInfo = reminderInfo;
}
public string Name { get; }
public byte[] State
{
get { return this.ReminderData.Data; }
get { return this.ReminderInfo.Data; }
}
public TimeSpan DueTime
{
get { return this.ReminderData.DueTime; }
get { return this.ReminderInfo.DueTime; }
}
public TimeSpan Period
{
get { return this.ReminderData.Period; }
get { return this.ReminderInfo.Period; }
}
internal ReminderData ReminderData { get; }
internal ReminderInfo ReminderInfo { get; }
internal ActorId OwnerActorId { get; }
}

View File

@ -129,6 +129,19 @@ namespace Microsoft.Actions.Actors.Runtime
return GetActorManager(actorTypeName).FireReminderAsync(new ActorId(actorId), reminderName, requestBodyStream, cancellationToken);
}
/// <summary>
/// Fires a timer for the Actor.
/// </summary>
/// <param name="actorTypeName">Actor type name to invokde the method for.</param>
/// <param name="actorId">Actor id for the actor for which method will be invoked.</param>
/// <param name="timerName">The name of timer provided during registration.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
internal static Task FireTimerAsync(string actorTypeName, string actorId, string timerName, CancellationToken cancellationToken = default(CancellationToken))
{
return GetActorManager(actorTypeName).FireTimerAsync(new ActorId(actorId), timerName, cancellationToken);
}
private static ActorManager GetActorManager(string actorTypeName)
{
if (!actorManagers.TryGetValue(actorTypeName, out var actorManager))

View File

@ -0,0 +1,72 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------
namespace Microsoft.Actions.Actors.Runtime
{
using System;
using System.IO;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
internal class ActorTimer : IActorTimer
{
private readonly Actor owner;
public ActorTimer(
Actor owner,
string timerName,
Func<object, Task> asyncCallback,
object state,
TimeSpan dueTime,
TimeSpan period)
{
this.owner = owner;
this.Name = timerName;
this.AsyncCallback = asyncCallback;
this.State = state;
this.Period = period;
this.DueTime = dueTime;
}
public string Name { get; }
public TimeSpan DueTime { get; }
public TimeSpan Period { get; }
public object State { get; }
public Func<object, Task> AsyncCallback { get; }
internal string SerializeToJson()
{
string content;
using (var sw = new StringWriter())
{
using (var writer = new JsonTextWriter(sw))
{
writer.WriteStartObject();
if (this.DueTime != null)
{
writer.WriteProperty((TimeSpan?)this.DueTime, "dueTime", JsonWriterExtensions.WriteTimeSpanValueActionsFormat);
}
if (this.Period != null)
{
writer.WriteProperty((TimeSpan?)this.Period, "period", JsonWriterExtensions.WriteTimeSpanValueActionsFormat);
}
// Do not serialize state and call back, it will be kept with actor instance.
writer.WriteEndObject();
content = sw.ToString();
}
}
return content;
}
}
}

View File

@ -0,0 +1,46 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------
namespace Microsoft.Actions.Actors.Runtime
{
using System;
using System.Threading.Tasks;
/// <summary>
/// Represents the timer set on an Actor.
/// </summary>
public interface IActorTimer
{
/// <summary>
/// Gets the time when timer is first due.
/// </summary>
/// <value>Time as <see cref="System.TimeSpan"/> when timer is first due.</value>
TimeSpan DueTime { get; }
/// <summary>
/// Gets the periodic time when timer will be invoked.
/// </summary>
/// <value>Periodic time as <see cref="System.TimeSpan"/> when timer will be invoked.</value>
TimeSpan Period { get; }
/// <summary>
/// Gets the name of the Timer. The name is unique per actor.
/// </summary>
/// <value>The name of the timer.</value>
string Name { get; }
/// <summary>
/// Gets a delegate that specifies a method to be called when the timer fires.
/// It has one parameter: the state object passed to RegisterTimer.
/// It returns a <see cref="System.Threading.Tasks.Task"/> representing the asynchronous operation.
/// </summary>
Func<object, Task> AsyncCallback { get; }
/// <summary>
/// Gets state containing information to be used by the callback method, or null.
/// </summary>
object State { get; }
}
}

View File

@ -15,20 +15,20 @@ namespace Microsoft.Actions.Actors.Runtime
using Microsoft.Actions.Actors.Resources;
using Newtonsoft.Json;
internal class ReminderData
internal class ReminderInfo
{
private readonly TimeSpan minTimePeriod = Timeout.InfiniteTimeSpan;
public ReminderData(
byte[] reminderState,
TimeSpan reminderDueTime,
TimeSpan reminderPeriod)
public ReminderInfo(
byte[] state,
TimeSpan dueTime,
TimeSpan period)
{
this.ValidateDueTime("DueTime", reminderDueTime);
this.ValidatePeriod("Period", reminderPeriod);
this.Data = reminderState;
this.DueTime = reminderDueTime;
this.Period = reminderPeriod;
this.ValidateDueTime("DueTime", dueTime);
this.ValidatePeriod("Period", period);
this.Data = state;
this.DueTime = dueTime;
this.Period = period;
}
public TimeSpan DueTime { get; private set; }
@ -37,7 +37,7 @@ namespace Microsoft.Actions.Actors.Runtime
public byte[] Data { get; private set; }
internal static ReminderData Deserialize(Stream stream)
internal static ReminderInfo Deserialize(Stream stream)
{
// Deserialize using JsonReader as we know the property names in response. Deserializing using JsonReader is most performant.
using (var streamReader = new StreamReader(stream))
@ -81,7 +81,7 @@ namespace Microsoft.Actions.Actors.Runtime
return content;
}
private static ReminderData GetFromJsonProperties(JsonReader reader)
private static ReminderInfo GetFromJsonProperties(JsonReader reader)
{
var dueTime = default(TimeSpan);
var period = default(TimeSpan);
@ -110,7 +110,7 @@ namespace Microsoft.Actions.Actors.Runtime
}
while (reader.TokenType != JsonToken.EndObject);
return new ReminderData(data, dueTime, period);
return new ReminderInfo(data, dueTime, period);
}
private void ValidateDueTime(string argName, TimeSpan value)