Add shutdown API support (#530)

* Add shutdown API support

https://github.com/dapr/java-sdk/issues/529

* Utilize POST instead of GET for shutdown API
This commit is contained in:
halspang 2021-04-07 11:42:00 -07:00 committed by GitHub
parent bdf8e86b87
commit a654861def
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 140 additions and 20 deletions

View File

@ -26,7 +26,7 @@ jobs:
DAPR_RUNTIME_VER: 1.1.0-rc.1 DAPR_RUNTIME_VER: 1.1.0-rc.1
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF: DAPR_CLI_REF:
DAPR_REF: DAPR_REF: 266eb0b5d300c576c3360c7f0582d795579ee457
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }} GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -68,31 +68,16 @@ public class DaprRun implements Stoppable {
this.stop(); this.stop();
// Wait for the previous run to kill the prior process. // Wait for the previous run to kill the prior process.
long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
callWithRetry(() -> { System.out.println("Checking if previous run for Dapr application has stopped ...");
System.out.println("Checking if previous run for Dapr application has stopped ..."); checkRunState(timeLeft, false);
try {
this.listCommand.run();
throw new RuntimeException("Previous run for app has not stopped yet!");
} catch (IllegalStateException e) {
// Success because we the list command did not find the app id.
} catch (Exception e) {
throw new RuntimeException(e);
}
}, timeLeft);
System.out.println("Starting dapr application ..."); System.out.println("Starting dapr application ...");
this.startCommand.run(); this.startCommand.run();
this.started.set(true); this.started.set(true);
timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
callWithRetry(() -> { System.out.println("Checking if Dapr application has started ...");
System.out.println("Checking if Dapr application has started ..."); checkRunState(timeLeft, true);
try {
this.listCommand.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, timeLeft);
if (this.ports.getAppPort() != null) { if (this.ports.getAppPort() != null) {
timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
@ -180,6 +165,25 @@ public class DaprRun implements Stoppable {
return appName; return appName;
} }
public void checkRunState(long timeout, boolean shouldBeRunning) throws InterruptedException {
callWithRetry(() -> {
try {
this.listCommand.run();
if (!shouldBeRunning) {
throw new RuntimeException("Previous run for app has not stopped yet!");
}
} catch (IllegalStateException e) {
// Bad case if the app is supposed to be running.
if (shouldBeRunning) {
throw e;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, timeout);
}
private static String buildDaprCommand( private static String buildDaprCommand(
String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) { String appName, Class serviceClass, DaprPorts ports, DaprApiProtocol protocol, DaprApiProtocol appProtocol) {
StringBuilder stringBuilder = StringBuilder stringBuilder =

View File

@ -0,0 +1,55 @@
package io.dapr.it.api;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.ActorReminderRecoveryIT;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
@RunWith(Parameterized.class)
public class ApiIT extends BaseIT {
private static final Logger logger = LoggerFactory.getLogger(ApiIT.class);
private static final int DEFAULT_TIMEOUT = 60000;
/**
* Parameters for this test.
* Param #1: useGrpc.
*
* @return Collection of parameter tuples.
*/
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{{false}, {true}});
}
@Parameterized.Parameter
public boolean useGrpc;
@Test
public void testShutdownAPI() throws Exception {
DaprRun run = startDaprApp(this.getClass().getSimpleName(), DEFAULT_TIMEOUT);
if (this.useGrpc) {
run.switchToGRPC();
} else {
run.switchToHTTP();
}
try (DaprClient client = new DaprClientBuilder().build()) {
logger.info("Sending shutdown request.");
client.shutdown().block();
logger.info("Ensuring dapr has stopped.");
run.checkRunState(DEFAULT_TIMEOUT, false);
}
}
}

View File

@ -550,4 +550,11 @@ public interface DaprClient extends AutoCloseable {
* @return Key-value pairs for the secret. * @return Key-value pairs for the secret.
*/ */
Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest request); Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest request);
/**
* Gracefully shutdown the dapr runtime.
*
* @return a Mono plan of type Void.
*/
Mono<Void> shutdown();
} }

View File

@ -659,6 +659,17 @@ public class DaprClientGrpc extends AbstractDaprClient {
} }
} }
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> shutdown() {
return Mono.subscriberContext().flatMap(
context -> this.<Empty>createMono(
it -> intercept(context, asyncStub).shutdown(Empty.getDefaultInstance(), it))
).then();
}
/** /**
* Populates GRPC client with interceptors. * Populates GRPC client with interceptors.
* *

View File

@ -644,6 +644,18 @@ public class DaprClientHttp extends AbstractDaprClient {
client.close(); client.close();
} }
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> shutdown() {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
return Mono.subscriberContext().flatMap(
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
null, null, context))
.then();
}
/** /**
* Converts metadata map into Query params. * Converts metadata map into Query params.
* @param metadata metadata map * @param metadata metadata map

View File

@ -499,4 +499,12 @@ class DaprClientProxy implements DaprClient {
methodInvocationOverrideClient.close(); methodInvocationOverrideClient.close();
} }
} }
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> shutdown() {
return client.shutdown();
}
} }

View File

@ -2142,6 +2142,19 @@ public class DaprClientGrpcTest {
} }
} }
@Test
public void shutdownTest() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
observer.onNext(Empty.getDefaultInstance());
observer.onCompleted();
return null;
}).when(daprStub).shutdown(any(Empty.class), any());
Mono<Void> result = client.shutdown();
result.block();
}
private <T> DaprProtos.GetStateResponse buildFutureGetStateEnvelop(T value, String etag) throws IOException { private <T> DaprProtos.GetStateResponse buildFutureGetStateEnvelop(T value, String etag) throws IOException {
return buildGetStateResponse(value, etag); return buildGetStateResponse(value, etag);
} }

View File

@ -1267,6 +1267,16 @@ public class DaprClientHttpTest {
daprClientHttp.close(); daprClientHttp.close();
} }
@Test
public void shutdown() throws Exception {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/shutdown")
.respond(204);
final Mono<Void> mono = daprClientHttp.shutdown();
assertNull(mono.block());
}
private static final class BodyMatcher implements Matcher { private static final class BodyMatcher implements Matcher {
private final String expected; private final String expected;