From 7eacfcc88f8d6b1cd6333a7c5695908a563a5956 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 27 Jan 2021 15:46:35 -0800 Subject: [PATCH 1/3] Fix error handling in http invoke. ITs to validate invoke errors on http and grpc services. Handle serialization of proto objects. --- .../actors/client/DaprGrpcClientTest.java | 2 - sdk-tests/pom.xml | 73 ++++++++ sdk-tests/proto/methodinvokeservice.proto | 42 +++++ .../src/test/java/io/dapr/it/BaseIT.java | 81 ++++++++- .../src/test/java/io/dapr/it/DaprRun.java | 40 +++-- .../it/methodinvoke/grpc/MethodInvokeIT.java | 145 ++++++++++++++++ .../grpc/MethodInvokeService.java | 162 ++++++++++++++++++ .../http/MethodInvokeController.java | 3 + .../it/methodinvoke/http/MethodInvokeIT.java | 16 ++ .../main/java/io/dapr/client/DaprHttp.java | 27 ++- .../java/io/dapr/client/ObjectSerializer.java | 22 ++- .../java/io/dapr/exceptions/DaprError.java | 12 ++ .../io/dapr/client/DaprClientHttpTest.java | 80 +++++++-- .../DefaultObjectSerializerTest.java | 14 ++ 14 files changed, 672 insertions(+), 47 deletions(-) create mode 100644 sdk-tests/proto/methodinvokeservice.proto create mode 100644 sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index 9869bc756..3bd79a2fe 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -22,8 +22,6 @@ import static org.mockito.Mockito.*; public class DaprGrpcClientTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ACTOR_TYPE = "MyActorType"; private static final String ACTOR_ID = "1234567890"; diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 3198e2461..6cfbb8d5d 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -26,9 +26,43 @@ 11 true 1.0.0-SNAPSHOT + ${project.build.directory}/generated-sources + ${project.basedir}/proto + 1.33.1 + 3.13.0 + + commons-cli + commons-cli + 1.4 + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-api + ${grpc.version} + + + com.google.protobuf + protobuf-java-util + ${protobuf.version} + + + com.github.os72 + protoc-jar-maven-plugin + 3.10.1 + io.dapr dapr-sdk @@ -90,10 +124,49 @@ 3.9 test + + jakarta.annotation + jakarta.annotation-api + 1.3.5 + compile + + + com.github.os72 + protoc-jar-maven-plugin + 3.10.1 + + + generate-sources + + run + + + ${protobuf.version} + inputs + direct + true + + ${protobuf.input.directory} + + + + java + ${protobuf.output.directory} + + + grpc-java + ${protobuf.output.directory} + io.grpc:protoc-gen-grpc-java:${grpc.version} + + + + + + org.apache.maven.plugins maven-jar-plugin diff --git a/sdk-tests/proto/methodinvokeservice.proto b/sdk-tests/proto/methodinvokeservice.proto new file mode 100644 index 000000000..b16d0526a --- /dev/null +++ b/sdk-tests/proto/methodinvokeservice.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package daprtests; + +option java_outer_classname = "MethodInvokeServiceProtos"; +option java_package = "io.dapr.it"; + +service MethodInvokeService { + rpc PostMessage (PostMessageRequest) returns (PostMessageResponse) {} + rpc DeleteMessage (DeleteMessageRequest) returns (DeleteMessageResponse) {} + rpc GetMessages (GetMessagesRequest) returns (GetMessagesResponse) {} + rpc Sleep (SleepRequest) returns (SleepResponse) {} +} + +message PostMessageRequest { + int32 id = 1; + string message = 2; +} + +message PostMessageResponse { +} + +message DeleteMessageRequest { + int32 id = 1; +} + +message DeleteMessageResponse { +} + +message GetMessagesRequest { +} + +message GetMessagesResponse { + map messages = 1; +} + +message SleepRequest { + int32 seconds = 1; +} + +message SleepResponse { +} \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index 5ce0c6ce9..daf8cc0eb 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -15,6 +15,9 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import static io.dapr.client.DaprApiProtocol.GRPC; +import static io.dapr.client.DaprApiProtocol.HTTP; + public abstract class BaseIT { protected static final String STATE_STORE_NAME = "statestore"; @@ -31,7 +34,16 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); + return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, GRPC); + } + + protected static DaprRun startDaprApp( + String testName, + String successMessage, + Class serviceClass, + DaprApiProtocol appProtocol, + int maxWaitMilliseconds) throws Exception { + return startDaprApp(testName, successMessage, serviceClass, true, maxWaitMilliseconds, GRPC, appProtocol); } protected static DaprRun startDaprApp( @@ -41,14 +53,48 @@ public abstract class BaseIT { Boolean useAppPort, int maxWaitMilliseconds, DaprApiProtocol protocol) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, protocol); + return startDaprApp( + testName, + successMessage, + serviceClass, + useAppPort, + true, + maxWaitMilliseconds, + protocol, + HTTP); + } + + protected static DaprRun startDaprApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds, + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { + return startDaprApp( + testName, + successMessage, + serviceClass, + useAppPort, + true, + maxWaitMilliseconds, + protocol, + appProtocol); } protected static DaprRun startDaprApp( String testName, int maxWaitMilliseconds) throws Exception { return startDaprApp( - testName, "You're up and running!", null, false, true, maxWaitMilliseconds, DaprApiProtocol.GRPC); + testName, + "You're up and running!", + null, + false, + true, + maxWaitMilliseconds, + GRPC, + HTTP); } protected static DaprRun startDaprApp( @@ -58,13 +104,15 @@ public abstract class BaseIT { Boolean useAppPort, Boolean useDaprPorts, int maxWaitMilliseconds, - DaprApiProtocol protocol) throws Exception { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), successMessage, maxWaitMilliseconds, - protocol).withServiceClass(serviceClass); + protocol, + appProtocol).withServiceClass(serviceClass); DaprRun run = builder.build(); TO_BE_STOPPED.add(run); DAPR_RUN_BUILDERS.put(run.getAppName(), builder); @@ -83,19 +131,38 @@ public abstract class BaseIT { testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); } + protected static ImmutablePair startSplitDaprAndApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds, + DaprApiProtocol protocol) throws Exception { + return startSplitDaprAndApp( + testName, + successMessage, + serviceClass, + useAppPort, + maxWaitMilliseconds, + protocol, + HTTP); + } + protected static ImmutablePair startSplitDaprAndApp( String testName, String successMessage, Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds, - DaprApiProtocol protocol) throws Exception { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, true, true), successMessage, maxWaitMilliseconds, - protocol).withServiceClass(serviceClass); + protocol, + appProtocol).withServiceClass(serviceClass); ImmutablePair runs = builder.splitBuild(); TO_BE_STOPPED.add(runs.left); TO_BE_STOPPED.add(runs.right); diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 8939785aa..a4bd7cd3d 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -20,7 +20,7 @@ public class DaprRun implements Stoppable { private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; - private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components"; + private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s --components-path ./components"; // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = @@ -45,11 +45,12 @@ public class DaprRun implements Stoppable { String successMessage, Class serviceClass, int maxWaitMilliseconds, - DaprApiProtocol protocol) { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName()); this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol)); + new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol, appProtocol)); this.listCommand = new Command( this.appName, "dapr list"); @@ -170,16 +171,17 @@ public class DaprRun implements Stoppable { } private static String buildDaprCommand( - String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol) { - StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName)) - .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") - .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") - .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") - .append(serviceClass == null ? "" : - String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), - ports.getAppPort() != null ? ports.getAppPort().toString() : "", - Properties.API_PROTOCOL.getName(), protocol, - Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol)); + String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) { + StringBuilder stringBuilder = + new StringBuilder(String.format(DAPR_RUN, appName, appProtocol.toString().toLowerCase())) + .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") + .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") + .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") + .append(serviceClass == null ? "" : + String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), + ports.getAppPort() != null ? ports.getAppPort().toString() : "", + Properties.API_PROTOCOL.getName(), protocol, + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol)); return stringBuilder.toString(); } @@ -210,17 +212,21 @@ public class DaprRun implements Stoppable { private DaprApiProtocol protocol; + private DaprApiProtocol appProtocol; + Builder( String testName, Supplier portsSupplier, String successMessage, int maxWaitMilliseconds, - DaprApiProtocol protocol) { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) { this.testName = testName; this.portsSupplier = portsSupplier; this.successMessage = successMessage; this.maxWaitMilliseconds = maxWaitMilliseconds; this.protocol = protocol; + this.appProtocol = appProtocol; } public Builder withServiceClass(Class serviceClass) { @@ -235,7 +241,8 @@ public class DaprRun implements Stoppable { this.successMessage, this.serviceClass, this.maxWaitMilliseconds, - this.protocol); + this.protocol, + this.appProtocol); } /** @@ -257,7 +264,8 @@ public class DaprRun implements Stoppable { DAPR_SUCCESS_MESSAGE, null, this.maxWaitMilliseconds, - this.protocol); + this.protocol, + this.appProtocol); return new ImmutablePair<>(appRun, daprRun); } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java new file mode 100644 index 000000000..d827c7227 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java @@ -0,0 +1,145 @@ +package io.dapr.it.methodinvoke.grpc; + +import io.dapr.client.DaprApiProtocol; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class MethodInvokeIT extends BaseIT { + + //Number of messages to be sent: 10 + private static final int NUM_MESSAGES = 10; + + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false }, { true } }); + } + + /** + * Run of a Dapr application. + */ + private DaprRun daprRun = null; + + @Parameter + public boolean useGrpc; + + @Before + public void init() throws Exception { + daprRun = startDaprApp( + MethodInvokeIT.class.getSimpleName(), + MethodInvokeService.SUCCESS_MESSAGE, + MethodInvokeService.class, + DaprApiProtocol.GRPC, // appProtocol + 60000); + + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } + + // Wait since service might be ready even after port is available. + Thread.sleep(2000); + } + + @Test + public void testInvoke() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d", i); + + PostMessageRequest req = PostMessageRequest.newBuilder().setId(i).setMessage(message).build(); + client.invokeMethod(daprRun.getAppName(), "postMessage", req, HttpExtension.POST).block(); + System.out.println("Invoke method messages : " + message); + } + + Map messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals(10, messages.size()); + + // Delete one message. + client.invokeMethod( + daprRun.getAppName(), + "deleteMessage", + DeleteMessageRequest.newBuilder().setId(1).build(), + HttpExtension.POST).block(); + messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals(9, messages.size()); + + // Now update one message. + client.invokeMethod( + daprRun.getAppName(), + "postMessage", + PostMessageRequest.newBuilder().setId(2).setMessage("updated message").build(), + HttpExtension.POST).block(); + messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals("updated message", messages.get(2)); + } + } + + @Test + public void testInvokeTimeout() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + long started = System.currentTimeMillis(); + SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build(); + String message = assertThrows(IllegalStateException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST) + .block(Duration.ofMillis(10))).getMessage(); + long delay = System.currentTimeMillis() - started; + assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out. + assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); + } + } + + @Test + public void testInvokeException() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + SleepRequest req = SleepRequest.newBuilder().setSeconds(-9).build(); + DaprException exception = assertThrows(DaprException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block()); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java new file mode 100644 index 000000000..0a3eca1bc --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.methodinvoke.grpc; + +import com.google.protobuf.Any; +import io.dapr.v1.AppCallbackGrpc; +import io.dapr.v1.CommonProtos; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageResponse; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse; +import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest; +import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse; + +public class MethodInvokeService { + + public static final String SUCCESS_MESSAGE = "application discovered on port "; + + /** + * Server mode: class that encapsulates all server-side logic for Grpc. + */ + private static class MyDaprService extends AppCallbackGrpc.AppCallbackImplBase { + + private final Map messages = Collections.synchronizedMap(new HashMap<>()); + + /** + * Server mode: Grpc server. + */ + private Server server; + + /** + * Server mode: starts listening on given port. + * + * @param port Port to listen on. + * @throws IOException Errors while trying to start service. + */ + private void start(int port) throws IOException { + this.server = ServerBuilder + .forPort(port) + .addService(this) + .build() + .start(); + System.out.printf("Server: started listening on port %d\n", port); + + // Now we handle ctrl+c (or any other JVM shutdown) + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Server: shutting down gracefully ..."); + MyDaprService.this.server.shutdown(); + System.out.println("Server: Bye."); + })); + } + + /** + * Server mode: waits for shutdown trigger. + * + * @throws InterruptedException Propagated interrupted exception. + */ + private void awaitTermination() throws InterruptedException { + if (this.server != null) { + this.server.awaitTermination(); + } + } + + /** + * Server mode: this is the Dapr method to receive Invoke operations via Grpc. + * + * @param request Dapr envelope request, + * @param responseObserver Dapr envelope response. + */ + @Override + public void onInvoke(CommonProtos.InvokeRequest request, + StreamObserver responseObserver) { + try { + if ("postMessage".equals(request.getMethod())) { + PostMessageRequest req = PostMessageRequest.parseFrom(request.getData().getValue().toByteArray()); + + this.messages.put(req.getId(), req.getMessage()); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(PostMessageResponse.newBuilder().build())); + responseObserver.onNext(responseBuilder.build()); + } + if ("deleteMessage".equals(request.getMethod())) { + DeleteMessageRequest req = DeleteMessageRequest.parseFrom(request.getData().getValue().toByteArray()); + + this.messages.remove(req.getId()); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(DeleteMessageResponse.newBuilder().build())); + responseObserver.onNext(responseBuilder.build()); + } + if ("getMessages".equals(request.getMethod())) { + GetMessagesRequest.parseFrom(request.getData().getValue().toByteArray()); + + GetMessagesResponse res = GetMessagesResponse.newBuilder().putAllMessages(this.messages).build(); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(res)); + responseObserver.onNext(responseBuilder.build()); + } + if ("sleep".equals(request.getMethod())) { + SleepRequest req = SleepRequest.parseFrom(request.getData().getValue().toByteArray()); + + SleepResponse res = this.sleep(req); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(res)); + responseObserver.onNext(responseBuilder.build()); + } + } catch (Exception e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + public SleepResponse sleep(SleepRequest request) { + if (request.getSeconds() < 0) { + throw new IllegalArgumentException("Sleep time cannot be negative."); + } + + try { + Thread.sleep(request.getSeconds() * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + new RuntimeException(e); + } + + // Now respond with current timestamp. + return SleepResponse.newBuilder().build(); + } + } + + /** + * This is the main method of this app. + * @param args The port to listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + + System.out.printf("Service starting on port %d ...\n", port); + + final MyDaprService service = new MyDaprService(); + service.start(port); + service.awaitTermination(); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java index cbc5319f8..71c3e7253 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java @@ -82,6 +82,9 @@ public class MethodInvokeController { @PostMapping(path = "/sleep") public void sleep(@RequestBody int seconds) throws InterruptedException { + if (seconds < 0) { + throw new IllegalArgumentException("Sleep time cannot be negative."); + } Thread.sleep(seconds * 1000); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java index df273c2ad..6d1242bf0 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java @@ -3,8 +3,10 @@ package io.dapr.it.methodinvoke.http; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; +import io.dapr.it.MethodInvokeServiceProtos; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -19,6 +21,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.runners.Parameterized.Parameter; @@ -143,4 +146,17 @@ public class MethodInvokeIT extends BaseIT { assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); } } + + @Test + public void testInvokeException() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + MethodInvokeServiceProtos.SleepRequest req = MethodInvokeServiceProtos.SleepRequest.newBuilder().setSeconds(-9).build(); + DaprException exception = assertThrows(DaprException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", -9, HttpExtension.POST).block()); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertNotNull(exception.getMessage()); + assertTrue(exception.getMessage().contains("Internal Server Error")); + } + } } diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index a5f532a5c..29bcedccc 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -5,7 +5,6 @@ package io.dapr.client; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.client.domain.Metadata; import io.dapr.config.Properties; @@ -308,14 +307,14 @@ public class DaprHttp implements AutoCloseable { * @param json Response body from Dapr. * @return DaprError or null if could not parse. */ - private static DaprError parseDaprError(byte[] json) throws IOException { + private static DaprError parseDaprError(byte[] json) { if ((json == null) || (json.length == 0)) { return null; } try { return OBJECT_MAPPER.readValue(json, DaprError.class); - } catch (JsonParseException e) { + } catch (IOException e) { throw new DaprException("UNKNOWN", new String(json, StandardCharsets.UTF_8)); } } @@ -347,14 +346,24 @@ public class DaprHttp implements AutoCloseable { @Override public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { if (!response.isSuccessful()) { - DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); - if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { - future.completeExceptionally(new DaprException(error)); + try { + DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); + if ((error != null) && (error.getErrorCode() != null)) { + if (error.getMessage() != null) { + future.completeExceptionally(new DaprException(error)); + } else { + future.completeExceptionally( + new DaprException(error.getErrorCode(), "HTTP status code: " + response.code())); + } + return; + } + + future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code())); + return; + } catch (DaprException e) { + future.completeExceptionally(e); return; } - - future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code())); - return; } Map mapHeaders = new HashMap<>(); diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index db3a17a21..48939e40f 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -10,10 +10,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.MessageLite; import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; import java.io.IOException; +import java.lang.reflect.Method; /** * Serializes and deserializes an internal object. @@ -54,6 +56,11 @@ public class ObjectSerializer { return (byte[]) state; } + // Proto buffer class is serialized directly. + if (state instanceof MessageLite) { + return ((MessageLite) state).toByteArray(); + } + // Not string, not primitive, so it is a complex type: we use JSON for that. return OBJECT_MAPPER.writeValueAsBytes(state); } @@ -94,7 +101,7 @@ public class ObjectSerializer { } if (content == null) { - return (T) null; + return null; } // Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[]. @@ -103,13 +110,24 @@ public class ObjectSerializer { } if (content.length == 0) { - return (T) null; + return null; } if (javaType.hasRawClass(CloudEvent.class)) { return (T) CloudEvent.deserialize(content); } + if (javaType.isTypeOrSubTypeOf(MessageLite.class)) { + try { + Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class); + if (method != null) { + return (T) method.invoke(null, content); + } + } catch (Exception e) { + throw new IOException(e); + } + } + return OBJECT_MAPPER.readValue(content, javaType); } diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprError.java b/sdk/src/main/java/io/dapr/exceptions/DaprError.java index 225552797..490cc8f84 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprError.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprError.java @@ -5,9 +5,13 @@ package io.dapr.exceptions; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import io.grpc.Status; + /** * Represents an error message from Dapr. */ +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) public class DaprError { /** @@ -20,12 +24,20 @@ public class DaprError { */ private String message; + /** + * Error code from gRPC. + */ + private Integer code; + /** * Gets the error code. * * @return Error code. */ public String getErrorCode() { + if ((errorCode == null) && (code != null)) { + return Status.fromCodeValue(code).getCode().name(); + } return errorCode; } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 146846b45..981c61288 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -14,6 +14,7 @@ import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; import io.dapr.utils.TypeRef; import okhttp3.OkHttpClient; import okhttp3.ResponseBody; @@ -189,13 +190,80 @@ public class DaprClientHttpTest { }); } + @Test + public void invokeServiceDaprError() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("MYCODE", exception.getErrorCode()); + assertEquals("MYCODE: My Message", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorFromGRPC() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"code\": 7 }", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("PERMISSION_DENIED", exception.getErrorCode()); + assertEquals("PERMISSION_DENIED: HTTP status code: 500", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorUnknownJSON() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"anything\": 7 }", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertEquals("UNKNOWN: { \"anything\": 7 }", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorEmptyString() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + } + @Test public void invokeServiceMethodNull() { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/publish/A") .respond(EXPECTED_RESULT); - String event = "{ \"message\": \"This is a test\" }"; assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block()); @@ -337,7 +405,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseNull() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(new byte[0]); @@ -348,7 +415,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseObject() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"OK\""); @@ -370,7 +436,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseFloat() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1.5"); @@ -381,7 +446,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseChar() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"a\""); @@ -392,7 +456,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseByte() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"2\""); @@ -403,7 +466,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseLong() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); @@ -414,7 +476,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseInt() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); @@ -425,7 +486,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingNullName() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); @@ -436,7 +496,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingNullOpName() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); @@ -447,7 +506,6 @@ public class DaprClientHttpTest { @Test public void bindingNoHotMono() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); diff --git a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java index 7cdd9ecc6..dcfaa1176 100644 --- a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java +++ b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java @@ -8,6 +8,7 @@ package io.dapr.serializer; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; +import io.dapr.v1.CommonProtos; import org.junit.Test; import java.io.IOException; @@ -393,6 +394,19 @@ public class DefaultObjectSerializerTest { } } + @Test + public void serializeProtoTest() throws Exception { + CommonProtos.Etag valueToSerialize = CommonProtos.Etag.newBuilder().setValue("myValue").build(); + String expectedSerializedBase64Value = "CgdteVZhbHVl"; + + byte[] serializedValue = SERIALIZER.serialize(valueToSerialize); + assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue)); + assertNotNull(serializedValue); + CommonProtos.Etag deserializedValue = SERIALIZER.deserialize(serializedValue, CommonProtos.Etag.class); + assertEquals(valueToSerialize.getValue(), deserializedValue.getValue()); + assertEquals(valueToSerialize, deserializedValue); + } + @Test public void deserializeObjectTest() { String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}"; From 5d20a217a7a1f2c5b863f006d8126ca48350ac98 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 27 Jan 2021 23:19:27 -0800 Subject: [PATCH 2/3] Fix IT to switch to GRPC service invocation. --- sdk-tests/src/test/java/io/dapr/it/DaprRun.java | 10 ++++++++++ .../io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java | 6 +++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index a4bd7cd3d..6222160be 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -140,18 +140,28 @@ public class DaprRun implements Stoppable { System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort())); } System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.GRPC.name()); } public void switchToGRPC() { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.GRPC.name()); } public void switchToHTTP() { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.HTTP.name()); } public void switchToProtocol(DaprApiProtocol protocol) { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), protocol.name()); + System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name()); } public int getGrpcPort() { diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java index d827c7227..f0f280082 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java @@ -139,7 +139,11 @@ public class MethodInvokeIT extends BaseIT { client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block()); assertEquals("UNKNOWN", exception.getErrorCode()); - assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + if (this.useGrpc) { + assertEquals("UNKNOWN: ", exception.getMessage()); + } else { + assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + } } } } From d7cc7974c79522a595d9c72c270c82034f3fafcb Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 27 Jan 2021 23:39:13 -0800 Subject: [PATCH 3/3] handle method not found when proto class does not have parseFrom() --- .../java/io/dapr/client/ObjectSerializer.java | 2 + .../DefaultObjectSerializerTest.java | 79 +++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index 48939e40f..8a184885a 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -123,6 +123,8 @@ public class ObjectSerializer { if (method != null) { return (T) method.invoke(null, content); } + } catch (NoSuchMethodException e) { + // It was a best effort. Skip this try. } catch (Exception e) { throw new IOException(e); } diff --git a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java index dcfaa1176..812e65bbe 100644 --- a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java +++ b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java @@ -5,13 +5,19 @@ package io.dapr.serializer; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; import io.dapr.v1.CommonProtos; import org.junit.Test; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Type; import java.util.ArrayList; @@ -24,6 +30,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -407,6 +414,19 @@ public class DefaultObjectSerializerTest { assertEquals(valueToSerialize, deserializedValue); } + @Test + public void serializeFakeProtoTest() throws Exception { + FakeProtoClass valueToSerialize = new FakeProtoClass(); + String expectedSerializedBase64Value = "AQ=="; + + byte[] serializedValue = SERIALIZER.serialize(valueToSerialize); + assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue)); + assertNotNull(serializedValue); + + // Tries to parse as JSON since FakeProtoClass does not have `parseFrom()` static method. + assertThrows(JsonParseException.class, () -> SERIALIZER.deserialize(serializedValue, FakeProtoClass.class)); + } + @Test public void deserializeObjectTest() { String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}"; @@ -868,4 +888,63 @@ public class DefaultObjectSerializerTest { return "\"" + content + "\""; } + + /** + * Class that simulates a proto class implementing MessageLite but does not have `parseFrom()` static method. + */ + public static final class FakeProtoClass implements MessageLite { + @Override + public void writeTo(CodedOutputStream codedOutputStream) throws IOException { + } + + @Override + public int getSerializedSize() { + return 0; + } + + @Override + public Parser getParserForType() { + return null; + } + + @Override + public ByteString toByteString() { + return null; + } + + @Override + public byte[] toByteArray() { + return new byte[]{0x1}; + } + + @Override + public void writeTo(OutputStream outputStream) throws IOException { + + } + + @Override + public void writeDelimitedTo(OutputStream outputStream) throws IOException { + + } + + @Override + public Builder newBuilderForType() { + return null; + } + + @Override + public Builder toBuilder() { + return null; + } + + @Override + public MessageLite getDefaultInstanceForType() { + return null; + } + + @Override + public boolean isInitialized() { + return false; + } + } }