Handle HTTP binding error. (#1024) (#1130)

Update binding http IT

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-09-18 11:13:17 -07:00 committed by GitHub
parent cc537a4c20
commit 436b7df427
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 433 additions and 45 deletions

View File

@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error

View File

@ -6,14 +6,14 @@ services:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
- 2181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 9092:9092
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

View File

@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
@ -26,6 +27,7 @@ import java.util.List;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@ -33,15 +35,50 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
public class BindingIT extends BaseIT {
private static final String BINDING_NAME = "sample123";
private static final String BINDING_OPERATION = "create";
public static class MyClass {
public MyClass() {
@Test
public void httpOutputBindingError() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}
public String message;
@Test
public void httpOutputBindingErrorIgnoredByComponent() throws Exception {
startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains("\"message\":\"Not Found\""));
assertTrue(new String(e.getPayload()).contains("\"documentation_url\":\"https://docs.github.com/rest\""));
}
}, 10000);
}
}
@Test
@ -53,11 +90,13 @@ public class BindingIT extends BaseIT {
true,
60000);
var bidingName = "sample123";
try(DaprClient client = new DaprClientBuilder().build()) {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();
try {
Thread.sleep(1000);
@ -76,14 +115,14 @@ public class BindingIT extends BaseIT {
System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
@ -115,4 +154,11 @@ public class BindingIT extends BaseIT {
}, 8000);
}
}
public static class MyClass {
public MyClass() {
}
public String message;
}
}

View File

@ -56,6 +56,7 @@ import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
@ -76,6 +77,7 @@ import io.dapr.v1.DaprProtos.PubsubSubscriptionRule;
import io.dapr.v1.DaprProtos.RegisteredComponents;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import reactor.core.publisher.Flux;
@ -99,6 +101,10 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
/**
* Implementation of the Dapr client combining gRPC and HTTP (when applicable).
*
@ -486,12 +492,22 @@ public class DaprClientImpl extends AbstractDaprClient {
}
DaprProtos.InvokeBindingRequest envelope = builder.build();
Metadata responseMetadata = new Metadata();
return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));
}
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
@ -1201,17 +1217,39 @@ public class DaprClientImpl extends AbstractDaprClient {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context);
}
/**
* Populates GRPC client with interceptors for telemetry.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Consumer of gRPC metadata.
* @return Client after adding interceptors.
*/
private DaprGrpc.DaprStub intercept(
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer);
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}
private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}
private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
return this.createFlux(null, consumer);
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
@ -1220,7 +1258,7 @@ public class DaprClientImpl extends AbstractDaprClient {
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}
@Override
@ -1230,7 +1268,7 @@ public class DaprClientImpl extends AbstractDaprClient {
};
}
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
@ -1239,7 +1277,7 @@ public class DaprClientImpl extends AbstractDaprClient {
@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}
@Override

View File

@ -18,6 +18,7 @@ import io.dapr.client.domain.Metadata;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprError;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.utils.Version;
import okhttp3.Call;
import okhttp3.Callback;
@ -387,17 +388,19 @@ public class DaprHttp implements AutoCloseable {
@Override
public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException {
if (!response.isSuccessful()) {
int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code());
if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) {
try {
byte[] payload = getBodyBytesOrEmptyArray(response);
DaprError error = parseDaprError(payload);
if (error != null) {
future.completeExceptionally(new DaprException(error, payload, response.code()));
future.completeExceptionally(new DaprException(error, payload, httpStatusCode));
return;
}
future.completeExceptionally(
new DaprException("UNKNOWN", "", payload, response.code()));
new DaprException("UNKNOWN", "", payload, httpStatusCode));
return;
} catch (DaprException e) {
future.completeExceptionally(e);
@ -410,8 +413,24 @@ public class DaprHttp implements AutoCloseable {
response.headers().forEach(pair -> {
mapHeaders.put(pair.getFirst(), pair.getSecond());
});
future.complete(new Response(result, mapHeaders, response.code()));
future.complete(new Response(result, mapHeaders, httpStatusCode));
}
}
private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) {
if ((headerValue == null) || headerValue.isEmpty()) {
return defaultStatusCode;
}
// Metadata used to override status code with code received from HTTP binding.
try {
int httpStatusCode = Integer.parseInt(headerValue);
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return httpStatusCode;
}
return defaultStatusCode;
} catch (NumberFormatException nfe) {
return defaultStatusCode;
}
}
}

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.exceptions;
import com.google.rpc.Status;
import io.dapr.internal.exceptions.DaprHttpException;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.StatusProto;
import reactor.core.Exceptions;
@ -170,11 +171,33 @@ public class DaprException extends RuntimeException {
*/
public DaprException(
String errorCode, String message, Throwable cause, DaprErrorDetails errorDetails, byte[] payload) {
super(buildErrorMessage(errorCode, 0, message), cause);
this.httpStatusCode = 0;
this(errorCode, message, cause, errorDetails, payload, 0);
}
/**
* New exception from a server-side generated error code and message.
* @param errorCode Client-side error code.
* @param message Client-side error message.
* @param cause the cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A {@code null} value is
* permitted, and indicates that the cause is nonexistent or
* unknown.)
* @param errorDetails the status details for the error.
* @param payload Raw error payload.
* @param httpStatusCode Optional HTTP Status code for the error, 0 if not applicable.
*/
private DaprException(
String errorCode,
String message,
Throwable cause,
DaprErrorDetails errorDetails,
byte[] payload,
int httpStatusCode) {
super(buildErrorMessage(errorCode, httpStatusCode, message), cause);
this.errorCode = errorCode;
this.errorDetails = errorDetails == null ? DaprErrorDetails.EMPTY_INSTANCE : errorDetails;
this.payload = payload;
this.httpStatusCode = httpStatusCode;
}
/**
@ -305,9 +328,15 @@ public class DaprException extends RuntimeException {
return (DaprException) exception;
}
int httpStatusCode = 0;
byte[] httpPayload = null;
Throwable e = exception;
while (e != null) {
if (e instanceof StatusRuntimeException) {
if (e instanceof DaprHttpException) {
DaprHttpException daprHttpException = (DaprHttpException) e;
httpStatusCode = daprHttpException.getStatusCode();
httpPayload = daprHttpException.getPayload();
} else if (e instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
Status status = StatusProto.fromThrowable(statusRuntimeException);
@ -318,8 +347,8 @@ public class DaprException extends RuntimeException {
statusRuntimeException.getStatus().getDescription(),
exception,
errorDetails,
status.toByteArray());
httpPayload != null ? httpPayload : status.toByteArray(),
httpStatusCode);
}
e = e.getCause();
@ -329,19 +358,30 @@ public class DaprException extends RuntimeException {
return (IllegalArgumentException) exception;
}
if (exception instanceof DaprHttpException) {
DaprHttpException daprHttpException = (DaprHttpException)exception;
return new DaprException(
io.grpc.Status.UNKNOWN.toString(),
null,
exception,
null,
daprHttpException.getPayload(),
daprHttpException.getStatusCode());
}
return new DaprException(exception);
}
private static String buildErrorMessage(String errorCode, int httpStatusCode, String message) {
String result = ((errorCode == null) || errorCode.isEmpty()) ? "UNKNOWN: " : errorCode + ": ";
if ((message == null) || message.isEmpty()) {
if (httpStatusCode > 0) {
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return result + "HTTP status code: " + httpStatusCode;
}
return result;
}
if (httpStatusCode > 0) {
if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) {
return result + message + " (HTTP status code: " + httpStatusCode + ")";
}
return result + message;

View File

@ -0,0 +1,126 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.internal.exceptions;
import io.grpc.Metadata;
import java.util.concurrent.ExecutionException;
/**
* Internal exception for propagating HTTP status code.
*/
public class DaprHttpException extends ExecutionException {
/**
* This is the metadata from HTTP binding that we want to automatically parse and make it as part of the exception.
*/
private static final Metadata.Key<String> GRPC_METADATA_KEY_HTTP_STATUS_CODE =
Metadata.Key.of("metadata.statuscode", Metadata.ASCII_STRING_MARSHALLER);
private final int statusCode;
private final byte[] payload;
/**
* Instantiates a new DaprHttpException, without http body.
* @param statusCode HTTP status code.
* @param cause Exception thrown.
*/
private DaprHttpException(int statusCode, Throwable cause) {
super(cause);
this.statusCode = statusCode;
this.payload = null;
}
/**
* Instantiates a new DaprHttpException with a given HTTP payload.
* @param statusCode HTTP status code.
* @param payload HTTP payload.
*/
public DaprHttpException(int statusCode, byte[] payload) {
super();
this.statusCode = statusCode;
this.payload = payload;
}
/**
* Creates an ExecutionException (can also be HttpException, if applicable).
* @param grpcMetadata Optional gRPC metadata.
* @param cause Exception triggered during execution.
* @return ExecutionException
*/
public static ExecutionException fromGrpcExecutionException(Metadata grpcMetadata, Throwable cause) {
int httpStatusCode = parseHttpStatusCode(grpcMetadata);
if (!isValidHttpStatusCode(httpStatusCode)) {
return new ExecutionException(cause);
}
return new DaprHttpException(httpStatusCode, cause);
}
public static boolean isValidHttpStatusCode(int statusCode) {
return statusCode >= 100 && statusCode <= 599; // Status codes range from 100 to 599
}
public static boolean isSuccessfulHttpStatusCode(int statusCode) {
return statusCode >= 200 && statusCode < 300;
}
private static int parseHttpStatusCode(Metadata grpcMetadata) {
if (grpcMetadata == null) {
return 0;
}
return parseHttpStatusCode(grpcMetadata.get(GRPC_METADATA_KEY_HTTP_STATUS_CODE));
}
/**
* Parses a given string value into an HTTP status code, 0 if invalid.
* @param value String value to be parsed.
* @return HTTP status code, 0 if not valid.
*/
public static int parseHttpStatusCode(String value) {
if ((value == null) || value.isEmpty()) {
return 0;
}
try {
int httpStatusCode = Integer.parseInt(value);
if (!isValidHttpStatusCode(httpStatusCode)) {
return 0;
}
return httpStatusCode;
} catch (NumberFormatException nfe) {
return 0;
}
}
/**
* Returns the HTTP Status code for the exception.
* @return HTTP Status code for the exception, 0 if not applicable.
*/
public int getStatusCode() {
return this.statusCode;
}
/**
* Returns the HTTP payload for the exception.
* @return HTTP payload, null if not present.
*/
public byte[] getPayload() {
return this.payload;
}
}

View File

@ -15,12 +15,16 @@ package io.dapr.internal.grpc;
import io.dapr.internal.grpc.interceptors.DaprApiTokenInterceptor;
import io.dapr.internal.grpc.interceptors.DaprAppIdInterceptor;
import io.dapr.internal.grpc.interceptors.DaprMetadataInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTimeoutInterceptor;
import io.dapr.internal.grpc.interceptors.DaprTracingInterceptor;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.grpc.Metadata;
import io.grpc.stub.AbstractStub;
import reactor.util.context.ContextView;
import java.util.function.Consumer;
/**
* Class to be used as part of your service's client stub interceptor.
* Usage: myClientStub = DaprClientGrpcInterceptors.intercept(myClientStub);
@ -35,7 +39,7 @@ public class DaprClientGrpcInterceptors {
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(final String appId, final T client) {
return intercept(appId, client, null, null);
return intercept(appId, client, null, null, null);
}
/**
@ -45,7 +49,7 @@ public class DaprClientGrpcInterceptors {
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(final T client) {
return intercept(null, client, null, null);
return intercept(null, client, null, null, null);
}
/**
@ -58,7 +62,7 @@ public class DaprClientGrpcInterceptors {
*/
public static <T extends AbstractStub<T>> T intercept(
final String appId, final T client, final TimeoutPolicy timeoutPolicy) {
return intercept(appId, client, timeoutPolicy, null);
return intercept(appId, client, timeoutPolicy, null, null);
}
/**
@ -69,7 +73,7 @@ public class DaprClientGrpcInterceptors {
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(final T client, final TimeoutPolicy timeoutPolicy) {
return intercept(null, client, timeoutPolicy, null);
return intercept(null, client, timeoutPolicy, null, null);
}
/**
@ -82,7 +86,7 @@ public class DaprClientGrpcInterceptors {
*/
public static <T extends AbstractStub<T>> T intercept(
final String appId, final T client, final ContextView context) {
return intercept(appId, client, null, context);
return intercept(appId, client, null, context, null);
}
/**
@ -93,7 +97,7 @@ public class DaprClientGrpcInterceptors {
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(final T client, final ContextView context) {
return intercept(null, client, null, context);
return intercept(null, client, null, context, null);
}
/**
@ -108,7 +112,24 @@ public class DaprClientGrpcInterceptors {
final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context) {
return intercept(null, client, timeoutPolicy, context);
return intercept(null, client, timeoutPolicy, context, null);
}
/**
* Adds all Dapr interceptors to a gRPC async stub.
* @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param context Reactor context for tracing
* @param metadataConsumer Consumer of the gRPC metadata
* @param <T> async client type
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(
final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context,
final Consumer<Metadata> metadataConsumer) {
return intercept(null, client, timeoutPolicy, context, metadataConsumer);
}
/**
@ -117,6 +138,7 @@ public class DaprClientGrpcInterceptors {
* @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param context Reactor context for tracing
* @param metadataConsumer Consumer of the gRPC metadata
* @param <T> async client type
* @return async client instance with interceptors
*/
@ -124,7 +146,8 @@ public class DaprClientGrpcInterceptors {
final String appId,
final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context) {
final ContextView context,
final Consumer<Metadata> metadataConsumer) {
if (client == null) {
throw new IllegalArgumentException("client cannot be null");
}
@ -133,7 +156,8 @@ public class DaprClientGrpcInterceptors {
new DaprAppIdInterceptor(appId),
new DaprApiTokenInterceptor(),
new DaprTimeoutInterceptor(timeoutPolicy),
new DaprTracingInterceptor(context));
new DaprTracingInterceptor(context),
new DaprMetadataInterceptor(metadataConsumer));
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.internal.grpc.interceptors;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.function.Consumer;
/**
* Consumes gRPC metadata.
*/
public class DaprMetadataInterceptor implements ClientInterceptor {
private final Consumer<Metadata> metadataConsumer;
/**
* Creates an instance of the consumer for gRPC metadata.
* @param metadataConsumer gRPC metadata consumer
*/
public DaprMetadataInterceptor(Consumer<Metadata> metadataConsumer) {
this.metadataConsumer = metadataConsumer;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, callOptions);
return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
final ClientCall.Listener<RespT> headerListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
if (metadataConsumer != null) {
metadataConsumer.accept(headers);
}
}
};
super.start(headerListener, metadata);
}
};
}
}