Support static Properties overrides on DaprClientBuilder (#1097)

This commit is contained in:
salaboy 2024-08-23 21:36:01 -05:00 committed by GitHub
parent 93a7bc5295
commit b808c92320
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 190 additions and 62 deletions

View File

@ -17,8 +17,8 @@ import okhttp3.OkHttpClient;
public class DaprHttpProxy extends io.dapr.client.DaprHttp { public class DaprHttpProxy extends io.dapr.client.DaprHttp {
public DaprHttpProxy(String hostname, int port, OkHttpClient httpClient) { public DaprHttpProxy(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
super(hostname, port, httpClient); super(hostname, port, daprApiToken, httpClient);
} }
} }

View File

@ -91,8 +91,8 @@ public class MethodInvokeIT extends BaseIT {
String message = assertThrows(StatusRuntimeException.class, () -> stub.sleep(req)).getMessage(); String message = assertThrows(StatusRuntimeException.class, () -> stub.sleep(req)).getMessage();
long delay = System.currentTimeMillis() - started; long delay = System.currentTimeMillis() - started;
assertTrue(delay >= TIMEOUT_MS, "Delay: " + delay + " is not greater than timeout: " + TIMEOUT_MS); assertTrue(delay >= TIMEOUT_MS, "Delay: " + delay + " is not greater than timeout: " + TIMEOUT_MS);
assertTrue(message.contains("DEADLINE_EXCEEDED")); assertTrue(message.contains("DEADLINE_EXCEEDED"), "The message contains DEADLINE_EXCEEDED: " + message);
assertTrue(message.contains("CallOptions deadline exceeded after")); assertTrue(message.contains("CallOptions deadline exceeded after"), "The message contains DEADLINE_EXCEEDED: " + message);
} }
} }

View File

@ -49,7 +49,7 @@ public class DaprWorkflowClient implements AutoCloseable {
* Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel. * Public constructor for DaprWorkflowClient. This layer constructs the GRPC Channel.
*/ */
public DaprWorkflowClient() { public DaprWorkflowClient() {
this(NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR)); this(NetworkUtils.buildGrpcManagedChannel(new Properties(), WORKFLOW_INTERCEPTOR));
} }
/** /**

View File

@ -14,6 +14,7 @@ limitations under the License.
package io.dapr.workflows.runtime; package io.dapr.workflows.runtime;
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder; import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
import io.dapr.config.Properties;
import io.dapr.utils.NetworkUtils; import io.dapr.utils.NetworkUtils;
import io.dapr.workflows.Workflow; import io.dapr.workflows.Workflow;
import io.dapr.workflows.internal.ApiTokenClientInterceptor; import io.dapr.workflows.internal.ApiTokenClientInterceptor;
@ -38,15 +39,20 @@ public class WorkflowRuntimeBuilder {
* Constructs the WorkflowRuntimeBuilder. * Constructs the WorkflowRuntimeBuilder.
*/ */
public WorkflowRuntimeBuilder() { public WorkflowRuntimeBuilder() {
this(LoggerFactory.getLogger(WorkflowRuntimeBuilder.class)); this(new Properties(), LoggerFactory.getLogger(WorkflowRuntimeBuilder.class));
} }
WorkflowRuntimeBuilder(Logger logger) { public WorkflowRuntimeBuilder(Logger logger) {
this(new Properties(), logger);
}
WorkflowRuntimeBuilder(Properties properties, Logger logger) {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel( this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR)); NetworkUtils.buildGrpcManagedChannel(properties, WORKFLOW_INTERCEPTOR));
this.logger = logger; this.logger = logger;
} }
/** /**
* Returns a WorkflowRuntime object. * Returns a WorkflowRuntime object.
* *

View File

@ -14,18 +14,25 @@ limitations under the License.
package io.dapr.client; package io.dapr.client;
import io.dapr.client.resiliency.ResiliencyOptions; import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.config.Property;
import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.NetworkUtils; import io.dapr.utils.NetworkUtils;
import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import java.util.HashMap;
import java.util.Map;
/** /**
* A builder for the DaprClient, * A builder for the DaprClient,
* Currently only gRPC and HTTP Client will be supported. * Currently only gRPC and HTTP Client will be supported.
*/ */
public class DaprClientBuilder { public class DaprClientBuilder {
private final Map<String, String> propertyOverrides = new HashMap<>();
/** /**
* Builder for Dapr's HTTP Client. * Builder for Dapr's HTTP Client.
*/ */
@ -105,6 +112,17 @@ public class DaprClientBuilder {
return this; return this;
} }
/**
* Allow to set up properties override for static properties.
* @param property that we want to override
* @param value the value of such property
* @return an instance of the setup Client
*/
public DaprClientBuilder withPropertyOverride(Property<?> property, String value) {
this.propertyOverrides.put(property.getName(), value);
return this;
}
/** /**
* Build an instance of the Client based on the provided setup. * Build an instance of the Client based on the provided setup.
* *
@ -132,8 +150,9 @@ public class DaprClientBuilder {
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number. * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
*/ */
private DaprClientImpl buildDaprClient() { private DaprClientImpl buildDaprClient() {
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(); final Properties properties = new Properties(this.propertyOverrides);
final DaprHttp daprHttp = this.daprHttpBuilder.build(); final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel(properties);
final DaprHttp daprHttp = this.daprHttpBuilder.build(properties);
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel); final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel); DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientImpl( return new DaprClientImpl(

View File

@ -151,6 +151,11 @@ public class DaprHttp implements AutoCloseable {
*/ */
private final OkHttpClient httpClient; private final OkHttpClient httpClient;
/**
* Dapr API Token required to interact with DAPR APIs.
*/
private final String daprApiToken;
/** /**
* Creates a new instance of {@link DaprHttp}. * Creates a new instance of {@link DaprHttp}.
* *
@ -158,9 +163,10 @@ public class DaprHttp implements AutoCloseable {
* @param port Port for calling Dapr. (e.g. 3500) * @param port Port for calling Dapr. (e.g. 3500)
* @param httpClient RestClient used for all API calls in this new instance. * @param httpClient RestClient used for all API calls in this new instance.
*/ */
DaprHttp(String hostname, int port, OkHttpClient httpClient) { DaprHttp(String hostname, int port, String daprApiToken, OkHttpClient httpClient) {
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port); this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
this.httpClient = httpClient; this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
} }
/** /**
@ -169,9 +175,10 @@ public class DaprHttp implements AutoCloseable {
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com") * @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
* @param httpClient RestClient used for all API calls in this new instance. * @param httpClient RestClient used for all API calls in this new instance.
*/ */
DaprHttp(String uri, OkHttpClient httpClient) { DaprHttp(String uri, String daprApiToken, OkHttpClient httpClient) {
this.uri = URI.create(uri); this.uri = URI.create(uri);
this.httpClient = httpClient; this.httpClient = httpClient;
this.daprApiToken = daprApiToken;
} }
/** /**
@ -314,7 +321,6 @@ public class DaprHttp implements AutoCloseable {
requestBuilder.method(method, body); requestBuilder.method(method, body);
} }
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) { if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken); requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
} }

View File

@ -21,6 +21,14 @@ import okhttp3.OkHttpClient;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static io.dapr.config.Properties.API_TOKEN;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS;
import static io.dapr.config.Properties.HTTP_CLIENT_MAX_REQUESTS;
import static io.dapr.config.Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS;
import static io.dapr.config.Properties.HTTP_ENDPOINT;
import static io.dapr.config.Properties.HTTP_PORT;
import static io.dapr.config.Properties.SIDECAR_IP;
/** /**
* A builder for the DaprHttp. * A builder for the DaprHttp.
*/ */
@ -43,38 +51,39 @@ public class DaprHttpBuilder {
*/ */
private static final int KEEP_ALIVE_DURATION = 30; private static final int KEEP_ALIVE_DURATION = 30;
/** /**
* Build an instance of the Http client based on the provided setup. * Build an instance of the Http client based on the provided setup.
* * @param properties to configure the DaprHttp client
* @return an instance of {@link DaprHttp} * @return an instance of {@link DaprHttp}
* @throws IllegalStateException if any required field is missing * @throws IllegalStateException if any required field is missing
*/ */
public DaprHttp build() { public DaprHttp build(Properties properties) {
return buildDaprHttp(); return buildDaprHttp(properties);
} }
/** /**
* Creates an instance of the HTTP Client. * Creates an instance of the HTTP Client.
* * @param properties to configure the DaprHttp client
* @return Instance of {@link DaprHttp} * @return Instance of {@link DaprHttp}
*/ */
private DaprHttp buildDaprHttp() { private DaprHttp buildDaprHttp(Properties properties) {
if (OK_HTTP_CLIENT == null) { if (OK_HTTP_CLIENT == null) {
synchronized (LOCK) { synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) { if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder(); OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get()); Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS));
builder.readTimeout(readTimeout); builder.readTimeout(readTimeout);
Dispatcher dispatcher = new Dispatcher(); Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get()); dispatcher.setMaxRequests(properties.getValue(HTTP_CLIENT_MAX_REQUESTS));
// The maximum number of requests for each host to execute concurrently. // The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE! // Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests. // For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get()); dispatcher.setMaxRequestsPerHost(HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher); builder.dispatcher(dispatcher);
ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(), ConnectionPool pool = new ConnectionPool(properties.getValue(HTTP_CLIENT_MAX_IDLE_CONNECTIONS),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS); KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool); builder.connectionPool(pool);
@ -83,11 +92,14 @@ public class DaprHttpBuilder {
} }
} }
String endpoint = Properties.HTTP_ENDPOINT.get(); String endpoint = properties.getValue(HTTP_ENDPOINT);
if ((endpoint != null) && !endpoint.isEmpty()) { if ((endpoint != null) && !endpoint.isEmpty()) {
return new DaprHttp(endpoint, OK_HTTP_CLIENT); return new DaprHttp(endpoint, properties.getValue(API_TOKEN), OK_HTTP_CLIENT);
} }
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT); return new DaprHttp(properties.getValue(SIDECAR_IP), properties.getValue(HTTP_PORT), properties.getValue(API_TOKEN),
OK_HTTP_CLIENT);
} }
} }

View File

@ -18,6 +18,8 @@ import io.dapr.utils.NetworkUtils;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Duration; import java.time.Duration;
import java.util.Collections;
import java.util.Map;
/** /**
* Global properties for Dapr's SDK, using Supplier so they are dynamically resolved. * Global properties for Dapr's SDK, using Supplier so they are dynamically resolved.
@ -172,4 +174,39 @@ public class Properties {
"dapr.http.client.maxIdleConnections", "dapr.http.client.maxIdleConnections",
"DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS", "DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS",
DEFAULT_HTTP_CLIENT_MAX_IDLE_CONNECTIONS); DEFAULT_HTTP_CLIENT_MAX_IDLE_CONNECTIONS);
/**
* Mechanism to override properties set in a static context.
*/
private final Map<String, String> overrides;
/**
* Creates a new instance to handle Properties per instance.
*/
public Properties() {
this.overrides = null;
}
/**
* Creates a new instance to handle Properties per instance.
* @param overrides to override static properties
*/
public Properties(Map<String, String> overrides) {
this.overrides = Collections.unmodifiableMap(overrides);
}
/**
* Gets a property value taking in consideration the override values.
* @param <T> type of the property that we want to get the value from
* @param property to override static property value from overrides
* @return the property's value
*/
public <T> T getValue(Property<T> property) {
if (overrides != null) {
String override = overrides.get(property.getName());
return property.get(override);
} else {
return property.get();
}
}
} }

View File

@ -72,6 +72,24 @@ public abstract class Property<T> {
* @return Value from system property (1st) or env variable (2nd) or default (last). * @return Value from system property (1st) or env variable (2nd) or default (last).
*/ */
public T get() { public T get() {
return this.get(null);
}
/**
* Gets the value defined by system property first, then env variable or sticks to default.
* @param override overrides the property value
* @return Value from system property (1st) or env variable (2nd) or default (last).
*/
public T get(String override) {
if ((override != null) && !override.isEmpty()) {
try {
return this.parse(override);
} catch (IllegalArgumentException e) {
LOGGER.warning(String.format("Invalid override value in property: %s", this.name));
// OK, we tried. Falling back to system environment variable.
}
}
String propValue = System.getProperty(this.name); String propValue = System.getProperty(this.name);
if (propValue != null && !propValue.trim().isEmpty()) { if (propValue != null && !propValue.trim().isEmpty()) {
try { try {

View File

@ -24,6 +24,11 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static io.dapr.config.Properties.GRPC_ENDPOINT;
import static io.dapr.config.Properties.GRPC_PORT;
import static io.dapr.config.Properties.SIDECAR_IP;
/** /**
* Utility methods for network, internal to Dapr SDK. * Utility methods for network, internal to Dapr SDK.
*/ */
@ -102,12 +107,12 @@ public final class NetworkUtils {
/** /**
* Creates a GRPC managed channel. * Creates a GRPC managed channel.
* * @param properties instance to set up the GrpcEndpoint
* @param interceptors Optional interceptors to add to the channel. * @param interceptors Optional interceptors to add to the channel.
* @return GRPC managed channel to communicate with the sidecar. * @return GRPC managed channel to communicate with the sidecar.
*/ */
public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interceptors) { public static ManagedChannel buildGrpcManagedChannel(Properties properties, ClientInterceptor... interceptors) {
var settings = GrpcEndpointSettings.parse(); var settings = GrpcEndpointSettings.parse(properties);
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(settings.endpoint) ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(settings.endpoint)
.userAgent(Version.getSdkVersion()); .userAgent(Version.getSdkVersion());
if (!settings.secure) { if (!settings.secure) {
@ -129,11 +134,11 @@ public final class NetworkUtils {
this.secure = secure; this.secure = secure;
} }
static GrpcEndpointSettings parse() { static GrpcEndpointSettings parse(Properties properties) {
String address = Properties.SIDECAR_IP.get(); String address = properties.getValue(SIDECAR_IP);
int port = Properties.GRPC_PORT.get(); int port = properties.getValue(GRPC_PORT);
boolean secure = false; boolean secure = false;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get(); String grpcEndpoint = properties.getValue(GRPC_ENDPOINT);
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) { if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
var matcher = GRPC_ENDPOINT_PATTERN.matcher(grpcEndpoint); var matcher = GRPC_ENDPOINT_PATTERN.matcher(grpcEndpoint);
if (!matcher.matches()) { if (!matcher.matches()) {

View File

@ -13,11 +13,15 @@ limitations under the License.
package io.dapr.client; package io.dapr.client;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprErrorDetails;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DaprObjectSerializer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -35,6 +39,18 @@ public class DaprClientBuilderTest {
assertNotNull(daprClient); assertNotNull(daprClient);
} }
@Test
public void buildWithOverrideSidecarIP() {
DaprClientBuilder daprClientBuilder = new DaprClientBuilder();
daprClientBuilder.withPropertyOverride(Properties.SIDECAR_IP, "unknown-host");
DaprClient daprClient = daprClientBuilder.build();
assertNotNull(daprClient);
DaprException thrown = assertThrows(DaprException.class, () -> { daprClient.getMetadata().block(); });
assertTrue(thrown.toString().contains("UNAVAILABLE"));
}
@Test @Test
public void noObjectSerializer() { public void noObjectSerializer() {
assertThrows(IllegalArgumentException.class, () -> { new DaprClientBuilder().withObjectSerializer(null);}); assertThrows(IllegalArgumentException.class, () -> { new DaprClientBuilder().withObjectSerializer(null);});

View File

@ -66,6 +66,8 @@ public class DaprClientHttpTest {
private String sidecarIp; private String sidecarIp;
private String daprApiToken;
private DaprClient daprClientHttp; private DaprClient daprClientHttp;
private DaprHttp daprHttp; private DaprHttp daprHttp;
@ -77,9 +79,10 @@ public class DaprClientHttpTest {
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get()); sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprApiToken = Properties.API_TOKEN.get();
mockInterceptor = new MockInterceptor(Behavior.UNORDERED); mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
daprHttp = new DaprHttp(sidecarIp, 3000, okHttpClient); daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, okHttpClient);
daprClientHttp = buildDaprClient(daprHttp); daprClientHttp = buildDaprClient(daprHttp);
} }
@ -97,7 +100,7 @@ public class DaprClientHttpTest {
@Test @Test
public void waitForSidecarTimeOutHealthCheck() throws Exception { public void waitForSidecarTimeOutHealthCheck() throws Exception {
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient); daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, okHttpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp); DaprClient daprClientHttp = buildDaprClient(daprHttp);
mockInterceptor.addRule() mockInterceptor.addRule()
@ -122,7 +125,7 @@ public class DaprClientHttpTest {
public void waitForSidecarBadHealthCheck() throws Exception { public void waitForSidecarBadHealthCheck() throws Exception {
int port = findFreePort(); int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp); DaprClient daprClientHttp = buildDaprClient(daprHttp);
mockInterceptor.addRule() mockInterceptor.addRule()
@ -147,7 +150,7 @@ public class DaprClientHttpTest {
public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception { public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception {
int port = findFreePort(); int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient); daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp); DaprClient daprClientHttp = buildDaprClient(daprHttp);
// Simulate a slow response // Simulate a slow response
@ -176,7 +179,7 @@ public class DaprClientHttpTest {
public void waitForSidecarOK() throws Exception { public void waitForSidecarOK() throws Exception {
int port = findFreePort(); int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port)); System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(sidecarIp, port, okHttpClient); daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp); DaprClient daprClientHttp = buildDaprClient(daprHttp);
mockInterceptor.addRule() mockInterceptor.addRule()
@ -207,7 +210,7 @@ public class DaprClientHttpTest {
} }
}); });
t.start(); t.start();
daprHttp = new DaprHttp(sidecarIp, port, okHttpClient); daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient);
DaprClient daprClientHttp = buildDaprClient(daprHttp); DaprClient daprClientHttp = buildDaprClient(daprHttp);
daprClientHttp.waitForSidecar(10000).block(); daprClientHttp.waitForSidecar(10000).block();
} }

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.client; package io.dapr.client;
import io.dapr.config.Properties;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -25,8 +26,9 @@ public class DaprHttpBuilderTest {
@Test @Test
public void singletonOkHttpClient() throws Exception { public void singletonOkHttpClient() throws Exception {
DaprHttp daprHttp = new DaprHttpBuilder().build(); Properties properties = new Properties();
DaprHttp anotherDaprHttp = new DaprHttpBuilder().build(); DaprHttp daprHttp = new DaprHttpBuilder().build(properties);
DaprHttp anotherDaprHttp = new DaprHttpBuilder().build(properties);
assertSame(getOkHttpClient(daprHttp), getOkHttpClient(anotherDaprHttp)); assertSame(getOkHttpClient(daprHttp), getOkHttpClient(anotherDaprHttp));
} }

View File

@ -34,7 +34,7 @@ public class DaprHttpStub extends DaprHttp {
* Instantiates a stub for DaprHttp * Instantiates a stub for DaprHttp
*/ */
public DaprHttpStub() { public DaprHttpStub() {
super(null, 3000, null); super(null, 3000, "stubToken", null);
} }
/** /**

View File

@ -56,6 +56,8 @@ public class DaprHttpTest {
private String sidecarIp; private String sidecarIp;
private String daprTokenApi;
private OkHttpClient okHttpClient; private OkHttpClient okHttpClient;
private MockInterceptor mockInterceptor; private MockInterceptor mockInterceptor;
@ -65,6 +67,7 @@ public class DaprHttpTest {
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get()); sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get());
daprTokenApi = Properties.API_TOKEN.get();
mockInterceptor = new MockInterceptor(Behavior.UNORDERED); mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
} }
@ -77,7 +80,7 @@ public class DaprHttpTest {
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
environmentVariables.set(Properties.API_TOKEN.getEnvName(), "xyz"); environmentVariables.set(Properties.API_TOKEN.getEnvName(), "xyz");
assertEquals("xyz", Properties.API_TOKEN.get()); assertEquals("xyz", Properties.API_TOKEN.get());
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -93,7 +96,7 @@ public class DaprHttpTest {
.hasHeader(Headers.DAPR_API_TOKEN) .hasHeader(Headers.DAPR_API_TOKEN)
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
assertNull(Properties.API_TOKEN.get()); assertNull(Properties.API_TOKEN.get());
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -109,7 +112,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -128,7 +131,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
System.setProperty(Properties.SIDECAR_IP.getName(), prevSidecarIp); System.setProperty(Properties.SIDECAR_IP.getName(), prevSidecarIp);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty());
@ -143,7 +146,7 @@ public class DaprHttpTest {
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT)) .respond(serializer.serialize(EXPECTED_RESULT))
.addHeader("Header", "Value"); .addHeader("Header", "Value");
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -156,7 +159,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.delete("http://" + sidecarIp + ":3500/v1.0/state") .delete("http://" + sidecarIp + ":3500/v1.0/state")
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty()); daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -167,7 +170,7 @@ public class DaprHttpTest {
@Test @Test
public void invokeHEADMethod() throws IOException { public void invokeHEADMethod() throws IOException {
mockInterceptor.addRule().head("http://127.0.0.1:3500/v1.0/state").respond(HttpURLConnection.HTTP_OK); mockInterceptor.addRule().head("http://127.0.0.1:3500/v1.0/state").respond(HttpURLConnection.HTTP_OK);
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("HEAD", "v1.0/state".split("/"), null, (String) null, null, Context.empty()); daprHttp.invokeApi("HEAD", "v1.0/state".split("/"), null, (String) null, null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -179,7 +182,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/v1.0/get") .get("http://" + sidecarIp + ":3500/v1.0/get")
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
String body = serializer.deserialize(response.getBody(), String.class); String body = serializer.deserialize(response.getBody(), String.class);
@ -196,7 +199,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.get("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41") .get("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41")
.respond(serializer.serialize(EXPECTED_RESULT)); .respond(serializer.serialize(EXPECTED_RESULT));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty()); daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty());
DaprHttp.Response response = mono.block(); DaprHttp.Response response = mono.block();
@ -209,7 +212,7 @@ public class DaprHttpTest {
mockInterceptor.addRule() mockInterceptor.addRule()
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500); .respond(500);
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = Mono<DaprHttp.Response> mono =
daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
StepVerifier.create(mono).expectError(RuntimeException.class).verify(); StepVerifier.create(mono).expectError(RuntimeException.class).verify();
@ -221,7 +224,7 @@ public class DaprHttpTest {
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500, ResponseBody.create(MediaType.parse("text"), .respond(500, ResponseBody.create(MediaType.parse("text"),
"{\"errorCode\":null,\"message\":null}")); "{\"errorCode\":null,\"message\":null}"));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
StepVerifier.create(mono).expectError(RuntimeException.class).verify(); StepVerifier.create(mono).expectError(RuntimeException.class).verify();
} }
@ -232,7 +235,7 @@ public class DaprHttpTest {
.post("http://" + sidecarIp + ":3500/v1.0/state") .post("http://" + sidecarIp + ":3500/v1.0/state")
.respond(500, ResponseBody.create(MediaType.parse("application/json"), .respond(500, ResponseBody.create(MediaType.parse("application/json"),
"{\"errorCode\":\"null\",\"message\":\"null\"}")); "{\"errorCode\":\"null\",\"message\":\"null\"}"));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty());
StepVerifier.create(mono).expectError(RuntimeException.class).verify(); StepVerifier.create(mono).expectError(RuntimeException.class).verify();
} }
@ -253,7 +256,7 @@ public class DaprHttpTest {
.post("http://127.0.0.1:3500/v1.0/pubsub/publish") .post("http://127.0.0.1:3500/v1.0/pubsub/publish")
.respond(500, ResponseBody.create(MediaType.parse("application/json"), .respond(500, ResponseBody.create(MediaType.parse("application/json"),
payload)); payload));
DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/pubsub/publish".split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> mono = daprHttp.invokeApi("POST", "v1.0/pubsub/publish".split("/"), null, null, Context.empty());
StepVerifier.create(mono).expectErrorMatches(e -> { StepVerifier.create(mono).expectErrorMatches(e -> {
assertEquals(DaprException.class, e.getClass()); assertEquals(DaprException.class, e.getClass());
@ -302,7 +305,7 @@ public class DaprHttpTest {
.get("http://" + sidecarIp + ":3500/" + urlExistingState) .get("http://" + sidecarIp + ":3500/" + urlExistingState)
.respond(200, ResponseBody.create(MediaType.parse("application/json"), .respond(200, ResponseBody.create(MediaType.parse("application/json"),
serializer.serialize(existingState))); serializer.serialize(existingState)));
DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, okHttpClient); DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient);
Mono<DaprHttp.Response> response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty());
assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class)); assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class));
Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty()); Mono<DaprHttp.Response> responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty());

View File

@ -14,6 +14,7 @@ public class NetworkUtilsTest {
private final String defaultSidecarIP = "127.0.0.1"; private final String defaultSidecarIP = "127.0.0.1";
private ManagedChannel channel; private ManagedChannel channel;
private Properties properties = new Properties();
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
@ -31,7 +32,7 @@ public class NetworkUtilsTest {
@Test @Test
public void testBuildGrpcManagedChannel() { public void testBuildGrpcManagedChannel() {
channel = NetworkUtils.buildGrpcManagedChannel(); channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = String.format("%s:%s", defaultSidecarIP, defaultGrpcPort); String expectedAuthority = String.format("%s:%s", defaultSidecarIP, defaultGrpcPort);
Assertions.assertEquals(expectedAuthority, channel.authority()); Assertions.assertEquals(expectedAuthority, channel.authority());
@ -40,7 +41,7 @@ public class NetworkUtilsTest {
@Test @Test
public void testBuildGrpcManagedChannel_httpEndpointNoPort() { public void testBuildGrpcManagedChannel_httpEndpointNoPort() {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com"); System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com");
channel = NetworkUtils.buildGrpcManagedChannel(); channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:80"; String expectedAuthority = "example.com:80";
Assertions.assertEquals(expectedAuthority, channel.authority()); Assertions.assertEquals(expectedAuthority, channel.authority());
@ -49,7 +50,7 @@ public class NetworkUtilsTest {
@Test @Test
public void testBuildGrpcManagedChannel_httpEndpointWithPort() { public void testBuildGrpcManagedChannel_httpEndpointWithPort() {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000"); System.setProperty(Properties.GRPC_ENDPOINT.getName(), "http://example.com:3000");
channel = NetworkUtils.buildGrpcManagedChannel(); channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:3000"; String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority()); Assertions.assertEquals(expectedAuthority, channel.authority());
@ -58,7 +59,7 @@ public class NetworkUtilsTest {
@Test @Test
public void testBuildGrpcManagedChannel_httpsEndpointNoPort() { public void testBuildGrpcManagedChannel_httpsEndpointNoPort() {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com"); System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com");
channel = NetworkUtils.buildGrpcManagedChannel(); channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:443"; String expectedAuthority = "example.com:443";
Assertions.assertEquals(expectedAuthority, channel.authority()); Assertions.assertEquals(expectedAuthority, channel.authority());
@ -67,7 +68,7 @@ public class NetworkUtilsTest {
@Test @Test
public void testBuildGrpcManagedChannel_httpsEndpointWithPort() { public void testBuildGrpcManagedChannel_httpsEndpointWithPort() {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000"); System.setProperty(Properties.GRPC_ENDPOINT.getName(), "https://example.com:3000");
channel = NetworkUtils.buildGrpcManagedChannel(); channel = NetworkUtils.buildGrpcManagedChannel(properties);
String expectedAuthority = "example.com:3000"; String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority()); Assertions.assertEquals(expectedAuthority, channel.authority());
@ -138,7 +139,7 @@ public class NetworkUtilsTest {
boolean expectSecure boolean expectSecure
) { ) {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue); System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
var settings = NetworkUtils.GrpcEndpointSettings.parse(); var settings = NetworkUtils.GrpcEndpointSettings.parse(new Properties());
Assertions.assertEquals(expectedEndpoint, settings.endpoint); Assertions.assertEquals(expectedEndpoint, settings.endpoint);
Assertions.assertEquals(expectSecure, settings.secure); Assertions.assertEquals(expectSecure, settings.secure);
@ -147,7 +148,7 @@ public class NetworkUtilsTest {
private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) { private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) {
try { try {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue); System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
NetworkUtils.GrpcEndpointSettings.parse(); NetworkUtils.GrpcEndpointSettings.parse(new Properties());
Assert.fail(); Assert.fail();
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// Expected // Expected