Revert "Changes to have only one Actions Interactor per ActorProxyFactory"

This reverts commit 2ba7c2f055.
This commit is contained in:
shalabhs 2019-08-27 16:57:05 -07:00
parent 2ba7c2f055
commit c9711869b5
5 changed files with 137 additions and 26 deletions

View File

@ -17,6 +17,7 @@ namespace Microsoft.Actions.Actors
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Actions.Actors.Communication;
using Microsoft.Actions.Actors.Runtime;
using Newtonsoft.Json;
/// <summary>

View File

@ -16,8 +16,9 @@ namespace Microsoft.Actions.Actors.Client
/// </summary>
internal class ActorProxyFactory : IActorProxyFactory
{
// Used only for Remoting based communication
private static readonly ActorCommunicationClientFactory DefaultActorCommunicationClientFactory = new ActorCommunicationClientFactory();
private readonly object thisLock;
private volatile IActorCommunicationClientFactory actorCommunicationClientFactory;
/// <summary>
/// Initializes a new instance of the <see cref="ActorProxyFactory"/> class.
@ -25,6 +26,9 @@ namespace Microsoft.Actions.Actors.Client
/// </summary>
public ActorProxyFactory()
{
this.thisLock = new object();
this.actorCommunicationClientFactory = null;
}
/// <inheritdoc/>
@ -35,7 +39,7 @@ namespace Microsoft.Actions.Actors.Client
}
/// <summary>
/// Create a proxy, this method is also used by ActorReference also to create proxy.
/// Create a proxy, this method is also sued by ACtorReference also to create proxy.
/// </summary>
/// <param name="actorId">Actor Id.</param>
/// <param name="actorInterfaceType">Actor Interface Type.</param>
@ -43,14 +47,49 @@ namespace Microsoft.Actions.Actors.Client
/// <returns>Returns Actor Proxy.</returns>
internal object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string actorType)
{
// TODO factory/client level settings
var actorCommunicationClient = DefaultActorCommunicationClientFactory.GetClient(actorId, actorType);
var factory = this.GetOrCreateActorCommunicationClientFactory();
// TODO factory level settings or method level parameter, default http
var actorCommunicationClient = new ActorCommunicationClient(
factory,
actorId,
actorType);
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
return proxyGenerator.CreateActorProxy(
actorCommunicationClient,
DefaultActorCommunicationClientFactory.GetRemotingMessageBodyFactory());
factory.GetRemotingMessageBodyFactory());
}
private IActorCommunicationClientFactory GetOrCreateActorCommunicationClientFactory()
{
if (this.actorCommunicationClientFactory != null)
{
return this.actorCommunicationClientFactory;
}
lock (this.thisLock)
{
if (this.actorCommunicationClientFactory == null)
{
this.actorCommunicationClientFactory = this.CreateActorCommunicationClientFactory();
}
}
return this.actorCommunicationClientFactory;
}
private IActorCommunicationClientFactory CreateActorCommunicationClientFactory()
{
// TODO factory settings
var factory = new ActorCommunicationClientFactory();
if (factory == null)
{
throw new NotSupportedException("ClientFactory can't be null");
}
return factory;
}
}
}

View File

@ -11,16 +11,21 @@ namespace Microsoft.Actions.Actors.Communication.Client
internal class ActorCommunicationClient : IActorCommunicationClient
{
private readonly IActionsInteractor actionsInteractor;
private readonly SemaphoreSlim communicationClientLock;
private readonly IActorCommunicationClientFactory communicationClientFactory;
private readonly IActorMessageBodyFactory messageBodyFactory;
private IActionsInteractor actionsInteractor;
public ActorCommunicationClient(
IActionsInteractor actionsInteractor,
IActorCommunicationClientFactory remotingClientFactory,
ActorId actorId,
string actorType)
{
this.ActorId = actorId;
this.ActorType = actorType;
this.actionsInteractor = actionsInteractor;
this.communicationClientFactory = remotingClientFactory;
this.communicationClientLock = new SemaphoreSlim(1);
this.messageBodyFactory = remotingClientFactory.GetRemotingMessageBodyFactory();
}
/// <summary>
@ -41,7 +46,33 @@ namespace Microsoft.Actions.Actors.Communication.Client
string methodName,
CancellationToken cancellationToken)
{
return await this.actionsInteractor.InvokeActorMethodWithRemotingAsync(remotingRequestMessage);
var client = await this.GetCommunicationClientAsync(cancellationToken);
return await client.InvokeActorMethodWithRemotingAsync(remotingRequestMessage);
}
private async Task<IActionsInteractor> GetCommunicationClientAsync(CancellationToken cancellationToken)
{
IActionsInteractor client;
await this.communicationClientLock.WaitAsync(cancellationToken);
try
{
if (this.actionsInteractor == null)
{
this.actionsInteractor = await this.communicationClientFactory.GetClientAsync();
}
client = this.actionsInteractor;
}
finally
{
// Release the lock incase of exceptions from the GetClientAsync method, which can
// happen if there are non retriable exceptions in that method. Eg: There can be
// ServiceNotFoundException if the GetClientAsync client is called before the
// service creation completes.
this.communicationClientLock.Release();
}
return client;
}
}
}

View File

@ -5,19 +5,24 @@
namespace Microsoft.Actions.Actors.Communication.Client
{
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Actions.Actors.Runtime;
/// <summary>
/// An <see cref="IActorCommunicationClientFactory"/> that uses
/// http protocol to create <see cref="IActorCommunicationClient"/> that communicate with actors.
/// </summary>
internal class ActorCommunicationClientFactory : IActorCommunicationClientFactory
{
private static readonly IActionsInteractor ActionsInteractor = new ActionsHttpInteractor();
private readonly ActorMessageSerializersManager serializersManager;
private readonly IActorMessageBodyFactory actorMessageBodyFactory = null;
private readonly IActorMessageBodyFactory remotingMessageBodyFactory = null;
/// <summary>
/// Initializes a new instance of the <see cref="ActorCommunicationClientFactory"/> class.
/// Constructs actor remoting communication client factory.
/// Constructs a fabric transport based service remoting client factory.
/// </summary>
/// <param name="serializationProvider">IActorCommunicationMessageSerializationProvider provider.</param>
public ActorCommunicationClientFactory(
@ -25,7 +30,19 @@ namespace Microsoft.Actions.Actors.Communication.Client
{
// TODO Add settings, exception handlers, serialization provider
this.serializersManager = IntializeSerializationManager(serializationProvider);
this.actorMessageBodyFactory = this.serializersManager.GetSerializationProvider().CreateMessageBodyFactory();
this.remotingMessageBodyFactory = this.serializersManager.GetSerializationProvider().CreateMessageBodyFactory();
}
/// <summary>
/// Returns a client to communicate.
/// </summary>
/// <returns>
/// A <see cref="System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
/// the CommunicationClient(<see cref="IActorCommunicationClient" />) object.
/// </returns>
public async Task<IActionsInteractor> GetClientAsync()
{
return await this.CreateClientAsync();
}
/// <summary>
@ -34,12 +51,7 @@ namespace Microsoft.Actions.Actors.Communication.Client
/// <returns>A factory for creating the remoting message bodies.</returns>
public IActorMessageBodyFactory GetRemotingMessageBodyFactory()
{
return this.actorMessageBodyFactory;
}
public ActorCommunicationClient GetClient(ActorId actorId, string actorType)
{
return new ActorCommunicationClient(ActionsInteractor, actorId, actorType);
return this.remotingMessageBodyFactory;
}
private static ActorMessageSerializersManager IntializeSerializationManager(
@ -50,5 +62,28 @@ namespace Microsoft.Actions.Actors.Communication.Client
serializationProvider,
new ActorMessageHeaderSerializer());
}
/// <summary>
/// Creates a communication client for the given endpoint address.
/// </summary>
/// <returns>The communication client that was created.</returns>
private Task<IActionsInteractor> CreateClientAsync()
{
try
{
// TODO add retries and error handling - add CreateClientWithRetriesAsync version
var client = new ActionsHttpInteractor(
this.serializersManager);
return Task.FromResult((IActionsInteractor)client);
}
catch (Exception ex)
{
// TODO specific error handling
throw new Exception(
string.Format(
CultureInfo.CurrentCulture,
ex.ToString()));
}
}
}
}

View File

@ -5,6 +5,10 @@
namespace Microsoft.Actions.Actors.Communication.Client
{
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Actions.Actors.Runtime;
/// <summary>
/// A factory for creating <see cref="IActorCommunicationClient">actions communication clients.</see>.
/// </summary>
@ -17,11 +21,12 @@ namespace Microsoft.Actions.Actors.Communication.Client
IActorMessageBodyFactory GetRemotingMessageBodyFactory();
/// <summary>
/// Gets actor communication client.
/// Get a communication client.
/// </summary>
/// <param name="actorId"> Actor Id.</param>
/// <param name="actorType"> Actor Type.</param>
/// <returns>A factory for creating the remoting message bodies.</returns>
ActorCommunicationClient GetClient(ActorId actorId, string actorType);
/// <returns>
/// A <see cref="System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
/// the CommunicationClient(<see cref="IActorCommunicationClient" />) object.
/// </returns>
Task<IActionsInteractor> GetClientAsync();
}
}