From 76c848bb6fec6ba51076475314212bfd9d1b1d7c Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 16 Oct 2020 14:29:31 -0700 Subject: [PATCH] Fixes timer invocation after app restarts. (#363) --- .../examples/actors/http/DemoActorClient.java | 2 +- .../actors/http/DemoActorService.java | 2 +- .../io/dapr/actors/runtime/AbstractActor.java | 37 ++---- .../io/dapr/actors/runtime/ActorManager.java | 65 ++-------- .../io/dapr/actors/runtime/ActorRuntime.java | 5 +- .../dapr/actors/runtime/ActorTimerParams.java | 59 +++++++++ .../actors/runtime/ActorTypeUtilities.java | 56 --------- .../runtime/DaprStateAsyncProvider.java | 1 - .../dapr/actors/runtime/ActorManagerTest.java | 36 +++--- .../dapr/actors/runtime/ActorNoStateTest.java | 79 +++++++++++- .../actors/runtime/ActorStatefulTest.java | 14 ++- .../runtime/ActorTypeUtilitiesTest.java | 79 ++++++++++++ .../io/dapr/springboot/DaprController.java | 6 +- sdk-tests/pom.xml | 6 + .../src/test/java/io/dapr/it/AppRun.java | 92 ++++++++++++++ .../src/test/java/io/dapr/it/BaseIT.java | 53 +++++--- .../src/test/java/io/dapr/it/Command.java | 35 +++++- .../src/test/java/io/dapr/it/DaprPorts.java | 1 - .../src/test/java/io/dapr/it/DaprRun.java | 38 +++++- .../src/test/java/io/dapr/it/Stoppable.java | 15 +++ .../dapr/it/actors/ActorTimerRecoveryIT.java | 116 ++++++++++++++++++ 21 files changed, 605 insertions(+), 192 deletions(-) create mode 100644 sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java create mode 100644 sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTypeUtilitiesTest.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/AppRun.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/Stoppable.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java index 25194615e..3867693d3 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java @@ -17,7 +17,7 @@ import java.util.List; * mvn clean install * 2. Run the client: * dapr run --components-path ./components --app-id demoactorclient --dapr-http-port 3006 -- java -jar \ - * examples/target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.http.DemoActorClient + * target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.http.DemoActorClient */ public class DemoActorClient { diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java index 83a86f327..7de0d20ef 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java @@ -20,7 +20,7 @@ import java.time.Duration; * mvn clean install * 2. Run the server: * dapr run --components-path ./components --app-id demoactorservice --app-port 3000 --dapr-http-port 3005 \ - * -- java -jar examples/target/dapr-java-sdk-examples-exec.jar \ + * -- java -jar target/dapr-java-sdk-examples-exec.jar \ * io.dapr.examples.actors.http.DemoActorService -p 3000 */ public class DemoActorService { diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java index 898f01268..ead31a92e 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java @@ -11,8 +11,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.time.Duration; -import java.util.HashMap; -import java.util.Map; +import java.util.UUID; /** * Represents the base class for actors. @@ -43,11 +42,6 @@ public abstract class AbstractActor { */ private final ActorTrace actorTrace; - /** - * Registered timers for this Actor. - */ - private final Map timers; - /** * Manager for the states in Actors. */ @@ -72,7 +66,6 @@ public abstract class AbstractActor { runtimeContext.getActorTypeInformation().getName(), id); this.actorTrace = runtimeContext.getActorTrace(); - this.timers = new HashMap<>(); this.started = false; } @@ -135,9 +128,9 @@ public abstract class AbstractActor { * @param period The time interval between invocations of the async callback. * Specify negative one (-1) milliseconds to disable periodic signaling. * @param Type for the state to be passed in to timer. - * @return Asynchronous result. + * @return Asynchronous result with timer's name. */ - protected Mono registerActorTimer( + protected Mono registerActorTimer( String timerName, String callback, T state, @@ -150,19 +143,17 @@ public abstract class AbstractActor { String name = timerName; if ((timerName == null) || (timerName.isEmpty())) { - name = String.format("%s_Timer_%d", this.id.toString(), this.timers.size() + 1); + name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString()); } - ActorTimer actorTimer = new ActorTimer(this, name, callback, state, dueTime, period); - this.timers.put(name, actorTimer); - return actorTimer; + return new ActorTimer(this, name, callback, state, dueTime, period); }).flatMap(actorTimer -> { try { return this.actorRuntimeContext.getDaprClient().registerActorTimer( this.actorRuntimeContext.getActorTypeInformation().getName(), this.id.toString(), actorTimer.getName(), - INTERNAL_SERIALIZER.serialize(actorTimer)); + INTERNAL_SERIALIZER.serialize(actorTimer)).then(Mono.just(actorTimer.getName())); } catch (Exception e) { return Mono.error(e); } @@ -176,12 +167,10 @@ public abstract class AbstractActor { * @return Asynchronous void response. */ protected Mono unregisterTimer(String timerName) { - return Mono.fromSupplier(() -> getActorTimer(timerName)) - .flatMap(actorTimer -> this.actorRuntimeContext.getDaprClient().unregisterActorTimer( + return this.actorRuntimeContext.getDaprClient().unregisterActorTimer( this.actorRuntimeContext.getActorTypeInformation().getName(), this.id.toString(), - timerName)) - .then(Mono.fromRunnable(() -> this.timers.remove(timerName))); + timerName); } /** @@ -263,16 +252,6 @@ public abstract class AbstractActor { this.actorStateManager.clear(); } - /** - * Gets a given timer by name. - * - * @param timerName Timer name. - * @return Asynchronous void response. - */ - ActorTimer getActorTimer(String timerName) { - return timers.getOrDefault(timerName, null); - } - /** * Internal callback when an Actor is activated. * diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java index eab5d8309..c5cf928e7 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java @@ -108,33 +108,22 @@ class ActorManager { * * @param actorId Identifier for Actor. * @param timerName Name of timer being invoked. + * @param params Parameters for the timer. * @return Asynchronous void response. */ - Mono invokeTimer(ActorId actorId, String timerName) { + Mono invokeTimer(ActorId actorId, String timerName, byte[] params) { return Mono.fromSupplier(() -> { - AbstractActor actor = this.activeActors.getOrDefault(actorId, null); - if (actor == null) { - throw new IllegalArgumentException( - String.format("Could not find actor %s of type %s.", - actorId.toString(), - this.runtimeContext.getActorTypeInformation().getName())); + try { + return OBJECT_SERIALIZER.deserialize(params, ActorTimerParams.class); + } catch (Exception e) { + throw new RuntimeException(e); } - - ActorTimer actorTimer = actor.getActorTimer(timerName); - if (actorTimer == null) { - throw new IllegalStateException( - String.format("Could not find timer %s for actor %s.", - timerName, - this.runtimeContext.getActorTypeInformation().getName())); - } - - return actorTimer; - }).flatMap(actorTimer -> invokeMethod( - actorId, - ActorMethodContext.createForTimer(actorTimer.getName()), - actorTimer.getCallback(), - actorTimer.getState())) - .then(); + }).flatMap(p -> + invokeMethod( + actorId, + ActorMethodContext.createForTimer(timerName), + p.getCallback(), + p.getData())).then(); } /** @@ -200,36 +189,6 @@ class ActorManager { return invokeMethod(actorId, null, methodName, request); } - /** - * Internal method to actually invoke Actor's timer method. - * - * @param actorId Identifier for the Actor. - * @param context Method context to be invoked. - * @param methodName Method name to be invoked. - * @param input Input object to be passed in to the invoked method. - * @return Asynchronous void response. - */ - private Mono invokeMethod(ActorId actorId, ActorMethodContext context, String methodName, Object input) { - ActorMethodContext actorMethodContext = context; - if (actorMethodContext == null) { - actorMethodContext = ActorMethodContext.createForActor(methodName); - } - - return this.invoke(actorId, actorMethodContext, actor -> { - try { - // Finds the actor method with the given name and 1 or no parameter. - Method method = this.actorMethods.get(methodName); - - if (method.getReturnType().equals(Mono.class)) { - return invokeMonoMethod(actor, method, input); - } - return invokeMethod(actor, method, input); - } catch (Exception e) { - return Mono.error(e); - } - }); - } - /** * Internal method to actually invoke Actor's method. * diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index 9404efe36..8cab519dd 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -249,13 +249,14 @@ public class ActorRuntime { * @param actorTypeName Actor type name to invoke the method for. * @param actorId Actor id for the actor for which method will be invoked. * @param timerName The name of timer provided during registration. + * @param params Params to trigger timer. * @return Async void task. */ - public Mono invokeTimer(String actorTypeName, String actorId, String timerName) { + public Mono invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) { ActorId id = new ActorId(actorId); return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) .flatMap(m -> m.activateActor(id).thenReturn(m)) - .flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName)); + .flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params)); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java new file mode 100644 index 000000000..0c0cc5e0c --- /dev/null +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTimerParams.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.actors.runtime; + +/** + * Parameters for Actor Timer. + */ +final class ActorTimerParams { + + /** + * Callback function to be invoked in actor. + */ + private String callback; + + /** + * Data to be passed in as part of the timer trigger. + */ + private byte[] data; + + /** + * Sets the name of the callback function. + * + * @param callback Name of the callback function. + */ + public void setCallback(String callback) { + this.callback = callback; + } + + /** + * Gets the name of the callback function. + * + * @return Name of the callback function. + */ + public String getCallback() { + return this.callback; + } + + /** + * Sets the raw data for the callback function. + * + * @param data Raw data for the callback function. + */ + public void setData(byte[] data) { + this.data = data; + } + + /** + * Gets the raw data for the callback function. + * + * @return Raw data for the callback function. + */ + public byte[] getData() { + return data; + } + +} diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTypeUtilities.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTypeUtilities.java index a74090148..272991af8 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTypeUtilities.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorTypeUtilities.java @@ -12,33 +12,6 @@ import java.util.Arrays; */ final class ActorTypeUtilities { - /** - * Gets all interfaces that extend Actor. - * - * @param clazz Actor class. - * @return Array of Actor interfaces. - */ - public static Class[] getActorInterfaces(Class clazz) { - if (clazz == null) { - return new Class[0]; - } - - return Arrays.stream(clazz.getInterfaces()) - .filter(t -> AbstractActor.class.isAssignableFrom(t)) - .filter(t -> getNonActorParentClass(t) == null) - .toArray(Class[]::new); - } - - /** - * Determines if given class is an Actor interface. - * - * @param clazz Actor interface candidate. - * @return Whether this is an Actor interface. - */ - public static boolean isActorInterface(Class clazz) { - return (clazz != null) && clazz.isInterface() && (getNonActorParentClass(clazz) == null); - } - /** * Determines whether this is an Actor class. * @@ -64,33 +37,4 @@ final class ActorTypeUtilities { && isActor(clazz) && (Arrays.stream(clazz.getInterfaces()).filter(t -> t.equals(Remindable.class)).count() > 0); } - - /** - * Returns the parent class if it is not the {@link AbstractActor} parent - * class. - * - * @param clazz Actor class. - * @return Parent class or null if it is {@link AbstractActor}. - */ - public static Class getNonActorParentClass(Class clazz) { - if (clazz == null) { - return null; - } - - Class[] items = Arrays.stream(clazz.getInterfaces()) - .filter(t -> !t.equals(AbstractActor.class)) - .toArray(Class[]::new); - if (items.length == 0) { - return clazz; - } - - for (Class c : items) { - Class nonActorParent = getNonActorParentClass(c); - if (nonActorParent != null) { - return nonActorParent; - } - } - - return null; - } } diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java index 279673f63..0cafca2f7 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java @@ -176,7 +176,6 @@ class DaprStateAsyncProvider { writer.flush(); payload = writer.toByteArray(); } catch (IOException e) { - e.printStackTrace(); return Mono.error(e); } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java index ee5a475ad..b7159a755 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorManagerTest.java @@ -231,37 +231,37 @@ public class ActorManagerTest { } @Test(expected = IllegalArgumentException.class) - public void invokeTimerBeforeActivate() { + public void invokeTimerBeforeActivate() throws IOException { ActorId actorId = newActorId(); - this.manager.invokeTimer(actorId, "count").block(); - } - - @Test(expected = IllegalStateException.class) - public void activateThenInvokeTimerBeforeRegister() { - ActorId actorId = newActorId(); - this.manager.activateActor(actorId).block(); - this.manager.invokeTimer(actorId, "unknown").block(); + this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block(); } @Test - public void activateThenInvokeTimer() { + public void activateThenInvokeTimerBeforeRegister() throws IOException { ActorId actorId = newActorId(); this.manager.activateActor(actorId).block(); - this.manager.invokeTimer(actorId, "count").block(); + this.manager.invokeTimer(actorId, "unknown", createTimerParams("incrementCount", 2)).block(); + } + + @Test + public void activateThenInvokeTimer() throws IOException { + ActorId actorId = newActorId(); + this.manager.activateActor(actorId).block(); + this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block(); byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block(); Assert.assertEquals("2", new String(response)); } @Test(expected = IllegalArgumentException.class) - public void activateInvokeTimerDeactivateThenInvokeTimer() { + public void activateInvokeTimerDeactivateThenInvokeTimer() throws IOException { ActorId actorId = newActorId(); this.manager.activateActor(actorId).block(); - this.manager.invokeTimer(actorId, "count").block(); + this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block(); byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block(); Assert.assertEquals("2", new String(response)); this.manager.deactivateActor(actorId).block(); - this.manager.invokeTimer(actorId, "count").block(); + this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block(); } private byte[] createReminderParams(String data) throws IOException { @@ -270,6 +270,14 @@ public class ActorManagerTest { return INTERNAL_SERIALIZER.serialize(params); } + private byte[] createTimerParams(String callback, Object data) throws IOException { + byte[] serializedData = this.context.getObjectSerializer().serialize(data); + ActorTimerParams params = new ActorTimerParams(); + params.setCallback(callback); + params.setData(serializedData); + return INTERNAL_SERIALIZER.serialize(params); + } + private static ActorId newActorId() { return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet())); } diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java index c1a8c293d..fd5c99e72 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorNoStateTest.java @@ -15,6 +15,8 @@ import org.junit.Assert; import org.junit.Test; import reactor.core.publisher.Mono; +import java.lang.reflect.Proxy; +import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.any; @@ -34,26 +36,32 @@ public class ActorNoStateTest { public interface MyActor { // The test will only call the versions of this in a derived class to the user code base class. // The user code base class version will throw. + Mono getMyId(); Mono stringInStringOut(String input); Mono stringInBooleanOut(String input); Mono stringInVoidOutIntentionallyThrows(String input); Mono classInClassOut(MyData input); + Mono registerBadCallbackName(); + String registerTimerAutoName(); } @ActorType(name = "MyActor") public static class ActorImpl extends AbstractActor implements MyActor { - private final ActorId id; private boolean activated; private boolean methodReturningVoidInvoked; //public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { public ActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { super(runtimeContext, id); - this.id = id; this.activated = true; this.methodReturningVoidInvoked = false; } + @Override + public Mono getMyId() { + return Mono.fromSupplier(() -> super.getId().toString()); + } + @Override public Mono stringInStringOut(String s) { return Mono.fromSupplier(() -> { @@ -90,6 +98,16 @@ public class ActorNoStateTest { input.getNum() + input.getNum()); }); } + + @Override + public Mono registerBadCallbackName() { + return super.registerActorTimer("mytimer", "", "state", Duration.ofSeconds(1), Duration.ofSeconds(1)); + } + + @Override + public String registerTimerAutoName() { + return super.registerActorTimer("", "anything", "state", Duration.ofSeconds(1), Duration.ofSeconds(1)).block(); + } } static class MyData { @@ -115,6 +133,15 @@ public class ActorNoStateTest { } } + @Test + public void actorId() { + ActorProxy proxy = createActorProxy(); + + Assert.assertEquals( + proxy.getActorId().toString(), + proxy.invokeActorMethod("getMyId", String.class).block()); + } + @Test public void stringInStringOut() { ActorProxy proxy = createActorProxy(); @@ -163,6 +190,24 @@ public class ActorNoStateTest { response.getNum()); } + @Test(expected = IllegalArgumentException.class) + public void testBadTimerCallbackName() { + MyActor actor = createActorProxy(MyActor.class); + actor.registerBadCallbackName().block(); + } + + @Test + public void testAutoTimerName() { + MyActor actor = createActorProxy(MyActor.class); + String firstTimer = actor.registerTimerAutoName(); + Assert.assertTrue((firstTimer != null) && !firstTimer.isEmpty()); + + String secondTimer = actor.registerTimerAutoName(); + Assert.assertTrue((secondTimer != null) && !secondTimer.isEmpty()); + + Assert.assertNotEquals(firstTimer, secondTimer); + } + private static ActorId newActorId() { return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet())); } @@ -193,6 +238,36 @@ public class ActorNoStateTest { daprClient); } + private T createActorProxy(Class clazz) { + ActorId actorId = newActorId(); + + // Mock daprClient for ActorProxy only, not for runtime. + DaprClientStub daprClient = mock(DaprClientStub.class); + + when(daprClient.invokeActorMethod( + eq(context.getActorTypeInformation().getName()), + eq(actorId.toString()), + any(), + any())) + .thenAnswer(invocationOnMock -> + this.manager.invokeMethod( + new ActorId(invocationOnMock.getArgument(1, String.class)), + invocationOnMock.getArgument(2, String.class), + invocationOnMock.getArgument(3, byte[].class))); + + this.manager.activateActor(actorId).block(); + + ActorProxyForTestsImpl proxy = new ActorProxyForTestsImpl( + context.getActorTypeInformation().getName(), + actorId, + new DefaultObjectSerializer(), + daprClient); + return (T) Proxy.newProxyInstance( + ActorProxyForTestsImpl.class.getClassLoader(), + new Class[]{clazz}, + proxy); + } + private static ActorRuntimeContext createContext() { DaprClient daprClient = mock(DaprClient.class); diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java index afc431086..7af99614b 100644 --- a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorStatefulTest.java @@ -441,7 +441,7 @@ public class ActorStatefulTest { public void invokeTimer() { ActorProxy proxy = newActorProxy(); - this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); + this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block(); MyMethodContext preContext = proxy.invokeActorMethod("getPreCallMethodContext", MyMethodContext.class).block(); @@ -460,23 +460,25 @@ public class ActorStatefulTest { this.manager.deactivateActor(proxy.getActorId()).block(); - this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); + this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block(); } - @Test(expected = IllegalStateException.class) + @Test public void invokeTimerAfterUnregister() { ActorProxy proxy = newActorProxy(); proxy.invokeActorMethod("unregisterTimerAndReminder").block(); - this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); + // This call succeeds because the SDK does not control register/unregister timer, the Dapr runtime does. + this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block(); } - @Test(expected = IllegalStateException.class) + @Test public void invokeUnknownTimer() { ActorProxy proxy = newActorProxy(); - this.manager.invokeTimer(proxy.getActorId(), "unknown").block(); + // SDK does not control timers, Dapr runtime does - so an "unknown" timer can still be triggered. + this.manager.invokeTimer(proxy.getActorId(), "unknown", "{ \"callback\": \"hasMessage\" }".getBytes()).block(); } @Test diff --git a/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTypeUtilitiesTest.java b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTypeUtilitiesTest.java new file mode 100644 index 000000000..99aceb58f --- /dev/null +++ b/sdk-actors/src/test/java/io/dapr/actors/runtime/ActorTypeUtilitiesTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ +package io.dapr.actors.runtime; + +import io.dapr.actors.ActorId; +import io.dapr.utils.TypeRef; +import org.junit.Test; +import reactor.core.publisher.Mono; + +import java.io.Closeable; +import java.time.Duration; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ActorTypeUtilitiesTest { + + @Test + public void nullIsNotRemindable() { + assertFalse(ActorTypeUtilities.isRemindableActor(null)); + } + + @Test + public void nonActorIsNotRemindable() { + assertFalse(ActorTypeUtilities.isRemindableActor(String.class)); + } + + @Test + public void actorButNotRemindable() { + assertFalse(ActorTypeUtilities.isRemindableActor(NonRemindable.class)); + } + + @Test + public void actorWithInterfacesButNotRemindable() { + assertFalse(ActorTypeUtilities.isRemindableActor(NonRemindableWithInterfaces.class)); + } + + @Test + public void actorIsRemindable() { + assertTrue(ActorTypeUtilities.isRemindableActor(Remindable.class)); + } + + public static class NonRemindable extends AbstractActor { + + protected NonRemindable(ActorRuntimeContext runtimeContext, ActorId id) { + super(runtimeContext, id); + } + } + + public static class NonRemindableWithInterfaces extends AbstractActor implements Closeable { + + protected NonRemindableWithInterfaces(ActorRuntimeContext runtimeContext, ActorId id) { + super(runtimeContext, id); + } + + @Override + public void close() { + } + } + + public static class Remindable extends AbstractActor implements io.dapr.actors.runtime.Remindable { + + protected Remindable(ActorRuntimeContext runtimeContext, ActorId id) { + super(runtimeContext, id); + } + + @Override + public TypeRef getStateType() { + return null; + } + + @Override + public Mono receiveReminder(String reminderName, Object state, Duration dueTime, Duration period) { + return null; + } + } +} diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java index e7894bab3..886ae2173 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprController.java @@ -91,13 +91,15 @@ public class DaprController { * @param type Actor type. * @param id Actor Id. * @param timer Actor timer's name. + * @param body Raw request's body. * @return Void. */ @PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}") public Mono invokeActorTimer(@PathVariable("type") String type, @PathVariable("id") String id, - @PathVariable("timer") String timer) { - return ActorRuntime.getInstance().invokeTimer(type, id, timer); + @PathVariable("timer") String timer, + @RequestBody byte[] body) { + return ActorRuntime.getInstance().invokeTimer(type, id, timer, body); } /** diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index 0bf998ba8..07ac1dc28 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -84,6 +84,12 @@ 3.3.1.RELEASE test + + org.apache.commons + commons-lang3 + 3.9 + test + diff --git a/sdk-tests/src/test/java/io/dapr/it/AppRun.java b/sdk-tests/src/test/java/io/dapr/it/AppRun.java new file mode 100644 index 000000000..77fc94059 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/AppRun.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it; + +import io.dapr.config.Properties; + +import java.io.IOException; +import java.util.HashMap; + +import static io.dapr.it.Retry.callWithRetry; + + +/** + * This class runs an app outside Dapr but adds Dapr env variables. + */ +public class AppRun implements Stoppable { + + private static final String APP_COMMAND = + "mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\""; + + private final DaprPorts ports; + + private final int maxWaitMilliseconds; + + private final Command command; + + AppRun(DaprPorts ports, + String successMessage, + Class serviceClass, + int maxWaitMilliseconds) { + this.command = new Command( + successMessage, + buildCommand(serviceClass, ports), + new HashMap<>() {{ + put("DAPR_HTTP_PORT", ports.getHttpPort().toString()); + put("DAPR_GRPC_PORT", ports.getGrpcPort().toString()); + }}); + this.ports = ports; + this.maxWaitMilliseconds = maxWaitMilliseconds; + } + + public void start() throws InterruptedException, IOException { + long start = System.currentTimeMillis(); + // First, try to stop previous run (if left running). + this.stop(); + // Wait for the previous run to kill the prior process. + System.out.println("Starting application ..."); + this.command.run(); + if (this.ports.getAppPort() != null) { + long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start); + callWithRetry(() -> { + System.out.println("Checking if app is listening on port ..."); + assertListeningOnPort(this.ports.getAppPort()); + }, timeLeft); + } + System.out.println("Application started."); + } + + @Override + public void stop() throws InterruptedException { + System.out.println("Stopping application ..."); + try { + this.command.stop(); + + System.out.println("Application stopped."); + } catch (RuntimeException e) { + System.out.println("Could not stop command: " + this.command.toString()); + } + } + + private static String buildCommand(Class serviceClass, DaprPorts ports) { + return String.format(APP_COMMAND, serviceClass.getCanonicalName(), + ports.getAppPort() != null ? ports.getAppPort().toString() : ""); + } + + private static void assertListeningOnPort(int port) { + System.out.printf("Checking port %d ...\n", port); + + java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(Properties.SIDECAR_IP.get(), port); + try (java.net.Socket socket = new java.net.Socket()) { + socket.connect(socketAddress, 1000); + } catch (Exception e) { + throw new RuntimeException(e); + } + + System.out.printf("Confirmed listening on port %d.\n", port); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index b5d7da727..334a823be 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -5,11 +5,13 @@ package io.dapr.it; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.AfterClass; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.junit.AfterClass; public abstract class BaseIT { @@ -17,7 +19,7 @@ public abstract class BaseIT { private static final Map DAPR_RUN_BUILDERS = new HashMap<>(); - private static final Collection DAPR_RUNS = new ArrayList<>(); + private static final Collection TO_BE_STOPPED = new ArrayList<>(); protected static DaprRun startDaprApp( String testName, @@ -35,29 +37,50 @@ public abstract class BaseIT { } protected static DaprRun startDaprApp( - String testName, - String successMessage, - Class serviceClass, - Boolean useAppPort, - Boolean useDaprPorts, - int maxWaitMilliseconds) throws Exception { + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + Boolean useDaprPorts, + int maxWaitMilliseconds) throws Exception { DaprRun.Builder builder = new DaprRun.Builder( - testName, - () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), - successMessage, - maxWaitMilliseconds).withServiceClass(serviceClass); + testName, + () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), + successMessage, + maxWaitMilliseconds).withServiceClass(serviceClass); DaprRun run = builder.build(); - DAPR_RUNS.add(run); + TO_BE_STOPPED.add(run); DAPR_RUN_BUILDERS.put(run.getAppName(), builder); run.start(); run.use(); return run; } + protected static ImmutablePair startSplitDaprAndApp( + String testName, + String successMessage, + Class serviceClass, + Boolean useAppPort, + int maxWaitMilliseconds) throws Exception { + DaprRun.Builder builder = new DaprRun.Builder( + testName, + () -> DaprPorts.build(useAppPort, true, true), + successMessage, + maxWaitMilliseconds).withServiceClass(serviceClass); + ImmutablePair runs = builder.splitBuild(); + TO_BE_STOPPED.add(runs.left); + TO_BE_STOPPED.add(runs.right); + DAPR_RUN_BUILDERS.put(runs.right.getAppName(), builder); + runs.left.start(); + runs.right.start(); + runs.right.use(); + return runs; + } + @AfterClass public static void cleanUp() throws Exception { - for (DaprRun app : DAPR_RUNS) { - app.stop(); + for (Stoppable toBeStopped : TO_BE_STOPPED) { + toBeStopped.stop(); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/Command.java b/sdk-tests/src/test/java/io/dapr/it/Command.java index c740908e9..9e1beedf2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/Command.java +++ b/sdk-tests/src/test/java/io/dapr/it/Command.java @@ -9,30 +9,43 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; public class Command { private static final int SUCCESS_WAIT_TIMEOUT_MINUTES = 5; + private static final int DESTROY_WAIT_TIMEOUT_SECONDS = 5; + private final String successMessage; private final String command; private Process process; - public Command(String successMessage, String command) { + private Map env; + + public Command(String successMessage, String command, Map env) { this.successMessage = successMessage; this.command = command; + this.env = env; + } + + public Command(String successMessage, String command) { + this(successMessage, command, null); } public void run() throws InterruptedException, IOException { final AtomicBoolean success = new AtomicBoolean(false); final Semaphore finished = new Semaphore(0); - this.process = Runtime.getRuntime().exec(command); + ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", command); + if (this.env != null) { + processBuilder.environment().putAll(this.env); + } + this.process = processBuilder.start(); final Thread stdoutReader = new Thread(() -> { try { @@ -66,4 +79,20 @@ public class Command { throw new IllegalStateException("Could not find success criteria for command: " + command); } } + + public void stop() throws InterruptedException { + if (this.process != null) { + this.process.destroy(); + Thread.sleep(DESTROY_WAIT_TIMEOUT_SECONDS * 1000); + if (this.process.isAlive()) { + this.process.destroyForcibly(); + } + this.process = null; + } + } + + @Override + public String toString() { + return this.command; + } } diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java index a844b957d..a03b83050 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprPorts.java @@ -8,7 +8,6 @@ package io.dapr.it; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java index dd11c26b7..ede75dfe2 100644 --- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java +++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java @@ -6,6 +6,7 @@ package io.dapr.it; import io.dapr.config.Properties; +import org.apache.commons.lang3.tuple.ImmutablePair; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,7 +15,9 @@ import java.util.function.Supplier; import static io.dapr.it.Retry.callWithRetry; -public class DaprRun { +public class DaprRun implements Stoppable { + + private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!"; private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components"; @@ -114,6 +117,7 @@ public class DaprRun { System.out.println("Dapr application started."); } + @Override public void stop() throws InterruptedException, IOException { System.out.println("Stopping dapr application ..."); try { @@ -213,11 +217,33 @@ public class DaprRun { DaprRun build() { return new DaprRun( - this.testName, - this.portsSupplier.get(), - this.successMessage, - this.serviceClass, - this.maxWaitMilliseconds); + this.testName, + this.portsSupplier.get(), + this.successMessage, + this.serviceClass, + this.maxWaitMilliseconds); + } + + /** + * Builds app and dapr run separately. It can be useful to force the restart of one of them. + * @return Pair of AppRun and DaprRun. + */ + ImmutablePair splitBuild() { + DaprPorts ports = this.portsSupplier.get(); + AppRun appRun = new AppRun( + ports, + this.successMessage, + this.serviceClass, + this.maxWaitMilliseconds); + + DaprRun daprRun = new DaprRun( + this.testName, + ports, + DAPR_SUCCESS_MESSAGE, + null, + this.maxWaitMilliseconds); + + return new ImmutablePair<>(appRun, daprRun); } } } diff --git a/sdk-tests/src/test/java/io/dapr/it/Stoppable.java b/sdk-tests/src/test/java/io/dapr/it/Stoppable.java new file mode 100644 index 000000000..4f0e7aba1 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/Stoppable.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it; + + +import java.io.IOException; + +public interface Stoppable { + + void stop() throws InterruptedException, IOException; + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java new file mode 100644 index 000000000..4ce759101 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.actors; + +import io.dapr.actors.ActorId; +import io.dapr.actors.client.ActorProxy; +import io.dapr.actors.client.ActorProxyBuilder; +import io.dapr.it.AppRun; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.actors.app.MyActorService; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class ActorTimerRecoveryIT extends BaseIT { + + private static Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class); + + /** + * Create an actor, register a timer, validates its content, restarts the Actor and confirms timer continues. + * @throws Exception This test is not expected to throw. Thrown exceptions are bugs. + */ + @Test + public void timerRecoveryTest() throws Exception { + ImmutablePair runs = startSplitDaprAndApp( + ActorTimerRecoveryIT.class.getSimpleName(), + "Started MyActorService", + MyActorService.class, + true, + 60000); + + Thread.sleep(3000); + String actorType="MyActorTest"; + logger.debug("Creating proxy builder"); + + ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class); + logger.debug("Creating actorId"); + ActorId actorId = new ActorId(UUID.randomUUID().toString()); + logger.debug("Building proxy"); + ActorProxy proxy = proxyBuilder.build(actorId); + + logger.debug("Invoking actor method 'startTimer' which will register a timer"); + proxy.invokeActorMethod("startTimer", "myTimer").block(); + + logger.debug("Pausing 7 seconds to allow timer to fire"); + Thread.sleep(7000); + + ArrayList logs = getAppMethodCallLogs(proxy); + validateTimerCalls(logs, 3); + + // Restarts app only. + runs.left.stop(); + runs.left.start(); + + logger.debug("Pausing 10 seconds to allow timer to fire"); + Thread.sleep(10000); + ArrayList newLogs = getAppMethodCallLogs(proxy); + validateTimerCalls(newLogs, 3); + + // Check that the restart actually happened by confirming the old logs are not in the new logs. + for (MethodEntryTracker oldLog: logs) { + for (MethodEntryTracker newLog: newLogs) { + assertNotEquals(oldLog.toString(), newLog.toString()); + } + } + + // call unregister + logger.debug("Calling actor method 'stopTimer' to unregister timer"); + proxy.invokeActorMethod("stopTimer", "myTimer").block(); + } + + ArrayList getAppMethodCallLogs(ActorProxy proxy) { + ArrayList logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block(); + ArrayList trackers = new ArrayList(); + for(String t : logs) { + String[] toks = t.split("\\|"); + MethodEntryTracker m = new MethodEntryTracker( + toks[0].equals("Enter") ? true : false, + toks[1], + new Date(toks[2])); + trackers.add(m); + } + + return trackers; + } + + /** + * Validate the timer and reminder has been invoked at least x times. + * @param logs logs with info about method entries and exits returned from the app + * @param minimum minimum number of entries. + */ + void validateTimerCalls(ArrayList logs, int minimum) { + // Validate the timer has been invoked at least x times. We cannot validate precisely because of + // differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above, + // we validate below with some margin. Events for each actor method call include "enter" and "exit" + // calls, so they are divided by 2. + List timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList()); + System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size()); + assertTrue(timerInvocations.size() / 2 >= minimum); + } + +}