mirror of https://github.com/dapr/dotnet-sdk.git
Making IActionsInteractor per ProxyFactory and share it in all proxies.
This commit is contained in:
parent
cd710e2246
commit
52c93edcae
|
|
@ -29,15 +29,13 @@ namespace Microsoft.Actions.Actors
|
|||
private readonly string actionsPort = Constants.ActionsDefaultPort;
|
||||
private readonly HttpClientHandler innerHandler;
|
||||
private readonly IReadOnlyList<DelegatingHandler> delegateHandlers;
|
||||
private readonly HttpClientSettings clientSettings;
|
||||
private readonly ActorMessageSerializersManager serializersManager;
|
||||
private readonly ClientSettings clientSettings;
|
||||
private HttpClient httpClient = null;
|
||||
private bool disposed = false;
|
||||
|
||||
public ActionsHttpInteractor(
|
||||
ActorMessageSerializersManager serializersManager = null,
|
||||
HttpClientHandler innerHandler = null,
|
||||
HttpClientSettings clientSettings = null,
|
||||
ClientSettings clientSettings = null,
|
||||
params DelegatingHandler[] delegateHandlers)
|
||||
{
|
||||
// Get Actions port from Environment Variable if it has been overridden.
|
||||
|
|
@ -51,16 +49,7 @@ namespace Microsoft.Actions.Actors
|
|||
this.delegateHandlers = delegateHandlers;
|
||||
this.clientSettings = clientSettings;
|
||||
|
||||
this.httpClient = this.CreateHttpClient();
|
||||
|
||||
if (serializersManager != null)
|
||||
{
|
||||
this.serializersManager = serializersManager;
|
||||
}
|
||||
else
|
||||
{
|
||||
this.serializersManager = IntializeSerializationManager(null);
|
||||
}
|
||||
this.httpClient = this.CreateHttpClient();
|
||||
}
|
||||
|
||||
public async Task<byte[]> GetStateAsync(string actorType, string actorId, string keyName, CancellationToken cancellationToken = default(CancellationToken))
|
||||
|
|
@ -138,7 +127,7 @@ namespace Microsoft.Actions.Actors
|
|||
return this.SendAsync(RequestFunc, relativeUrl, requestId, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task<IActorResponseMessage> InvokeActorMethodWithRemotingAsync(IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default(CancellationToken))
|
||||
public async Task<IActorResponseMessage> InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
var requestMessageHeader = remotingRequestRequestMessage.GetHeader();
|
||||
|
||||
|
|
@ -147,10 +136,10 @@ namespace Microsoft.Actions.Actors
|
|||
var actorType = requestMessageHeader.ActorType;
|
||||
var interfaceId = requestMessageHeader.InterfaceId;
|
||||
|
||||
var serializedHeader = this.serializersManager.GetHeaderSerializer()
|
||||
var serializedHeader = serializersManager.GetHeaderSerializer()
|
||||
.SerializeRequestHeader(remotingRequestRequestMessage.GetHeader());
|
||||
|
||||
var msgBodySeriaizer = this.serializersManager.GetRequestMessageBodySerializer(interfaceId);
|
||||
var msgBodySeriaizer = serializersManager.GetRequestMessageBodySerializer(interfaceId);
|
||||
var serializedMsgBody = msgBodySeriaizer.Serialize(remotingRequestRequestMessage.GetBody());
|
||||
|
||||
// Send Request
|
||||
|
|
@ -187,7 +176,7 @@ namespace Microsoft.Actions.Actors
|
|||
|
||||
// DeSerialize Actor Response Message Header
|
||||
actorResponseMessageHeader =
|
||||
this.serializersManager.GetHeaderSerializer()
|
||||
serializersManager.GetHeaderSerializer()
|
||||
.DeserializeResponseHeaders(
|
||||
new MemoryStream(Encoding.ASCII.GetBytes(header)));
|
||||
}
|
||||
|
|
@ -200,7 +189,7 @@ namespace Microsoft.Actions.Actors
|
|||
var responseMessageBody = await retval.Content.ReadAsStreamAsync();
|
||||
|
||||
// Deserialize Actor Response Message Body
|
||||
var responseBodySerializer = this.serializersManager.GetResponseMessageBodySerializer(interfaceId);
|
||||
var responseBodySerializer = serializersManager.GetResponseMessageBodySerializer(interfaceId);
|
||||
|
||||
actorResponseMessageBody = responseBodySerializer.Deserialize(responseMessageBody);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,28 +20,16 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// </summary>
|
||||
public class ActorProxy : IActorProxy
|
||||
{
|
||||
internal static readonly ActorProxyFactory DefaultProxyFactory = new ActorProxyFactory();
|
||||
private static ActionsHttpInteractor actionsHttpInteractor = new ActionsHttpInteractor();
|
||||
private ActorCommunicationClient actorCommunicationClient;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="ActorProxy"/> class. Thsi is used for making non-remoting calls.
|
||||
/// </summary>
|
||||
/// <param name="actorId">The actor ID of the proxy actor object. Methods called on this proxy will result in requests
|
||||
/// being sent to the actor with this ID.</param>
|
||||
/// <param name="actorType">
|
||||
/// Type of actor implementation.
|
||||
/// </param>
|
||||
internal ActorProxy(ActorId actorId, string actorType)
|
||||
{
|
||||
this.ActorType = actorType;
|
||||
this.ActorId = actorId;
|
||||
}
|
||||
internal static readonly ActorProxyFactory DefaultProxyFactory = new ActorProxyFactory();
|
||||
private ActorRemotingClient actorRemotingClient;
|
||||
private ActorNonRemotingClient actorNonRemotingClient;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="ActorProxy"/> class.
|
||||
/// This constructor is protected so that it can be used by generated class which derives from ActorProxy when making Remoting calls.
|
||||
/// This constructor is also marked as internal so that it can be called by ActorProxyFactory when making non-remoting calls.
|
||||
/// </summary>
|
||||
protected ActorProxy()
|
||||
protected internal ActorProxy()
|
||||
{
|
||||
}
|
||||
|
||||
|
|
@ -51,15 +39,6 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <inheritdoc/>
|
||||
public string ActorType { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the <see cref="IActorCommunicationClient"/> interface that this proxy is using to communicate with the actor.
|
||||
/// </summary>
|
||||
/// <value><see cref="ActorCommunicationClient"/> that this proxy is using to communicate with the actor.</value>
|
||||
internal IActorCommunicationClient ActorCommunicationClient
|
||||
{
|
||||
get { return this.actorCommunicationClient; }
|
||||
}
|
||||
|
||||
internal IActorMessageBodyFactory ActorMessageBodyFactory { get; set; }
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -105,7 +84,7 @@ namespace Microsoft.Actions.Actors.Client
|
|||
// TODO: Allow users to provide a custom Serializer.
|
||||
var serializer = new JsonSerializer();
|
||||
var jsonPayload = JsonConvert.SerializeObject(data);
|
||||
var response = await actionsHttpInteractor.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
|
||||
var response = await this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
|
||||
|
||||
using (var streamReader = new StreamReader(response))
|
||||
{
|
||||
|
|
@ -125,10 +104,8 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <returns>Response form server.</returns>
|
||||
public Task InvokeAsync(string method, object data, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
// TODO: Allow users to provide a custom Serializer.
|
||||
var jsonPayload = JsonConvert.SerializeObject(data);
|
||||
|
||||
return actionsHttpInteractor.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
|
||||
return this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -140,7 +117,7 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <returns>Response form server.</returns>
|
||||
public async Task<T> InvokeAsync<T>(string method, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
var response = await actionsHttpInteractor.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
|
||||
var response = await this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
|
||||
var serializer = new JsonSerializer();
|
||||
|
||||
using (var streamReader = new StreamReader(response))
|
||||
|
|
@ -158,19 +135,36 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <param name="method">Actor method name.</param>
|
||||
/// <param name="cancellationToken">Cancellation Token.</param>
|
||||
/// <returns>Response form server.</returns>
|
||||
public async Task InvokeAsync(string method, CancellationToken cancellationToken = default(CancellationToken))
|
||||
public Task InvokeAsync(string method, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
await actionsHttpInteractor.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
|
||||
return this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initialize whencACtorProxy is created for Remoting.
|
||||
/// </summary>
|
||||
internal void Initialize(
|
||||
ActorCommunicationClient client,
|
||||
IActorMessageBodyFactory actorMessageBodyFactory)
|
||||
ActorRemotingClient client,
|
||||
ActorId actorId,
|
||||
string actorType)
|
||||
{
|
||||
this.actorCommunicationClient = client;
|
||||
this.ActorId = this.actorCommunicationClient.ActorId;
|
||||
this.ActorType = this.actorCommunicationClient.ActorType;
|
||||
this.ActorMessageBodyFactory = actorMessageBodyFactory;
|
||||
this.actorRemotingClient = client;
|
||||
this.ActorId = actorId;
|
||||
this.ActorType = actorType;
|
||||
this.ActorMessageBodyFactory = client.GetRemotingMessageBodyFactory();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initialize whenc ActorProxy is created for non-Remoting calls.
|
||||
/// </summary>
|
||||
internal void Initialize(
|
||||
ActorNonRemotingClient client,
|
||||
ActorId actorId,
|
||||
string actorType)
|
||||
{
|
||||
this.actorNonRemotingClient = client;
|
||||
this.ActorId = actorId;
|
||||
this.ActorType = actorType;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -192,14 +186,14 @@ namespace Microsoft.Actions.Actors.Client
|
|||
var headers = new ActorRequestMessageHeader
|
||||
{
|
||||
ActorId = this.ActorId,
|
||||
ActorType = this.actorCommunicationClient.ActorType,
|
||||
ActorType = this.ActorType,
|
||||
InterfaceId = interfaceId,
|
||||
MethodId = methodId,
|
||||
CallContext = Actors.Helper.GetCallContext(),
|
||||
MethodName = methodName,
|
||||
};
|
||||
|
||||
var responseMsg = await this.actorCommunicationClient.InvokeAsync(
|
||||
var responseMsg = await this.actorRemotingClient.InvokeAsync(
|
||||
new ActorRequestMessage(
|
||||
headers,
|
||||
requestMsgBodyValue),
|
||||
|
|
|
|||
|
|
@ -16,9 +16,7 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// </summary>
|
||||
internal class ActorProxyFactory : IActorProxyFactory
|
||||
{
|
||||
private readonly object thisLock;
|
||||
|
||||
private volatile IActorCommunicationClientFactory actorCommunicationClientFactory;
|
||||
private readonly IActionsInteractor actionsInteractor;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="ActorProxyFactory"/> class.
|
||||
|
|
@ -26,9 +24,8 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// </summary>
|
||||
public ActorProxyFactory()
|
||||
{
|
||||
this.thisLock = new object();
|
||||
|
||||
this.actorCommunicationClientFactory = null;
|
||||
// TODO: Allow configuration of serialization and client settings.
|
||||
this.actionsInteractor = new ActionsHttpInteractor();
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
|
|
@ -41,7 +38,11 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <inheritdoc/>
|
||||
public ActorProxy Create(ActorId actorId, string actorType)
|
||||
{
|
||||
return new ActorProxy(actorId, actorType);
|
||||
var actorProxy = new ActorProxy();
|
||||
var nonRemotingClient = new ActorNonRemotingClient(this.actionsInteractor);
|
||||
actorProxy.Initialize(nonRemotingClient, actorId, actorType);
|
||||
|
||||
return actorProxy;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
@ -53,48 +54,12 @@ namespace Microsoft.Actions.Actors.Client
|
|||
/// <returns>Returns Actor Proxy.</returns>
|
||||
internal object CreateActorProxy(ActorId actorId, Type actorInterfaceType, string actorType)
|
||||
{
|
||||
var factory = this.GetOrCreateActorCommunicationClientFactory();
|
||||
|
||||
// TODO factory level settings or method level parameter, default http
|
||||
var actorCommunicationClient = new ActorCommunicationClient(
|
||||
factory,
|
||||
actorId,
|
||||
actorType);
|
||||
|
||||
var remotingClient = new ActorRemotingClient(this.actionsInteractor);
|
||||
var proxyGenerator = ActorCodeBuilder.GetOrCreateProxyGenerator(actorInterfaceType);
|
||||
var actorProxy = proxyGenerator.CreateActorProxy();
|
||||
actorProxy.Initialize(actorCommunicationClient, factory.GetRemotingMessageBodyFactory());
|
||||
actorProxy.Initialize(remotingClient, actorId, actorType);
|
||||
|
||||
return actorProxy;
|
||||
}
|
||||
|
||||
private IActorCommunicationClientFactory GetOrCreateActorCommunicationClientFactory()
|
||||
{
|
||||
if (this.actorCommunicationClientFactory != null)
|
||||
{
|
||||
return this.actorCommunicationClientFactory;
|
||||
}
|
||||
|
||||
lock (this.thisLock)
|
||||
{
|
||||
if (this.actorCommunicationClientFactory == null)
|
||||
{
|
||||
this.actorCommunicationClientFactory = this.CreateActorCommunicationClientFactory();
|
||||
}
|
||||
}
|
||||
|
||||
return this.actorCommunicationClientFactory;
|
||||
}
|
||||
|
||||
private IActorCommunicationClientFactory CreateActorCommunicationClientFactory()
|
||||
{
|
||||
// TODO factory settings
|
||||
var factory = new ActorCommunicationClientFactory();
|
||||
if (factory == null)
|
||||
{
|
||||
throw new NotSupportedException("ClientFactory can't be null");
|
||||
}
|
||||
|
||||
return factory;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,15 +8,15 @@ namespace Microsoft.Actions.Actors
|
|||
using System;
|
||||
|
||||
/// <summary>
|
||||
/// Represents connection settings for Http Client to interact with Actions runtime.
|
||||
/// Represents connection settings for Http/gRPC Client to interact with Actions runtime.
|
||||
/// </summary>
|
||||
internal class HttpClientSettings
|
||||
internal class ClientSettings
|
||||
{
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="HttpClientSettings"/> class.
|
||||
/// Initializes a new instance of the <see cref="ClientSettings"/> class.
|
||||
/// </summary>
|
||||
/// <param name="clientTimeout">Timespan to wait before the request times out for the client.</param>
|
||||
public HttpClientSettings(TimeSpan? clientTimeout = null)
|
||||
public ClientSettings(TimeSpan? clientTimeout = null)
|
||||
{
|
||||
this.ClientTimeout = clientTimeout;
|
||||
}
|
||||
|
|
@ -1,78 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
internal class ActorCommunicationClient : IActorCommunicationClient
|
||||
{
|
||||
private readonly SemaphoreSlim communicationClientLock;
|
||||
private readonly IActorCommunicationClientFactory communicationClientFactory;
|
||||
private readonly IActorMessageBodyFactory messageBodyFactory;
|
||||
private IActionsInteractor actionsInteractor;
|
||||
|
||||
public ActorCommunicationClient(
|
||||
IActorCommunicationClientFactory remotingClientFactory,
|
||||
ActorId actorId,
|
||||
string actorType)
|
||||
{
|
||||
this.ActorId = actorId;
|
||||
this.ActorType = actorType;
|
||||
this.communicationClientFactory = remotingClientFactory;
|
||||
this.communicationClientLock = new SemaphoreSlim(1);
|
||||
this.messageBodyFactory = remotingClientFactory.GetRemotingMessageBodyFactory();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the Actor id.
|
||||
/// </summary>
|
||||
/// <value>actor id.</value>
|
||||
public ActorId ActorId { get; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the Actor implementation type name for the actor.
|
||||
/// belongs to.
|
||||
/// </summary>
|
||||
/// <value>Actor implementation type name.</value>
|
||||
public string ActorType { get; }
|
||||
|
||||
public async Task<IActorResponseMessage> InvokeAsync(
|
||||
IActorRequestMessage remotingRequestMessage,
|
||||
string methodName,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var client = await this.GetCommunicationClientAsync(cancellationToken);
|
||||
return await client.InvokeActorMethodWithRemotingAsync(remotingRequestMessage);
|
||||
}
|
||||
|
||||
private async Task<IActionsInteractor> GetCommunicationClientAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
IActionsInteractor client;
|
||||
await this.communicationClientLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
if (this.actionsInteractor == null)
|
||||
{
|
||||
this.actionsInteractor = await this.communicationClientFactory.GetClientAsync();
|
||||
}
|
||||
|
||||
client = this.actionsInteractor;
|
||||
}
|
||||
finally
|
||||
{
|
||||
// Release the lock incase of exceptions from the GetClientAsync method, which can
|
||||
// happen if there are non retriable exceptions in that method. Eg: There can be
|
||||
// ServiceNotFoundException if the GetClientAsync client is called before the
|
||||
// service creation completes.
|
||||
this.communicationClientLock.Release();
|
||||
}
|
||||
|
||||
return client;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,89 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Actions.Actors.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// An <see cref="IActorCommunicationClientFactory"/> that uses
|
||||
/// http protocol to create <see cref="IActorCommunicationClient"/> that communicate with actors.
|
||||
/// </summary>
|
||||
internal class ActorCommunicationClientFactory : IActorCommunicationClientFactory
|
||||
{
|
||||
private readonly ActorMessageSerializersManager serializersManager;
|
||||
private readonly IActorMessageBodyFactory remotingMessageBodyFactory = null;
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="ActorCommunicationClientFactory"/> class.
|
||||
/// Constructs a fabric transport based service remoting client factory.
|
||||
/// </summary>
|
||||
/// <param name="serializationProvider">IActorCommunicationMessageSerializationProvider provider.</param>
|
||||
public ActorCommunicationClientFactory(
|
||||
IActorMessageBodySerializationProvider serializationProvider = null)
|
||||
{
|
||||
// TODO Add settings, exception handlers, serialization provider
|
||||
this.serializersManager = IntializeSerializationManager(serializationProvider);
|
||||
this.remotingMessageBodyFactory = this.serializersManager.GetSerializationProvider().CreateMessageBodyFactory();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a client to communicate.
|
||||
/// </summary>
|
||||
/// <returns>
|
||||
/// A <see cref="System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
|
||||
/// the CommunicationClient(<see cref="IActorCommunicationClient" />) object.
|
||||
/// </returns>
|
||||
public async Task<IActionsInteractor> GetClientAsync()
|
||||
{
|
||||
return await this.CreateClientAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a factory for creating the remoting message bodies.
|
||||
/// </summary>
|
||||
/// <returns>A factory for creating the remoting message bodies.</returns>
|
||||
public IActorMessageBodyFactory GetRemotingMessageBodyFactory()
|
||||
{
|
||||
return this.remotingMessageBodyFactory;
|
||||
}
|
||||
|
||||
private static ActorMessageSerializersManager IntializeSerializationManager(
|
||||
IActorMessageBodySerializationProvider serializationProvider)
|
||||
{
|
||||
// TODO serializer settings
|
||||
return new ActorMessageSerializersManager(
|
||||
serializationProvider,
|
||||
new ActorMessageHeaderSerializer());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a communication client for the given endpoint address.
|
||||
/// </summary>
|
||||
/// <returns>The communication client that was created.</returns>
|
||||
private Task<IActionsInteractor> CreateClientAsync()
|
||||
{
|
||||
try
|
||||
{
|
||||
// TODO add retries and error handling - add CreateClientWithRetriesAsync version
|
||||
var client = new ActionsHttpInteractor(
|
||||
this.serializersManager);
|
||||
return Task.FromResult((IActionsInteractor)client);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// TODO specific error handling
|
||||
throw new Exception(
|
||||
string.Format(
|
||||
CultureInfo.CurrentCulture,
|
||||
ex.ToString()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
internal class ActorNonRemotingClient
|
||||
{
|
||||
private readonly IActionsInteractor actionsInteractor;
|
||||
|
||||
public ActorNonRemotingClient(IActionsInteractor actionsInteractor)
|
||||
{
|
||||
this.actionsInteractor = actionsInteractor;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Invokes an Actor method on Actions without remoting.
|
||||
/// </summary>
|
||||
/// <param name="actorType">Type of actor.</param>
|
||||
/// <param name="actorId">ActorId.</param>
|
||||
/// <param name="methodName">Method name to invoke.</param>
|
||||
/// <param name="jsonPayload">Serialized body.</param>
|
||||
/// <param name="cancellationToken">Cancels the operation.</param>
|
||||
/// <returns>A task that represents the asynchronous operation.</returns>
|
||||
public Task<Stream> InvokeActorMethodWithoutRemotingAsync(string actorType, string actorId, string methodName, string jsonPayload, CancellationToken cancellationToken = default(CancellationToken))
|
||||
{
|
||||
return this.actionsInteractor.InvokeActorMethodWithoutRemotingAsync(actorType, actorId, methodName, jsonPayload, cancellationToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
internal class ActorRemotingClient
|
||||
{
|
||||
private readonly ActorMessageSerializersManager serializersManager;
|
||||
private readonly IActorMessageBodyFactory remotingMessageBodyFactory = null;
|
||||
private readonly IActionsInteractor actionsInteractor;
|
||||
|
||||
public ActorRemotingClient(
|
||||
IActionsInteractor actionsInteractor,
|
||||
IActorMessageBodySerializationProvider serializationProvider = null)
|
||||
{
|
||||
this.actionsInteractor = actionsInteractor;
|
||||
this.serializersManager = IntializeSerializationManager(serializationProvider);
|
||||
this.remotingMessageBodyFactory = this.serializersManager.GetSerializationProvider().CreateMessageBodyFactory();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets a factory for creating the remoting message bodies.
|
||||
/// </summary>
|
||||
/// <returns>A factory for creating the remoting message bodies.</returns>
|
||||
public IActorMessageBodyFactory GetRemotingMessageBodyFactory()
|
||||
{
|
||||
return this.remotingMessageBodyFactory;
|
||||
}
|
||||
|
||||
public async Task<IActorResponseMessage> InvokeAsync(
|
||||
IActorRequestMessage remotingRequestMessage,
|
||||
string methodName,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
return await this.actionsInteractor.InvokeActorMethodWithRemotingAsync(this.serializersManager, remotingRequestMessage, cancellationToken);
|
||||
}
|
||||
|
||||
private static ActorMessageSerializersManager IntializeSerializationManager(
|
||||
IActorMessageBodySerializationProvider serializationProvider)
|
||||
{
|
||||
// TODO serializer settings
|
||||
return new ActorMessageSerializersManager(
|
||||
serializationProvider,
|
||||
new ActorMessageHeaderSerializer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,20 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System.Threading.Tasks;
|
||||
|
||||
/// <summary>
|
||||
/// Defines the interface for the client that communicate with an actor.
|
||||
/// </summary>
|
||||
internal interface IActorCommunicationClient
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the id of the actor this client communicates with.
|
||||
/// </summary>
|
||||
ActorId ActorId { get; }
|
||||
}
|
||||
}
|
||||
|
|
@ -1,32 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Communication.Client
|
||||
{
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Microsoft.Actions.Actors.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// A factory for creating <see cref="IActorCommunicationClient">actions communication clients.</see>.
|
||||
/// </summary>
|
||||
internal interface IActorCommunicationClientFactory
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets a factory for creating the remoting message bodies.
|
||||
/// </summary>
|
||||
/// <returns>A factory for creating the remoting message bodies.</returns>
|
||||
IActorMessageBodyFactory GetRemotingMessageBodyFactory();
|
||||
|
||||
/// <summary>
|
||||
/// Get a communication client.
|
||||
/// </summary>
|
||||
/// <returns>
|
||||
/// A <see cref="System.Threading.Tasks.Task">Task</see> that represents outstanding operation. The result of the Task is
|
||||
/// the CommunicationClient(<see cref="IActorCommunicationClient" />) object.
|
||||
/// </returns>
|
||||
Task<IActionsInteractor> GetClientAsync();
|
||||
}
|
||||
}
|
||||
|
|
@ -73,10 +73,11 @@ namespace Microsoft.Actions.Actors
|
|||
/// <summary>
|
||||
/// Invokes Actor method.
|
||||
/// </summary>
|
||||
/// <param name="serializersManager">Serializers manager for remoting calls.</param>
|
||||
/// <param name="remotingRequestRequestMessage">Actor Request Message.</param>
|
||||
/// <param name="cancellationToken">Cancels the operation.</param>
|
||||
/// <returns>A <see cref="Task"/> representing the result of the asynchronous operation.</returns>
|
||||
Task<IActorResponseMessage> InvokeActorMethodWithRemotingAsync(IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default(CancellationToken));
|
||||
Task<IActorResponseMessage> InvokeActorMethodWithRemotingAsync(ActorMessageSerializersManager serializersManager, IActorRequestMessage remotingRequestRequestMessage, CancellationToken cancellationToken = default(CancellationToken));
|
||||
|
||||
/// <summary>
|
||||
/// Register a reminder.
|
||||
|
|
|
|||
|
|
@ -1,31 +0,0 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
namespace Microsoft.Actions.Actors.Test
|
||||
{
|
||||
using System.Threading;
|
||||
using Microsoft.VisualStudio.TestTools.UnitTesting;
|
||||
using Microsoft.Actions.Actors.Client;
|
||||
using Microsoft.Actions.Actors.Communication.Client;
|
||||
|
||||
[TestClass]
|
||||
public class ActorCommunicationTests
|
||||
{
|
||||
[TestMethod]
|
||||
public void TestCreateActorCommunicationClientFactory()
|
||||
{
|
||||
var factory = new ActorCommunicationClientFactory();
|
||||
}
|
||||
|
||||
[TestMethod]
|
||||
public void TestInvokingMethodOnActorInterface()
|
||||
{
|
||||
var actorId = new ActorId("Test");
|
||||
var proxy = ActorProxy.Create<ITestActor>(actorId, "TestActor");
|
||||
// TODO the following call is expected to fail as http send request will fail till we have mocked it.
|
||||
proxy.SetCountAsync(5, CancellationToken.None).GetAwaiter().GetResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue