mirror of https://github.com/dapr/java-sdk.git
Merge pull request #462 from artursouza/it_invoke_grpc
Fix error handling in http invoke.
This commit is contained in:
commit
2ca48be3ec
|
@ -22,8 +22,6 @@ import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class DaprGrpcClientTest {
|
public class DaprGrpcClientTest {
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
private static final String ACTOR_TYPE = "MyActorType";
|
private static final String ACTOR_TYPE = "MyActorType";
|
||||||
|
|
||||||
private static final String ACTOR_ID = "1234567890";
|
private static final String ACTOR_ID = "1234567890";
|
||||||
|
|
|
@ -26,9 +26,43 @@
|
||||||
<maven.compiler.target>11</maven.compiler.target>
|
<maven.compiler.target>11</maven.compiler.target>
|
||||||
<maven.deploy.skip>true</maven.deploy.skip>
|
<maven.deploy.skip>true</maven.deploy.skip>
|
||||||
<dapr.sdk.version>1.0.0-SNAPSHOT</dapr.sdk.version>
|
<dapr.sdk.version>1.0.0-SNAPSHOT</dapr.sdk.version>
|
||||||
|
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
|
||||||
|
<protobuf.input.directory>${project.basedir}/proto</protobuf.input.directory>
|
||||||
|
<grpc.version>1.33.1</grpc.version>
|
||||||
|
<protobuf.version>3.13.0</protobuf.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-cli</groupId>
|
||||||
|
<artifactId>commons-cli</artifactId>
|
||||||
|
<version>1.4</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.grpc</groupId>
|
||||||
|
<artifactId>grpc-protobuf</artifactId>
|
||||||
|
<version>${grpc.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.grpc</groupId>
|
||||||
|
<artifactId>grpc-stub</artifactId>
|
||||||
|
<version>${grpc.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.grpc</groupId>
|
||||||
|
<artifactId>grpc-api</artifactId>
|
||||||
|
<version>${grpc.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.google.protobuf</groupId>
|
||||||
|
<artifactId>protobuf-java-util</artifactId>
|
||||||
|
<version>${protobuf.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.github.os72</groupId>
|
||||||
|
<artifactId>protoc-jar-maven-plugin</artifactId>
|
||||||
|
<version>3.10.1</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.dapr</groupId>
|
<groupId>io.dapr</groupId>
|
||||||
<artifactId>dapr-sdk</artifactId>
|
<artifactId>dapr-sdk</artifactId>
|
||||||
|
@ -90,10 +124,49 @@
|
||||||
<version>3.9</version>
|
<version>3.9</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>jakarta.annotation</groupId>
|
||||||
|
<artifactId>jakarta.annotation-api</artifactId>
|
||||||
|
<version>1.3.5</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>com.github.os72</groupId>
|
||||||
|
<artifactId>protoc-jar-maven-plugin</artifactId>
|
||||||
|
<version>3.10.1</version>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>generate-sources</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>run</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<protocVersion>${protobuf.version}</protocVersion>
|
||||||
|
<addProtoSources>inputs</addProtoSources>
|
||||||
|
<includeMavenTypes>direct</includeMavenTypes>
|
||||||
|
<includeStdTypes>true</includeStdTypes>
|
||||||
|
<inputDirectories>
|
||||||
|
<include>${protobuf.input.directory}</include>
|
||||||
|
</inputDirectories>
|
||||||
|
<outputTargets>
|
||||||
|
<outputTarget>
|
||||||
|
<type>java</type>
|
||||||
|
<outputDirectory>${protobuf.output.directory}</outputDirectory>
|
||||||
|
</outputTarget>
|
||||||
|
<outputTarget>
|
||||||
|
<type>grpc-java</type>
|
||||||
|
<outputDirectory>${protobuf.output.directory}</outputDirectory>
|
||||||
|
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
|
||||||
|
</outputTarget>
|
||||||
|
</outputTargets>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-jar-plugin</artifactId>
|
<artifactId>maven-jar-plugin</artifactId>
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
syntax = "proto3";
|
||||||
|
|
||||||
|
package daprtests;
|
||||||
|
|
||||||
|
option java_outer_classname = "MethodInvokeServiceProtos";
|
||||||
|
option java_package = "io.dapr.it";
|
||||||
|
|
||||||
|
service MethodInvokeService {
|
||||||
|
rpc PostMessage (PostMessageRequest) returns (PostMessageResponse) {}
|
||||||
|
rpc DeleteMessage (DeleteMessageRequest) returns (DeleteMessageResponse) {}
|
||||||
|
rpc GetMessages (GetMessagesRequest) returns (GetMessagesResponse) {}
|
||||||
|
rpc Sleep (SleepRequest) returns (SleepResponse) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
message PostMessageRequest {
|
||||||
|
int32 id = 1;
|
||||||
|
string message = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message PostMessageResponse {
|
||||||
|
}
|
||||||
|
|
||||||
|
message DeleteMessageRequest {
|
||||||
|
int32 id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message DeleteMessageResponse {
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetMessagesRequest {
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetMessagesResponse {
|
||||||
|
map<int32, string> messages = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SleepRequest {
|
||||||
|
int32 seconds = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message SleepResponse {
|
||||||
|
}
|
|
@ -15,6 +15,9 @@ import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
|
||||||
|
import static io.dapr.client.DaprApiProtocol.GRPC;
|
||||||
|
import static io.dapr.client.DaprApiProtocol.HTTP;
|
||||||
|
|
||||||
public abstract class BaseIT {
|
public abstract class BaseIT {
|
||||||
|
|
||||||
protected static final String STATE_STORE_NAME = "statestore";
|
protected static final String STATE_STORE_NAME = "statestore";
|
||||||
|
@ -31,7 +34,16 @@ public abstract class BaseIT {
|
||||||
Class serviceClass,
|
Class serviceClass,
|
||||||
Boolean useAppPort,
|
Boolean useAppPort,
|
||||||
int maxWaitMilliseconds) throws Exception {
|
int maxWaitMilliseconds) throws Exception {
|
||||||
return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC);
|
return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, GRPC);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static DaprRun startDaprApp(
|
||||||
|
String testName,
|
||||||
|
String successMessage,
|
||||||
|
Class serviceClass,
|
||||||
|
DaprApiProtocol appProtocol,
|
||||||
|
int maxWaitMilliseconds) throws Exception {
|
||||||
|
return startDaprApp(testName, successMessage, serviceClass, true, maxWaitMilliseconds, GRPC, appProtocol);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static DaprRun startDaprApp(
|
protected static DaprRun startDaprApp(
|
||||||
|
@ -41,14 +53,48 @@ public abstract class BaseIT {
|
||||||
Boolean useAppPort,
|
Boolean useAppPort,
|
||||||
int maxWaitMilliseconds,
|
int maxWaitMilliseconds,
|
||||||
DaprApiProtocol protocol) throws Exception {
|
DaprApiProtocol protocol) throws Exception {
|
||||||
return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, protocol);
|
return startDaprApp(
|
||||||
|
testName,
|
||||||
|
successMessage,
|
||||||
|
serviceClass,
|
||||||
|
useAppPort,
|
||||||
|
true,
|
||||||
|
maxWaitMilliseconds,
|
||||||
|
protocol,
|
||||||
|
HTTP);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static DaprRun startDaprApp(
|
||||||
|
String testName,
|
||||||
|
String successMessage,
|
||||||
|
Class serviceClass,
|
||||||
|
Boolean useAppPort,
|
||||||
|
int maxWaitMilliseconds,
|
||||||
|
DaprApiProtocol protocol,
|
||||||
|
DaprApiProtocol appProtocol) throws Exception {
|
||||||
|
return startDaprApp(
|
||||||
|
testName,
|
||||||
|
successMessage,
|
||||||
|
serviceClass,
|
||||||
|
useAppPort,
|
||||||
|
true,
|
||||||
|
maxWaitMilliseconds,
|
||||||
|
protocol,
|
||||||
|
appProtocol);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static DaprRun startDaprApp(
|
protected static DaprRun startDaprApp(
|
||||||
String testName,
|
String testName,
|
||||||
int maxWaitMilliseconds) throws Exception {
|
int maxWaitMilliseconds) throws Exception {
|
||||||
return startDaprApp(
|
return startDaprApp(
|
||||||
testName, "You're up and running!", null, false, true, maxWaitMilliseconds, DaprApiProtocol.GRPC);
|
testName,
|
||||||
|
"You're up and running!",
|
||||||
|
null,
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
maxWaitMilliseconds,
|
||||||
|
GRPC,
|
||||||
|
HTTP);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static DaprRun startDaprApp(
|
protected static DaprRun startDaprApp(
|
||||||
|
@ -58,13 +104,15 @@ public abstract class BaseIT {
|
||||||
Boolean useAppPort,
|
Boolean useAppPort,
|
||||||
Boolean useDaprPorts,
|
Boolean useDaprPorts,
|
||||||
int maxWaitMilliseconds,
|
int maxWaitMilliseconds,
|
||||||
DaprApiProtocol protocol) throws Exception {
|
DaprApiProtocol protocol,
|
||||||
|
DaprApiProtocol appProtocol) throws Exception {
|
||||||
DaprRun.Builder builder = new DaprRun.Builder(
|
DaprRun.Builder builder = new DaprRun.Builder(
|
||||||
testName,
|
testName,
|
||||||
() -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
|
() -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
|
||||||
successMessage,
|
successMessage,
|
||||||
maxWaitMilliseconds,
|
maxWaitMilliseconds,
|
||||||
protocol).withServiceClass(serviceClass);
|
protocol,
|
||||||
|
appProtocol).withServiceClass(serviceClass);
|
||||||
DaprRun run = builder.build();
|
DaprRun run = builder.build();
|
||||||
TO_BE_STOPPED.add(run);
|
TO_BE_STOPPED.add(run);
|
||||||
DAPR_RUN_BUILDERS.put(run.getAppName(), builder);
|
DAPR_RUN_BUILDERS.put(run.getAppName(), builder);
|
||||||
|
@ -83,19 +131,38 @@ public abstract class BaseIT {
|
||||||
testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC);
|
testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
|
||||||
|
String testName,
|
||||||
|
String successMessage,
|
||||||
|
Class serviceClass,
|
||||||
|
Boolean useAppPort,
|
||||||
|
int maxWaitMilliseconds,
|
||||||
|
DaprApiProtocol protocol) throws Exception {
|
||||||
|
return startSplitDaprAndApp(
|
||||||
|
testName,
|
||||||
|
successMessage,
|
||||||
|
serviceClass,
|
||||||
|
useAppPort,
|
||||||
|
maxWaitMilliseconds,
|
||||||
|
protocol,
|
||||||
|
HTTP);
|
||||||
|
}
|
||||||
|
|
||||||
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
|
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
|
||||||
String testName,
|
String testName,
|
||||||
String successMessage,
|
String successMessage,
|
||||||
Class serviceClass,
|
Class serviceClass,
|
||||||
Boolean useAppPort,
|
Boolean useAppPort,
|
||||||
int maxWaitMilliseconds,
|
int maxWaitMilliseconds,
|
||||||
DaprApiProtocol protocol) throws Exception {
|
DaprApiProtocol protocol,
|
||||||
|
DaprApiProtocol appProtocol) throws Exception {
|
||||||
DaprRun.Builder builder = new DaprRun.Builder(
|
DaprRun.Builder builder = new DaprRun.Builder(
|
||||||
testName,
|
testName,
|
||||||
() -> DaprPorts.build(useAppPort, true, true),
|
() -> DaprPorts.build(useAppPort, true, true),
|
||||||
successMessage,
|
successMessage,
|
||||||
maxWaitMilliseconds,
|
maxWaitMilliseconds,
|
||||||
protocol).withServiceClass(serviceClass);
|
protocol,
|
||||||
|
appProtocol).withServiceClass(serviceClass);
|
||||||
ImmutablePair<AppRun, DaprRun> runs = builder.splitBuild();
|
ImmutablePair<AppRun, DaprRun> runs = builder.splitBuild();
|
||||||
TO_BE_STOPPED.add(runs.left);
|
TO_BE_STOPPED.add(runs.left);
|
||||||
TO_BE_STOPPED.add(runs.right);
|
TO_BE_STOPPED.add(runs.right);
|
||||||
|
|
|
@ -20,7 +20,7 @@ public class DaprRun implements Stoppable {
|
||||||
|
|
||||||
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
|
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
|
||||||
|
|
||||||
private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components";
|
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s --components-path ./components";
|
||||||
|
|
||||||
// the arg in -Dexec.args is the app's port
|
// the arg in -Dexec.args is the app's port
|
||||||
private static final String DAPR_COMMAND =
|
private static final String DAPR_COMMAND =
|
||||||
|
@ -45,11 +45,12 @@ public class DaprRun implements Stoppable {
|
||||||
String successMessage,
|
String successMessage,
|
||||||
Class serviceClass,
|
Class serviceClass,
|
||||||
int maxWaitMilliseconds,
|
int maxWaitMilliseconds,
|
||||||
DaprApiProtocol protocol) {
|
DaprApiProtocol protocol,
|
||||||
|
DaprApiProtocol appProtocol) {
|
||||||
// The app name needs to be deterministic since we depend on it to kill previous runs.
|
// The app name needs to be deterministic since we depend on it to kill previous runs.
|
||||||
this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName());
|
this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName());
|
||||||
this.startCommand =
|
this.startCommand =
|
||||||
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol));
|
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol, appProtocol));
|
||||||
this.listCommand = new Command(
|
this.listCommand = new Command(
|
||||||
this.appName,
|
this.appName,
|
||||||
"dapr list");
|
"dapr list");
|
||||||
|
@ -139,18 +140,28 @@ public class DaprRun implements Stoppable {
|
||||||
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort()));
|
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort()));
|
||||||
}
|
}
|
||||||
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
|
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
|
||||||
|
System.getProperties().setProperty(
|
||||||
|
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
|
||||||
|
DaprApiProtocol.GRPC.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void switchToGRPC() {
|
public void switchToGRPC() {
|
||||||
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
|
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name());
|
||||||
|
System.getProperties().setProperty(
|
||||||
|
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
|
||||||
|
DaprApiProtocol.GRPC.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void switchToHTTP() {
|
public void switchToHTTP() {
|
||||||
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name());
|
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name());
|
||||||
|
System.getProperties().setProperty(
|
||||||
|
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
|
||||||
|
DaprApiProtocol.HTTP.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void switchToProtocol(DaprApiProtocol protocol) {
|
public void switchToProtocol(DaprApiProtocol protocol) {
|
||||||
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), protocol.name());
|
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), protocol.name());
|
||||||
|
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getGrpcPort() {
|
public int getGrpcPort() {
|
||||||
|
@ -170,16 +181,17 @@ public class DaprRun implements Stoppable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String buildDaprCommand(
|
private static String buildDaprCommand(
|
||||||
String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol) {
|
String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) {
|
||||||
StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName))
|
StringBuilder stringBuilder =
|
||||||
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
|
new StringBuilder(String.format(DAPR_RUN, appName, appProtocol.toString().toLowerCase()))
|
||||||
.append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "")
|
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
|
||||||
.append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "")
|
.append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "")
|
||||||
.append(serviceClass == null ? "" :
|
.append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "")
|
||||||
String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
|
.append(serviceClass == null ? "" :
|
||||||
ports.getAppPort() != null ? ports.getAppPort().toString() : "",
|
String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
|
||||||
Properties.API_PROTOCOL.getName(), protocol,
|
ports.getAppPort() != null ? ports.getAppPort().toString() : "",
|
||||||
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol));
|
Properties.API_PROTOCOL.getName(), protocol,
|
||||||
|
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol));
|
||||||
return stringBuilder.toString();
|
return stringBuilder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,17 +222,21 @@ public class DaprRun implements Stoppable {
|
||||||
|
|
||||||
private DaprApiProtocol protocol;
|
private DaprApiProtocol protocol;
|
||||||
|
|
||||||
|
private DaprApiProtocol appProtocol;
|
||||||
|
|
||||||
Builder(
|
Builder(
|
||||||
String testName,
|
String testName,
|
||||||
Supplier<DaprPorts> portsSupplier,
|
Supplier<DaprPorts> portsSupplier,
|
||||||
String successMessage,
|
String successMessage,
|
||||||
int maxWaitMilliseconds,
|
int maxWaitMilliseconds,
|
||||||
DaprApiProtocol protocol) {
|
DaprApiProtocol protocol,
|
||||||
|
DaprApiProtocol appProtocol) {
|
||||||
this.testName = testName;
|
this.testName = testName;
|
||||||
this.portsSupplier = portsSupplier;
|
this.portsSupplier = portsSupplier;
|
||||||
this.successMessage = successMessage;
|
this.successMessage = successMessage;
|
||||||
this.maxWaitMilliseconds = maxWaitMilliseconds;
|
this.maxWaitMilliseconds = maxWaitMilliseconds;
|
||||||
this.protocol = protocol;
|
this.protocol = protocol;
|
||||||
|
this.appProtocol = appProtocol;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withServiceClass(Class serviceClass) {
|
public Builder withServiceClass(Class serviceClass) {
|
||||||
|
@ -235,7 +251,8 @@ public class DaprRun implements Stoppable {
|
||||||
this.successMessage,
|
this.successMessage,
|
||||||
this.serviceClass,
|
this.serviceClass,
|
||||||
this.maxWaitMilliseconds,
|
this.maxWaitMilliseconds,
|
||||||
this.protocol);
|
this.protocol,
|
||||||
|
this.appProtocol);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,7 +274,8 @@ public class DaprRun implements Stoppable {
|
||||||
DAPR_SUCCESS_MESSAGE,
|
DAPR_SUCCESS_MESSAGE,
|
||||||
null,
|
null,
|
||||||
this.maxWaitMilliseconds,
|
this.maxWaitMilliseconds,
|
||||||
this.protocol);
|
this.protocol,
|
||||||
|
this.appProtocol);
|
||||||
|
|
||||||
return new ImmutablePair<>(appRun, daprRun);
|
return new ImmutablePair<>(appRun, daprRun);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
package io.dapr.it.methodinvoke.grpc;
|
||||||
|
|
||||||
|
import io.dapr.client.DaprApiProtocol;
|
||||||
|
import io.dapr.client.DaprClient;
|
||||||
|
import io.dapr.client.DaprClientBuilder;
|
||||||
|
import io.dapr.client.domain.HttpExtension;
|
||||||
|
import io.dapr.exceptions.DaprException;
|
||||||
|
import io.dapr.it.BaseIT;
|
||||||
|
import io.dapr.it.DaprRun;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.runners.Parameterized.Parameter;
|
||||||
|
import static org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
public class MethodInvokeIT extends BaseIT {
|
||||||
|
|
||||||
|
//Number of messages to be sent: 10
|
||||||
|
private static final int NUM_MESSAGES = 10;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parameters for this test.
|
||||||
|
* Param #1: useGrpc.
|
||||||
|
* @return Collection of parameter tuples.
|
||||||
|
*/
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> data() {
|
||||||
|
return Arrays.asList(new Object[][] { { false }, { true } });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run of a Dapr application.
|
||||||
|
*/
|
||||||
|
private DaprRun daprRun = null;
|
||||||
|
|
||||||
|
@Parameter
|
||||||
|
public boolean useGrpc;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws Exception {
|
||||||
|
daprRun = startDaprApp(
|
||||||
|
MethodInvokeIT.class.getSimpleName(),
|
||||||
|
MethodInvokeService.SUCCESS_MESSAGE,
|
||||||
|
MethodInvokeService.class,
|
||||||
|
DaprApiProtocol.GRPC, // appProtocol
|
||||||
|
60000);
|
||||||
|
|
||||||
|
if (this.useGrpc) {
|
||||||
|
daprRun.switchToGRPC();
|
||||||
|
} else {
|
||||||
|
daprRun.switchToHTTP();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait since service might be ready even after port is available.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvoke() throws Exception {
|
||||||
|
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||||
|
for (int i = 0; i < NUM_MESSAGES; i++) {
|
||||||
|
String message = String.format("This is message #%d", i);
|
||||||
|
|
||||||
|
PostMessageRequest req = PostMessageRequest.newBuilder().setId(i).setMessage(message).build();
|
||||||
|
client.invokeMethod(daprRun.getAppName(), "postMessage", req, HttpExtension.POST).block();
|
||||||
|
System.out.println("Invoke method messages : " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Integer, String> messages = client.invokeMethod(
|
||||||
|
daprRun.getAppName(),
|
||||||
|
"getMessages",
|
||||||
|
GetMessagesRequest.newBuilder().build(),
|
||||||
|
HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap();
|
||||||
|
assertEquals(10, messages.size());
|
||||||
|
|
||||||
|
// Delete one message.
|
||||||
|
client.invokeMethod(
|
||||||
|
daprRun.getAppName(),
|
||||||
|
"deleteMessage",
|
||||||
|
DeleteMessageRequest.newBuilder().setId(1).build(),
|
||||||
|
HttpExtension.POST).block();
|
||||||
|
messages = client.invokeMethod(
|
||||||
|
daprRun.getAppName(),
|
||||||
|
"getMessages",
|
||||||
|
GetMessagesRequest.newBuilder().build(),
|
||||||
|
HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap();
|
||||||
|
assertEquals(9, messages.size());
|
||||||
|
|
||||||
|
// Now update one message.
|
||||||
|
client.invokeMethod(
|
||||||
|
daprRun.getAppName(),
|
||||||
|
"postMessage",
|
||||||
|
PostMessageRequest.newBuilder().setId(2).setMessage("updated message").build(),
|
||||||
|
HttpExtension.POST).block();
|
||||||
|
messages = client.invokeMethod(
|
||||||
|
daprRun.getAppName(),
|
||||||
|
"getMessages",
|
||||||
|
GetMessagesRequest.newBuilder().build(),
|
||||||
|
HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap();
|
||||||
|
assertEquals("updated message", messages.get(2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvokeTimeout() throws Exception {
|
||||||
|
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
|
||||||
|
String message = assertThrows(IllegalStateException.class, () ->
|
||||||
|
client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST)
|
||||||
|
.block(Duration.ofMillis(10))).getMessage();
|
||||||
|
long delay = System.currentTimeMillis() - started;
|
||||||
|
assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out.
|
||||||
|
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvokeException() throws Exception {
|
||||||
|
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||||
|
SleepRequest req = SleepRequest.newBuilder().setSeconds(-9).build();
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () ->
|
||||||
|
client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block());
|
||||||
|
|
||||||
|
assertEquals("UNKNOWN", exception.getErrorCode());
|
||||||
|
if (this.useGrpc) {
|
||||||
|
assertEquals("UNKNOWN: ", exception.getMessage());
|
||||||
|
} else {
|
||||||
|
assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,162 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) Microsoft Corporation.
|
||||||
|
* Licensed under the MIT License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.dapr.it.methodinvoke.grpc;
|
||||||
|
|
||||||
|
import com.google.protobuf.Any;
|
||||||
|
import io.dapr.v1.AppCallbackGrpc;
|
||||||
|
import io.dapr.v1.CommonProtos;
|
||||||
|
import io.grpc.Server;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageResponse;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
|
||||||
|
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
|
||||||
|
|
||||||
|
public class MethodInvokeService {
|
||||||
|
|
||||||
|
public static final String SUCCESS_MESSAGE = "application discovered on port ";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server mode: class that encapsulates all server-side logic for Grpc.
|
||||||
|
*/
|
||||||
|
private static class MyDaprService extends AppCallbackGrpc.AppCallbackImplBase {
|
||||||
|
|
||||||
|
private final Map<Integer, String> messages = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server mode: Grpc server.
|
||||||
|
*/
|
||||||
|
private Server server;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server mode: starts listening on given port.
|
||||||
|
*
|
||||||
|
* @param port Port to listen on.
|
||||||
|
* @throws IOException Errors while trying to start service.
|
||||||
|
*/
|
||||||
|
private void start(int port) throws IOException {
|
||||||
|
this.server = ServerBuilder
|
||||||
|
.forPort(port)
|
||||||
|
.addService(this)
|
||||||
|
.build()
|
||||||
|
.start();
|
||||||
|
System.out.printf("Server: started listening on port %d\n", port);
|
||||||
|
|
||||||
|
// Now we handle ctrl+c (or any other JVM shutdown)
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||||
|
System.out.println("Server: shutting down gracefully ...");
|
||||||
|
MyDaprService.this.server.shutdown();
|
||||||
|
System.out.println("Server: Bye.");
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server mode: waits for shutdown trigger.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException Propagated interrupted exception.
|
||||||
|
*/
|
||||||
|
private void awaitTermination() throws InterruptedException {
|
||||||
|
if (this.server != null) {
|
||||||
|
this.server.awaitTermination();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Server mode: this is the Dapr method to receive Invoke operations via Grpc.
|
||||||
|
*
|
||||||
|
* @param request Dapr envelope request,
|
||||||
|
* @param responseObserver Dapr envelope response.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onInvoke(CommonProtos.InvokeRequest request,
|
||||||
|
StreamObserver<CommonProtos.InvokeResponse> responseObserver) {
|
||||||
|
try {
|
||||||
|
if ("postMessage".equals(request.getMethod())) {
|
||||||
|
PostMessageRequest req = PostMessageRequest.parseFrom(request.getData().getValue().toByteArray());
|
||||||
|
|
||||||
|
this.messages.put(req.getId(), req.getMessage());
|
||||||
|
|
||||||
|
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
|
||||||
|
responseBuilder.setData(Any.pack(PostMessageResponse.newBuilder().build()));
|
||||||
|
responseObserver.onNext(responseBuilder.build());
|
||||||
|
}
|
||||||
|
if ("deleteMessage".equals(request.getMethod())) {
|
||||||
|
DeleteMessageRequest req = DeleteMessageRequest.parseFrom(request.getData().getValue().toByteArray());
|
||||||
|
|
||||||
|
this.messages.remove(req.getId());
|
||||||
|
|
||||||
|
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
|
||||||
|
responseBuilder.setData(Any.pack(DeleteMessageResponse.newBuilder().build()));
|
||||||
|
responseObserver.onNext(responseBuilder.build());
|
||||||
|
}
|
||||||
|
if ("getMessages".equals(request.getMethod())) {
|
||||||
|
GetMessagesRequest.parseFrom(request.getData().getValue().toByteArray());
|
||||||
|
|
||||||
|
GetMessagesResponse res = GetMessagesResponse.newBuilder().putAllMessages(this.messages).build();
|
||||||
|
|
||||||
|
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
|
||||||
|
responseBuilder.setData(Any.pack(res));
|
||||||
|
responseObserver.onNext(responseBuilder.build());
|
||||||
|
}
|
||||||
|
if ("sleep".equals(request.getMethod())) {
|
||||||
|
SleepRequest req = SleepRequest.parseFrom(request.getData().getValue().toByteArray());
|
||||||
|
|
||||||
|
SleepResponse res = this.sleep(req);
|
||||||
|
|
||||||
|
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
|
||||||
|
responseBuilder.setData(Any.pack(res));
|
||||||
|
responseObserver.onNext(responseBuilder.build());
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
responseObserver.onError(e);
|
||||||
|
} finally {
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public SleepResponse sleep(SleepRequest request) {
|
||||||
|
if (request.getSeconds() < 0) {
|
||||||
|
throw new IllegalArgumentException("Sleep time cannot be negative.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(request.getSeconds() * 1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now respond with current timestamp.
|
||||||
|
return SleepResponse.newBuilder().build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the main method of this app.
|
||||||
|
* @param args The port to listen on.
|
||||||
|
* @throws Exception An Exception.
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
int port = Integer.parseInt(args[0]);
|
||||||
|
|
||||||
|
System.out.printf("Service starting on port %d ...\n", port);
|
||||||
|
|
||||||
|
final MyDaprService service = new MyDaprService();
|
||||||
|
service.start(port);
|
||||||
|
service.awaitTermination();
|
||||||
|
}
|
||||||
|
}
|
|
@ -82,6 +82,9 @@ public class MethodInvokeController {
|
||||||
|
|
||||||
@PostMapping(path = "/sleep")
|
@PostMapping(path = "/sleep")
|
||||||
public void sleep(@RequestBody int seconds) throws InterruptedException {
|
public void sleep(@RequestBody int seconds) throws InterruptedException {
|
||||||
|
if (seconds < 0) {
|
||||||
|
throw new IllegalArgumentException("Sleep time cannot be negative.");
|
||||||
|
}
|
||||||
Thread.sleep(seconds * 1000);
|
Thread.sleep(seconds * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,10 @@ package io.dapr.it.methodinvoke.http;
|
||||||
import io.dapr.client.DaprClient;
|
import io.dapr.client.DaprClient;
|
||||||
import io.dapr.client.DaprClientBuilder;
|
import io.dapr.client.DaprClientBuilder;
|
||||||
import io.dapr.client.domain.HttpExtension;
|
import io.dapr.client.domain.HttpExtension;
|
||||||
|
import io.dapr.exceptions.DaprException;
|
||||||
import io.dapr.it.BaseIT;
|
import io.dapr.it.BaseIT;
|
||||||
import io.dapr.it.DaprRun;
|
import io.dapr.it.DaprRun;
|
||||||
|
import io.dapr.it.MethodInvokeServiceProtos;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -19,6 +21,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertThrows;
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.runners.Parameterized.Parameter;
|
import static org.junit.runners.Parameterized.Parameter;
|
||||||
|
@ -143,4 +146,17 @@ public class MethodInvokeIT extends BaseIT {
|
||||||
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
assertEquals("Timeout on blocking read for 10 MILLISECONDS", message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInvokeException() throws Exception {
|
||||||
|
try (DaprClient client = new DaprClientBuilder().build()) {
|
||||||
|
MethodInvokeServiceProtos.SleepRequest req = MethodInvokeServiceProtos.SleepRequest.newBuilder().setSeconds(-9).build();
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () ->
|
||||||
|
client.invokeMethod(daprRun.getAppName(), "sleep", -9, HttpExtension.POST).block());
|
||||||
|
|
||||||
|
assertEquals("UNKNOWN", exception.getErrorCode());
|
||||||
|
assertNotNull(exception.getMessage());
|
||||||
|
assertTrue(exception.getMessage().contains("Internal Server Error"));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@
|
||||||
|
|
||||||
package io.dapr.client;
|
package io.dapr.client;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParseException;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.dapr.client.domain.Metadata;
|
import io.dapr.client.domain.Metadata;
|
||||||
import io.dapr.config.Properties;
|
import io.dapr.config.Properties;
|
||||||
|
@ -308,14 +307,14 @@ public class DaprHttp implements AutoCloseable {
|
||||||
* @param json Response body from Dapr.
|
* @param json Response body from Dapr.
|
||||||
* @return DaprError or null if could not parse.
|
* @return DaprError or null if could not parse.
|
||||||
*/
|
*/
|
||||||
private static DaprError parseDaprError(byte[] json) throws IOException {
|
private static DaprError parseDaprError(byte[] json) {
|
||||||
if ((json == null) || (json.length == 0)) {
|
if ((json == null) || (json.length == 0)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return OBJECT_MAPPER.readValue(json, DaprError.class);
|
return OBJECT_MAPPER.readValue(json, DaprError.class);
|
||||||
} catch (JsonParseException e) {
|
} catch (IOException e) {
|
||||||
throw new DaprException("UNKNOWN", new String(json, StandardCharsets.UTF_8));
|
throw new DaprException("UNKNOWN", new String(json, StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -347,14 +346,24 @@ public class DaprHttp implements AutoCloseable {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
|
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
|
||||||
if (!response.isSuccessful()) {
|
if (!response.isSuccessful()) {
|
||||||
DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response));
|
try {
|
||||||
if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) {
|
DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response));
|
||||||
future.completeExceptionally(new DaprException(error));
|
if ((error != null) && (error.getErrorCode() != null)) {
|
||||||
|
if (error.getMessage() != null) {
|
||||||
|
future.completeExceptionally(new DaprException(error));
|
||||||
|
} else {
|
||||||
|
future.completeExceptionally(
|
||||||
|
new DaprException(error.getErrorCode(), "HTTP status code: " + response.code()));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code()));
|
||||||
|
return;
|
||||||
|
} catch (DaprException e) {
|
||||||
|
future.completeExceptionally(e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code()));
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, String> mapHeaders = new HashMap<>();
|
Map<String, String> mapHeaders = new HashMap<>();
|
||||||
|
|
|
@ -10,10 +10,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.JavaType;
|
import com.fasterxml.jackson.databind.JavaType;
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
import io.dapr.client.domain.CloudEvent;
|
import io.dapr.client.domain.CloudEvent;
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Serializes and deserializes an internal object.
|
* Serializes and deserializes an internal object.
|
||||||
|
@ -54,6 +56,11 @@ public class ObjectSerializer {
|
||||||
return (byte[]) state;
|
return (byte[]) state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Proto buffer class is serialized directly.
|
||||||
|
if (state instanceof MessageLite) {
|
||||||
|
return ((MessageLite) state).toByteArray();
|
||||||
|
}
|
||||||
|
|
||||||
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
// Not string, not primitive, so it is a complex type: we use JSON for that.
|
||||||
return OBJECT_MAPPER.writeValueAsBytes(state);
|
return OBJECT_MAPPER.writeValueAsBytes(state);
|
||||||
}
|
}
|
||||||
|
@ -94,7 +101,7 @@ public class ObjectSerializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (content == null) {
|
if (content == null) {
|
||||||
return (T) null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
|
// Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
|
||||||
|
@ -103,13 +110,26 @@ public class ObjectSerializer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (content.length == 0) {
|
if (content.length == 0) {
|
||||||
return (T) null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (javaType.hasRawClass(CloudEvent.class)) {
|
if (javaType.hasRawClass(CloudEvent.class)) {
|
||||||
return (T) CloudEvent.deserialize(content);
|
return (T) CloudEvent.deserialize(content);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (javaType.isTypeOrSubTypeOf(MessageLite.class)) {
|
||||||
|
try {
|
||||||
|
Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class);
|
||||||
|
if (method != null) {
|
||||||
|
return (T) method.invoke(null, content);
|
||||||
|
}
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
// It was a best effort. Skip this try.
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return OBJECT_MAPPER.readValue(content, javaType);
|
return OBJECT_MAPPER.readValue(content, javaType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,13 @@
|
||||||
|
|
||||||
package io.dapr.exceptions;
|
package io.dapr.exceptions;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||||
|
import io.grpc.Status;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an error message from Dapr.
|
* Represents an error message from Dapr.
|
||||||
*/
|
*/
|
||||||
|
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
|
||||||
public class DaprError {
|
public class DaprError {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -20,12 +24,20 @@ public class DaprError {
|
||||||
*/
|
*/
|
||||||
private String message;
|
private String message;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Error code from gRPC.
|
||||||
|
*/
|
||||||
|
private Integer code;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the error code.
|
* Gets the error code.
|
||||||
*
|
*
|
||||||
* @return Error code.
|
* @return Error code.
|
||||||
*/
|
*/
|
||||||
public String getErrorCode() {
|
public String getErrorCode() {
|
||||||
|
if ((errorCode == null) && (code != null)) {
|
||||||
|
return Status.fromCodeValue(code).getCode().name();
|
||||||
|
}
|
||||||
return errorCode;
|
return errorCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ import io.dapr.client.domain.State;
|
||||||
import io.dapr.client.domain.StateOptions;
|
import io.dapr.client.domain.StateOptions;
|
||||||
import io.dapr.client.domain.TransactionalStateOperation;
|
import io.dapr.client.domain.TransactionalStateOperation;
|
||||||
import io.dapr.config.Properties;
|
import io.dapr.config.Properties;
|
||||||
|
import io.dapr.exceptions.DaprException;
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
import okhttp3.OkHttpClient;
|
import okhttp3.OkHttpClient;
|
||||||
import okhttp3.ResponseBody;
|
import okhttp3.ResponseBody;
|
||||||
|
@ -189,13 +190,80 @@ public class DaprClientHttpTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void invokeServiceDaprError() {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod")
|
||||||
|
.respond(500,
|
||||||
|
ResponseBody.create(
|
||||||
|
"{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}",
|
||||||
|
MediaTypes.MEDIATYPE_JSON));
|
||||||
|
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () -> {
|
||||||
|
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("MYCODE", exception.getErrorCode());
|
||||||
|
assertEquals("MYCODE: My Message", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void invokeServiceDaprErrorFromGRPC() {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod")
|
||||||
|
.respond(500,
|
||||||
|
ResponseBody.create(
|
||||||
|
"{ \"code\": 7 }",
|
||||||
|
MediaTypes.MEDIATYPE_JSON));
|
||||||
|
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () -> {
|
||||||
|
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("PERMISSION_DENIED", exception.getErrorCode());
|
||||||
|
assertEquals("PERMISSION_DENIED: HTTP status code: 500", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void invokeServiceDaprErrorUnknownJSON() {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod")
|
||||||
|
.respond(500,
|
||||||
|
ResponseBody.create(
|
||||||
|
"{ \"anything\": 7 }",
|
||||||
|
MediaTypes.MEDIATYPE_JSON));
|
||||||
|
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () -> {
|
||||||
|
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("UNKNOWN", exception.getErrorCode());
|
||||||
|
assertEquals("UNKNOWN: { \"anything\": 7 }", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void invokeServiceDaprErrorEmptyString() {
|
||||||
|
mockInterceptor.addRule()
|
||||||
|
.post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod")
|
||||||
|
.respond(500,
|
||||||
|
ResponseBody.create(
|
||||||
|
"",
|
||||||
|
MediaTypes.MEDIATYPE_JSON));
|
||||||
|
|
||||||
|
DaprException exception = assertThrows(DaprException.class, () -> {
|
||||||
|
daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block();
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals("UNKNOWN", exception.getErrorCode());
|
||||||
|
assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeServiceMethodNull() {
|
public void invokeServiceMethodNull() {
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/publish/A")
|
.post("http://127.0.0.1:3000/v1.0/publish/A")
|
||||||
.respond(EXPECTED_RESULT);
|
.respond(EXPECTED_RESULT);
|
||||||
String event = "{ \"message\": \"This is a test\" }";
|
|
||||||
|
|
||||||
assertThrows(IllegalArgumentException.class, () ->
|
assertThrows(IllegalArgumentException.class, () ->
|
||||||
daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block());
|
daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block());
|
||||||
|
@ -337,7 +405,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseNull() {
|
public void invokeBindingResponseNull() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond(new byte[0]);
|
.respond(new byte[0]);
|
||||||
|
@ -348,7 +415,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseObject() {
|
public void invokeBindingResponseObject() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("\"OK\"");
|
.respond("\"OK\"");
|
||||||
|
@ -370,7 +436,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseFloat() {
|
public void invokeBindingResponseFloat() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("1.5");
|
.respond("1.5");
|
||||||
|
@ -381,7 +446,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseChar() {
|
public void invokeBindingResponseChar() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("\"a\"");
|
.respond("\"a\"");
|
||||||
|
@ -392,7 +456,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseByte() {
|
public void invokeBindingResponseByte() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("\"2\"");
|
.respond("\"2\"");
|
||||||
|
@ -403,7 +466,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseLong() {
|
public void invokeBindingResponseLong() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("1");
|
.respond("1");
|
||||||
|
@ -414,7 +476,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingResponseInt() {
|
public void invokeBindingResponseInt() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond("1");
|
.respond("1");
|
||||||
|
@ -425,7 +486,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingNullName() {
|
public void invokeBindingNullName() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond(EXPECTED_RESULT);
|
.respond(EXPECTED_RESULT);
|
||||||
|
@ -436,7 +496,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void invokeBindingNullOpName() {
|
public void invokeBindingNullOpName() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond(EXPECTED_RESULT);
|
.respond(EXPECTED_RESULT);
|
||||||
|
@ -447,7 +506,6 @@ public class DaprClientHttpTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void bindingNoHotMono() {
|
public void bindingNoHotMono() {
|
||||||
Map<String, String> map = new HashMap<>();
|
|
||||||
mockInterceptor.addRule()
|
mockInterceptor.addRule()
|
||||||
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
|
||||||
.respond(EXPECTED_RESULT);
|
.respond(EXPECTED_RESULT);
|
||||||
|
|
|
@ -5,12 +5,19 @@
|
||||||
|
|
||||||
package io.dapr.serializer;
|
package io.dapr.serializer;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.CodedOutputStream;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
import com.google.protobuf.Parser;
|
||||||
import io.dapr.client.domain.CloudEvent;
|
import io.dapr.client.domain.CloudEvent;
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
|
import io.dapr.v1.CommonProtos;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -23,6 +30,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -393,6 +401,32 @@ public class DefaultObjectSerializerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void serializeProtoTest() throws Exception {
|
||||||
|
CommonProtos.Etag valueToSerialize = CommonProtos.Etag.newBuilder().setValue("myValue").build();
|
||||||
|
String expectedSerializedBase64Value = "CgdteVZhbHVl";
|
||||||
|
|
||||||
|
byte[] serializedValue = SERIALIZER.serialize(valueToSerialize);
|
||||||
|
assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue));
|
||||||
|
assertNotNull(serializedValue);
|
||||||
|
CommonProtos.Etag deserializedValue = SERIALIZER.deserialize(serializedValue, CommonProtos.Etag.class);
|
||||||
|
assertEquals(valueToSerialize.getValue(), deserializedValue.getValue());
|
||||||
|
assertEquals(valueToSerialize, deserializedValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void serializeFakeProtoTest() throws Exception {
|
||||||
|
FakeProtoClass valueToSerialize = new FakeProtoClass();
|
||||||
|
String expectedSerializedBase64Value = "AQ==";
|
||||||
|
|
||||||
|
byte[] serializedValue = SERIALIZER.serialize(valueToSerialize);
|
||||||
|
assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue));
|
||||||
|
assertNotNull(serializedValue);
|
||||||
|
|
||||||
|
// Tries to parse as JSON since FakeProtoClass does not have `parseFrom()` static method.
|
||||||
|
assertThrows(JsonParseException.class, () -> SERIALIZER.deserialize(serializedValue, FakeProtoClass.class));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void deserializeObjectTest() {
|
public void deserializeObjectTest() {
|
||||||
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
|
String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}";
|
||||||
|
@ -854,4 +888,63 @@ public class DefaultObjectSerializerTest {
|
||||||
|
|
||||||
return "\"" + content + "\"";
|
return "\"" + content + "\"";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that simulates a proto class implementing MessageLite but does not have `parseFrom()` static method.
|
||||||
|
*/
|
||||||
|
public static final class FakeProtoClass implements MessageLite {
|
||||||
|
@Override
|
||||||
|
public void writeTo(CodedOutputStream codedOutputStream) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getSerializedSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Parser<? extends MessageLite> getParserForType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteString toByteString() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] toByteArray() {
|
||||||
|
return new byte[]{0x1};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(OutputStream outputStream) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeDelimitedTo(OutputStream outputStream) throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder newBuilderForType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Builder toBuilder() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageLite getDefaultInstanceForType() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isInitialized() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue