Handle HTTP binding error. (#1024)

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-03-04 07:40:59 -08:00 committed by GitHub
parent d55095c0e1
commit 2f3e7f883d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 412 additions and 96 deletions

View File

@ -44,7 +44,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.2
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:

View File

@ -38,7 +38,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.5
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.actors.client;
import com.google.protobuf.ByteString;
import io.dapr.client.DaprClientGrpc;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
@ -128,7 +129,7 @@ class DaprGrpcClient implements DaprClient {
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
return DaprClientGrpc.intercept(context, client, null);
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {

View File

@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error

View File

@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -17,9 +17,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -28,6 +28,7 @@ import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@ -35,21 +36,73 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
public class BindingIT extends BaseIT {
private static final String BINDING_NAME = "sample123";
private static final String BINDING_OPERATION = "create";
public static class MyClass {
public MyClass() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingError(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
public String message;
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingErrorIgnoredByComponent(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertEquals(
"{\"message\":\"Not Found\",\"documentation_url\":\"https://docs.github.com/rest\"}",
new String(e.getPayload()));
}
}, 10000);
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void inputOutputBinding(boolean useGrpc) throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final String bidingName = "sample123";
String serviceNameVariant = useGrpc ? "-grpc" : "-http";
DaprRun daprRun = startDaprApp(
@ -69,7 +122,7 @@ public class BindingIT extends BaseIT {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();
try {
Thread.sleep(1000);
@ -88,14 +141,14 @@ public class BindingIT extends BaseIT {
System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
@ -127,4 +180,11 @@ public class BindingIT extends BaseIT {
}, 8000);
}
}
public static class MyClass {
public MyClass() {
}
public String message;
}
}

View File

@ -50,7 +50,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
@ -65,6 +65,7 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
@ -81,10 +82,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
import static io.dapr.internal.opencensus.GrpcWrapper.appendTracingToMetadata;
/**
* An adapter for the GRPC Client.
*
@ -351,6 +356,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
*/
@Override
public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type) {
Metadata responseMetadata = new Metadata();
try {
final String name = request.getName();
final String operation = request.getOperation();
@ -377,10 +383,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));
}
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
@ -1155,21 +1170,74 @@ public class DaprClientGrpc extends AbstractDaprClient {
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
private static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client) {
return intercept(context, client, null);
}
/**
* Populates GRPC client with interceptors for telemetry - internal use only.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Handles metadata result.
* @return Client after adding interceptors.
*/
public static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client,
Consumer<Metadata> metadataConsumer) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, options);
return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
appendTracingToMetadata(context, metadata);
final ClientCall.Listener<RespT> headerListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
if (metadataConsumer != null) {
metadataConsumer.accept(headers);
}
}
};
super.start(headerListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}
private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}
private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
return this.createFlux(null, consumer);
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
@ -1178,7 +1246,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}
@Override
@ -1188,7 +1256,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
};
}
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
@ -1197,7 +1265,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}
@Override

View File

@ -18,6 +18,7 @@ import io.dapr.client.domain.Metadata;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.utils.Version;
import okhttp3.Call;
import okhttp3.Callback;
@ -381,17 +382,19 @@ public class DaprHttp implements AutoCloseable {
@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
if (!response.isSuccessful()) {
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
try {
byte[] payload = getBodyBytesOrEmptyArray(response);
DaprError error = parseDaprError(payload);
if (error != null) {
future.completeExceptionally(new DaprException(error, payload, response.code()));
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
return;
}
future.completeExceptionally(
new DaprException("UNKNOWN", "", payload, response.code()));
new DaprException("UNKNOWN", "", payload, httpStatusCode));
return;
} catch (DaprException e) {
future.completeExceptionally(e);
@ -404,8 +407,24 @@ public class DaprHttp implements AutoCloseable {
response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond());
});
future.complete(new Response(result, mapHeaders, response.code()));
future.complete(new Response(result, mapHeaders, httpStatusCode));
}
}
private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
if ((headerValue == null) || headerValue.isEmpty()) {
return defaultStatusCode;
}
// Metadata used to override status code with code received from HTTP binding.
try {
int httpStatusCode = Integer.parseInt(headerValue);
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}
return defaultStatusCode;
} catch (NumberFormatException nfe) {
return defaultStatusCode;
}
}
}

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.exceptions;
import com.google.rpc.Status;
import io.dapr.internal.exceptions.DaprHttpException;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import reactor.core.Exceptions;
@ -170,11 +171,33 @@ public class DaprException extends RuntimeException {
*/
public DaprException(
String errorCode, String message, Throwable cause, DaprErrorDetails errorDetails, byte[] payload) {
super(buildErrorMessage(errorCode, 0, message), cause);
this.httpStatusCode = 0;
this(errorCode, message, cause, errorDetails, payload, 0);
}
/**
* New exception from a server-side generated error code and message.
* @param errorCode Client-side error code.
* @param message Client-side error message.
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A {@code null} value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @param errorDetails the status details for the error.
* @param payload Raw error payload.
* @param httpStatusCode Optional HTTP Status code for the error, 0 if not applicable.
*/
private DaprException(
String errorCode,
String message,
Throwable cause,
DaprErrorDetails errorDetails,
byte[] payload,
int httpStatusCode) {
super(buildErrorMessage(errorCode, httpStatusCode, message), cause);
this.errorCode = errorCode;
this.errorDetails = errorDetails == null ? DaprErrorDetails.EMPTY_INSTANCE : errorDetails;
this.payload = payload;
this.httpStatusCode = httpStatusCode;
}
/**
@ -305,21 +328,27 @@ public class DaprException extends RuntimeException {
return (DaprException) exception;
}
int httpStatusCode = 0;
byte[] httpPayload = null;
Throwable e = exception;
while (e != null) {
if (e instanceof StatusRuntimeException) {
if (e instanceof DaprHttpException) {
DaprHttpException daprHttpException = (DaprHttpException) e;
httpStatusCode = daprHttpException.getStatusCode();
httpPayload = daprHttpException.getPayload();
} else if (e instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
Status status = StatusProto.fromThrowable(statusRuntimeException);
DaprErrorDetails errorDetails = new DaprErrorDetails(status);
return new DaprException(
statusRuntimeException.getStatus().getCode().toString(),
statusRuntimeException.getStatus().getDescription(),
exception,
errorDetails,
status.toByteArray());
statusRuntimeException.getStatus().getCode().toString(),
statusRuntimeException.getStatus().getDescription(),
exception,
errorDetails,
httpPayload != null ? httpPayload : status.toByteArray(),
httpStatusCode);
}
e = e.getCause();
@ -329,19 +358,30 @@ public class DaprException extends RuntimeException {
return (IllegalArgumentException) exception;
}
if (exception instanceof DaprHttpException) {
DaprHttpException daprHttpException = (DaprHttpException)exception;
return new DaprException(
io.grpc.Status.UNKNOWN.toString(),
null,
exception,
null,
daprHttpException.getPayload(),
daprHttpException.getStatusCode());
}
return new DaprException(exception);
}
private static String buildErrorMessage(String errorCode, int httpStatusCode, String message) {
String result = ((errorCode == null) || errorCode.isEmpty()) ? "UNKNOWN: " : errorCode + ": ";
if ((message == null) || message.isEmpty()) {
if (httpStatusCode > 0) {
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return result + "HTTP status code: " + httpStatusCode;
}
return result;
}
if (httpStatusCode > 0) {
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return result + message + " (HTTP status code: " + httpStatusCode + ")";
}
return result + message;

View File

@ -0,0 +1,126 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.internal.exceptions;
import io.grpc.Metadata;
import java.util.concurrent.ExecutionException;
/**
* Internal exception for propagating HTTP status code.
*/
public class DaprHttpException extends ExecutionException {
/**
* This is the metadata from HTTP binding that we want to automatically parse and make it as part of the exception.
*/
private static final Metadata.Key<String> GRPC_METADATA_KEY_HTTP_STATUS_CODE =
Metadata.Key.of("metadata.statuscode", Metadata.ASCII_STRING_MARSHALLER);
private final int statusCode;
private final byte[] payload;
/**
* Instantiates a new DaprHttpException, without http body.
* @param statusCode HTTP status code.
* @param cause Exception thrown.
*/
private DaprHttpException(int statusCode, Throwable cause) {
super(cause);
this.statusCode = statusCode;
this.payload = null;
}
/**
* Instantiates a new DaprHttpException with a given HTTP payload.
* @param statusCode HTTP status code.
* @param payload HTTP payload.
*/
public DaprHttpException(int statusCode, byte[] payload) {
super();
this.statusCode = statusCode;
this.payload = payload;
}
/**
* Creates an ExecutionException (can also be HttpException, if applicable).
* @param grpcMetadata Optional gRPC metadata.
* @param cause Exception triggered during execution.
* @return ExecutionException
*/
public static ExecutionException fromGrpcExecutionException(Metadata grpcMetadata, Throwable cause) {
int httpStatusCode = parseHttpStatusCode(grpcMetadata);
if (!isValidHttpStatusCode(httpStatusCode)) {
return new ExecutionException(cause);
}
return new DaprHttpException(httpStatusCode, cause);
}
public static boolean isValidHttpStatusCode(int statusCode) {
return statusCode >= 100 && statusCode <= 599; // Status codes range from 100 to 599
}
public static boolean isSuccessfulHttpStatusCode(int statusCode) {
return statusCode >= 200 && statusCode < 300;
}
private static int parseHttpStatusCode(Metadata grpcMetadata) {
if (grpcMetadata == null) {
return 0;
}
return parseHttpStatusCode(grpcMetadata.get(GRPC_METADATA_KEY_HTTP_STATUS_CODE));
}
/**
* Parses a given string value into an HTTP status code, 0 if invalid.
* @param value String value to be parsed.
* @return HTTP status code, 0 if not valid.
*/
public static int parseHttpStatusCode(String value) {
if ((value == null) || value.isEmpty()) {
return 0;
}
try {
int httpStatusCode = Integer.parseInt(value);
if (!isValidHttpStatusCode(httpStatusCode)) {
return 0;
}
return httpStatusCode;
} catch (NumberFormatException nfe) {
return 0;
}
}
/**
* Returns the HTTP Status code for the exception.
* @return HTTP Status code for the exception, 0 if not applicable.
*/
public int getStatusCode() {
return this.statusCode;
}
/**
* Returns the HTTP payload for the exception.
* @return HTTP payload, null if not present.
*/
public byte[] getPayload() {
return this.payload;
}
}

View File

@ -14,14 +14,7 @@ limitations under the License.
package io.dapr.internal.opencensus;
import io.dapr.config.Property;
import io.dapr.v1.DaprGrpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
@ -55,55 +48,37 @@ public final class GrpcWrapper {
}
/**
* Populates GRPC client with interceptors.
* Populates metadata with tracing headers.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
* @param context Context containing tracing information.
* @param metadata Metadata where tracing values will be added to.
*/
public static DaprGrpc.DaprStub intercept(final ContextView context, DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
Map<String, Object> map = (context == null ? Context.empty() : context)
.stream()
.filter(e -> (e.getKey() != null) && (e.getValue() != null))
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue()));
if (map.containsKey(GRPC_TRACE_BIN_KEY.name())) {
byte[] value = (byte[]) map.get(GRPC_TRACE_BIN_KEY.name());
metadata.put(GRPC_TRACE_BIN_KEY, value);
}
if (map.containsKey(TRACEPARENT_KEY.name())) {
String value = map.get(TRACEPARENT_KEY.name()).toString();
metadata.put(TRACEPARENT_KEY, value);
}
if (map.containsKey(TRACESTATE_KEY.name())) {
String value = map.get(TRACESTATE_KEY.name()).toString();
metadata.put(TRACESTATE_KEY, value);
}
public static void appendTracingToMetadata(ContextView context, Metadata metadata) {
Map<String, Object> map = (context == null ? Context.empty() : context)
.stream()
.filter(e -> (e.getKey() != null) && (e.getValue() != null))
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue()));
if (map.containsKey(GRPC_TRACE_BIN_KEY.name())) {
byte[] value = (byte[]) map.get(GRPC_TRACE_BIN_KEY.name());
metadata.put(GRPC_TRACE_BIN_KEY, value);
}
if (map.containsKey(TRACEPARENT_KEY.name())) {
String value = map.get(TRACEPARENT_KEY.name()).toString();
metadata.put(TRACEPARENT_KEY, value);
}
if (map.containsKey(TRACESTATE_KEY.name())) {
String value = map.get(TRACESTATE_KEY.name()).toString();
metadata.put(TRACESTATE_KEY, value);
}
// Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet:
// https://github.com/open-telemetry/opentelemetry-specification/issues/639
// This should be the only use of OpenCensus SDK: populate "grpc-trace-bin".
SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata);
if (opencensusSpanContext != null) {
byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext);
metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin);
}
super.start(responseListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
// Dapr only supports "grpc-trace-bin" for GRPC and OpenTelemetry SDK does not support that yet:
// https://github.com/open-telemetry/opentelemetry-specification/issues/639
// This should be the only use of OpenCensus SDK: populate "grpc-trace-bin".
SpanContext opencensusSpanContext = extractOpenCensusSpanContext(metadata);
if (opencensusSpanContext != null) {
byte[] grpcTraceBin = OPENCENSUS_BINARY_FORMAT.toByteArray(opencensusSpanContext);
metadata.put(GRPC_TRACE_BIN_KEY, grpcTraceBin);
}
}
private static SpanContext extractOpenCensusSpanContext(Metadata metadata) {