diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b41377098..3724d7efd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: DAPR_RUNTIME_VER: 1.1.0-rc.1 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_CLI_REF: - DAPR_REF: + DAPR_REF: 266eb0b5d300c576c3360c7f0582d795579ee457 OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index 7227dc499..89e2a7836 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -68,31 +68,16 @@ public class DaprRun implements Stoppable { this.stop(); // Wait for the previous run to kill the prior process. long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); - callWithRetry(() -> { - System.out.println("Checking if previous run for Dapr application has stopped ..."); - try { - this.listCommand.run(); - throw new RuntimeException("Previous run for app has not stopped yet!"); - } catch (IllegalStateException e) { - // Success because we the list command did not find the app id. - } catch (Exception e) { - throw new RuntimeException(e); - } - }, timeLeft); + System.out.println("Checking if previous run for Dapr application has stopped ..."); + checkRunState(timeLeft, false); System.out.println("Starting dapr application ..."); this.startCommand.run(); this.started.set(true); timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); - callWithRetry(() -> { - System.out.println("Checking if Dapr application has started ..."); - try { - this.listCommand.run(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, timeLeft); + System.out.println("Checking if Dapr application has started ..."); + checkRunState(timeLeft, true); if (this.ports.getAppPort() != null) { timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); @@ -180,6 +165,25 @@ public class DaprRun implements Stoppable { return appName; } + public void checkRunState(long timeout, boolean shouldBeRunning) throws InterruptedException { + callWithRetry(() -> { + try { + this.listCommand.run(); + + if (!shouldBeRunning) { + throw new RuntimeException("Previous run for app has not stopped yet!"); + } + } catch (IllegalStateException e) { + // Bad case if the app is supposed to be running. + if (shouldBeRunning) { + throw e; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }, timeout); + } + private static String buildDaprCommand( String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) { StringBuilder stringBuilder = diff --git a/sdk-tests/src/test/java/io/dapr/it/api/ApiIT.java b/sdk-tests/src/test/java/io/dapr/it/api/ApiIT.java new file mode 100644 index 000000000..e4da1b6ca --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/api/ApiIT.java @@ -0,0 +1,55 @@ +package io.dapr.it.api; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.actors.ActorReminderRecoveryIT; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class ApiIT extends BaseIT { + + private static final Logger logger = LoggerFactory.getLogger(ApiIT.class); + private static final int DEFAULT_TIMEOUT = 60000; + + /** + * Parameters for this test. + * Param #1: useGrpc. + * + * @return Collection of parameter tuples. + */ + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + @Parameterized.Parameter + public boolean useGrpc; + + @Test + public void testShutdownAPI() throws Exception { + DaprRun run = startDaprApp(this.getClass().getSimpleName(), DEFAULT_TIMEOUT); + + if (this.useGrpc) { + run.switchToGRPC(); + } else { + run.switchToHTTP(); + } + + try (DaprClient client = new DaprClientBuilder().build()) { + logger.info("Sending shutdown request."); + client.shutdown().block(); + + logger.info("Ensuring dapr has stopped."); + run.checkRunState(DEFAULT_TIMEOUT, false); + } + } +} diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index ef6a07677..48b2c8b8e 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -550,4 +550,11 @@ public interface DaprClient extends AutoCloseable { * @return Key-value pairs for the secret. */ Mono>> getBulkSecret(GetBulkSecretRequest request); + + /** + * Gracefully shutdown the dapr runtime. + * + * @return a Mono plan of type Void. + */ + Mono shutdown(); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 92cb523f6..f841a4592 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -659,6 +659,17 @@ public class DaprClientGrpc extends AbstractDaprClient { } } + /** + * {@inheritDoc} + */ + @Override + public Mono shutdown() { + return Mono.subscriberContext().flatMap( + context -> this.createMono( + it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it)) + ).then(); + } + /** * Populates GRPC client with interceptors. * diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 76a8c6c05..bd26e3f0a 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -644,6 +644,18 @@ public class DaprClientHttp extends AbstractDaprClient { client.close(); } + /** + * {@inheritDoc} + */ + @Override + public Mono shutdown() { + String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" }; + return Mono.subscriberContext().flatMap( + context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments, + null, null, context)) + .then(); + } + /** * Converts metadata map into Query params. * @param metadata metadata map diff --git a/sdk/src/main/java/io/dapr/client/DaprClientProxy.java b/sdk/src/main/java/io/dapr/client/DaprClientProxy.java index 22171eba9..26eee85ef 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientProxy.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientProxy.java @@ -499,4 +499,12 @@ class DaprClientProxy implements DaprClient { methodInvocationOverrideClient.close(); } } + + /** + * {@inheritDoc} + */ + @Override + public Mono shutdown() { + return client.shutdown(); + } } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index c3f2cd999..c1fb29bc2 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -2142,6 +2142,19 @@ public class DaprClientGrpcTest { } } + @Test + public void shutdownTest() { + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(Empty.getDefaultInstance()); + observer.onCompleted(); + return null; + }).when(daprStub).shutdown(any(Empty.class), any()); + + Mono result = client.shutdown(); + result.block(); + } + private DaprProtos.GetStateResponse buildFutureGetStateEnvelop(T value, String etag) throws IOException { return buildGetStateResponse(value, etag); } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 429b177e5..16854dd5f 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -1267,6 +1267,16 @@ public class DaprClientHttpTest { daprClientHttp.close(); } + @Test + public void shutdown() throws Exception { + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/shutdown") + .respond(204); + + final Mono mono = daprClientHttp.shutdown(); + assertNull(mono.block()); + } + private static final class BodyMatcher implements Matcher { private final String expected;