Fix bi-di subscription to support dapr-api-token (#1142)

* Fix bi-di subscription to support dapr-api-token

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove dapr-api-token from actor services

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Handle dapr-api-token for split run tests

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix more tests requiring dapr-api-token

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix IT for HelloWorldClientIT

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-10-16 07:20:37 -07:00 committed by GitHub
parent 8a0913d4a2
commit 6a6901d43e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 242 additions and 199 deletions

View File

@ -59,7 +59,7 @@ public class ActorClient implements AutoCloseable {
* @param overrideProperties Override properties. * @param overrideProperties Override properties.
*/ */
public ActorClient(Properties overrideProperties) { public ActorClient(Properties overrideProperties) {
this(buildManagedChannel(overrideProperties), null); this(buildManagedChannel(overrideProperties), null, overrideProperties.getValue(Properties.API_TOKEN));
} }
/** /**
@ -69,7 +69,7 @@ public class ActorClient implements AutoCloseable {
* @param resiliencyOptions Client resiliency options. * @param resiliencyOptions Client resiliency options.
*/ */
public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) { public ActorClient(Properties overrideProperties, ResiliencyOptions resiliencyOptions) {
this(buildManagedChannel(overrideProperties), resiliencyOptions); this(buildManagedChannel(overrideProperties), resiliencyOptions, overrideProperties.getValue(Properties.API_TOKEN));
} }
/** /**
@ -80,9 +80,10 @@ public class ActorClient implements AutoCloseable {
*/ */
private ActorClient( private ActorClient(
ManagedChannel grpcManagedChannel, ManagedChannel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) { ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this.grpcManagedChannel = grpcManagedChannel; this.grpcManagedChannel = grpcManagedChannel;
this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions); this.daprClient = buildDaprClient(grpcManagedChannel, resiliencyOptions, daprApiToken);
} }
/** /**
@ -136,7 +137,11 @@ public class ActorClient implements AutoCloseable {
*/ */
private static DaprClient buildDaprClient( private static DaprClient buildDaprClient(
Channel grpcManagedChannel, Channel grpcManagedChannel,
ResiliencyOptions resiliencyOptions) { ResiliencyOptions resiliencyOptions,
return new DaprClientImpl(DaprGrpc.newStub(grpcManagedChannel), resiliencyOptions); String daprApiToken) {
return new DaprClientImpl(
DaprGrpc.newStub(grpcManagedChannel),
resiliencyOptions,
daprApiToken);
} }
} }

View File

@ -42,11 +42,6 @@ import java.util.function.Consumer;
*/ */
class DaprClientImpl implements DaprClient { class DaprClientImpl implements DaprClient {
/**
* Timeout policy for SDK calls to Dapr API.
*/
private final TimeoutPolicy timeoutPolicy;
/** /**
* Retry policy for SDK calls to Dapr API. * Retry policy for SDK calls to Dapr API.
*/ */
@ -57,16 +52,22 @@ class DaprClientImpl implements DaprClient {
*/ */
private final DaprGrpc.DaprStub client; private final DaprGrpc.DaprStub client;
/**
* gRPC client interceptors.
*/
private final DaprClientGrpcInterceptors grpcInterceptors;
/** /**
* Internal constructor. * Internal constructor.
* *
* @param grpcClient Dapr's GRPC client. * @param grpcClient Dapr's GRPC client.
* @param resiliencyOptions Client resiliency options (optional) * @param resiliencyOptions Client resiliency options (optional).
* @param daprApiToken Dapr API token (optional).
*/ */
DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions) { DaprClientImpl(DaprGrpc.DaprStub grpcClient, ResiliencyOptions resiliencyOptions, String daprApiToken) {
this.client = intercept(grpcClient); this.client = grpcClient;
this.timeoutPolicy = new TimeoutPolicy( this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken,
resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()));
this.retryPolicy = new RetryPolicy( this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()); resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
} }
@ -85,54 +86,11 @@ class DaprClientImpl implements DaprClient {
.build(); .build();
return Mono.deferContextual( return Mono.deferContextual(
context -> this.<DaprProtos.InvokeActorResponse>createMono( context -> this.<DaprProtos.InvokeActorResponse>createMono(
it -> intercept(context, this.timeoutPolicy, client).invokeActor(req, it) it -> this.grpcInterceptors.intercept(client, context).invokeActor(req, it)
) )
).map(r -> r.getData().toByteArray()); ).map(r -> r.getData().toByteArray());
} }
/**
* Populates GRPC client with interceptors.
*
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private DaprGrpc.DaprStub intercept(DaprGrpc.DaprStub client) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, timeoutPolicy.apply(options));
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
metadata.put(Metadata.Key.of("dapr-api-token", Metadata.ASCII_STRING_MARSHALLER), daprApiToken);
}
super.start(responseListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
}
/**
* Populates GRPC client with interceptors for telemetry.
*
* @param context Reactor's context.
* @param timeoutPolicy Timeout policy for gRPC call.
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(
ContextView context, TimeoutPolicy timeoutPolicy, DaprGrpc.DaprStub client) {
return DaprClientGrpcInterceptors.intercept(client, timeoutPolicy, context);
}
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) { private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply( return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run())); Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));

View File

@ -106,7 +106,7 @@ public class DaprGrpcClientTest {
InProcessChannelBuilder.forName(serverName).directExecutor().build()); InProcessChannelBuilder.forName(serverName).directExecutor().build());
// Create a HelloWorldClient using the in-process channel; // Create a HelloWorldClient using the in-process channel;
client = new DaprClientImpl(DaprGrpc.newStub(channel), null); client = new DaprClientImpl(DaprGrpc.newStub(channel), null, null);
} }
@Test @Test

View File

@ -30,7 +30,10 @@ import okhttp3.Response;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -40,6 +43,7 @@ import static io.dapr.it.Retry.callWithRetry;
public class DaprRun implements Stoppable { public class DaprRun implements Stoppable {
private static final String DEFAULT_DAPR_API_TOKEN = UUID.randomUUID().toString();
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " + private static final String DAPR_RUN = "dapr run --app-id %s --app-protocol %s " +
@ -68,19 +72,41 @@ public class DaprRun implements Stoppable {
private final boolean hasAppHealthCheck; private final boolean hasAppHealthCheck;
private final Map<Property<?>, String> propertyOverrides;
private DaprRun(String testName, private DaprRun(String testName,
DaprPorts ports, DaprPorts ports,
String successMessage, String successMessage,
Class serviceClass, Class serviceClass,
int maxWaitMilliseconds, int maxWaitMilliseconds,
AppRun.AppProtocol appProtocol) { AppRun.AppProtocol appProtocol) {
this(
testName,
ports,
successMessage,
serviceClass,
maxWaitMilliseconds,
appProtocol,
resolveDaprApiToken(serviceClass));
}
private DaprRun(String testName,
DaprPorts ports,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds,
AppRun.AppProtocol appProtocol,
String daprApiToken) {
// The app name needs to be deterministic since we depend on it to kill previous runs. // The app name needs to be deterministic since we depend on it to kill previous runs.
this.appName = serviceClass == null ? this.appName = serviceClass == null ?
testName.toLowerCase() : testName.toLowerCase() :
String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase(); String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase();
this.appProtocol = appProtocol; this.appProtocol = appProtocol;
this.startCommand = this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, appProtocol)); new Command(
successMessage,
buildDaprCommand(this.appName, serviceClass, ports, appProtocol),
daprApiToken == null ? null : Map.of("DAPR_API_TOKEN", daprApiToken));
this.listCommand = new Command( this.listCommand = new Command(
this.appName, this.appName,
"dapr list"); "dapr list");
@ -91,6 +117,10 @@ public class DaprRun implements Stoppable {
this.maxWaitMilliseconds = maxWaitMilliseconds; this.maxWaitMilliseconds = maxWaitMilliseconds;
this.started = new AtomicBoolean(false); this.started = new AtomicBoolean(false);
this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass); this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass);
this.propertyOverrides = daprApiToken == null ? ports.getPropertyOverrides() :
Collections.unmodifiableMap(new HashMap<>(ports.getPropertyOverrides()) {{
put(Properties.API_TOKEN, daprApiToken);
}});
} }
public void start() throws InterruptedException, IOException { public void start() throws InterruptedException, IOException {
@ -149,7 +179,7 @@ public class DaprRun implements Stoppable {
} }
public Map<Property<?>, String> getPropertyOverrides() { public Map<Property<?>, String> getPropertyOverrides() {
return this.ports.getPropertyOverrides(); return this.propertyOverrides;
} }
public DaprClientBuilder newDaprClientBuilder() { public DaprClientBuilder newDaprClientBuilder() {
@ -239,17 +269,13 @@ public class DaprRun implements Stoppable {
public DaprClient newDaprClient() { public DaprClient newDaprClient() {
return new DaprClientBuilder() return new DaprClientBuilder()
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) .withPropertyOverrides(this.getPropertyOverrides())
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
.build(); .build();
} }
public DaprPreviewClient newDaprPreviewClient() { public DaprPreviewClient newDaprPreviewClient() {
return new DaprClientBuilder() return new DaprClientBuilder()
.withPropertyOverride(Properties.GRPC_PORT, ports.getGrpcPort().toString()) .withPropertyOverrides(this.getPropertyOverrides())
.withPropertyOverride(Properties.HTTP_PORT, ports.getHttpPort().toString())
.withPropertyOverride(Properties.SIDECAR_IP, "127.0.0.1")
.buildPreviewClient(); .buildPreviewClient();
} }
@ -298,6 +324,22 @@ public class DaprRun implements Stoppable {
return false; return false;
} }
private static String resolveDaprApiToken(Class serviceClass) {
if (serviceClass != null) {
DaprRunConfig daprRunConfig = (DaprRunConfig) serviceClass.getAnnotation(DaprRunConfig.class);
if (daprRunConfig != null) {
if (!daprRunConfig.enableDaprApiToken()) {
return null;
}
// We use the clas name itself as the token. Just needs to be deterministic.
return serviceClass.getCanonicalName();
}
}
// By default, we use a token.
return DEFAULT_DAPR_API_TOKEN;
}
private static void assertListeningOnPort(int port) { private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port); System.out.printf("Checking port %d ...\n", port);
@ -325,6 +367,8 @@ public class DaprRun implements Stoppable {
private AppRun.AppProtocol appProtocol; private AppRun.AppProtocol appProtocol;
private String daprApiToken;
Builder( Builder(
String testName, String testName,
Supplier<DaprPorts> portsSupplier, Supplier<DaprPorts> portsSupplier,
@ -336,6 +380,7 @@ public class DaprRun implements Stoppable {
this.successMessage = successMessage; this.successMessage = successMessage;
this.maxWaitMilliseconds = maxWaitMilliseconds; this.maxWaitMilliseconds = maxWaitMilliseconds;
this.appProtocol = appProtocol; this.appProtocol = appProtocol;
this.daprApiToken = UUID.randomUUID().toString();
} }
public Builder withServiceClass(Class serviceClass) { public Builder withServiceClass(Class serviceClass) {
@ -371,7 +416,8 @@ public class DaprRun implements Stoppable {
DAPR_SUCCESS_MESSAGE, DAPR_SUCCESS_MESSAGE,
null, null,
this.maxWaitMilliseconds, this.maxWaitMilliseconds,
this.appProtocol); this.appProtocol,
resolveDaprApiToken(serviceClass));
return new ImmutablePair<>(appRun, daprRun); return new ImmutablePair<>(appRun, daprRun);
} }

View File

@ -26,4 +26,6 @@ import java.lang.annotation.Target;
public @interface DaprRunConfig { public @interface DaprRunConfig {
boolean enableAppHealthCheck() default false; boolean enableAppHealthCheck() default false;
boolean enableDaprApiToken() default true;
} }

View File

@ -14,7 +14,10 @@ limitations under the License.
package io.dapr.it.actors.app; package io.dapr.it.actors.app;
import io.dapr.actors.runtime.ActorRuntime; import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.it.DaprRunConfig;
// Enable dapr-api-token once runtime supports it in standalone mode.
@DaprRunConfig(enableDaprApiToken = false)
public class MyActorService { public class MyActorService {
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running"; public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running";

View File

@ -14,10 +14,12 @@ limitations under the License.
package io.dapr.it.actors.services.springboot; package io.dapr.it.actors.services.springboot;
import io.dapr.actors.runtime.ActorRuntime; import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.it.DaprRunConfig;
import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer;
import java.time.Duration; import java.time.Duration;
@DaprRunConfig(enableDaprApiToken = false)
public class StatefulActorService { public class StatefulActorService {
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running"; public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running";

View File

@ -13,13 +13,10 @@ limitations under the License.
package io.dapr.it.state; package io.dapr.it.state;
import io.dapr.config.Properties;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -34,45 +31,44 @@ public class HelloWorldClientIT extends BaseIT {
false, false,
2000 2000
); );
ManagedChannel channel = try (var client = daprRun.newDaprClientBuilder().build()) {
ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), daprRun.getGrpcPort()).usePlaintext().build(); var stub = client.newGrpcStub("n/a", DaprGrpc::newBlockingStub);
DaprGrpc.DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
String key = "mykey"; String key = "mykey";
{ {
DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest
.newBuilder() .newBuilder()
.setStoreName(STATE_STORE_NAME) .setStoreName(STATE_STORE_NAME)
.setKey(key) .setKey(key)
.build(); .build();
DaprProtos.GetStateResponse response = client.getState(req); DaprProtos.GetStateResponse response = stub.getState(req);
String value = response.getData().toStringUtf8(); String value = response.getData().toStringUtf8();
System.out.println("Got: " + value); System.out.println("Got: " + value);
Assertions.assertEquals("Hello World", value); Assertions.assertEquals("Hello World", value);
} }
// Then, delete it. // Then, delete it.
{ {
DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest DaprProtos.DeleteStateRequest req = DaprProtos.DeleteStateRequest
.newBuilder() .newBuilder()
.setStoreName(STATE_STORE_NAME) .setStoreName(STATE_STORE_NAME)
.setKey(key) .setKey(key)
.build(); .build();
client.deleteState(req); stub.deleteState(req);
System.out.println("Deleted!"); System.out.println("Deleted!");
} }
{ {
DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest DaprProtos.GetStateRequest req = DaprProtos.GetStateRequest
.newBuilder() .newBuilder()
.setStoreName(STATE_STORE_NAME) .setStoreName(STATE_STORE_NAME)
.setKey(key) .setKey(key)
.build(); .build();
DaprProtos.GetStateResponse response = client.getState(req); DaprProtos.GetStateResponse response = stub.getState(req);
String value = response.getData().toStringUtf8(); String value = response.getData().toStringUtf8();
System.out.println("Got: " + value); System.out.println("Got: " + value);
Assertions.assertEquals("", value); Assertions.assertEquals("", value);
}
} }
channel.shutdown();
} }
} }

View File

@ -14,7 +14,9 @@ limitations under the License.
package io.dapr.it.state; package io.dapr.it.state;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import io.dapr.client.DaprClientBuilder;
import io.dapr.config.Properties; import io.dapr.config.Properties;
import io.dapr.internal.grpc.DaprClientGrpcInterceptors;
import io.dapr.v1.CommonProtos.StateItem; import io.dapr.v1.CommonProtos.StateItem;
import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprGrpc.DaprBlockingStub; import io.dapr.v1.DaprGrpc.DaprBlockingStub;
@ -38,8 +40,11 @@ public class HelloWorldGrpcStateService {
// If port string is not valid, it will throw an exception. // If port string is not valid, it will throw an exception.
int grpcPortInt = Integer.parseInt(grpcPort); int grpcPortInt = Integer.parseInt(grpcPort);
ManagedChannel channel = ManagedChannelBuilder.forAddress(Properties.SIDECAR_IP.get(), grpcPortInt).usePlaintext().build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(
DaprBlockingStub client = DaprGrpc.newBlockingStub(channel); Properties.SIDECAR_IP.get(), grpcPortInt).usePlaintext().build();
DaprClientGrpcInterceptors interceptors = new DaprClientGrpcInterceptors(
Properties.API_TOKEN.get(), null);
DaprBlockingStub client = interceptors.intercept(DaprGrpc.newBlockingStub(channel));
String key = "mykey"; String key = "mykey";
// First, write key-value pair. // First, write key-value pair.

View File

@ -173,6 +173,7 @@ public class DaprClientBuilder {
daprHttp, daprHttp,
this.objectSerializer, this.objectSerializer,
this.stateSerializer, this.stateSerializer,
this.resiliencyOptions); this.resiliencyOptions,
properties.getValue(Properties.API_TOKEN));
} }
} }

View File

@ -119,11 +119,6 @@ public class DaprClientImpl extends AbstractDaprClient {
*/ */
private final GrpcChannelFacade channel; private final GrpcChannelFacade channel;
/**
* The timeout policy.
*/
private final TimeoutPolicy timeoutPolicy;
/** /**
* The retry policy. * The retry policy.
*/ */
@ -141,9 +136,10 @@ public class DaprClientImpl extends AbstractDaprClient {
*/ */
private final DaprHttp httpClient; private final DaprHttp httpClient;
private final DaprClientGrpcInterceptors grpcInterceptors;
/** /**
* Default access level constructor, in order to create an instance of this * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
* class use io.dapr.client.DaprClientBuilder
* *
* @param channel Facade for the managed GRPC channel * @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub * @param asyncStub async gRPC stub
@ -157,7 +153,27 @@ public class DaprClientImpl extends AbstractDaprClient {
DaprHttp httpClient, DaprHttp httpClient,
DaprObjectSerializer objectSerializer, DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) { DaprObjectSerializer stateSerializer) {
this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null); this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, null);
}
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @param daprApiToken Dapr API Token.
* @see DaprClientBuilder
*/
DaprClientImpl(
GrpcChannelFacade channel,
DaprGrpc.DaprStub asyncStub,
DaprHttp httpClient,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer,
String daprApiToken) {
this(channel, asyncStub, httpClient, objectSerializer, stateSerializer, null, daprApiToken);
} }
/** /**
@ -169,6 +185,7 @@ public class DaprClientImpl extends AbstractDaprClient {
* @param objectSerializer Serializer for transient request/response objects. * @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects. * @param stateSerializer Serializer for state objects.
* @param resiliencyOptions Client-level override for resiliency options. * @param resiliencyOptions Client-level override for resiliency options.
* @param daprApiToken Dapr API Token.
* @see DaprClientBuilder * @see DaprClientBuilder
*/ */
DaprClientImpl( DaprClientImpl(
@ -177,15 +194,47 @@ public class DaprClientImpl extends AbstractDaprClient {
DaprHttp httpClient, DaprHttp httpClient,
DaprObjectSerializer objectSerializer, DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer, DaprObjectSerializer stateSerializer,
ResiliencyOptions resiliencyOptions) { ResiliencyOptions resiliencyOptions,
String daprApiToken) {
this(
channel,
asyncStub,
httpClient,
objectSerializer,
stateSerializer,
new TimeoutPolicy(resiliencyOptions == null ? null : resiliencyOptions.getTimeout()),
new RetryPolicy(resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries()),
daprApiToken);
}
/**
* Instantiates a new DaprClient.
*
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param httpClient client for http service invocation
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @param timeoutPolicy Client-level timeout policy.
* @param retryPolicy Client-level retry policy.
* @param daprApiToken Dapr API Token.
* @see DaprClientBuilder
*/
private DaprClientImpl(
GrpcChannelFacade channel,
DaprGrpc.DaprStub asyncStub,
DaprHttp httpClient,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer,
TimeoutPolicy timeoutPolicy,
RetryPolicy retryPolicy,
String daprApiToken) {
super(objectSerializer, stateSerializer); super(objectSerializer, stateSerializer);
this.channel = channel; this.channel = channel;
this.asyncStub = asyncStub; this.asyncStub = asyncStub;
this.httpClient = httpClient; this.httpClient = httpClient;
this.timeoutPolicy = new TimeoutPolicy( this.retryPolicy = retryPolicy;
resiliencyOptions == null ? null : resiliencyOptions.getTimeout()); this.grpcInterceptors = new DaprClientGrpcInterceptors(daprApiToken, timeoutPolicy);
this.retryPolicy = new RetryPolicy(
resiliencyOptions == null ? null : resiliencyOptions.getMaxRetries());
} }
private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) { private CommonProtos.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
@ -215,7 +264,7 @@ public class DaprClientImpl extends AbstractDaprClient {
*/ */
public <T extends AbstractStub<T>> T newGrpcStub(String appId, Function<Channel, T> stubBuilder) { public <T extends AbstractStub<T>> T newGrpcStub(String appId, Function<Channel, T> stubBuilder) {
// Adds Dapr interceptors to populate gRPC metadata automatically. // Adds Dapr interceptors to populate gRPC metadata automatically.
return DaprClientGrpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()), timeoutPolicy); return this.grpcInterceptors.intercept(appId, stubBuilder.apply(this.channel.getGrpcChannel()));
} }
/** /**
@ -425,7 +474,8 @@ public class DaprClientImpl extends AbstractDaprClient {
SubscriptionListener<T> listener, SubscriptionListener<T> listener,
TypeRef<T> type, TypeRef<T> type,
DaprProtos.SubscribeTopicEventsRequestAlpha1 request) { DaprProtos.SubscribeTopicEventsRequestAlpha1 request) {
Subscription<T> subscription = new Subscription<>(this.asyncStub, request, listener, response -> { var interceptedStub = this.grpcInterceptors.intercept(this.asyncStub);
Subscription<T> subscription = new Subscription<>(interceptedStub, request, listener, response -> {
if (response.getEventMessage() == null) { if (response.getEventMessage() == null) {
return null; return null;
} }
@ -1268,7 +1318,7 @@ public class DaprClientImpl extends AbstractDaprClient {
* @return Client after adding interceptors. * @return Client after adding interceptors.
*/ */
private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) { private DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context); return this.grpcInterceptors.intercept(client, context);
} }
/** /**
@ -1281,7 +1331,7 @@ public class DaprClientImpl extends AbstractDaprClient {
*/ */
private DaprGrpc.DaprStub intercept( private DaprGrpc.DaprStub intercept(
ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) { ContextView context, DaprGrpc.DaprStub client, Consumer<Metadata> metadataConsumer) {
return DaprClientGrpcInterceptors.intercept(client, this.timeoutPolicy, context, metadataConsumer); return this.grpcInterceptors.intercept(client, context, metadataConsumer);
} }
private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) { private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {

View File

@ -31,15 +31,17 @@ import java.util.function.Consumer;
*/ */
public class DaprClientGrpcInterceptors { public class DaprClientGrpcInterceptors {
/** private final String daprApiToken;
* Adds all Dapr interceptors to a gRPC async stub.
* @param appId the appId to be invoked private final TimeoutPolicy timeoutPolicy;
* @param client gRPC client
* @param <T> async client type public DaprClientGrpcInterceptors() {
* @return async client instance with interceptors this(null, null);
*/ }
public static <T extends AbstractStub<T>> T intercept(final String appId, final T client) {
return intercept(appId, client, null, null, null); public DaprClientGrpcInterceptors(String daprApiToken, TimeoutPolicy timeoutPolicy) {
this.daprApiToken = daprApiToken;
this.timeoutPolicy = timeoutPolicy;
} }
/** /**
@ -48,45 +50,21 @@ public class DaprClientGrpcInterceptors {
* @param <T> async client type * @param <T> async client type
* @return async client instance with interceptors * @return async client instance with interceptors
*/ */
public static <T extends AbstractStub<T>> T intercept(final T client) { public <T extends AbstractStub<T>> T intercept(final T client) {
return intercept(null, client, null, null, null); return intercept(null, client, null, null);
} }
/** /**
* Adds all Dapr interceptors to a gRPC async stub. * Adds all Dapr interceptors to a gRPC async stub.
* @param appId the appId to be invoked * @param appId Application ID to invoke.
* @param client gRPC client * @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param <T> async client type * @param <T> async client type
* @return async client instance with interceptors * @return async client instance with interceptors
*/ */
public static <T extends AbstractStub<T>> T intercept( public <T extends AbstractStub<T>> T intercept(
final String appId, final T client, final TimeoutPolicy timeoutPolicy) { final String appId,
return intercept(appId, client, timeoutPolicy, null, null); final T client) {
} return this.intercept(appId, client, null, null);
/**
* Adds all Dapr interceptors to a gRPC async stub.
* @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param <T> async client type
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(final T client, final TimeoutPolicy timeoutPolicy) {
return intercept(null, client, timeoutPolicy, null, null);
}
/**
* Adds all Dapr interceptors to a gRPC async stub.
* @param appId the appId to be invoked
* @param client gRPC client
* @param context Reactor context for tracing
* @param <T> async client type
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(
final String appId, final T client, final ContextView context) {
return intercept(appId, client, null, context, null);
} }
/** /**
@ -96,56 +74,39 @@ public class DaprClientGrpcInterceptors {
* @param <T> async client type * @param <T> async client type
* @return async client instance with interceptors * @return async client instance with interceptors
*/ */
public static <T extends AbstractStub<T>> T intercept(final T client, final ContextView context) { public <T extends AbstractStub<T>> T intercept(
return intercept(null, client, null, context, null);
}
/**
* Adds all Dapr interceptors to a gRPC async stub.
* @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param context Reactor context for tracing
* @param <T> async client type
* @return async client instance with interceptors
*/
public static <T extends AbstractStub<T>> T intercept(
final T client, final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context) { final ContextView context) {
return intercept(null, client, timeoutPolicy, context, null); return intercept(null, client, context, null);
} }
/** /**
* Adds all Dapr interceptors to a gRPC async stub. * Adds all Dapr interceptors to a gRPC async stub.
* @param client gRPC client * @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param context Reactor context for tracing * @param context Reactor context for tracing
* @param metadataConsumer Consumer of the gRPC metadata * @param metadataConsumer Consumer of the gRPC metadata
* @param <T> async client type * @param <T> async client type
* @return async client instance with interceptors * @return async client instance with interceptors
*/ */
public static <T extends AbstractStub<T>> T intercept( public <T extends AbstractStub<T>> T intercept(
final T client, final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context, final ContextView context,
final Consumer<Metadata> metadataConsumer) { final Consumer<Metadata> metadataConsumer) {
return intercept(null, client, timeoutPolicy, context, metadataConsumer); return this.intercept(null, client, context, metadataConsumer);
} }
/** /**
* Adds all Dapr interceptors to a gRPC async stub. * Adds all Dapr interceptors to a gRPC async stub.
* @param appId the appId to be invoked * @param appId Application ID to invoke.
* @param client gRPC client * @param client gRPC client
* @param timeoutPolicy timeout policy for gRPC call
* @param context Reactor context for tracing * @param context Reactor context for tracing
* @param metadataConsumer Consumer of the gRPC metadata * @param metadataConsumer Consumer of the gRPC metadata
* @param <T> async client type * @param <T> async client type
* @return async client instance with interceptors * @return async client instance with interceptors
*/ */
public static <T extends AbstractStub<T>> T intercept( public <T extends AbstractStub<T>> T intercept(
final String appId, final String appId,
final T client, final T client,
final TimeoutPolicy timeoutPolicy,
final ContextView context, final ContextView context,
final Consumer<Metadata> metadataConsumer) { final Consumer<Metadata> metadataConsumer) {
if (client == null) { if (client == null) {
@ -154,8 +115,8 @@ public class DaprClientGrpcInterceptors {
return client.withInterceptors( return client.withInterceptors(
new DaprAppIdInterceptor(appId), new DaprAppIdInterceptor(appId),
new DaprApiTokenInterceptor(), new DaprApiTokenInterceptor(this.daprApiToken),
new DaprTimeoutInterceptor(timeoutPolicy), new DaprTimeoutInterceptor(this.timeoutPolicy),
new DaprTracingInterceptor(context), new DaprTracingInterceptor(context),
new DaprMetadataInterceptor(metadataConsumer)); new DaprMetadataInterceptor(metadataConsumer));
} }

View File

@ -24,10 +24,23 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
/** /**
* Class to be used as part of your service's client stub interceptor to include Dapr tokens. * Class to be used as part of your service's client stub interceptor to include the Dapr API token.
*/ */
public class DaprApiTokenInterceptor implements ClientInterceptor { public class DaprApiTokenInterceptor implements ClientInterceptor {
/**
* Dapr API Token.
*/
private final String token;
/**
* Instantiates an interceptor to inject the Dapr API Token.
* @param token Dapr API Token.
*/
public DaprApiTokenInterceptor(String token) {
this.token = token;
}
@Override @Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, MethodDescriptor<ReqT, RespT> methodDescriptor,
@ -37,9 +50,10 @@ public class DaprApiTokenInterceptor implements ClientInterceptor {
return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) { return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) {
@Override @Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) { public void start(final Listener<RespT> responseListener, final Metadata metadata) {
String daprApiToken = Properties.API_TOKEN.get(); if (DaprApiTokenInterceptor.this.token != null) {
if (daprApiToken != null) { metadata.put(
metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER), daprApiToken); Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER),
DaprApiTokenInterceptor.this.token);
} }
super.start(responseListener, metadata); super.start(responseListener, metadata);
} }

View File

@ -238,7 +238,7 @@ null,
.build(); .build();
return Mono.deferContextual( return Mono.deferContextual(
context -> this.<CommonProtos.InvokeResponse>createMono( context -> this.<CommonProtos.InvokeResponse>createMono(
it -> DaprClientGrpcInterceptors.intercept(daprStub, context).invokeService(req, it) it -> new DaprClientGrpcInterceptors().intercept(daprStub, context).invokeService(req, it)
) )
).then(); ).then();
} }