diff --git a/examples/pom.xml b/examples/pom.xml index 0947b19f2..857358b9f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,7 +24,7 @@ ${java.version} true false - 0.10.0 + 0.14.0 diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java index e810d48b8..16c684ca1 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java @@ -6,13 +6,18 @@ package io.dapr.examples; import io.dapr.examples.invoke.http.InvokeClient; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.api.trace.propagation.HttpTraceContext; -import io.opentelemetry.context.propagation.DefaultContextPropagators; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; @@ -21,6 +26,8 @@ import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.net.Socket; +import java.util.HashMap; +import java.util.Map; @Configuration @EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) @@ -31,22 +38,20 @@ public class OpenTelemetryConfig { private static final String ENDPOINT_V2_SPANS = "/api/v2/spans"; @Bean - public Tracer initTracer() throws Exception { - return createTracer("io.dapr.examples"); + public OpenTelemetry initOpenTelemetry() { + return createOpenTelemetry(); + } + + @Bean + public Tracer initTracer(@Autowired OpenTelemetry openTelemetry) { + return openTelemetry.getTracer(io.dapr.examples.tracing.InvokeClient.class.getCanonicalName()); } /** - * Creates an OpenTelemetry's tracer. - * @param instrumentationName Name of the instrumentation. - * @return New tracer's instance. + * Creates an opentelemetry instance. + * @return OpenTelemetry. */ - public static Tracer createTracer(String instrumentationName) { - OpenTelemetry.setGlobalPropagators( - DefaultContextPropagators.builder() - .addTextMapPropagator(HttpTraceContext.getInstance()) - .build()); - final Tracer tracer = OpenTelemetry.getGlobalTracer(instrumentationName); - + public static OpenTelemetry createOpenTelemetry() { // Only exports to Zipkin if it is up. Otherwise, ignore it. // This is helpful to avoid exceptions for examples that do not require Zipkin. if (isZipkinUp()) { @@ -57,13 +62,45 @@ public class OpenTelemetryConfig { .setServiceName(InvokeClient.class.getName()) .build(); - OpenTelemetrySdk.getGlobalTracerManagement() - .addSpanProcessor(SimpleSpanProcessor.builder(zipkinExporter).build()); + SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter)) + .build(); + + return OpenTelemetrySdk.builder() + .setTracerProvider(sdkTracerProvider) + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); } else { System.out.println("WARNING: Zipkin is not available."); } - return tracer; + return null; + } + + /** + * Converts current OpenTelemetry's context into Reactor's context. + * @return Reactor's context. + */ + public static reactor.util.context.Context getReactorContext() { + return getReactorContext(Context.current()); + } + + /** + * Converts given OpenTelemetry's context into Reactor's context. + * @param context OpenTelemetry's context. + * @return Reactor's context. + */ + public static reactor.util.context.Context getReactorContext(Context context) { + Map map = new HashMap<>(); + TextMapPropagator.Setter> setter = + (carrier, key, value) -> map.put(key, value); + + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter); + reactor.util.context.Context reactorContext = reactor.util.context.Context.empty(); + for (Map.Entry entry : map.entrySet()) { + reactorContext = reactorContext.put(entry.getKey(), entry.getValue()); + } + return reactorContext; } private static boolean isZipkinUp() { diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java index ba11db1fe..8f7c6e35e 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapPropagator; import org.jetbrains.annotations.Nullable; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; @@ -20,6 +21,10 @@ import java.util.Collections; @Component public class OpenTelemetryInterceptor implements HandlerInterceptor { + + @Autowired + private OpenTelemetry openTelemetry; + private static final TextMapPropagator.Getter HTTP_SERVLET_REQUEST_GETTER = new TextMapPropagator.Getter<>() { @Override @@ -37,7 +42,7 @@ public class OpenTelemetryInterceptor implements HandlerInterceptor { @Override public boolean preHandle( HttpServletRequest request, HttpServletResponse response, Object handler) { - final TextMapPropagator textFormat = OpenTelemetry.getGlobalPropagators().getTextMapPropagator(); + final TextMapPropagator textFormat = openTelemetry.getPropagators().getTextMapPropagator(); // preHandle is called twice for asynchronous request. For more information, read: // https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/AsyncHandlerInterceptor.html if (request.getDispatcherType() == DispatcherType.ASYNC) { diff --git a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java index 264c7ae8a..cfdcf0ee6 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java @@ -12,12 +12,14 @@ import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.client.domain.InvokeMethodRequestBuilder; import io.dapr.examples.OpenTelemetryConfig; import io.dapr.utils.TypeRef; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; +import static io.dapr.examples.OpenTelemetryConfig.getReactorContext; + /** * 1. Build and install jars: * mvn clean install @@ -39,15 +41,18 @@ public class InvokeClient { * @param args Messages to be sent as request for the invoke API. */ public static void main(String[] args) throws Exception { - Tracer tracer = OpenTelemetryConfig.createTracer(InvokeClient.class.getCanonicalName()); + final OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); + final Tracer tracer = openTelemetry.getTracer(InvokeClient.class.getCanonicalName()); Span span = tracer.spanBuilder("Example's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); try (DaprClient client = (new DaprClientBuilder()).build()) { for (String message : args) { try (Scope scope = span.makeCurrent()) { InvokeMethodRequestBuilder builder = new InvokeMethodRequestBuilder(SERVICE_APP_ID, "proxy_echo"); - InvokeMethodRequest request - = builder.withBody(message).withHttpExtension(HttpExtension.POST).withContext(Context.current()).build(); + InvokeMethodRequest request = builder + .withBody(message) + .withHttpExtension(HttpExtension.POST) + .withContext(getReactorContext()).build(); client.invokeMethod(request, TypeRef.get(byte[].class)) .map(r -> { System.out.println(new String(r.getObject())); @@ -74,4 +79,5 @@ public class InvokeClient { OpenTelemetrySdk.getGlobalTracerManagement().shutdown(); } + } diff --git a/examples/src/main/java/io/dapr/examples/tracing/README.md b/examples/src/main/java/io/dapr/examples/tracing/README.md index 6d0338bc5..90aa82b57 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/README.md +++ b/examples/src/main/java/io/dapr/examples/tracing/README.md @@ -145,16 +145,20 @@ public class TracingDemoMiddleServiceController { public Mono echo( @RequestAttribute(name = "opentelemetry-context") Context context, @RequestBody(required = false) String body) { - InvokeServiceRequestBuilder builder = new InvokeServiceRequestBuilder(INVOKE_APP_ID, "echo"); - InvokeServiceRequest request - = builder.withBody(body).withHttpExtension(HttpExtension.POST).withContext(context).build(); + InvokeMethodRequestBuilder builder = new InvokeMethodRequestBuilder(INVOKE_APP_ID, "echo"); + InvokeMethodRequest request = builder + .withBody(body) + .withHttpExtension(HttpExtension.POST) + .withContext(getReactorContext(context)).build(); return client.invokeMethod(request, TypeRef.get(byte[].class)).map(r -> r.getObject()); } // ... @PostMapping(path = "/proxy_sleep") public Mono sleep(@RequestAttribute(name = "opentelemetry-context") Context context) { - InvokeServiceRequestBuilder builder = new InvokeServiceRequestBuilder(INVOKE_APP_ID, "sleep"); - InvokeServiceRequest request = builder.withHttpExtension(HttpExtension.POST).withContext(context).build(); + InvokeMethodRequestBuilder builder = new InvokeMethodRequestBuilder(INVOKE_APP_ID, "sleep"); + InvokeMethodRequest request = builder + .withHttpExtension(HttpExtension.POST) + .withContext(getReactorContext(context)).build(); return client.invokeMethod(request, TypeRef.get(byte[].class)).then(); } } @@ -179,6 +183,29 @@ public class OpenTelemetryInterceptor implements HandlerInterceptor { } ``` +Then, `getReactorContext()` method is used to convert the OpenTelemetry's context to Reactor's context in the [OpenTelemetryConfig](../OpenTelemetryConfig.java) class: + +```java +@Configuration +@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) +public class OpenTelemetryConfig { + // ... + public static reactor.util.context.Context getReactorContext(Context context) { + Map map = new HashMap<>(); + TextMapPropagator.Setter> setter = + (carrier, key, value) -> map.put(key, value); + + GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter); + reactor.util.context.Context reactorContext = reactor.util.context.Context.empty(); + for (Map.Entry entry : map.entrySet()) { + reactorContext = reactorContext.put(entry.getKey(), entry.getValue()); + } + return reactorContext; + } + // ... +} +``` + Use the follow command to execute the service: ```sh diff --git a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java index 1cdcf3358..07a0eb9a3 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java +++ b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoMiddleServiceController.java @@ -19,6 +19,8 @@ import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; +import static io.dapr.examples.OpenTelemetryConfig.getReactorContext; + /** * SpringBoot Controller to handle service invocation. * @@ -47,8 +49,10 @@ public class TracingDemoMiddleServiceController { @RequestAttribute(name = "opentelemetry-context") Context context, @RequestBody(required = false) String body) { InvokeMethodRequestBuilder builder = new InvokeMethodRequestBuilder(INVOKE_APP_ID, "echo"); - InvokeMethodRequest request - = builder.withBody(body).withHttpExtension(HttpExtension.POST).withContext(context).build(); + InvokeMethodRequest request = builder + .withBody(body) + .withHttpExtension(HttpExtension.POST) + .withContext(getReactorContext(context)).build(); return client.invokeMethod(request, TypeRef.get(byte[].class)).map(r -> r.getObject()); } @@ -61,7 +65,9 @@ public class TracingDemoMiddleServiceController { @PostMapping(path = "/proxy_sleep") public Mono sleep(@RequestAttribute(name = "opentelemetry-context") Context context) { InvokeMethodRequestBuilder builder = new InvokeMethodRequestBuilder(INVOKE_APP_ID, "sleep"); - InvokeMethodRequest request = builder.withHttpExtension(HttpExtension.POST).withContext(context).build(); + InvokeMethodRequest request = builder + .withHttpExtension(HttpExtension.POST) + .withContext(getReactorContext(context)).build(); return client.invokeMethod(request, TypeRef.get(byte[].class)).then(); } diff --git a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoServiceController.java b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoServiceController.java index be333718e..7c54dc896 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/TracingDemoServiceController.java +++ b/examples/src/main/java/io/dapr/examples/tracing/TracingDemoServiceController.java @@ -7,8 +7,6 @@ package io.dapr.examples.tracing; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.examples.OpenTelemetryInterceptor; -import io.opentelemetry.api.trace.Tracer; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestHeader; @@ -39,12 +37,6 @@ public class TracingDemoServiceController { */ private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); - /** - * OpenTelemetry's tracer. - */ - @Autowired - private Tracer tracer; - /** * Handles the 'echo' method invocation. * diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java index 499068d96..033647d40 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java +++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprGrpcClient.java @@ -10,11 +10,8 @@ import com.google.protobuf.ByteString; import io.dapr.exceptions.DaprException; import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; -import io.opentelemetry.context.Context; import reactor.core.publisher.Mono; -import java.util.concurrent.Callable; - /** * A DaprClient over GRPC for Actor. */ diff --git a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java index 9869bc756..3bd79a2fe 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/client/DaprGrpcClientTest.java @@ -22,8 +22,6 @@ import static org.mockito.Mockito.*; public class DaprGrpcClientTest { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String ACTOR_TYPE = "MyActorType"; private static final String ACTOR_ID = "1234567890"; diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index b7c032f38..bc86a0285 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -26,9 +26,43 @@ 11 true 1.0.0-rc-2 + ${project.build.directory}/generated-sources + ${project.basedir}/proto + 1.33.1 + 3.13.0 + + commons-cli + commons-cli + 1.4 + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-api + ${grpc.version} + + + com.google.protobuf + protobuf-java-util + ${protobuf.version} + + + com.github.os72 + protoc-jar-maven-plugin + 3.10.1 + io.dapr dapr-sdk @@ -90,10 +124,49 @@ 3.9 test + + jakarta.annotation + jakarta.annotation-api + 1.3.5 + compile + + + com.github.os72 + protoc-jar-maven-plugin + 3.10.1 + + + generate-sources + + run + + + ${protobuf.version} + inputs + direct + true + + ${protobuf.input.directory} + + + + java + ${protobuf.output.directory} + + + grpc-java + ${protobuf.output.directory} + io.grpc:protoc-gen-grpc-java:${grpc.version} + + + + + + org.apache.maven.plugins maven-jar-plugin diff --git a/sdk-tests/proto/methodinvokeservice.proto b/sdk-tests/proto/methodinvokeservice.proto new file mode 100644 index 000000000..b16d0526a --- /dev/null +++ b/sdk-tests/proto/methodinvokeservice.proto @@ -0,0 +1,42 @@ +syntax = "proto3"; + +package daprtests; + +option java_outer_classname = "MethodInvokeServiceProtos"; +option java_package = "io.dapr.it"; + +service MethodInvokeService { + rpc PostMessage (PostMessageRequest) returns (PostMessageResponse) {} + rpc DeleteMessage (DeleteMessageRequest) returns (DeleteMessageResponse) {} + rpc GetMessages (GetMessagesRequest) returns (GetMessagesResponse) {} + rpc Sleep (SleepRequest) returns (SleepResponse) {} +} + +message PostMessageRequest { + int32 id = 1; + string message = 2; +} + +message PostMessageResponse { +} + +message DeleteMessageRequest { + int32 id = 1; +} + +message DeleteMessageResponse { +} + +message GetMessagesRequest { +} + +message GetMessagesResponse { + map messages = 1; +} + +message SleepRequest { + int32 seconds = 1; +} + +message SleepResponse { +} \ No newline at end of file diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index 5ce0c6ce9..daf8cc0eb 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -15,6 +15,9 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; +import static io.dapr.client.DaprApiProtocol.GRPC; +import static io.dapr.client.DaprApiProtocol.HTTP; + public abstract class BaseIT { protected static final String STATE_STORE_NAME = "statestore"; @@ -31,7 +34,16 @@ public abstract class BaseIT { Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); + return startDaprApp(testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, GRPC); + } + + protected static DaprRun startDaprApp( + String testName, + String successMessage, + Class serviceClass, + DaprApiProtocol appProtocol, + int maxWaitMilliseconds) throws Exception { + return startDaprApp(testName, successMessage, serviceClass, true, maxWaitMilliseconds, GRPC, appProtocol); } protected static DaprRun startDaprApp( @@ -41,14 +53,48 @@ public abstract class BaseIT { Boolean useAppPort, int maxWaitMilliseconds, DaprApiProtocol protocol) throws Exception { - return startDaprApp(testName, successMessage, serviceClass, useAppPort, true, maxWaitMilliseconds, protocol); + return startDaprApp( + testName, + successMessage, + serviceClass, + useAppPort, + true, + maxWaitMilliseconds, + protocol, + HTTP); + } + + protected static DaprRun startDaprApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds, + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { + return startDaprApp( + testName, + successMessage, + serviceClass, + useAppPort, + true, + maxWaitMilliseconds, + protocol, + appProtocol); } protected static DaprRun startDaprApp( String testName, int maxWaitMilliseconds) throws Exception { return startDaprApp( - testName, "You're up and running!", null, false, true, maxWaitMilliseconds, DaprApiProtocol.GRPC); + testName, + "You're up and running!", + null, + false, + true, + maxWaitMilliseconds, + GRPC, + HTTP); } protected static DaprRun startDaprApp( @@ -58,13 +104,15 @@ public abstract class BaseIT { Boolean useAppPort, Boolean useDaprPorts, int maxWaitMilliseconds, - DaprApiProtocol protocol) throws Exception { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), successMessage, maxWaitMilliseconds, - protocol).withServiceClass(serviceClass); + protocol, + appProtocol).withServiceClass(serviceClass); DaprRun run = builder.build(); TO_BE_STOPPED.add(run); DAPR_RUN_BUILDERS.put(run.getAppName(), builder); @@ -83,19 +131,38 @@ public abstract class BaseIT { testName, successMessage, serviceClass, useAppPort, maxWaitMilliseconds, DaprApiProtocol.GRPC); } + protected static ImmutablePair startSplitDaprAndApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds, + DaprApiProtocol protocol) throws Exception { + return startSplitDaprAndApp( + testName, + successMessage, + serviceClass, + useAppPort, + maxWaitMilliseconds, + protocol, + HTTP); + } + protected static ImmutablePair startSplitDaprAndApp( String testName, String successMessage, Class serviceClass, Boolean useAppPort, int maxWaitMilliseconds, - DaprApiProtocol protocol) throws Exception { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( testName, () -> DaprPorts.build(useAppPort, true, true), successMessage, maxWaitMilliseconds, - protocol).withServiceClass(serviceClass); + protocol, + appProtocol).withServiceClass(serviceClass); ImmutablePair runs = builder.splitBuild(); TO_BE_STOPPED.add(runs.left); TO_BE_STOPPED.add(runs.right); diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 8939785aa..6222160be 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -20,7 +20,7 @@ public class DaprRun implements Stoppable { private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; - private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components"; + private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s --components-path ./components"; // the arg in -Dexec.args is the app's port private static final String DAPR_COMMAND = @@ -45,11 +45,12 @@ public class DaprRun implements Stoppable { String successMessage, Class serviceClass, int maxWaitMilliseconds, - DaprApiProtocol protocol) { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) { // The app name needs to be deterministic since we depend on it to kill previous runs. this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName()); this.startCommand = - new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol)); + new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol, appProtocol)); this.listCommand = new Command( this.appName, "dapr list"); @@ -139,18 +140,28 @@ public class DaprRun implements Stoppable { System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.ports.getGrpcPort())); } System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.GRPC.name()); } public void switchToGRPC() { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.GRPC.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.GRPC.name()); } public void switchToHTTP() { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name()); + System.getProperties().setProperty( + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), + DaprApiProtocol.HTTP.name()); } public void switchToProtocol(DaprApiProtocol protocol) { System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), protocol.name()); + System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name()); } public int getGrpcPort() { @@ -170,16 +181,17 @@ public class DaprRun implements Stoppable { } private static String buildDaprCommand( - String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol) { - StringBuilder stringBuilder = new StringBuilder(String.format(DAPR_RUN, appName)) - .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") - .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") - .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") - .append(serviceClass == null ? "" : - String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), - ports.getAppPort() != null ? ports.getAppPort().toString() : "", - Properties.API_PROTOCOL.getName(), protocol, - Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol)); + String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) { + StringBuilder stringBuilder = + new StringBuilder(String.format(DAPR_RUN, appName, appProtocol.toString().toLowerCase())) + .append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "") + .append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "") + .append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "") + .append(serviceClass == null ? "" : + String.format(DAPR_COMMAND, serviceClass.getCanonicalName(), + ports.getAppPort() != null ? ports.getAppPort().toString() : "", + Properties.API_PROTOCOL.getName(), protocol, + Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol)); return stringBuilder.toString(); } @@ -210,17 +222,21 @@ public class DaprRun implements Stoppable { private DaprApiProtocol protocol; + private DaprApiProtocol appProtocol; + Builder( String testName, Supplier portsSupplier, String successMessage, int maxWaitMilliseconds, - DaprApiProtocol protocol) { + DaprApiProtocol protocol, + DaprApiProtocol appProtocol) { this.testName = testName; this.portsSupplier = portsSupplier; this.successMessage = successMessage; this.maxWaitMilliseconds = maxWaitMilliseconds; this.protocol = protocol; + this.appProtocol = appProtocol; } public Builder withServiceClass(Class serviceClass) { @@ -235,7 +251,8 @@ public class DaprRun implements Stoppable { this.successMessage, this.serviceClass, this.maxWaitMilliseconds, - this.protocol); + this.protocol, + this.appProtocol); } /** @@ -257,7 +274,8 @@ public class DaprRun implements Stoppable { DAPR_SUCCESS_MESSAGE, null, this.maxWaitMilliseconds, - this.protocol); + this.protocol, + this.appProtocol); return new ImmutablePair<>(appRun, daprRun); } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java new file mode 100644 index 000000000..f0f280082 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java @@ -0,0 +1,149 @@ +package io.dapr.it.methodinvoke.grpc; + +import io.dapr.client.DaprApiProtocol; +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.runners.Parameterized.Parameter; +import static org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class MethodInvokeIT extends BaseIT { + + //Number of messages to be sent: 10 + private static final int NUM_MESSAGES = 10; + + /** + * Parameters for this test. + * Param #1: useGrpc. + * @return Collection of parameter tuples. + */ + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { { false }, { true } }); + } + + /** + * Run of a Dapr application. + */ + private DaprRun daprRun = null; + + @Parameter + public boolean useGrpc; + + @Before + public void init() throws Exception { + daprRun = startDaprApp( + MethodInvokeIT.class.getSimpleName(), + MethodInvokeService.SUCCESS_MESSAGE, + MethodInvokeService.class, + DaprApiProtocol.GRPC, // appProtocol + 60000); + + if (this.useGrpc) { + daprRun.switchToGRPC(); + } else { + daprRun.switchToHTTP(); + } + + // Wait since service might be ready even after port is available. + Thread.sleep(2000); + } + + @Test + public void testInvoke() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("This is message #%d", i); + + PostMessageRequest req = PostMessageRequest.newBuilder().setId(i).setMessage(message).build(); + client.invokeMethod(daprRun.getAppName(), "postMessage", req, HttpExtension.POST).block(); + System.out.println("Invoke method messages : " + message); + } + + Map messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals(10, messages.size()); + + // Delete one message. + client.invokeMethod( + daprRun.getAppName(), + "deleteMessage", + DeleteMessageRequest.newBuilder().setId(1).build(), + HttpExtension.POST).block(); + messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals(9, messages.size()); + + // Now update one message. + client.invokeMethod( + daprRun.getAppName(), + "postMessage", + PostMessageRequest.newBuilder().setId(2).setMessage("updated message").build(), + HttpExtension.POST).block(); + messages = client.invokeMethod( + daprRun.getAppName(), + "getMessages", + GetMessagesRequest.newBuilder().build(), + HttpExtension.POST, GetMessagesResponse.class).block().getMessagesMap(); + assertEquals("updated message", messages.get(2)); + } + } + + @Test + public void testInvokeTimeout() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + long started = System.currentTimeMillis(); + SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build(); + String message = assertThrows(IllegalStateException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST) + .block(Duration.ofMillis(10))).getMessage(); + long delay = System.currentTimeMillis() - started; + assertTrue(delay <= 500); // 500 ms is a reasonable delay if the request timed out. + assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); + } + } + + @Test + public void testInvokeException() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + SleepRequest req = SleepRequest.newBuilder().setSeconds(-9).build(); + DaprException exception = assertThrows(DaprException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block()); + + assertEquals("UNKNOWN", exception.getErrorCode()); + if (this.useGrpc) { + assertEquals("UNKNOWN: ", exception.getMessage()); + } else { + assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + } + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java new file mode 100644 index 000000000..0a3eca1bc --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.methodinvoke.grpc; + +import com.google.protobuf.Any; +import io.dapr.v1.AppCallbackGrpc; +import io.dapr.v1.CommonProtos; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageResponse; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest; +import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest; +import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse; +import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest; +import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse; + +public class MethodInvokeService { + + public static final String SUCCESS_MESSAGE = "application discovered on port "; + + /** + * Server mode: class that encapsulates all server-side logic for Grpc. + */ + private static class MyDaprService extends AppCallbackGrpc.AppCallbackImplBase { + + private final Map messages = Collections.synchronizedMap(new HashMap<>()); + + /** + * Server mode: Grpc server. + */ + private Server server; + + /** + * Server mode: starts listening on given port. + * + * @param port Port to listen on. + * @throws IOException Errors while trying to start service. + */ + private void start(int port) throws IOException { + this.server = ServerBuilder + .forPort(port) + .addService(this) + .build() + .start(); + System.out.printf("Server: started listening on port %d\n", port); + + // Now we handle ctrl+c (or any other JVM shutdown) + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("Server: shutting down gracefully ..."); + MyDaprService.this.server.shutdown(); + System.out.println("Server: Bye."); + })); + } + + /** + * Server mode: waits for shutdown trigger. + * + * @throws InterruptedException Propagated interrupted exception. + */ + private void awaitTermination() throws InterruptedException { + if (this.server != null) { + this.server.awaitTermination(); + } + } + + /** + * Server mode: this is the Dapr method to receive Invoke operations via Grpc. + * + * @param request Dapr envelope request, + * @param responseObserver Dapr envelope response. + */ + @Override + public void onInvoke(CommonProtos.InvokeRequest request, + StreamObserver responseObserver) { + try { + if ("postMessage".equals(request.getMethod())) { + PostMessageRequest req = PostMessageRequest.parseFrom(request.getData().getValue().toByteArray()); + + this.messages.put(req.getId(), req.getMessage()); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(PostMessageResponse.newBuilder().build())); + responseObserver.onNext(responseBuilder.build()); + } + if ("deleteMessage".equals(request.getMethod())) { + DeleteMessageRequest req = DeleteMessageRequest.parseFrom(request.getData().getValue().toByteArray()); + + this.messages.remove(req.getId()); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(DeleteMessageResponse.newBuilder().build())); + responseObserver.onNext(responseBuilder.build()); + } + if ("getMessages".equals(request.getMethod())) { + GetMessagesRequest.parseFrom(request.getData().getValue().toByteArray()); + + GetMessagesResponse res = GetMessagesResponse.newBuilder().putAllMessages(this.messages).build(); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(res)); + responseObserver.onNext(responseBuilder.build()); + } + if ("sleep".equals(request.getMethod())) { + SleepRequest req = SleepRequest.parseFrom(request.getData().getValue().toByteArray()); + + SleepResponse res = this.sleep(req); + + CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder(); + responseBuilder.setData(Any.pack(res)); + responseObserver.onNext(responseBuilder.build()); + } + } catch (Exception e) { + responseObserver.onError(e); + } finally { + responseObserver.onCompleted(); + } + } + + public SleepResponse sleep(SleepRequest request) { + if (request.getSeconds() < 0) { + throw new IllegalArgumentException("Sleep time cannot be negative."); + } + + try { + Thread.sleep(request.getSeconds() * 1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + new RuntimeException(e); + } + + // Now respond with current timestamp. + return SleepResponse.newBuilder().build(); + } + } + + /** + * This is the main method of this app. + * @param args The port to listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + + System.out.printf("Service starting on port %d ...\n", port); + + final MyDaprService service = new MyDaprService(); + service.start(port); + service.awaitTermination(); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java index cbc5319f8..71c3e7253 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeController.java @@ -82,6 +82,9 @@ public class MethodInvokeController { @PostMapping(path = "/sleep") public void sleep(@RequestBody int seconds) throws InterruptedException { + if (seconds < 0) { + throw new IllegalArgumentException("Sleep time cannot be negative."); + } Thread.sleep(seconds * 1000); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java index df273c2ad..3cc7ef97a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java @@ -3,8 +3,10 @@ package io.dapr.it.methodinvoke.http; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.HttpExtension; +import io.dapr.exceptions.DaprException; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; +import io.dapr.it.MethodInvokeServiceProtos; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -19,6 +21,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.runners.Parameterized.Parameter; @@ -55,7 +58,7 @@ public class MethodInvokeIT extends BaseIT { MethodInvokeService.SUCCESS_MESSAGE, MethodInvokeService.class, true, - 60000); + 30000); if (this.useGrpc) { daprRun.switchToGRPC(); @@ -143,4 +146,17 @@ public class MethodInvokeIT extends BaseIT { assertEquals("Timeout on blocking read for 10 MILLISECONDS", message); } } + + @Test + public void testInvokeException() throws Exception { + try (DaprClient client = new DaprClientBuilder().build()) { + MethodInvokeServiceProtos.SleepRequest req = MethodInvokeServiceProtos.SleepRequest.newBuilder().setSeconds(-9).build(); + DaprException exception = assertThrows(DaprException.class, () -> + client.invokeMethod(daprRun.getAppName(), "sleep", -9, HttpExtension.POST).block()); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertNotNull(exception.getMessage()); + assertTrue(exception.getMessage().contains("Internal Server Error")); + } + } } diff --git a/sdk/pom.xml b/sdk/pom.xml index 5203980d2..df338ef53 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -52,16 +52,6 @@ okhttp 4.9.0 - - io.opentelemetry - opentelemetry-api - 0.10.0 - - - io.opencensus - opencensus-impl-core - 0.28.2 - junit junit @@ -108,6 +98,12 @@ 2.3.5.RELEASE test + + io.grpc + grpc-testing + 1.33.1 + test + diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 47176a404..628d69580 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -26,6 +26,7 @@ import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; +import io.dapr.internal.opencensus.GrpcWrapper; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.NetworkUtils; import io.dapr.utils.TypeRef; @@ -38,29 +39,17 @@ import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ForwardingClientCall; import io.grpc.Metadata; -import io.grpc.Metadata.Key; import io.grpc.MethodDescriptor; import io.grpc.stub.StreamObserver; -import io.opencensus.implcore.trace.propagation.PropagationComponentImpl; -import io.opencensus.implcore.trace.propagation.TraceContextFormat; -import io.opencensus.trace.SpanContext; -import io.opencensus.trace.propagation.BinaryFormat; -import io.opencensus.trace.propagation.SpanContextParseException; -import io.opencensus.trace.propagation.TextFormat; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.propagation.TextMapPropagator; -import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.util.context.Context; import java.io.Closeable; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -73,18 +62,6 @@ import java.util.stream.Collectors; */ public class DaprClientGrpc extends AbstractDaprClient { - private static final TextMapPropagator.Setter> MAP_SETTER = - (mapper, key, value) -> { - if (mapper != null) { - mapper.put(key, value); - } - }; - - /** - * Binary formatter to generate grpc-trace-bin. - */ - private static final BinaryFormat OPENCENSUS_BINARY_FORMAT = new PropagationComponentImpl().getBinaryFormat(); - /** * The GRPC managed channel to be used. */ @@ -112,7 +89,7 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprObjectSerializer stateSerializer) { super(objectSerializer, stateSerializer); this.channel = closeableChannel; - this.asyncStub = populateWithInterceptors(asyncStub); + this.asyncStub = intercept(asyncStub); } private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { @@ -175,10 +152,8 @@ public class DaprClientGrpc extends AbstractDaprClient { } } - return this.createMono( - context, - it -> asyncStub.publishEvent(envelopeBuilder.build(), it) - ).thenReturn(new Response<>(context, null)); + return this.createMono(it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)) + .thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -204,10 +179,9 @@ public class DaprClientGrpc extends AbstractDaprClient { // gRPC to gRPC does not handle metadata in Dapr runtime proto. // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342 - return this.createMono( - context, - it -> asyncStub.invokeService(envelope, it) - ).flatMap( + return this.createMono(it -> intercept(context, asyncStub).invokeService(envelope, it)) + .flatMap( it -> { try { return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); @@ -251,10 +225,9 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.InvokeBindingRequest envelope = builder.build(); - return this.createMono( - context, - it -> asyncStub.invokeBinding(envelope, it) - ).flatMap( + return this.createMono(it -> intercept(context, asyncStub) + .invokeBinding(envelope, it)) + .flatMap( it -> { try { return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); @@ -298,10 +271,8 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprProtos.GetStateRequest envelope = builder.build(); - return this.createMono( - context, - it -> asyncStub.getState(envelope, it) - ).map( + return this.createMono(it -> intercept(context, asyncStub).getState(envelope, it)) + .map( it -> { try { return buildStateKeyValue(it, key, options, type); @@ -346,10 +317,9 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprProtos.GetBulkStateRequest envelope = builder.build(); - return this.createMono( - context, - it -> asyncStub.getBulkState(envelope, it) - ).map( + return this.createMono(it -> intercept(context, asyncStub) + .getBulkState(envelope, it)) + .map( it -> it .getItemsList() @@ -430,10 +400,8 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.ExecuteStateTransactionRequest req = builder.build(); - return this.createMono( - context, - it -> asyncStub.executeStateTransaction(req, it) - ).thenReturn(new Response<>(context, null)); + return this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it)) + .thenReturn(new Response<>(context, null)); } catch (Exception e) { return DaprException.wrapMono(e); } @@ -458,10 +426,8 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.SaveStateRequest req = builder.build(); - return this.createMono( - context, - it -> asyncStub.saveState(req, it) - ).thenReturn(new Response<>(context, null)); + return this.createMono(it -> intercept(context, asyncStub).saveState(req, it)) + .thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -544,10 +510,8 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprProtos.DeleteStateRequest req = builder.build(); - return this.createMono( - context, - it -> asyncStub.deleteState(req, it) - ).thenReturn(new Response<>(context, null)); + return this.createMono(it -> intercept(context, asyncStub).deleteState(req, it)) + .thenReturn(new Response<>(context, null)); } catch (Exception ex) { return DaprException.wrapMono(ex); } @@ -623,10 +587,8 @@ public class DaprClientGrpc extends AbstractDaprClient { } DaprProtos.GetSecretRequest req = requestBuilder.build(); - return this.createMono( - context, - it -> asyncStub.getSecret(req, it) - ).map(it -> new Response<>(context, it.getDataMap())); + return this.createMono(it -> intercept(context, asyncStub).getSecret(req, it)) + .map(it -> new Response<>(context, it.getDataMap())); } /** @@ -650,7 +612,8 @@ public class DaprClientGrpc extends AbstractDaprClient { DaprProtos.GetBulkSecretRequest envelope = builder.build(); - return this.createMono(context, it -> asyncStub.getBulkSecret(envelope, it)) + return this.createMono(it -> intercept(context, asyncStub) + .getBulkSecret(envelope, it)) .map(it -> { Map secretsMap = it.getDataMap(); if (secretsMap == null) { @@ -688,33 +651,23 @@ public class DaprClientGrpc extends AbstractDaprClient { * @param client GRPC client for Dapr. * @return Client after adding interceptors. */ - private static DaprGrpc.DaprStub populateWithInterceptors(DaprGrpc.DaprStub client) { + private static DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) { ClientInterceptor interceptor = new ClientInterceptor() { @Override public ClientCall interceptCall( - MethodDescriptor methodDescriptor, - CallOptions callOptions, - Channel channel) { + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { @Override - public void start(final Listener responseListener, final Metadata headers) { - // Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet: - // https://github.com/open-telemetry/opentelemetry-specification/issues/639 - // This should be the only use of OpenCensus SDK: populate "grpc-trace-bin". - Context context = Context.current(); - SpanContext opencensusSpanContext = extractOpenCensusSpanContext(context); - if (opencensusSpanContext != null) { - byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext); - headers.put(Key.of(Headers.GRPC_TRACE_BIN, Metadata.BINARY_BYTE_MARSHALLER), grpcTraceBin); - } - + public void start(final Listener responseListener, final Metadata metadata) { String daprApiToken = Properties.API_TOKEN.get(); if (daprApiToken != null) { - headers.put(Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); + metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); } - super.start(responseListener, headers); + super.start(responseListener, metadata); } }; } @@ -723,47 +676,18 @@ public class DaprClientGrpc extends AbstractDaprClient { } /** - * Extracts the context from OpenTelemetry and creates a SpanContext for OpenCensus. + * Populates GRPC client with interceptors for telemetry. * - * @param openTelemetryContext Context from OpenTelemetry. - * @return SpanContext for OpenCensus. + * @param context Reactor's context. + * @param client GRPC client for Dapr. + * @return Client after adding interceptors. */ - private static SpanContext extractOpenCensusSpanContext(Context openTelemetryContext) { - Map map = new HashMap<>(); - OpenTelemetry.getGlobalPropagators().getTextMapPropagator().inject( - openTelemetryContext, map, MAP_SETTER); - - if (!map.containsKey("traceparent")) { - // Trying to extract context without this key will throw an "expected" exception, so we avoid it here. - return null; - } - - try { - return new TraceContextFormat() - .extract(map, new TextFormat.Getter>() { - @Nullable - @Override - public String get(Map map, String key) { - return map.get(key); - } - }); - } catch (SpanContextParseException e) { - throw new DaprException(e); - } + private static DaprGrpc.DaprStub intercept(Context context, DaprGrpc.DaprStub client) { + return GrpcWrapper.intercept(context, client); } - private static Runnable wrap(Context context, Runnable runnable) { - if (context == null) { - return DaprException.wrap(runnable); - } - - return DaprException.wrap(context.wrap(runnable)); - } - - private Mono createMono(Context context, Consumer> consumer) { - return Mono.create( - sink -> wrap(context, () -> consumer.accept(createStreamObserver(sink))).run() - ); + private Mono createMono(Consumer> consumer) { + return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()); } private StreamObserver createStreamObserver(MonoSink sink) { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 9e5a5b9d6..ec5caa320 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -29,8 +29,8 @@ import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.NetworkUtils; import io.dapr.utils.TypeRef; -import io.opentelemetry.context.Context; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.io.IOException; import java.util.ArrayList; diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index a5f532a5c..716581209 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -5,15 +5,11 @@ package io.dapr.client; -import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.client.domain.Metadata; import io.dapr.config.Properties; import io.dapr.exceptions.DaprError; import io.dapr.exceptions.DaprException; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.propagation.HttpTraceContext; -import io.opentelemetry.context.Context; import okhttp3.Call; import okhttp3.Callback; import okhttp3.HttpUrl; @@ -24,6 +20,7 @@ import okhttp3.RequestBody; import okhttp3.ResponseBody; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -52,12 +49,6 @@ public class DaprHttp implements AutoCloseable { */ private static final String DEFAULT_HTTP_SCHEME = "http"; - /** - * Sets the headers for OpenTelemetry SDK. - */ - private static final HttpTraceContext.Setter OPENTELEMETRY_SETTER = - (requestBuilder, key, value) -> requestBuilder.addHeader(key, value); - /** * HTTP Methods supported. */ @@ -272,7 +263,8 @@ public class DaprHttp implements AutoCloseable { .url(urlBuilder.build()) .addHeader(HEADER_DAPR_REQUEST_ID, requestId); if (context != null) { - OpenTelemetry.getGlobalPropagators().getTextMapPropagator().inject(context, requestBuilder, OPENTELEMETRY_SETTER); + context.stream().forEach( + entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString())); } if (HttpMethods.GET.name().equals(method)) { requestBuilder.get(); @@ -308,14 +300,14 @@ public class DaprHttp implements AutoCloseable { * @param json Response body from Dapr. * @return DaprError or null if could not parse. */ - private static DaprError parseDaprError(byte[] json) throws IOException { + private static DaprError parseDaprError(byte[] json) { if ((json == null) || (json.length == 0)) { return null; } try { return OBJECT_MAPPER.readValue(json, DaprError.class); - } catch (JsonParseException e) { + } catch (IOException e) { throw new DaprException("UNKNOWN", new String(json, StandardCharsets.UTF_8)); } } @@ -347,14 +339,24 @@ public class DaprHttp implements AutoCloseable { @Override public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { if (!response.isSuccessful()) { - DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); - if ((error != null) && (error.getErrorCode() != null) && (error.getMessage() != null)) { - future.completeExceptionally(new DaprException(error)); + try { + DaprError error = parseDaprError(getBodyBytesOrEmptyArray(response)); + if ((error != null) && (error.getErrorCode() != null)) { + if (error.getMessage() != null) { + future.completeExceptionally(new DaprException(error)); + } else { + future.completeExceptionally( + new DaprException(error.getErrorCode(), "HTTP status code: " + response.code())); + } + return; + } + + future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code())); + return; + } catch (DaprException e) { + future.completeExceptionally(e); return; } - - future.completeExceptionally(new DaprException("UNKNOWN", "HTTP status code: " + response.code())); - return; } Map mapHeaders = new HashMap<>(); diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java index db3a17a21..8a184885a 100644 --- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java +++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java @@ -10,10 +10,12 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.MessageLite; import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; import java.io.IOException; +import java.lang.reflect.Method; /** * Serializes and deserializes an internal object. @@ -54,6 +56,11 @@ public class ObjectSerializer { return (byte[]) state; } + // Proto buffer class is serialized directly. + if (state instanceof MessageLite) { + return ((MessageLite) state).toByteArray(); + } + // Not string, not primitive, so it is a complex type: we use JSON for that. return OBJECT_MAPPER.writeValueAsBytes(state); } @@ -94,7 +101,7 @@ public class ObjectSerializer { } if (content == null) { - return (T) null; + return null; } // Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[]. @@ -103,13 +110,26 @@ public class ObjectSerializer { } if (content.length == 0) { - return (T) null; + return null; } if (javaType.hasRawClass(CloudEvent.class)) { return (T) CloudEvent.deserialize(content); } + if (javaType.isTypeOrSubTypeOf(MessageLite.class)) { + try { + Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class); + if (method != null) { + return (T) method.invoke(null, content); + } + } catch (NoSuchMethodException e) { + // It was a best effort. Skip this try. + } catch (Exception e) { + throw new IOException(e); + } + } + return OBJECT_MAPPER.readValue(content, javaType); } diff --git a/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequest.java b/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequest.java index 61d0a3337..1122be31b 100644 --- a/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequestBuilder.java index 803da827c..060e8c480 100644 --- a/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/DeleteStateRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequest.java b/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequest.java index 82020212f..07013e74d 100644 --- a/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.List; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequestBuilder.java index 795d7daed..b35886ada 100644 --- a/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/ExecuteStateTransactionRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Arrays; import java.util.Collections; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequest.java index addd28b7d..a993f8303 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequestBuilder.java index 320d9cc72..3663511f6 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetBulkSecretRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequest.java index e7acefd1a..6d2b1add6 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.List; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequestBuilder.java index 28b5100d7..2c908d312 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetBulkStateRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Arrays; import java.util.Collections; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetSecretRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetSecretRequest.java index dff7586b1..676ea950a 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetSecretRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetSecretRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetSecretRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetSecretRequestBuilder.java index b7dd11f15..12e56de05 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetSecretRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetSecretRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetStateRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetStateRequest.java index ea8e70fd6..959ab3b83 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetStateRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetStateRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java index f9363c4e4..1eb42a914 100644 --- a/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequest.java b/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequest.java index c7cbf5099..600fca208 100644 --- a/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequestBuilder.java index 167575274..8d368580e 100644 --- a/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/InvokeBindingRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.HashMap; diff --git a/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequest.java b/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequest.java index 3bd9a316e..b21554d1f 100644 --- a/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; /** * A request to invoke a service. diff --git a/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequestBuilder.java index e698b202f..8ee25f316 100644 --- a/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/InvokeMethodRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; /** * Builds a request to invoke a service. diff --git a/sdk/src/main/java/io/dapr/client/domain/PublishEventRequest.java b/sdk/src/main/java/io/dapr/client/domain/PublishEventRequest.java index b68fb9f26..5a6ea2dbc 100644 --- a/sdk/src/main/java/io/dapr/client/domain/PublishEventRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/PublishEventRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Map; diff --git a/sdk/src/main/java/io/dapr/client/domain/PublishEventRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/PublishEventRequestBuilder.java index cbfd69a3d..31760ca30 100644 --- a/sdk/src/main/java/io/dapr/client/domain/PublishEventRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/PublishEventRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.Collections; import java.util.HashMap; diff --git a/sdk/src/main/java/io/dapr/client/domain/Response.java b/sdk/src/main/java/io/dapr/client/domain/Response.java index b3e14f3c4..25eae1acf 100644 --- a/sdk/src/main/java/io/dapr/client/domain/Response.java +++ b/sdk/src/main/java/io/dapr/client/domain/Response.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; /** * A Dapr Response. diff --git a/sdk/src/main/java/io/dapr/client/domain/SaveStateRequest.java b/sdk/src/main/java/io/dapr/client/domain/SaveStateRequest.java index 95598589a..e834de515 100644 --- a/sdk/src/main/java/io/dapr/client/domain/SaveStateRequest.java +++ b/sdk/src/main/java/io/dapr/client/domain/SaveStateRequest.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.List; diff --git a/sdk/src/main/java/io/dapr/client/domain/SaveStateRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/SaveStateRequestBuilder.java index 3e47e4a9f..7e2ed6c7d 100644 --- a/sdk/src/main/java/io/dapr/client/domain/SaveStateRequestBuilder.java +++ b/sdk/src/main/java/io/dapr/client/domain/SaveStateRequestBuilder.java @@ -5,7 +5,7 @@ package io.dapr.client.domain; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import java.util.ArrayList; import java.util.Arrays; diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprError.java b/sdk/src/main/java/io/dapr/exceptions/DaprError.java index 225552797..490cc8f84 100644 --- a/sdk/src/main/java/io/dapr/exceptions/DaprError.java +++ b/sdk/src/main/java/io/dapr/exceptions/DaprError.java @@ -5,9 +5,13 @@ package io.dapr.exceptions; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import io.grpc.Status; + /** * Represents an error message from Dapr. */ +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY) public class DaprError { /** @@ -20,12 +24,20 @@ public class DaprError { */ private String message; + /** + * Error code from gRPC. + */ + private Integer code; + /** * Gets the error code. * * @return Error code. */ public String getErrorCode() { + if ((errorCode == null) && (code != null)) { + return Status.fromCodeValue(code).getCode().name(); + } return errorCode; } diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/BigendianEncoding.java b/sdk/src/main/java/io/dapr/internal/opencensus/BigendianEncoding.java new file mode 100644 index 000000000..090a86adb --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/BigendianEncoding.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import java.util.Arrays; + +/** + * Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/BigendianEncoding.java + */ +final class BigendianEncoding { + + static final int LONG_BYTES = Long.SIZE / Byte.SIZE; + + static final int BYTE_BASE16 = 2; + + static final int LONG_BASE16 = BYTE_BASE16 * LONG_BYTES; + + private static final String ALPHABET = "0123456789abcdef"; + + private static final int ASCII_CHARACTERS = 128; + + private static final char[] ENCODING = buildEncodingArray(); + + private static final byte[] DECODING = buildDecodingArray(); + + private static char[] buildEncodingArray() { + char[] encoding = new char[512]; + for (int i = 0; i < 256; ++i) { + encoding[i] = ALPHABET.charAt(i >>> 4); + encoding[i | 0x100] = ALPHABET.charAt(i & 0xF); + } + return encoding; + } + + private static byte[] buildDecodingArray() { + byte[] decoding = new byte[ASCII_CHARACTERS]; + Arrays.fill(decoding, (byte) -1); + for (int i = 0; i < ALPHABET.length(); i++) { + char c = ALPHABET.charAt(i); + decoding[c] = (byte) i; + } + return decoding; + } + + /** + * Returns the {@code long} value whose big-endian representation is stored in the first 8 bytes + * of {@code bytes} starting from the {@code offset}. + * + * @param bytes the byte array representation of the {@code long}. + * @param offset the starting offset in the byte array. + * @return the {@code long} value whose big-endian representation is given. + * @throws IllegalArgumentException if {@code bytes} has fewer than 8 elements. + */ + static long longFromByteArray(byte[] bytes, int offset) { + Utils.checkArgument(bytes.length >= offset + LONG_BYTES, "array too small"); + return (bytes[offset] & 0xFFL) << 56 + | (bytes[offset + 1] & 0xFFL) << 48 + | (bytes[offset + 2] & 0xFFL) << 40 + | (bytes[offset + 3] & 0xFFL) << 32 + | (bytes[offset + 4] & 0xFFL) << 24 + | (bytes[offset + 5] & 0xFFL) << 16 + | (bytes[offset + 6] & 0xFFL) << 8 + | (bytes[offset + 7] & 0xFFL); + } + + /** + * Stores the big-endian representation of {@code value} in the {@code dest} starting from the + * {@code destOffset}. + * + * @param value the value to be converted. + * @param dest the destination byte array. + * @param destOffset the starting offset in the destination byte array. + */ + static void longToByteArray(long value, byte[] dest, int destOffset) { + Utils.checkArgument(dest.length >= destOffset + LONG_BYTES, "array too small"); + dest[destOffset + 7] = (byte) (value & 0xFFL); + dest[destOffset + 6] = (byte) (value >> 8 & 0xFFL); + dest[destOffset + 5] = (byte) (value >> 16 & 0xFFL); + dest[destOffset + 4] = (byte) (value >> 24 & 0xFFL); + dest[destOffset + 3] = (byte) (value >> 32 & 0xFFL); + dest[destOffset + 2] = (byte) (value >> 40 & 0xFFL); + dest[destOffset + 1] = (byte) (value >> 48 & 0xFFL); + dest[destOffset] = (byte) (value >> 56 & 0xFFL); + } + + /** + * Returns the {@code long} value whose base16 representation is stored in the first 16 chars of + * {@code chars} starting from the {@code offset}. + * + * @param chars the base16 representation of the {@code long}. + * @param offset the starting offset in the {@code CharSequence}. + */ + static long longFromBase16String(CharSequence chars, int offset) { + Utils.checkArgument(chars.length() >= offset + LONG_BASE16, "chars too small"); + return (decodeByte(chars.charAt(offset), chars.charAt(offset + 1)) & 0xFFL) << 56 + | (decodeByte(chars.charAt(offset + 2), chars.charAt(offset + 3)) & 0xFFL) << 48 + | (decodeByte(chars.charAt(offset + 4), chars.charAt(offset + 5)) & 0xFFL) << 40 + | (decodeByte(chars.charAt(offset + 6), chars.charAt(offset + 7)) & 0xFFL) << 32 + | (decodeByte(chars.charAt(offset + 8), chars.charAt(offset + 9)) & 0xFFL) << 24 + | (decodeByte(chars.charAt(offset + 10), chars.charAt(offset + 11)) & 0xFFL) << 16 + | (decodeByte(chars.charAt(offset + 12), chars.charAt(offset + 13)) & 0xFFL) << 8 + | (decodeByte(chars.charAt(offset + 14), chars.charAt(offset + 15)) & 0xFFL); + } + + /** + * Appends the base16 encoding of the specified {@code value} to the {@code dest}. + * + * @param value the value to be converted. + * @param dest the destination char array. + * @param destOffset the starting offset in the destination char array. + */ + static void longToBase16String(long value, char[] dest, int destOffset) { + byteToBase16((byte) (value >> 56 & 0xFFL), dest, destOffset); + byteToBase16((byte) (value >> 48 & 0xFFL), dest, destOffset + BYTE_BASE16); + byteToBase16((byte) (value >> 40 & 0xFFL), dest, destOffset + 2 * BYTE_BASE16); + byteToBase16((byte) (value >> 32 & 0xFFL), dest, destOffset + 3 * BYTE_BASE16); + byteToBase16((byte) (value >> 24 & 0xFFL), dest, destOffset + 4 * BYTE_BASE16); + byteToBase16((byte) (value >> 16 & 0xFFL), dest, destOffset + 5 * BYTE_BASE16); + byteToBase16((byte) (value >> 8 & 0xFFL), dest, destOffset + 6 * BYTE_BASE16); + byteToBase16((byte) (value & 0xFFL), dest, destOffset + 7 * BYTE_BASE16); + } + + /** + * Decodes the specified two character sequence, and returns the resulting {@code byte}. + * + * @param chars the character sequence to be decoded. + * @param offset the starting offset in the {@code CharSequence}. + * @return the resulting {@code byte} + * @throws IllegalArgumentException if the input is not a valid encoded string according to this + * encoding. + */ + static byte byteFromBase16String(CharSequence chars, int offset) { + Utils.checkArgument(chars.length() >= offset + 2, "chars too small"); + return decodeByte(chars.charAt(offset), chars.charAt(offset + 1)); + } + + private static byte decodeByte(char hi, char lo) { + Utils.checkArgument(lo < ASCII_CHARACTERS && DECODING[lo] != -1, "invalid character " + lo); + Utils.checkArgument(hi < ASCII_CHARACTERS && DECODING[hi] != -1, "invalid character " + hi); + int decoded = DECODING[hi] << 4 | DECODING[lo]; + return (byte) decoded; + } + + private static void byteToBase16(byte value, char[] dest, int destOffset) { + int b = value & 0xFF; + dest[destOffset] = ENCODING[b]; + dest[destOffset + 1] = ENCODING[b | 0x100]; + } + + private BigendianEncoding() { + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/BinaryFormatImpl.java b/sdk/src/main/java/io/dapr/internal/opencensus/BinaryFormatImpl.java new file mode 100644 index 000000000..1da74a1e1 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/BinaryFormatImpl.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/impl_core/src/main/java/io/opencensus/ + * implcore/trace/propagation/BinaryFormatImpl.java + */ +final class BinaryFormatImpl { + + private static final byte VERSION_ID = 0; + + private static final int VERSION_ID_OFFSET = 0; + + // The version_id/field_id size in bytes. + private static final byte ID_SIZE = 1; + + private static final byte TRACE_ID_FIELD_ID = 0; + + // TODO: clarify if offsets are correct here. While the specification suggests you should stop + // parsing when you hit an unknown field, it does not suggest that fields must be declared in + // ID order. Rather it only groups by data type order, in this case Trace Context + // https://github.com/census-instrumentation/opencensus-specs/blob/master/encodings/BinaryEncoding + // .md#deserialization-rules + private static final int TRACE_ID_FIELD_ID_OFFSET = VERSION_ID_OFFSET + ID_SIZE; + + private static final int TRACE_ID_OFFSET = TRACE_ID_FIELD_ID_OFFSET + ID_SIZE; + + private static final byte SPAN_ID_FIELD_ID = 1; + + private static final int SPAN_ID_FIELD_ID_OFFSET = TRACE_ID_OFFSET + TraceId.SIZE; + + private static final int SPAN_ID_OFFSET = SPAN_ID_FIELD_ID_OFFSET + ID_SIZE; + + private static final byte TRACE_OPTION_FIELD_ID = 2; + + private static final int TRACE_OPTION_FIELD_ID_OFFSET = SPAN_ID_OFFSET + SpanId.SIZE; + + private static final int TRACE_OPTIONS_OFFSET = TRACE_OPTION_FIELD_ID_OFFSET + ID_SIZE; + + /** + * Version, Trace and Span IDs are required fields. + */ + private static final int REQUIRED_FORMAT_LENGTH = 3 * ID_SIZE + TraceId.SIZE + SpanId.SIZE; + + private static final int ALL_FORMAT_LENGTH = REQUIRED_FORMAT_LENGTH + ID_SIZE + TraceOptions.SIZE; + + /** + * Generates the byte array for a span context. + * @param spanContext OpenCensus' span context. + * @return byte array for span context. + */ + byte[] toByteArray(SpanContext spanContext) { + checkNotNull(spanContext, "spanContext"); + byte[] bytes = new byte[ALL_FORMAT_LENGTH]; + bytes[VERSION_ID_OFFSET] = VERSION_ID; + bytes[TRACE_ID_FIELD_ID_OFFSET] = TRACE_ID_FIELD_ID; + spanContext.getTraceId().copyBytesTo(bytes, TRACE_ID_OFFSET); + bytes[SPAN_ID_FIELD_ID_OFFSET] = SPAN_ID_FIELD_ID; + spanContext.getSpanId().copyBytesTo(bytes, SPAN_ID_OFFSET); + bytes[TRACE_OPTION_FIELD_ID_OFFSET] = TRACE_OPTION_FIELD_ID; + spanContext.getTraceOptions().copyBytesTo(bytes, TRACE_OPTIONS_OFFSET); + return bytes; + } + +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java new file mode 100644 index 000000000..ccd7bada2 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/GrpcWrapper.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import io.dapr.config.Property; +import io.dapr.v1.DaprGrpc; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import reactor.util.context.Context; + +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Wraps a Dapr gRPC stub with telemetry interceptor. + */ +public final class GrpcWrapper { + + private static final Logger LOGGER = Logger.getLogger(Property.class.getName()); + + /** + * Binary formatter to generate grpc-trace-bin. + */ + private static final BinaryFormatImpl OPENCENSUS_BINARY_FORMAT = new BinaryFormatImpl(); + + private static final Metadata.Key GRPC_TRACE_BIN_KEY = + Metadata.Key.of("grpc-trace-bin", Metadata.BINARY_BYTE_MARSHALLER); + + private static final Metadata.Key TRACEPARENT_KEY = + Metadata.Key.of("traceparent", Metadata.ASCII_STRING_MARSHALLER); + + private static final Metadata.Key TRACESTATE_KEY = + Metadata.Key.of("tracestate", Metadata.ASCII_STRING_MARSHALLER); + + private GrpcWrapper() { + } + + /** + * Populates GRPC client with interceptors. + * + * @param context Reactor's context. + * @param client GRPC client for Dapr. + * @return Client after adding interceptors. + */ + public static DaprGrpc.DaprStub intercept(final Context context, DaprGrpc.DaprStub client) { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions, + Channel channel) { + ClientCall clientCall = channel.newCall(methodDescriptor, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { + @Override + public void start(final Listener responseListener, final Metadata metadata) { + Map map = (context == null ? Context.empty() : context) + .stream() + .filter(e -> (e.getKey() != null) && (e.getValue() != null)) + .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue())); + if (map.containsKey(GRPC_TRACE_BIN_KEY.name())) { + byte[] value = (byte[]) map.get(GRPC_TRACE_BIN_KEY.name()); + metadata.put(GRPC_TRACE_BIN_KEY, value); + } + if (map.containsKey(TRACEPARENT_KEY.name())) { + String value = map.get(TRACEPARENT_KEY.name()).toString(); + metadata.put(TRACEPARENT_KEY, value); + } + if (map.containsKey(TRACESTATE_KEY.name())) { + String value = map.get(TRACESTATE_KEY.name()).toString(); + metadata.put(TRACESTATE_KEY, value); + } + + // Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet: + // https://github.com/open-telemetry/opentelemetry-specification/issues/639 + // This should be the only use of OpenCensus SDK: populate "grpc-trace-bin". + SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata); + if (opencensusSpanContext != null) { + byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext); + metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin); + } + + super.start(responseListener, metadata); + } + }; + } + }; + return client.withInterceptors(interceptor); + } + + private static SpanContext extractOpenCensusSpanContext(Metadata metadata) { + if (!metadata.keys().contains(TRACEPARENT_KEY.name())) { + // Trying to extract context without this key will throw an "expected" exception, so we avoid it here. + return null; + } + + try { + return TraceContextFormat.extract(metadata); + } catch (RuntimeException e) { + LOGGER.log(Level.FINE, "Could not extract span context.", e); + return null; + } + } +} diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/SpanContext.java b/sdk/src/main/java/io/dapr/internal/opencensus/SpanContext.java new file mode 100644 index 000000000..f7559b471 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/SpanContext.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import javax.annotation.concurrent.Immutable; +import java.util.Arrays; + +/** + * A class that represents a span context. A span context contains the state that must propagate to + * child Spans and across process boundaries. It contains the identifiers (a {@link TraceId + * trace_id} and {@link SpanId span_id}) associated with the and a set of {@link + * TraceOptions options}. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/SpanContext.java

+ * @since 0.5 + */ +@Immutable +final class SpanContext { + + private final TraceId traceId; + + private final SpanId spanId; + + private final TraceOptions traceOptions; + + private final Tracestate tracestate; + + /** + * Creates a new {@code SpanContext} with the given identifiers and options. + * + * @param traceId the trace identifier of the span context. + * @param spanId the span identifier of the span context. + * @param traceOptions the trace options for the span context. + * @param tracestate the trace state for the span context. + * @return a new {@code SpanContext} with the given identifiers and options. + * @since 0.16 + */ + static SpanContext create( + TraceId traceId, SpanId spanId, TraceOptions traceOptions, Tracestate tracestate) { + return new SpanContext(traceId, spanId, traceOptions, tracestate); + } + + /** + * Returns the trace identifier associated with this {@code SpanContext}. + * + * @return the trace identifier associated with this {@code SpanContext}. + * @since 0.5 + */ + TraceId getTraceId() { + return traceId; + } + + /** + * Returns the span identifier associated with this {@code SpanContext}. + * + * @return the span identifier associated with this {@code SpanContext}. + * @since 0.5 + */ + SpanId getSpanId() { + return spanId; + } + + /** + * Returns the {@code TraceOptions} associated with this {@code SpanContext}. + * + * @return the {@code TraceOptions} associated with this {@code SpanContext}. + * @since 0.5 + */ + TraceOptions getTraceOptions() { + return traceOptions; + } + + /** + * Returns the {@code Tracestate} associated with this {@code SpanContext}. + * + * @return the {@code Tracestate} associated with this {@code SpanContext}. + * @since 0.5 + */ + Tracestate getTracestate() { + return tracestate; + } + + /** + * Returns true if this {@code SpanContext} is valid. + * + * @return true if this {@code SpanContext} is valid. + * @since 0.5 + */ + boolean isValid() { + return traceId.isValid() && spanId.isValid(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof SpanContext)) { + return false; + } + + SpanContext that = (SpanContext) obj; + return traceId.equals(that.traceId) + && spanId.equals(that.spanId) + && traceOptions.equals(that.traceOptions); + } + + @Override + public int hashCode() { + return Arrays.hashCode(new Object[]{traceId, spanId, traceOptions}); + } + + private SpanContext( + TraceId traceId, SpanId spanId, TraceOptions traceOptions, Tracestate tracestate) { + this.traceId = traceId; + this.spanId = spanId; + this.traceOptions = traceOptions; + this.tracestate = tracestate; + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/SpanId.java b/sdk/src/main/java/io/dapr/internal/opencensus/SpanId.java new file mode 100644 index 000000000..b7481d59c --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/SpanId.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import javax.annotation.concurrent.Immutable; + +/** + * A class that represents a span identifier. A valid span identifier is an 8-byte array with at + * least one non-zero byte. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceId.java

+ * @since 0.5 + */ +@Immutable +final class SpanId implements Comparable { + + /** + * The size in bytes of the {@code SpanId}. + * + * @since 0.5 + */ + public static final int SIZE = 8; + + private static final int BASE16_SIZE = 2 * SIZE; + + private static final long INVALID_ID = 0; + + // The internal representation of the SpanId. + private final long id; + + private SpanId(long id) { + this.id = id; + } + + /** + * Returns a {@code SpanId} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code SpanId} + * begins. + * @return a {@code SpanId} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if not enough characters in the {@code src} from the {@code + * srcOffset}. + * @since 0.11 + */ + static SpanId fromLowerBase16(CharSequence src, int srcOffset) { + Utils.checkNotNull(src, "src"); + return new SpanId(BigendianEncoding.longFromBase16String(src, srcOffset)); + } + + /** + * Returns the byte representation of the {@code SpanId}. + * + * @return the byte representation of the {@code SpanId}. + * @since 0.5 + */ + byte[] getBytes() { + byte[] bytes = new byte[SIZE]; + BigendianEncoding.longToByteArray(id, bytes, 0); + return bytes; + } + + /** + * Copies the byte array representations of the {@code SpanId} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+SpanId.SIZE} is greater than {@code + * dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + BigendianEncoding.longToByteArray(id, dest, destOffset); + } + + /** + * Copies the lowercase base16 representations of the {@code SpanId} into the {@code dest} + * beginning at the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws IndexOutOfBoundsException if {@code destOffset + 2 * SpanId.SIZE} is greater than + * {@code dest.length}. + * @since 0.18 + */ + void copyLowerBase16To(char[] dest, int destOffset) { + BigendianEncoding.longToBase16String(id, dest, destOffset); + } + + /** + * Returns whether the span identifier is valid. A valid span identifier is an 8-byte array with + * at least one non-zero byte. + * + * @return {@code true} if the span identifier is valid. + * @since 0.5 + */ + boolean isValid() { + return id != INVALID_ID; + } + + /** + * Returns the lowercase base16 encoding of this {@code SpanId}. + * + * @return the lowercase base16 encoding of this {@code SpanId}. + * @since 0.11 + */ + String toLowerBase16() { + char[] chars = new char[BASE16_SIZE]; + copyLowerBase16To(chars, 0); + return new String(chars); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof SpanId)) { + return false; + } + + SpanId that = (SpanId) obj; + return id == that.id; + } + + @Override + public int hashCode() { + // Copied from Long.hashCode in java8. + return (int) (id ^ (id >>> 32)); + } + + @Override + public String toString() { + return "SpanId{spanId=" + toLowerBase16() + "}"; + } + + @Override + public int compareTo(SpanId that) { + // Copied from Long.compare in java8. + return (id < that.id) ? -1 : ((id == that.id) ? 0 : 1); + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/TraceContextFormat.java b/sdk/src/main/java/io/dapr/internal/opencensus/TraceContextFormat.java new file mode 100644 index 000000000..5bad050aa --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/TraceContextFormat.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import com.google.common.base.Splitter; +import io.grpc.Metadata; + +import java.util.List; +import java.util.regex.Pattern; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Implementation of the TraceContext propagation protocol. See w3c/distributed-tracing. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/impl_core/src/main/java/io/opencensus/implcore/ + * trace/propagation/TraceContextFormat.java

+ */ +class TraceContextFormat { + + private static final Tracestate TRACESTATE_DEFAULT = Tracestate.builder().build(); + private static final String TRACEPARENT = "traceparent"; + private static final String TRACESTATE = "tracestate"; + + private static final Metadata.Key TRACEPARENT_KEY = + Metadata.Key.of(TRACEPARENT, Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key TRACESTATE_KEY = + Metadata.Key.of(TRACESTATE, Metadata.ASCII_STRING_MARSHALLER); + + private static final int VERSION_SIZE = 2; + private static final char TRACEPARENT_DELIMITER = '-'; + private static final int TRACEPARENT_DELIMITER_SIZE = 1; + private static final int TRACE_ID_HEX_SIZE = 2 * TraceId.SIZE; + private static final int SPAN_ID_HEX_SIZE = 2 * SpanId.SIZE; + private static final int TRACE_OPTION_HEX_SIZE = 2 * TraceOptions.SIZE; + private static final int TRACE_ID_OFFSET = VERSION_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int SPAN_ID_OFFSET = + TRACE_ID_OFFSET + TRACE_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int TRACE_OPTION_OFFSET = + SPAN_ID_OFFSET + SPAN_ID_HEX_SIZE + TRACEPARENT_DELIMITER_SIZE; + private static final int TRACEPARENT_HEADER_SIZE = TRACE_OPTION_OFFSET + TRACE_OPTION_HEX_SIZE; + private static final int TRACESTATE_MAX_MEMBERS = 32; + private static final char TRACESTATE_KEY_VALUE_DELIMITER = '='; + private static final char TRACESTATE_ENTRY_DELIMITER = ','; + private static final Splitter TRACESTATE_ENTRY_DELIMITER_SPLITTER = + Splitter.on(Pattern.compile("[ \t]*" + TRACESTATE_ENTRY_DELIMITER + "[ \t]*")); + + /** + * Extracts span context from gRPC's metadata. + * @param metadata gRPC's metadata. + * @return span context. + */ + static SpanContext extract(Metadata metadata) { + String traceparent = metadata.get(TRACEPARENT_KEY); + if (traceparent == null) { + throw new RuntimeException("Traceparent not present"); + } + + checkArgument( + traceparent.charAt(TRACE_OPTION_OFFSET - 1) == TRACEPARENT_DELIMITER + && (traceparent.length() == TRACEPARENT_HEADER_SIZE + || (traceparent.length() > TRACEPARENT_HEADER_SIZE + && traceparent.charAt(TRACEPARENT_HEADER_SIZE) == TRACEPARENT_DELIMITER)) + && traceparent.charAt(SPAN_ID_OFFSET - 1) == TRACEPARENT_DELIMITER + && traceparent.charAt(TRACE_OPTION_OFFSET - 1) == TRACEPARENT_DELIMITER, + "Missing or malformed TRACEPARENT."); + + TraceId traceId = TraceId.fromLowerBase16(traceparent, TRACE_ID_OFFSET); + SpanId spanId = SpanId.fromLowerBase16(traceparent, SPAN_ID_OFFSET); + TraceOptions traceOptions = TraceOptions.fromLowerBase16(traceparent, TRACE_OPTION_OFFSET); + + String tracestate = metadata.get(TRACESTATE_KEY); + if (tracestate == null || tracestate.isEmpty()) { + return SpanContext.create(traceId, spanId, traceOptions, TRACESTATE_DEFAULT); + } + Tracestate.Builder tracestateBuilder = Tracestate.builder(); + List listMembers = TRACESTATE_ENTRY_DELIMITER_SPLITTER.splitToList(tracestate); + checkArgument( + listMembers.size() <= TRACESTATE_MAX_MEMBERS, "Tracestate has too many elements."); + // Iterate in reverse order because when call builder set the elements is added in the + // front of the list. + for (int i = listMembers.size() - 1; i >= 0; i--) { + String listMember = listMembers.get(i); + int index = listMember.indexOf(TRACESTATE_KEY_VALUE_DELIMITER); + checkArgument(index != -1, "Invalid tracestate list-member format."); + tracestateBuilder.set( + listMember.substring(0, index), listMember.substring(index + 1, listMember.length())); + } + return SpanContext.create(traceId, spanId, traceOptions, tracestateBuilder.build()); + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/TraceId.java b/sdk/src/main/java/io/dapr/internal/opencensus/TraceId.java new file mode 100644 index 000000000..f62c82ae5 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/TraceId.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import javax.annotation.concurrent.Immutable; + +/** + * A class that represents a trace identifier. A valid trace identifier is a 16-byte array with at + * least one non-zero byte. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceId.java.

+ * @since 0.5 + */ +@Immutable +final class TraceId implements Comparable { + /** + * The size in bytes of the {@code TraceId}. + * + * @since 0.5 + */ + static final int SIZE = 16; + private static final int BASE16_SIZE = 2 * BigendianEncoding.LONG_BASE16; + private static final long INVALID_ID = 0; + + // The internal representation of the TraceId. + private final long idHi; + private final long idLo; + + private TraceId(long idHi, long idLo) { + this.idHi = idHi; + this.idLo = idLo; + } + + /** + * Returns a {@code TraceId} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code TraceId} + * begins. + * @return a {@code TraceId} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if not enough characters in the {@code src} from the {@code + * srcOffset}. + * @since 0.11 + */ + static TraceId fromLowerBase16(CharSequence src, int srcOffset) { + Utils.checkNotNull(src, "src"); + return new TraceId( + BigendianEncoding.longFromBase16String(src, srcOffset), + BigendianEncoding.longFromBase16String(src, srcOffset + BigendianEncoding.LONG_BASE16)); + } + + /** + * Copies the byte array representations of the {@code TraceId} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+TraceId.SIZE} is greater than {@code + * dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + BigendianEncoding.longToByteArray(idHi, dest, destOffset); + BigendianEncoding.longToByteArray(idLo, dest, destOffset + BigendianEncoding.LONG_BYTES); + } + + /** + * Copies the lowercase base16 representations of the {@code TraceId} into the {@code dest} + * beginning at the {@code destOffset} offset. + * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws IndexOutOfBoundsException if {@code destOffset + 2 * TraceId.SIZE} is greater than + * {@code dest.length}. + * @since 0.18 + */ + void copyLowerBase16To(char[] dest, int destOffset) { + BigendianEncoding.longToBase16String(idHi, dest, destOffset); + BigendianEncoding.longToBase16String(idLo, dest, destOffset + BASE16_SIZE / 2); + } + + /** + * Returns whether the {@code TraceId} is valid. A valid trace identifier is a 16-byte array with + * at least one non-zero byte. + * + * @return {@code true} if the {@code TraceId} is valid. + * @since 0.5 + */ + boolean isValid() { + return idHi != INVALID_ID || idLo != INVALID_ID; + } + + /** + * Returns the lowercase base16 encoding of this {@code TraceId}. + * + * @return the lowercase base16 encoding of this {@code TraceId}. + * @since 0.11 + */ + String toLowerBase16() { + char[] chars = new char[BASE16_SIZE]; + copyLowerBase16To(chars, 0); + return new String(chars); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof TraceId)) { + return false; + } + + TraceId that = (TraceId) obj; + return idHi == that.idHi && idLo == that.idLo; + } + + @Override + public int hashCode() { + // Copied from Arrays.hashCode(long[]) + int result = 1; + result = 31 * result + ((int) (idHi ^ (idHi >>> 32))); + result = 31 * result + ((int) (idLo ^ (idLo >>> 32))); + return result; + } + + @Override + public String toString() { + return "TraceId{traceId=" + toLowerBase16() + "}"; + } + + @Override + public int compareTo(TraceId that) { + if (idHi == that.idHi) { + if (idLo == that.idLo) { + return 0; + } + return idLo < that.idLo ? -1 : 1; + } + return idHi < that.idHi ? -1 : 1; + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/TraceOptions.java b/sdk/src/main/java/io/dapr/internal/opencensus/TraceOptions.java new file mode 100644 index 000000000..c44f04ace --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/TraceOptions.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import javax.annotation.concurrent.Immutable; +import java.util.Arrays; + +/** + * A class that represents global trace options. These options are propagated to all child spans. + * These determine features such as whether a {@code Span} should be traced. It is implemented as a bitmask. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/TraceOptions.java

+ * @since 0.5 + */ +@Immutable +final class TraceOptions { + /** + * The size in bytes of the {@code TraceOptions}. + * + * @since 0.5 + */ + static final int SIZE = 1; + + // The set of enabled features is determined by all the enabled bits. + private final byte options; + + // Creates a new {@code TraceOptions} with the given options. + private TraceOptions(byte options) { + this.options = options; + } + + /** + * Returns a {@code TraceOption} built from a lowercase base16 representation. + * + * @param src the lowercase base16 representation. + * @param srcOffset the offset in the buffer where the representation of the {@code TraceOptions} + * begins. + * @return a {@code TraceOption} built from a lowercase base16 representation. + * @throws NullPointerException if {@code src} is null. + * @throws IllegalArgumentException if {@code src.length} is not {@code 2 * TraceOption.SIZE} OR + * if the {@code str} has invalid characters. + * @since 0.18 + */ + static TraceOptions fromLowerBase16(CharSequence src, int srcOffset) { + return new TraceOptions(BigendianEncoding.byteFromBase16String(src, srcOffset)); + } + + /** + * Copies the byte representations of the {@code TraceOptions} into the {@code dest} beginning at + * the {@code destOffset} offset. + * + *

Equivalent with (but faster because it avoids any new allocations): + * + *

{@code
+   * System.arraycopy(getBytes(), 0, dest, destOffset, TraceOptions.SIZE);
+   * }
+ * + * @param dest the destination buffer. + * @param destOffset the starting offset in the destination buffer. + * @throws NullPointerException if {@code dest} is null. + * @throws IndexOutOfBoundsException if {@code destOffset+TraceOptions.SIZE} is greater than + * {@code dest.length}. + * @since 0.5 + */ + void copyBytesTo(byte[] dest, int destOffset) { + Utils.checkIndex(destOffset, dest.length); + dest[destOffset] = options; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof TraceOptions)) { + return false; + } + + TraceOptions that = (TraceOptions) obj; + return options == that.options; + } + + @Override + public int hashCode() { + return Arrays.hashCode(new byte[]{options}); + } + +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/Tracestate.java b/sdk/src/main/java/io/dapr/internal/opencensus/Tracestate.java new file mode 100644 index 000000000..c87031b37 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/Tracestate.java @@ -0,0 +1,260 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +import javax.annotation.concurrent.Immutable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Carries tracing-system specific context in a list of key-value pairs. TraceState allows different + * vendors propagate additional information and inter-operate with their legacy Id formats. + * + *

Implementation is optimized for a small list of key-value pairs. + * + *

Key is opaque string up to 256 characters printable. It MUST begin with a lowercase letter, + * and can only contain lowercase letters a-z, digits 0-9, underscores _, dashes -, asterisks *, and + * forward slashes /. + * + *

Value is opaque string up to 256 characters printable ASCII RFC0020 characters (i.e., the + * range 0x20 to 0x7E) except comma , and =. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/trace/Tracestate.java

+ * @since 0.16 + */ +@Immutable +class Tracestate { + private static final int KEY_MAX_SIZE = 256; + private static final int VALUE_MAX_SIZE = 256; + private static final int MAX_KEY_VALUE_PAIRS = 32; + + private final List entries; + + private Tracestate(List entries) { + this.entries = entries; + } + + /** + * Returns the value to which the specified key is mapped, or null if this map contains no mapping + * for the key. + * + * @param key with which the specified value is to be associated + * @return the value to which the specified key is mapped, or null if this map contains no mapping for the key. + * @since 0.16 + */ + @javax.annotation.Nullable + String get(String key) { + for (Entry entry : getEntries()) { + if (entry.getKey().equals(key)) { + return entry.getValue(); + } + } + return null; + } + + /** + * Returns a {@link List} view of the mappings contained in this {@code TraceState}. + * + * @return a {@link List} view of the mappings contained in this {@code TraceState}. + * @since 0.16 + */ + List getEntries() { + return this.entries; + } + + /** + * Returns a {@code Builder} based on an empty {@code Tracestate}. + * + * @return a {@code Builder} based on an empty {@code Tracestate}. + * @since 0.16 + */ + static Builder builder() { + return new Builder(Builder.EMPTY); + } + + /** + * Builder class. + * + * @since 0.16 + */ + static final class Builder { + private final Tracestate parent; + @javax.annotation.Nullable + private ArrayList entries; + + // Needs to be in this class to avoid initialization deadlock because super class depends on + // subclass (the auto-value generate class). + private static final Tracestate EMPTY = create(Collections.emptyList()); + + private Builder(Tracestate parent) { + Utils.checkNotNull(parent, "parent"); + this.parent = parent; + this.entries = null; + } + + /** + * Adds or updates the {@code Entry} that has the given {@code key} if it is present. The new + * {@code Entry} will always be added in the front of the list of entries. + * + * @param key the key for the {@code Entry} to be added. + * @param value the value for the {@code Entry} to be added. + * @return this. + * @since 0.16 + */ + @SuppressWarnings("nullness") + Builder set(String key, String value) { + // Initially create the Entry to validate input. + Entry entry = new Entry(key, value); + if (entries == null) { + // Copy entries from the parent. + entries = new ArrayList(parent.getEntries()); + } + for (int i = 0; i < entries.size(); i++) { + if (entries.get(i).getKey().equals(entry.getKey())) { + entries.remove(i); + // Exit now because the entries list cannot contain duplicates. + break; + } + } + // Inserts the element at the front of this list. + entries.add(0, entry); + return this; + } + + /** + * Removes the {@code Entry} that has the given {@code key} if it is present. + * + * @param key the key for the {@code Entry} to be removed. + * @return this. + * @since 0.16 + */ + @SuppressWarnings("nullness") + Builder remove(String key) { + Utils.checkNotNull(key, "key"); + if (entries == null) { + // Copy entries from the parent. + entries = new ArrayList(parent.getEntries()); + } + for (int i = 0; i < entries.size(); i++) { + if (entries.get(i).getKey().equals(key)) { + entries.remove(i); + // Exit now because the entries list cannot contain duplicates. + break; + } + } + return this; + } + + /** + * Builds a TraceState by adding the entries to the parent in front of the key-value pairs list + * and removing duplicate entries. + * + * @return a TraceState with the new entries. + * @since 0.16 + */ + Tracestate build() { + if (entries == null) { + return parent; + } + return Tracestate.create(entries); + } + } + + /** + * Immutable key-value pair for {@code Tracestate}. + * + * @since 0.16 + */ + @Immutable + static class Entry { + + private final String key; + + private final String value; + + /** + * Creates a new {@code Entry} for the {@code Tracestate}. + * + * @param key the Entry's key. + * @param value the Entry's value. + * @since 0.16 + */ + Entry(String key, String value) { + Utils.checkNotNull(key, "key"); + Utils.checkNotNull(value, "value"); + Utils.checkArgument(validateKey(key), "Invalid key %s", key); + Utils.checkArgument(validateValue(value), "Invalid value %s", value); + this.key = key; + this.value = value; + } + + /** + * Returns the key {@code String}. + * + * @return the key {@code String}. + * @since 0.16 + */ + String getKey() { + return this.key; + } + + /** + * Returns the value {@code String}. + * + * @return the value {@code String}. + * @since 0.16 + */ + String getValue() { + return this.value; + } + } + + // Key is opaque string up to 256 characters printable. It MUST begin with a lowercase letter, and + // can only contain lowercase letters a-z, digits 0-9, underscores _, dashes -, asterisks *, and + // forward slashes /. + static boolean validateKey(String key) { + if (key.length() > KEY_MAX_SIZE + || key.isEmpty() + || key.charAt(0) < 'a' + || key.charAt(0) > 'z') { + return false; + } + for (int i = 1; i < key.length(); i++) { + char c = key.charAt(i); + if (!(c >= 'a' && c <= 'z') + && !(c >= '0' && c <= '9') + && c != '_' + && c != '-' + && c != '*' + && c != '/') { + return false; + } + } + return true; + } + + // Value is opaque string up to 256 characters printable ASCII RFC0020 characters (i.e., the range + // 0x20 to 0x7E) except comma , and =. + static boolean validateValue(String value) { + if (value.length() > VALUE_MAX_SIZE || value.charAt(value.length() - 1) == ' ' /* '\u0020' */) { + return false; + } + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + if (c == ',' || c == '=' || c < ' ' /* '\u0020' */ || c > '~' /* '\u007E' */) { + return false; + } + } + return true; + } + + private static Tracestate create(List entries) { + Utils.checkState(entries.size() <= MAX_KEY_VALUE_PAIRS, "Invalid size"); + return new Tracestate(Collections.unmodifiableList(entries)); + } +} \ No newline at end of file diff --git a/sdk/src/main/java/io/dapr/internal/opencensus/Utils.java b/sdk/src/main/java/io/dapr/internal/opencensus/Utils.java new file mode 100644 index 000000000..0208dfc28 --- /dev/null +++ b/sdk/src/main/java/io/dapr/internal/opencensus/Utils.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.internal.opencensus; + +/** + * General internal utility methods. + * + *

Code originally from https://github.com/census-instrumentation/opencensus-java/blob/ + * 446e9bde9b1f6c0317e3f310644997e5d6d5eab2/api/src/main/java/io/opencensus/internal/Utils.java

+ */ +final class Utils { + + private Utils() { + } + + /** + * Throws an {@link IllegalArgumentException} if the argument is false. This method is similar to + * {@code Preconditions.checkArgument(boolean, Object)} from Guava. + * + * @param isValid whether the argument check passed. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + static void checkArgument( + boolean isValid, @javax.annotation.Nullable Object errorMessage) { + if (!isValid) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + + /** + * Throws an {@link IllegalArgumentException} if the argument is false. This method is similar to + * {@code Preconditions.checkArgument(boolean, Object)} from Guava. + * + * @param expression a boolean expression + * @param errorMessageTemplate a template for the exception message should the check fail. The + * message is formed by replacing each {@code %s} placeholder in the template with an + * argument. These are matched by position - the first {@code %s} gets {@code + * errorMessageArgs[0]}, etc. Unmatched arguments will be appended to the formatted + * message in + * square braces. Unmatched placeholders will be left as-is. + * @param errorMessageArgs the arguments to be substituted into the message template. Arguments + * are converted to strings using {@link String#valueOf(Object)}. + * @throws IllegalArgumentException if {@code expression} is false + * @throws NullPointerException if the check fails and either {@code errorMessageTemplate} or + * {@code errorMessageArgs} is null (don't let this happen) + */ + static void checkArgument( + boolean expression, + String errorMessageTemplate, + @javax.annotation.Nullable Object... errorMessageArgs) { + if (!expression) { + throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); + } + } + + /** + * Throws an {@link IllegalStateException} if the argument is false. This method is similar to + * {@code Preconditions.checkState(boolean, Object)} from Guava. + * + * @param isValid whether the state check passed. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + */ + static void checkState(boolean isValid, @javax.annotation.Nullable Object errorMessage) { + if (!isValid) { + throw new IllegalStateException(String.valueOf(errorMessage)); + } + } + + /** + * Validates an index in an array or other container. This method throws an {@link + * IllegalArgumentException} if the size is negative and throws an {@link + * IndexOutOfBoundsException} if the index is negative or greater than or equal to the size. This + * method is similar to {@code Preconditions.checkElementIndex(int, int)} from Guava. + * + * @param index the index to validate. + * @param size the size of the array or container. + */ + static void checkIndex(int index, int size) { + if (size < 0) { + throw new IllegalArgumentException("Negative size: " + size); + } + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("Index out of bounds: size=" + size + ", index=" + index); + } + } + + /** + * Throws a {@link NullPointerException} if the argument is null. This method is similar to {@code + * Preconditions.checkNotNull(Object, Object)} from Guava. + * + * @param arg the argument to check for null. + * @param errorMessage the message to use for the exception. Will be converted to a string using + * {@link String#valueOf(Object)}. + * @param Object checked. + * @return the argument, if it passes the null check. + */ + public static T checkNotNull(T arg, @javax.annotation.Nullable Object errorMessage) { + if (arg == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return arg; + } + + /** + * Substitutes each {@code %s} in {@code template} with an argument. These are matched by + * position: the first {@code %s} gets {@code args[0]}, etc. If there are more arguments than + * placeholders, the unmatched arguments will be appended to the end of the formatted message in + * square braces. + * + *

Copied from {@code Preconditions.format(String, Object...)} from Guava + * + * @param template a non-null string containing 0 or more {@code %s} placeholders. + * @param args the arguments to be substituted into the message template. Arguments are converted + * to strings using {@link String#valueOf(Object)}. Arguments can be null. + */ + // Note that this is somewhat-improperly used from Verify.java as well. + private static String format(String template, @javax.annotation.Nullable Object... args) { + // If no arguments return the template. + if (args == null) { + return template; + } + + // start substituting the arguments into the '%s' placeholders + StringBuilder builder = new StringBuilder(template.length() + 16 * args.length); + int templateStart = 0; + int i = 0; + while (i < args.length) { + int placeholderStart = template.indexOf("%s", templateStart); + if (placeholderStart == -1) { + break; + } + builder.append(template, templateStart, placeholderStart); + builder.append(args[i++]); + templateStart = placeholderStart + 2; + } + builder.append(template, templateStart, template.length()); + + // if we run out of placeholders, append the extra args in square braces + if (i < args.length) { + builder.append(" ["); + builder.append(args[i++]); + while (i < args.length) { + builder.append(", "); + builder.append(args[i++]); + } + builder.append(']'); + } + + return builder.toString(); + } +} \ No newline at end of file diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java new file mode 100644 index 000000000..13876f9b2 --- /dev/null +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTelemetryTest.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.client; + +import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.InvokeMethodRequestBuilder; +import io.dapr.client.domain.Response; +import io.dapr.serializer.DefaultObjectSerializer; +import io.dapr.utils.TypeRef; +import io.dapr.v1.CommonProtos; +import io.dapr.v1.DaprGrpc; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.ServerServiceDefinition; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import reactor.core.publisher.Mono; +import reactor.util.context.Context; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class DaprClientGrpcTelemetryTest { + + private static final Metadata.Key GRPC_TRACE_BIN_KEY = Metadata.Key.of(Headers.GRPC_TRACE_BIN, + Metadata.BINARY_BYTE_MARSHALLER); + + private static final Metadata.Key TRACEPARENT_KEY = Metadata.Key.of("traceparent", + Metadata.ASCII_STRING_MARSHALLER); + + private static final Metadata.Key TRACESTATE_KEY = Metadata.Key.of("tracestate", + Metadata.ASCII_STRING_MARSHALLER); + + /** + * This rule manages automatic graceful shutdown for the registered servers and channels at the + * end of test. + */ + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private DaprClient client; + + @Parameterized.Parameter + public Scenario scenario; + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Scenario[][]{ + { + new Scenario() {{ + traceparent = "00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01"; + tracestate = "congo=ucfJifl5GOE,rojo=00f067aa0ba902b7"; + expectGrpcTraceBin = true; + }} + }, + { + new Scenario() {{ + traceparent = null; + tracestate = null; + expectGrpcTraceBin = false; + }} + }, + { + new Scenario() {{ + traceparent = null; + tracestate = "congo=ucfJifl5GOE,rojo=00f067aa0ba902b7"; + expectGrpcTraceBin = false; + }} + }, + { + new Scenario() {{ + traceparent = "00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01"; + tracestate = null; + expectGrpcTraceBin = true; + }}, + }, + { + new Scenario() {{ + traceparent = "BAD FORMAT"; + tracestate = null; + expectGrpcTraceBin = false; + }}, + }, + { + new Scenario() {{ + traceparent = "00-0af7651916cd43dd8448eb211c80319c-b9c7c989f97918e1-01"; + tracestate = "INVALID"; + expectGrpcTraceBin = false; + }}, + }, + { + null + } + }); + } + + @Before + public void setup() throws IOException { + DaprGrpc.DaprImplBase daprImplBase = new DaprGrpc.DaprImplBase() { + + public void invokeService(io.dapr.v1.DaprProtos.InvokeServiceRequest request, + io.grpc.stub.StreamObserver responseObserver) { + responseObserver.onNext(CommonProtos.InvokeResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + }; + + ServerServiceDefinition service = ServerInterceptors.intercept(daprImplBase, new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall(ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + if (scenario == null) { + assertNull(metadata.get(TRACEPARENT_KEY)); + assertNull(metadata.get(TRACESTATE_KEY)); + assertNull(metadata.get(GRPC_TRACE_BIN_KEY)); + return serverCallHandler.startCall(serverCall, metadata); + } + + assertEquals(scenario.traceparent, metadata.get(TRACEPARENT_KEY)); + assertEquals(scenario.tracestate, metadata.get(TRACESTATE_KEY)); + assertTrue((metadata.get(GRPC_TRACE_BIN_KEY) != null) == scenario.expectGrpcTraceBin); + return serverCallHandler.startCall(serverCall, metadata); + } + }); + + // Generate a unique in-process server name. + String serverName = InProcessServerBuilder.generateName(); + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor() + .addService(service) + .build().start()); + + // Create a client channel and register for automatic graceful shutdown. + ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + Closeable closeableChannel = () -> { + if (channel != null && !channel.isShutdown()) { + channel.shutdown(); + } + }; + DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); + client = new DaprClientGrpc( + closeableChannel, asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + } + + @Test + public void invokeServiceVoidWithTracingTest() { + Context context = null; + if (scenario != null) { + context = Context.empty(); + if (scenario.traceparent != null) { + context = context.put("traceparent", scenario.traceparent); + } + if (scenario.tracestate != null) { + context = context.put("tracestate", scenario.tracestate); + } + } + InvokeMethodRequest req = new InvokeMethodRequestBuilder("appId", "method") + .withBody("request") + .withHttpExtension(HttpExtension.NONE) + .withContext(context) + .build(); + Mono> result = this.client.invokeMethod(req, TypeRef.get(Void.class)); + result.block(); + } + + @After + public void tearDown() throws Exception { + client.close(); + } + + public static class Scenario { + public String traceparent; + public String tracestate; + public boolean expectGrpcTraceBin; + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 0df44775b..ce5b9c697 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -17,6 +17,8 @@ import io.dapr.client.domain.GetBulkStateRequestBuilder; import io.dapr.client.domain.GetStateRequest; import io.dapr.client.domain.GetStateRequestBuilder; import io.dapr.client.domain.HttpExtension; +import io.dapr.client.domain.InvokeMethodRequest; +import io.dapr.client.domain.InvokeMethodRequestBuilder; import io.dapr.client.domain.Response; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; @@ -38,6 +40,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.stubbing.Answer; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.io.Closeable; import java.io.IOException; diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 146846b45..981c61288 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -14,6 +14,7 @@ import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.config.Properties; +import io.dapr.exceptions.DaprException; import io.dapr.utils.TypeRef; import okhttp3.OkHttpClient; import okhttp3.ResponseBody; @@ -189,13 +190,80 @@ public class DaprClientHttpTest { }); } + @Test + public void invokeServiceDaprError() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("MYCODE", exception.getErrorCode()); + assertEquals("MYCODE: My Message", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorFromGRPC() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"code\": 7 }", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("PERMISSION_DENIED", exception.getErrorCode()); + assertEquals("PERMISSION_DENIED: HTTP status code: 500", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorUnknownJSON() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "{ \"anything\": 7 }", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertEquals("UNKNOWN: { \"anything\": 7 }", exception.getMessage()); + } + + @Test + public void invokeServiceDaprErrorEmptyString() { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/myapp/method/mymethod") + .respond(500, + ResponseBody.create( + "", + MediaTypes.MEDIATYPE_JSON)); + + DaprException exception = assertThrows(DaprException.class, () -> { + daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + }); + + assertEquals("UNKNOWN", exception.getErrorCode()); + assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); + } + @Test public void invokeServiceMethodNull() { mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/publish/A") .respond(EXPECTED_RESULT); - String event = "{ \"message\": \"This is a test\" }"; assertThrows(IllegalArgumentException.class, () -> daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block()); @@ -337,7 +405,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseNull() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(new byte[0]); @@ -348,7 +415,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseObject() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"OK\""); @@ -370,7 +436,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseFloat() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1.5"); @@ -381,7 +446,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseChar() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"a\""); @@ -392,7 +456,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseByte() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("\"2\""); @@ -403,7 +466,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseLong() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); @@ -414,7 +476,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingResponseInt() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond("1"); @@ -425,7 +486,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingNullName() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); @@ -436,7 +496,6 @@ public class DaprClientHttpTest { @Test public void invokeBindingNullOpName() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); @@ -447,7 +506,6 @@ public class DaprClientHttpTest { @Test public void bindingNoHotMono() { - Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") .respond(EXPECTED_RESULT); diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java index c5a884cb3..50e02df30 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java @@ -5,7 +5,7 @@ package io.dapr.client; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import reactor.core.publisher.Mono; import java.util.Map; diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java index bffb461e5..e032538b5 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java @@ -6,7 +6,7 @@ package io.dapr.client; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; -import io.opentelemetry.context.Context; +import reactor.util.context.Context; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.ResponseBody; @@ -59,7 +59,7 @@ public class DaprHttpTest { assertEquals("xyz", Properties.API_TOKEN.get()); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.current()); + daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -75,7 +75,7 @@ public class DaprHttpTest { assertNull(Properties.API_TOKEN.get()); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.current()); + daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -91,7 +91,7 @@ public class DaprHttpTest { .respond(serializer.serialize(EXPECTED_RESULT)); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.current()); + daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -105,7 +105,7 @@ public class DaprHttpTest { .addHeader("Header", "Value"); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.current()); + daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -118,7 +118,7 @@ public class DaprHttpTest { .respond(serializer.serialize(EXPECTED_RESULT)); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.current()); + daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -130,7 +130,7 @@ public class DaprHttpTest { .get("http://127.0.0.1:3500/v1.0/get") .respond(serializer.serialize(EXPECTED_RESULT)); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); - Mono mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.current()); + Mono mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -148,7 +148,7 @@ public class DaprHttpTest { .respond(serializer.serialize(EXPECTED_RESULT)); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.current()); + daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -161,7 +161,7 @@ public class DaprHttpTest { .respond(500); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.current()); + daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -175,7 +175,7 @@ public class DaprHttpTest { .respond(500, ResponseBody.create(MediaType.parse("text"), "{\"errorCode\":null,\"message\":null}")); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); - Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.current()); + Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -188,7 +188,7 @@ public class DaprHttpTest { .respond(500, ResponseBody.create(MediaType.parse("application/json"), "{\"errorCode\":\"null\",\"message\":\"null\"}")); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); - Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.current()); + Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); assertEquals(EXPECTED_RESULT, body); @@ -231,11 +231,11 @@ public class DaprHttpTest { .respond(200, ResponseBody.create(MediaType.parse("application/json"), serializer.serialize(existingState))); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); - Mono response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.current()); + Mono response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty()); assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class)); - Mono responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.current()); + Mono responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty()); Mono responseDeleteKey = - daprHttp.invokeApi("DELETE", urlDeleteState.split("/"), null, null, Context.current()); + daprHttp.invokeApi("DELETE", urlDeleteState.split("/"), null, null, Context.empty()); assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class)); mockInterceptor.reset(); mockInterceptor.addRule() diff --git a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java index 7cdd9ecc6..812e65bbe 100644 --- a/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java +++ b/sdk/src/test/java/io/dapr/serializer/DefaultObjectSerializerTest.java @@ -5,12 +5,19 @@ package io.dapr.serializer; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.dapr.client.domain.CloudEvent; import io.dapr.utils.TypeRef; +import io.dapr.v1.CommonProtos; import org.junit.Test; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.lang.reflect.Type; import java.util.ArrayList; @@ -23,6 +30,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -393,6 +401,32 @@ public class DefaultObjectSerializerTest { } } + @Test + public void serializeProtoTest() throws Exception { + CommonProtos.Etag valueToSerialize = CommonProtos.Etag.newBuilder().setValue("myValue").build(); + String expectedSerializedBase64Value = "CgdteVZhbHVl"; + + byte[] serializedValue = SERIALIZER.serialize(valueToSerialize); + assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue)); + assertNotNull(serializedValue); + CommonProtos.Etag deserializedValue = SERIALIZER.deserialize(serializedValue, CommonProtos.Etag.class); + assertEquals(valueToSerialize.getValue(), deserializedValue.getValue()); + assertEquals(valueToSerialize, deserializedValue); + } + + @Test + public void serializeFakeProtoTest() throws Exception { + FakeProtoClass valueToSerialize = new FakeProtoClass(); + String expectedSerializedBase64Value = "AQ=="; + + byte[] serializedValue = SERIALIZER.serialize(valueToSerialize); + assertEquals(expectedSerializedBase64Value, Base64.getEncoder().encodeToString(serializedValue)); + assertNotNull(serializedValue); + + // Tries to parse as JSON since FakeProtoClass does not have `parseFrom()` static method. + assertThrows(JsonParseException.class, () -> SERIALIZER.deserialize(serializedValue, FakeProtoClass.class)); + } + @Test public void deserializeObjectTest() { String jsonToDeserialize = "{\"stringValue\":\"A String\",\"intValue\":2147483647,\"boolValue\":true,\"charValue\":\"a\",\"byteValue\":65,\"shortValue\":32767,\"longValue\":9223372036854775807,\"floatValue\":1.0,\"doubleValue\":1000.0}"; @@ -854,4 +888,63 @@ public class DefaultObjectSerializerTest { return "\"" + content + "\""; } + + /** + * Class that simulates a proto class implementing MessageLite but does not have `parseFrom()` static method. + */ + public static final class FakeProtoClass implements MessageLite { + @Override + public void writeTo(CodedOutputStream codedOutputStream) throws IOException { + } + + @Override + public int getSerializedSize() { + return 0; + } + + @Override + public Parser getParserForType() { + return null; + } + + @Override + public ByteString toByteString() { + return null; + } + + @Override + public byte[] toByteArray() { + return new byte[]{0x1}; + } + + @Override + public void writeTo(OutputStream outputStream) throws IOException { + + } + + @Override + public void writeDelimitedTo(OutputStream outputStream) throws IOException { + + } + + @Override + public Builder newBuilderForType() { + return null; + } + + @Override + public Builder toBuilder() { + return null; + } + + @Override + public MessageLite getDefaultInstanceForType() { + return null; + } + + @Override + public boolean isInitialized() { + return false; + } + } }