fix review comments -2

This commit is contained in:
vinayada1 2020-10-28 23:35:47 -07:00
parent f6cc602286
commit 0acec97ade
5 changed files with 146 additions and 44 deletions

View File

@ -143,7 +143,7 @@ namespace Dapr.Client
/// <param name="httpExtension">Additional fields that may be needed if the receiving app is listening on HTTP.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{InvokeResponse}" /> that will return the value when the operation has completed.</returns>
public abstract Task<InvocationResponse<TRequest, TResponse>> InvokeMethodWithResponseHeadersAsync<TRequest, TResponse>(
public abstract Task<InvocationResponse<TRequest, TResponse>> InvokeMethodWithResponseAsync<TRequest, TResponse>(
string appId,
string methodName,
TRequest data,
@ -153,14 +153,13 @@ namespace Dapr.Client
/// <summary>
/// Invokes a method on a Dapr app.
/// </summary>
/// <typeparam name="TResponse">The type of the object in the response.</typeparam>
/// <param name="appId">The Dapr application id to invoke the method on.</param>
/// <param name="methodName">The name of the method to invoke.</param>
/// <param name="data">Byte array to pass to the method</param>
/// <param name="httpExtension">Additional fields that may be needed if the receiving app is listening on HTTP.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="ValueTask{T}" /> that will return the value when the operation has completed.</returns>
public abstract Task<InvocationResponse<byte[], TResponse>> InvokeMethodRawAsync<TResponse>(
public abstract Task<InvocationResponse<byte[], byte[]>> InvokeMethodRawAsync(
string appId,
string methodName,
byte[] data,

View File

@ -8,6 +8,7 @@ namespace Dapr.Client
using System;
using System.Text.Json;
using Grpc.Net.Client;
using Autogenerated = Autogen.Grpc.v1;
/// <summary>
/// Builder for building <see cref="DaprClient"/>
@ -80,7 +81,8 @@ namespace Dapr.Client
}
var channel = GrpcChannel.ForAddress(this.daprEndpoint, this.gRPCChannelOptions ?? new GrpcChannelOptions());
return new DaprClientGrpc(channel, this.jsonSerializerOptions);
var client = new Autogenerated.Dapr.DaprClient(channel);
return new DaprClientGrpc(client, this.jsonSerializerOptions);
}
/// <summary>

View File

@ -43,14 +43,8 @@ namespace Dapr.Client
/// <summary>
/// Initializes a new instance of the <see cref="DaprClientGrpc"/> class.
/// </summary>
/// <param name="channel">gRPC channel to create gRPC clients.</param>
/// <param name="inner">Autogenerated.Dapr.DaprClient.</param>
/// <param name="jsonSerializerOptions">Json serialization options.</param>
internal DaprClientGrpc(GrpcChannel channel, JsonSerializerOptions jsonSerializerOptions = null)
{
this.jsonSerializerOptions = jsonSerializerOptions;
this.client = new Autogenerated.Dapr.DaprClient(channel);
}
internal DaprClientGrpc(Autogenerated.Dapr.DaprClient inner, JsonSerializerOptions jsonSerializerOptions)
{
this.client = inner;
@ -221,13 +215,14 @@ namespace Dapr.Client
AppId = appId,
MethodName = methodName,
Body = data,
HttpExtension = httpExtension,
};
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<TRequest, TResponse>(request, httpExtension, cancellationToken);
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<TRequest, TResponse>(request, cancellationToken);
return invokeResponse.Body;
}
public override async Task<InvocationResponse<TRequest, TResponse>> InvokeMethodWithResponseHeadersAsync<TRequest, TResponse>(
public override async Task<InvocationResponse<TRequest, TResponse>> InvokeMethodWithResponseAsync<TRequest, TResponse>(
string appId,
string methodName,
TRequest data,
@ -242,14 +237,15 @@ namespace Dapr.Client
AppId = appId,
MethodName = methodName,
Body = data,
HttpExtension = httpExtension,
};
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<TRequest, TResponse>(request, request.HttpExtension, cancellationToken);
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<TRequest, TResponse>(request, cancellationToken);
return invokeResponse;
}
public override async Task<InvocationResponse<byte[], TResponse>> InvokeMethodRawAsync<TResponse>(
public override async Task<InvocationResponse<byte[], byte[]>> InvokeMethodRawAsync(
string appId,
string methodName,
byte[] data,
@ -264,9 +260,10 @@ namespace Dapr.Client
AppId = appId,
MethodName = methodName,
Body = data,
HttpExtension = httpExtension,
};
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<byte[], TResponse>(request, httpExtension, cancellationToken);
var invokeResponse = await this.MakeInvokeRequestAsyncWithResponse<byte[], byte[]>(request, cancellationToken);
return invokeResponse;
}
@ -368,7 +365,6 @@ namespace Dapr.Client
private async Task<InvocationResponse<TRequest, TResponse>> MakeInvokeRequestAsyncWithResponse<TRequest, TResponse>(
InvocationRequest<TRequest> request,
HTTPExtension httpExtension,
CancellationToken cancellationToken = default)
{
@ -376,12 +372,20 @@ namespace Dapr.Client
Any serializedData = null;
if (request.Body != null)
{
serializedData = TypeConverters.ToAny(request.Body, this.jsonSerializerOptions);
if (typeof(TResponse) == typeof(byte[]))
{
var requestBytes = (byte[])Convert.ChangeType(request.Body, typeof(byte[]));
serializedData = new Any { Value = ByteString.CopyFrom(requestBytes), TypeUrl = typeof(byte[]).FullName };
}
else
{
serializedData = TypeConverters.ToAny(request.Body, this.jsonSerializerOptions);
}
}
try
{
var grpcCall = MakeInvokeRequestAsync(request.AppId, request.MethodName, serializedData, httpExtension, cancellationToken);
var grpcCall = MakeInvokeRequestAsync(request.AppId, request.MethodName, serializedData, request.HttpExtension, cancellationToken);
var response = await grpcCall.ResponseAsync;
var responseHeaders = await grpcCall.ResponseHeadersAsync;
@ -390,7 +394,66 @@ namespace Dapr.Client
var headers = grpcCall.ResponseHeadersAsync.Result.ToDictionary(kv => kv.Key, kv => kv.ValueBytes);
invokeResponse.Body = response.Data.Value.IsEmpty ? default : TypeConverters.FromAny<TResponse>(response.Data, this.jsonSerializerOptions);
if (typeof(TResponse) == typeof(byte[]))
{
var responseBytes = new byte[response.Data.Value.Length];
response.Data.Value?.CopyTo(responseBytes, 0);
invokeResponse.Body = (TResponse)(response.Data.Value.IsEmpty ? default : Convert.ChangeType(responseBytes, typeof(TResponse)));
}
else
{
invokeResponse.Body = response.Data.Value.IsEmpty ? default : TypeConverters.FromAny<TResponse>(response.Data, this.jsonSerializerOptions);
}
invokeResponse.Headers = headers;
invokeResponse.Trailers = grpcCall.GetTrailers().ToDictionary(kv => kv.Key, kv => kv.ValueBytes);
if (headers.TryGetValue(DaprHttpStatusHeader, out var httpStatus))
{
invokeResponse.HttpStatusCode = (HttpStatusCode)System.Enum.Parse(typeof(HttpStatusCode), Encoding.UTF8.GetString(httpStatus, 0, httpStatus.Length));
invokeResponse.ContentType = Constants.ContentTypeApplicationJson;
}
else
{
// Response is grpc
invokeResponse.GrpcStatusInfo = new GrpcStatusInfo(grpcStatus.StatusCode, grpcStatus.Detail);
invokeResponse.ContentType = Constants.ContentTypeApplicationGrpc;
}
}
catch (RpcException ex)
{
throw ProcessRpcException(ex, invokeResponse);
}
return invokeResponse;
}
private async Task<InvocationResponse<byte[], byte[]>> MakeRawInvokeRequestAsyncWithResponse(
InvocationRequest<byte[]> request,
CancellationToken cancellationToken = default)
{
var invokeResponse = new InvocationResponse<byte[], byte[]>(request);
Any serializedData = null;
if (request.Body != null)
{
serializedData = new Any { Value = ByteString.CopyFrom(request.Body), TypeUrl = typeof(byte[]).FullName };
}
try
{
var grpcCall = MakeInvokeRequestAsync(request.AppId, request.MethodName, serializedData, request.HttpExtension, cancellationToken);
var response = await grpcCall.ResponseAsync;
var responseHeaders = await grpcCall.ResponseHeadersAsync;
var trailers = grpcCall.GetTrailers();
var grpcStatus = grpcCall.GetStatus();
var headers = grpcCall.ResponseHeadersAsync.Result.ToDictionary(kv => kv.Key, kv => kv.ValueBytes);
var responseBytes = new byte[response.Data.Value.Length];
response.Data.Value?.CopyTo(responseBytes, 0);
invokeResponse.Body = response.Data.Value.IsEmpty ? default : responseBytes;
;
invokeResponse.Headers = headers;
invokeResponse.Trailers = grpcCall.GetTrailers().ToDictionary(kv => kv.Key, kv => kv.ValueBytes);

View File

@ -11,8 +11,8 @@ namespace Dapr
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
using Autogenerated = Dapr.Client.Autogen.Grpc;
using Grpc.Net.Client;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
internal class StateTestClient : DaprClientGrpc
{
@ -22,9 +22,9 @@ namespace Dapr
/// <summary>
/// Initializes a new instance of the <see cref="DaprClientGrpc"/> class.
/// </summary>
internal StateTestClient()
: base(channel)
{ }
internal StateTestClient() : base(new Autogenerated.Dapr.DaprClient(channel), null)
{
}
public override ValueTask<TValue> GetStateAsync<TValue>(string storeName, string key, ConsistencyMode? consistencyMode = default, Dictionary<string, string> metadata = default, CancellationToken cancellationToken = default)
{

View File

@ -21,6 +21,9 @@ namespace Dapr.Client.Test
using Xunit;
using System.Linq;
using Moq;
using Google.Protobuf.WellKnownTypes;
using Google.Protobuf;
// using Moq.SetupAsync;
public class InvokeApiTest
@ -428,7 +431,6 @@ namespace Dapr.Client.Test
var response =
client.Call<InvokeResponse>()
.SetResponse(invokeResponse)
.AddHeader("some-header", "some-value")
.Build();
@ -437,7 +439,7 @@ namespace Dapr.Client.Test
.Returns(response);
var body = new Request() { RequestParameter = "Hello " };
var task = client.DaprClient.InvokeMethodWithResponseHeadersAsync<Request, Response>("test", "testMethod", body);
var task = client.DaprClient.InvokeMethodWithResponseAsync<Request, Response>("test", "testMethod", body);
// Validate Response
var invokedResponse = await task;
@ -467,7 +469,7 @@ namespace Dapr.Client.Test
.Returns(response);
var body = new Request() { RequestParameter = "Hello " };
var task = client.DaprClient.InvokeMethodWithResponseHeadersAsync<Request, Response>("test", "testMethod", body);
var task = client.DaprClient.InvokeMethodWithResponseAsync<Request, Response>("test", "testMethod", body);
// Validate Response
var invokedResponse = await task;
@ -478,7 +480,7 @@ namespace Dapr.Client.Test
}
[Fact]
public async Task InvokeMethodAsync_CanInvokeMethodWithResponseHeaders_ServerThrowsRpcException()
public async Task InvokeMethodAsync_CanInvokeMethodWithResponseHeaders_ServerReturnsNonSuccessResponse()
{
var client = new MockClient();
var data = new Response() { Name = "Look, I was invoked!" };
@ -488,7 +490,6 @@ namespace Dapr.Client.Test
var response =
client.Call<InvokeResponse>()
.SetResponse(invokeResponse)
.AddHeader("some-header", "some-value")
.Build();
@ -532,7 +533,7 @@ namespace Dapr.Client.Test
try
{
var body = new Request() { RequestParameter = "Hello " };
await client.DaprClient.InvokeMethodWithResponseHeadersAsync<Request, Response>("test", "testMethod", body);
await client.DaprClient.InvokeMethodWithResponseAsync<Request, Response>("test", "testMethod", body);
Assert.False(true);
}
catch(ServiceInvocationException<Request, Response> ex)
@ -554,29 +555,49 @@ namespace Dapr.Client.Test
{
var client = new MockClient();
var data = new Response() { Name = "Look, I was invoked!" };
// var dataBytes = new byte[]{1,2,3};
var dataBytes = JsonSerializer.SerializeToUtf8Bytes(data);
var invokeResponse = new InvokeResponse();
invokeResponse.Data = TypeConverters.ToAny(data);
invokeResponse.Data = new Any { Value = ByteString.CopyFrom(dataBytes), TypeUrl = typeof(byte[]).FullName };
var response =
client.Call<InvokeResponse>()
.SetResponse(invokeResponse)
.Build();
var body = new Request() { RequestParameter = "Hello " };
var requestBytes = JsonSerializer.SerializeToUtf8Bytes(body);
client.Mock
.Setup(m => m.InvokeServiceAsync(It.IsAny<Autogen.Grpc.v1.InvokeServiceRequest>(), It.IsAny<CallOptions>()))
.Returns(response);
var body = new Request() { RequestParameter = "Hello " };
var bytes = JsonSerializer.SerializeToUtf8Bytes(body);
var task = client.DaprClient.InvokeMethodRawAsync<Response>("test", "testMethod", bytes);
var task = client.DaprClient.InvokeMethodRawAsync("test", "testMethod", requestBytes);
// Validate Response
var invokedResponse = await task;
invokedResponse.Body.Name.Should().Be("Look, I was invoked!");
var responseBody = JsonSerializer.Deserialize<Response>(invokedResponse.Body);
responseBody.Name.Should().Be("Look, I was invoked!");
invokedResponse.HttpStatusCode.Should().BeNull();
invokedResponse.GrpcStatusInfo.Should().NotBeNull();
invokedResponse.GrpcStatusInfo.GrpcStatusCode.Should().Be(Grpc.Core.StatusCode.OK);
var expectedRequest = new Autogen.Grpc.v1.InvokeServiceRequest
{
Id = "test",
Message = new InvokeRequest
{
Method = "testMethod",
Data = new Any { Value = ByteString.CopyFrom(requestBytes), TypeUrl = typeof(byte[]).FullName },
HttpExtension = new Autogen.Grpc.v1.HTTPExtension
{
Verb = Autogen.Grpc.v1.HTTPExtension.Types.Verb.Post,
},
ContentType = "application/json",
},
};
client.Mock.Verify(m => m.InvokeServiceAsync(It.Is<Autogen.Grpc.v1.InvokeServiceRequest>(request => request.Equals(expectedRequest)), It.IsAny<CallOptions>()));
}
[Fact]
@ -584,8 +605,9 @@ namespace Dapr.Client.Test
{
var client = new MockClient();
var data = new Response() { Name = "Look, I was invoked!" };
var dataBytes = JsonSerializer.SerializeToUtf8Bytes(data);
var invokeResponse = new InvokeResponse();
invokeResponse.Data = TypeConverters.ToAny(data);
invokeResponse.Data = new Any { Value = ByteString.CopyFrom(dataBytes), TypeUrl = typeof(byte[]).FullName };
var response =
client.Call<InvokeResponse>()
@ -593,25 +615,42 @@ namespace Dapr.Client.Test
.AddHeader("dapr-http-status", "200")
.Build();
var body = new Request() { RequestParameter = "Hello " };
var requestBytes = JsonSerializer.SerializeToUtf8Bytes(body);
client.Mock
.Setup(m => m.InvokeServiceAsync(It.IsAny<Autogen.Grpc.v1.InvokeServiceRequest>(), It.IsAny<CallOptions>()))
.Returns(response);
var body = new Request() { RequestParameter = "Hello " };
var bytes = JsonSerializer.SerializeToUtf8Bytes(body);
var task = client.DaprClient.InvokeMethodRawAsync<Response>("test", "testMethod", bytes);
var task = client.DaprClient.InvokeMethodRawAsync("test", "testMethod", requestBytes);
// Validate Response
var invokedResponse = await task;
invokedResponse.Body.Name.Should().Be("Look, I was invoked!");
var responseBody = JsonSerializer.Deserialize<Response>(invokedResponse.Body);
responseBody.Name.Should().Be("Look, I was invoked!");
invokedResponse.GrpcStatusInfo.Should().BeNull();
invokedResponse.HttpStatusCode.Should().NotBeNull();
invokedResponse.HttpStatusCode.Should().Be(HttpStatusCode.OK);
var expectedRequest = new Autogen.Grpc.v1.InvokeServiceRequest
{
Id = "test",
Message = new InvokeRequest
{
Method = "testMethod",
Data = new Any { Value = ByteString.CopyFrom(requestBytes), TypeUrl = typeof(byte[]).FullName },
HttpExtension = new Autogen.Grpc.v1.HTTPExtension
{
Verb = Autogen.Grpc.v1.HTTPExtension.Types.Verb.Post,
},
ContentType = "application/json",
},
};
client.Mock.Verify(m => m.InvokeServiceAsync(It.Is<Autogen.Grpc.v1.InvokeServiceRequest>(request => request.Equals(expectedRequest)), It.IsAny<CallOptions>()));
}
[Fact]
public async Task InvokeMethodAsync_CanInvokeRawMethodWithResponse_ServerThrowsRpcException()
public async Task InvokeMethodAsync_CanInvokeRawMethodWithResponse_ServerReturnsNonSuccessResponse()
{
var client = new MockClient();
var data = new Response() { Name = "Look, I was invoked!" };
@ -621,7 +660,6 @@ namespace Dapr.Client.Test
var response =
client.Call<InvokeResponse>()
.SetResponse(invokeResponse)
.AddHeader("some-header", "some-value")
.Build();
var trailers = new Metadata();
@ -666,10 +704,10 @@ namespace Dapr.Client.Test
{
var body = new Request() { RequestParameter = "Hello " };
var bytes = JsonSerializer.SerializeToUtf8Bytes(body);
await client.DaprClient.InvokeMethodRawAsync<Response>("test", "testMethod", bytes);
await client.DaprClient.InvokeMethodRawAsync("test", "testMethod", bytes);
Assert.False(true);
}
catch(ServiceInvocationException<byte[], Response> ex)
catch(ServiceInvocationException<byte[], byte[]> ex)
{
ex.Message.Should().Be("Exception while invoking testMethod on appId:test");
ex.InnerException.Message.Should().Be(rpcExceptionMessage);