Adds method to wait for Dapr sidecar. (#433)

This commit is contained in:
Artur Souza 2020-12-30 19:29:23 -08:00 committed by GitHub
parent 1a6675add8
commit 6a44d51279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 204 additions and 6 deletions

View File

@ -43,6 +43,10 @@ public class StateClient {
///...
public static void main(String[] args) throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
System.out.println("Waiting for Dapr sidecar ...");
client.waitForSidecar(10000).block();
System.out.println("Dapr sidecar is ready.");
String message = args.length == 0 ? " " : args[0];
MyClass myClass = new MyClass();
@ -102,7 +106,8 @@ public class StateClient {
```
The code uses the `DaprClient` created by the `DaprClientBuilder`. Notice that this builder uses default settings. Internally, it is using `DefaultObjectSerializer` for two properties: `objectSerializer` is for Dapr's sent and received objects, and `stateSerializer` is for objects to be persisted.
This example performs multiple operations:
This example performs multiple operations:
* `client.waitForSidecar(...)` for waiting until Dapr sidecar is ready.
* `client.saveState(...)` for persisting an instance of `MyClass`.
* `client.getState(...)` operation in order to retrieve back the persisted state using the same key.
* `client.executeStateTransaction(...)` operation in order to update existing state and add new state.

View File

@ -46,6 +46,10 @@ public class StateClient {
*/
public static void main(String[] args) throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
System.out.println("Waiting for Dapr sidecar ...");
client.waitForSidecar(10000).block();
System.out.println("Dapr sidecar is ready.");
String message = args.length == 0 ? " " : args[0];
MyClass myClass = new MyClass();

View File

@ -32,6 +32,13 @@ import java.util.Map;
*/
public interface DaprClient extends AutoCloseable {
/**
* Waits for the sidecar, giving up after timeout.
* @param timeoutInMilliseconds Timeout in milliseconds to wait for sidecar.
* @return a Mono plan of type Void.
*/
Mono<Void> waitForSidecar(int timeoutInMilliseconds);
/**
* Publish an event.
*

View File

@ -26,6 +26,7 @@ import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprGrpc;
@ -131,6 +132,20 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
/**
* {@inheritDoc}
*/

View File

@ -26,6 +26,7 @@ import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.opentelemetry.context.Context;
import reactor.core.publisher.Mono;
@ -101,6 +102,20 @@ public class DaprClientHttp extends AbstractDaprClient {
this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
/**
* {@inheritDoc}
*/

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.utils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
/**
* Utility methods for network, internal to Dapr SDK.
*/
public final class NetworkUtils {
private NetworkUtils() {
}
/**
* Tries to connect to a socket, retrying every 1 second.
* @param host Host to connect to.
* @param port Port to connect to.
* @param timeoutInMilliseconds Timeout in milliseconds to give up trying.
* @throws InterruptedException If retry is interrupted.
*/
public static void waitForSocket(String host, int port, int timeoutInMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
Retry.callWithRetry(() -> {
try {
try (Socket socket = new Socket()) {
// timeout cannot be negative.
// zero timeout means infinite, so 1 is the practical minimum.
int remainingTimeout = (int) Math.max(1, timeoutInMilliseconds - (System.currentTimeMillis() - started));
socket.connect(new InetSocketAddress(host, port), remainingTimeout);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}, timeoutInMilliseconds);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.utils;
class Retry {
private static final long RETRY_WAIT_MILLISECONDS = 1000;
private Retry() {
}
static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
while (true) {
Throwable exception;
try {
function.run();
return;
} catch (Exception e) {
exception = e;
} catch (AssertionError e) {
exception = e;
}
long elapsed = System.currentTimeMillis() - started;
if (elapsed >= retryTimeoutMilliseconds) {
if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
}
throw new RuntimeException(exception);
}
long remaining = retryTimeoutMilliseconds - elapsed;
Thread.sleep(Math.min(remaining, RETRY_WAIT_MILLISECONDS));
}
}
}

View File

@ -23,6 +23,7 @@ import io.dapr.client.domain.Response;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
@ -40,6 +41,8 @@ import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@ -51,7 +54,9 @@ import java.util.concurrent.ExecutionException;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static io.dapr.utils.TestUtils.findFreePort;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.doNothing;
@ -89,6 +94,30 @@ public class DaprClientGrpcTest {
verifyNoMoreInteractions(closeable);
}
@Test
public void waitForSidecarTimeout() throws Exception {
int port = findFreePort();
System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port));
assertThrows(RuntimeException.class, () -> adapter.waitForSidecar(1).block());
}
@Test
public void waitForSidecarTimeoutOK() throws Exception {
try (ServerSocket serverSocket = new ServerSocket(0)) {
final int port = serverSocket.getLocalPort();
System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port));
Thread t = new Thread(() -> {
try {
try (Socket socket = serverSocket.accept()) {
}
} catch (IOException e) {
}
});
t.start();
adapter.waitForSidecar(10000).block();
}
}
@Test
public void publishEventExceptionThrownTest() {
when(client.publishEvent(any(DaprProtos.PublishEventRequest.class)))

View File

@ -6,8 +6,8 @@ package io.dapr.client;
import com.fasterxml.jackson.core.JsonParseException;
import io.dapr.client.domain.DeleteStateRequestBuilder;
import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.GetBulkStateRequestBuilder;
import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.Response;
import io.dapr.client.domain.State;
@ -26,6 +26,8 @@ import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@ -35,10 +37,8 @@ import java.util.List;
import java.util.Map;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static io.dapr.utils.TestUtils.findFreePort;
import static org.junit.Assert.*;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@ -65,6 +65,34 @@ public class DaprClientHttpTest {
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
}
@Test
public void waitForSidecarTimeout() throws Exception {
int port = findFreePort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
assertThrows(RuntimeException.class, () -> daprClientHttp.waitForSidecar(1).block());
}
@Test
public void waitForSidecarTimeoutOK() throws Exception {
try (ServerSocket serverSocket = new ServerSocket(0)) {
final int port = serverSocket.getLocalPort();
System.setProperty(Properties.HTTP_PORT.getName(), Integer.toString(port));
Thread t = new Thread(() -> {
try {
try (Socket socket = serverSocket.accept()) {
}
} catch (IOException e) {
}
});
t.start();
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, okHttpClient);
DaprClientHttp daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.waitForSidecar(10000).block();
}
}
@Test
public void publishEventInvokation() {
mockInterceptor.addRule()

View File

@ -9,6 +9,11 @@ import io.dapr.exceptions.DaprException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashSet;
import java.util.Set;
public final class TestUtils {
private TestUtils() {}
@ -46,4 +51,11 @@ public final class TestUtils {
Assertions.assertEquals(expectedErrorCode, daprException.getErrorCode());
Assertions.assertEquals(expectedErrorMessage, daprException.getMessage());
}
public static int findFreePort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
socket.setReuseAddress(true);
return socket.getLocalPort();
}
}
}