From 4dbcbde14d4446af6e0d15a9d15871e17a9fdae5 Mon Sep 17 00:00:00 2001 From: Sky Ao Date: Mon, 18 Sep 2023 12:28:03 +0800 Subject: [PATCH] Support Dapr API token in workflow client(fix DCO) (#916) * change to be public to reuse in workflow subproject Signed-off-by: Sky Ao * update buildGrpcManagedChannel() method to accept optional parameters for grpc client interceptors Signed-off-by: Sky Ao * support dapr API token while build grpc management channel Signed-off-by: Sky Ao --------- Signed-off-by: Sky Ao --- .../workflows/client/DaprWorkflowClient.java | 33 ++++++++++++++++++- sdk/src/main/java/io/dapr/client/Headers.java | 8 ++--- .../main/java/io/dapr/utils/NetworkUtils.java | 7 +++- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index 6e05a7f06..2915b2c71 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -17,9 +17,18 @@ import com.microsoft.durabletask.DurableTaskClient; import com.microsoft.durabletask.DurableTaskGrpcClientBuilder; import com.microsoft.durabletask.OrchestrationMetadata; import com.microsoft.durabletask.PurgeResult; +import io.dapr.client.Headers; +import io.dapr.config.Properties; import io.dapr.utils.NetworkUtils; import io.dapr.workflows.Workflow; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import javax.annotation.Nullable; @@ -39,7 +48,7 @@ public class DaprWorkflowClient implements AutoCloseable { * Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel. */ public DaprWorkflowClient() { - this(NetworkUtils.buildGrpcManagedChannel()); + this(NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR)); } /** @@ -239,4 +248,26 @@ public class DaprWorkflowClient implements AutoCloseable { } } } + + private static ClientInterceptor WORKFLOW_INTERCEPTOR = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, + CallOptions options, + Channel channel) { + // TBD: do we need timeout in workflow client? + ClientCall clientCall = channel.newCall(methodDescriptor, options); + return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) { + @Override + public void start(final Listener responseListener, final Metadata metadata) { + String daprApiToken = Properties.API_TOKEN.get(); + if (daprApiToken != null) { + metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); + } + super.start(responseListener, metadata); + } + }; + } + }; } + diff --git a/sdk/src/main/java/io/dapr/client/Headers.java b/sdk/src/main/java/io/dapr/client/Headers.java index c65a5741e..9ae7ebef5 100644 --- a/sdk/src/main/java/io/dapr/client/Headers.java +++ b/sdk/src/main/java/io/dapr/client/Headers.java @@ -16,20 +16,20 @@ package io.dapr.client; /** * Common headers for GRPC and HTTP communication. */ -class Headers { +public final class Headers { /** * OpenCensus's metadata for GRPC. */ - static final String GRPC_TRACE_BIN = "grpc-trace-bin"; + public static final String GRPC_TRACE_BIN = "grpc-trace-bin"; /** * Token for authentication from Application to Dapr runtime. */ - static final String DAPR_API_TOKEN = "dapr-api-token"; + public static final String DAPR_API_TOKEN = "dapr-api-token"; /** * Header for Api Logging User-Agent. */ - static final String DAPR_USER_AGENT = "User-Agent"; + public static final String DAPR_USER_AGENT = "User-Agent"; } diff --git a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java index 71e1be12c..e5b4ad0f2 100644 --- a/sdk/src/main/java/io/dapr/utils/NetworkUtils.java +++ b/sdk/src/main/java/io/dapr/utils/NetworkUtils.java @@ -14,6 +14,7 @@ limitations under the License. package io.dapr.utils; import io.dapr.config.Properties; +import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -57,9 +58,10 @@ public final class NetworkUtils { /** * Creates a GRPC managed channel. + * @param interceptors Optional interceptors to add to the channel. * @return GRPC managed channel to communicate with the sidecar. */ - public static ManagedChannel buildGrpcManagedChannel() { + public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interceptors) { String address = Properties.SIDECAR_IP.get(); int port = Properties.GRPC_PORT.get(); boolean insecure = true; @@ -78,6 +80,9 @@ public final class NetworkUtils { if (insecure) { builder = builder.usePlaintext(); } + if (interceptors != null && interceptors.length > 0) { + builder = builder.intercept(interceptors); + } return builder.build(); }