Fixes timer invocation after app restarts. (#363)

This commit is contained in:
Artur Souza 2020-10-16 14:29:31 -07:00 committed by GitHub
parent 1573a46ed4
commit 76c848bb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 605 additions and 192 deletions

View File

@ -17,7 +17,7 @@ import java.util.List;
* mvn clean install * mvn clean install
* 2. Run the client: * 2. Run the client:
* dapr run --components-path ./components --app-id demoactorclient --dapr-http-port 3006 -- java -jar \ * dapr run --components-path ./components --app-id demoactorclient --dapr-http-port 3006 -- java -jar \
* examples/target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.http.DemoActorClient * target/dapr-java-sdk-examples-exec.jar io.dapr.examples.actors.http.DemoActorClient
*/ */
public class DemoActorClient { public class DemoActorClient {

View File

@ -20,7 +20,7 @@ import java.time.Duration;
* mvn clean install * mvn clean install
* 2. Run the server: * 2. Run the server:
* dapr run --components-path ./components --app-id demoactorservice --app-port 3000 --dapr-http-port 3005 \ * dapr run --components-path ./components --app-id demoactorservice --app-port 3000 --dapr-http-port 3005 \
* -- java -jar examples/target/dapr-java-sdk-examples-exec.jar \ * -- java -jar target/dapr-java-sdk-examples-exec.jar \
* io.dapr.examples.actors.http.DemoActorService -p 3000 * io.dapr.examples.actors.http.DemoActorService -p 3000
*/ */
public class DemoActorService { public class DemoActorService {

View File

@ -11,8 +11,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.HashMap; import java.util.UUID;
import java.util.Map;
/** /**
* Represents the base class for actors. * Represents the base class for actors.
@ -43,11 +42,6 @@ public abstract class AbstractActor {
*/ */
private final ActorTrace actorTrace; private final ActorTrace actorTrace;
/**
* Registered timers for this Actor.
*/
private final Map<String, ActorTimer> timers;
/** /**
* Manager for the states in Actors. * Manager for the states in Actors.
*/ */
@ -72,7 +66,6 @@ public abstract class AbstractActor {
runtimeContext.getActorTypeInformation().getName(), runtimeContext.getActorTypeInformation().getName(),
id); id);
this.actorTrace = runtimeContext.getActorTrace(); this.actorTrace = runtimeContext.getActorTrace();
this.timers = new HashMap<>();
this.started = false; this.started = false;
} }
@ -135,9 +128,9 @@ public abstract class AbstractActor {
* @param period The time interval between invocations of the async callback. * @param period The time interval between invocations of the async callback.
* Specify negative one (-1) milliseconds to disable periodic signaling. * Specify negative one (-1) milliseconds to disable periodic signaling.
* @param <T> Type for the state to be passed in to timer. * @param <T> Type for the state to be passed in to timer.
* @return Asynchronous result. * @return Asynchronous result with timer's name.
*/ */
protected <T> Mono<Void> registerActorTimer( protected <T> Mono<String> registerActorTimer(
String timerName, String timerName,
String callback, String callback,
T state, T state,
@ -150,19 +143,17 @@ public abstract class AbstractActor {
String name = timerName; String name = timerName;
if ((timerName == null) || (timerName.isEmpty())) { if ((timerName == null) || (timerName.isEmpty())) {
name = String.format("%s_Timer_%d", this.id.toString(), this.timers.size() + 1); name = String.format("%s_Timer_%s", this.id.toString(), UUID.randomUUID().toString());
} }
ActorTimer actorTimer = new ActorTimer(this, name, callback, state, dueTime, period); return new ActorTimer(this, name, callback, state, dueTime, period);
this.timers.put(name, actorTimer);
return actorTimer;
}).flatMap(actorTimer -> { }).flatMap(actorTimer -> {
try { try {
return this.actorRuntimeContext.getDaprClient().registerActorTimer( return this.actorRuntimeContext.getDaprClient().registerActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(), this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(), this.id.toString(),
actorTimer.getName(), actorTimer.getName(),
INTERNAL_SERIALIZER.serialize(actorTimer)); INTERNAL_SERIALIZER.serialize(actorTimer)).then(Mono.just(actorTimer.getName()));
} catch (Exception e) { } catch (Exception e) {
return Mono.error(e); return Mono.error(e);
} }
@ -176,12 +167,10 @@ public abstract class AbstractActor {
* @return Asynchronous void response. * @return Asynchronous void response.
*/ */
protected Mono<Void> unregisterTimer(String timerName) { protected Mono<Void> unregisterTimer(String timerName) {
return Mono.fromSupplier(() -> getActorTimer(timerName)) return this.actorRuntimeContext.getDaprClient().unregisterActorTimer(
.flatMap(actorTimer -> this.actorRuntimeContext.getDaprClient().unregisterActorTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(), this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(), this.id.toString(),
timerName)) timerName);
.then(Mono.fromRunnable(() -> this.timers.remove(timerName)));
} }
/** /**
@ -263,16 +252,6 @@ public abstract class AbstractActor {
this.actorStateManager.clear(); this.actorStateManager.clear();
} }
/**
* Gets a given timer by name.
*
* @param timerName Timer name.
* @return Asynchronous void response.
*/
ActorTimer getActorTimer(String timerName) {
return timers.getOrDefault(timerName, null);
}
/** /**
* Internal callback when an Actor is activated. * Internal callback when an Actor is activated.
* *

View File

@ -108,33 +108,22 @@ class ActorManager<T extends AbstractActor> {
* *
* @param actorId Identifier for Actor. * @param actorId Identifier for Actor.
* @param timerName Name of timer being invoked. * @param timerName Name of timer being invoked.
* @param params Parameters for the timer.
* @return Asynchronous void response. * @return Asynchronous void response.
*/ */
Mono<Void> invokeTimer(ActorId actorId, String timerName) { Mono<Void> invokeTimer(ActorId actorId, String timerName, byte[] params) {
return Mono.fromSupplier(() -> { return Mono.fromSupplier(() -> {
AbstractActor actor = this.activeActors.getOrDefault(actorId, null); try {
if (actor == null) { return OBJECT_SERIALIZER.deserialize(params, ActorTimerParams.class);
throw new IllegalArgumentException( } catch (Exception e) {
String.format("Could not find actor %s of type %s.", throw new RuntimeException(e);
actorId.toString(),
this.runtimeContext.getActorTypeInformation().getName()));
} }
}).flatMap(p ->
ActorTimer actorTimer = actor.getActorTimer(timerName); invokeMethod(
if (actorTimer == null) { actorId,
throw new IllegalStateException( ActorMethodContext.createForTimer(timerName),
String.format("Could not find timer %s for actor %s.", p.getCallback(),
timerName, p.getData())).then();
this.runtimeContext.getActorTypeInformation().getName()));
}
return actorTimer;
}).flatMap(actorTimer -> invokeMethod(
actorId,
ActorMethodContext.createForTimer(actorTimer.getName()),
actorTimer.getCallback(),
actorTimer.getState()))
.then();
} }
/** /**
@ -200,36 +189,6 @@ class ActorManager<T extends AbstractActor> {
return invokeMethod(actorId, null, methodName, request); return invokeMethod(actorId, null, methodName, request);
} }
/**
* Internal method to actually invoke Actor's timer method.
*
* @param actorId Identifier for the Actor.
* @param context Method context to be invoked.
* @param methodName Method name to be invoked.
* @param input Input object to be passed in to the invoked method.
* @return Asynchronous void response.
*/
private Mono<Object> invokeMethod(ActorId actorId, ActorMethodContext context, String methodName, Object input) {
ActorMethodContext actorMethodContext = context;
if (actorMethodContext == null) {
actorMethodContext = ActorMethodContext.createForActor(methodName);
}
return this.invoke(actorId, actorMethodContext, actor -> {
try {
// Finds the actor method with the given name and 1 or no parameter.
Method method = this.actorMethods.get(methodName);
if (method.getReturnType().equals(Mono.class)) {
return invokeMonoMethod(actor, method, input);
}
return invokeMethod(actor, method, input);
} catch (Exception e) {
return Mono.error(e);
}
});
}
/** /**
* Internal method to actually invoke Actor's method. * Internal method to actually invoke Actor's method.
* *

View File

@ -249,13 +249,14 @@ public class ActorRuntime {
* @param actorTypeName Actor type name to invoke the method for. * @param actorTypeName Actor type name to invoke the method for.
* @param actorId Actor id for the actor for which method will be invoked. * @param actorId Actor id for the actor for which method will be invoked.
* @param timerName The name of timer provided during registration. * @param timerName The name of timer provided during registration.
* @param params Params to trigger timer.
* @return Async void task. * @return Async void task.
*/ */
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName) { public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName, byte[] params) {
ActorId id = new ActorId(actorId); ActorId id = new ActorId(actorId);
return Mono.fromSupplier(() -> this.getActorManager(actorTypeName)) return Mono.fromSupplier(() -> this.getActorManager(actorTypeName))
.flatMap(m -> m.activateActor(id).thenReturn(m)) .flatMap(m -> m.activateActor(id).thenReturn(m))
.flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName)); .flatMap(m -> ((ActorManager)m).invokeTimer(new ActorId(actorId), timerName, params));
} }
/** /**

View File

@ -0,0 +1,59 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
/**
* Parameters for Actor Timer.
*/
final class ActorTimerParams {
/**
* Callback function to be invoked in actor.
*/
private String callback;
/**
* Data to be passed in as part of the timer trigger.
*/
private byte[] data;
/**
* Sets the name of the callback function.
*
* @param callback Name of the callback function.
*/
public void setCallback(String callback) {
this.callback = callback;
}
/**
* Gets the name of the callback function.
*
* @return Name of the callback function.
*/
public String getCallback() {
return this.callback;
}
/**
* Sets the raw data for the callback function.
*
* @param data Raw data for the callback function.
*/
public void setData(byte[] data) {
this.data = data;
}
/**
* Gets the raw data for the callback function.
*
* @return Raw data for the callback function.
*/
public byte[] getData() {
return data;
}
}

View File

@ -12,33 +12,6 @@ import java.util.Arrays;
*/ */
final class ActorTypeUtilities { final class ActorTypeUtilities {
/**
* Gets all interfaces that extend Actor.
*
* @param clazz Actor class.
* @return Array of Actor interfaces.
*/
public static Class[] getActorInterfaces(Class clazz) {
if (clazz == null) {
return new Class[0];
}
return Arrays.stream(clazz.getInterfaces())
.filter(t -> AbstractActor.class.isAssignableFrom(t))
.filter(t -> getNonActorParentClass(t) == null)
.toArray(Class[]::new);
}
/**
* Determines if given class is an Actor interface.
*
* @param clazz Actor interface candidate.
* @return Whether this is an Actor interface.
*/
public static boolean isActorInterface(Class clazz) {
return (clazz != null) && clazz.isInterface() && (getNonActorParentClass(clazz) == null);
}
/** /**
* Determines whether this is an Actor class. * Determines whether this is an Actor class.
* *
@ -64,33 +37,4 @@ final class ActorTypeUtilities {
&& isActor(clazz) && isActor(clazz)
&& (Arrays.stream(clazz.getInterfaces()).filter(t -> t.equals(Remindable.class)).count() > 0); && (Arrays.stream(clazz.getInterfaces()).filter(t -> t.equals(Remindable.class)).count() > 0);
} }
/**
* Returns the parent class if it is not the {@link AbstractActor} parent
* class.
*
* @param clazz Actor class.
* @return Parent class or null if it is {@link AbstractActor}.
*/
public static Class getNonActorParentClass(Class clazz) {
if (clazz == null) {
return null;
}
Class[] items = Arrays.stream(clazz.getInterfaces())
.filter(t -> !t.equals(AbstractActor.class))
.toArray(Class[]::new);
if (items.length == 0) {
return clazz;
}
for (Class c : items) {
Class nonActorParent = getNonActorParentClass(c);
if (nonActorParent != null) {
return nonActorParent;
}
}
return null;
}
} }

View File

@ -176,7 +176,6 @@ class DaprStateAsyncProvider {
writer.flush(); writer.flush();
payload = writer.toByteArray(); payload = writer.toByteArray();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace();
return Mono.error(e); return Mono.error(e);
} }

View File

@ -231,37 +231,37 @@ public class ActorManagerTest {
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void invokeTimerBeforeActivate() { public void invokeTimerBeforeActivate() throws IOException {
ActorId actorId = newActorId(); ActorId actorId = newActorId();
this.manager.invokeTimer(actorId, "count").block(); this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block();
}
@Test(expected = IllegalStateException.class)
public void activateThenInvokeTimerBeforeRegister() {
ActorId actorId = newActorId();
this.manager.activateActor(actorId).block();
this.manager.invokeTimer(actorId, "unknown").block();
} }
@Test @Test
public void activateThenInvokeTimer() { public void activateThenInvokeTimerBeforeRegister() throws IOException {
ActorId actorId = newActorId(); ActorId actorId = newActorId();
this.manager.activateActor(actorId).block(); this.manager.activateActor(actorId).block();
this.manager.invokeTimer(actorId, "count").block(); this.manager.invokeTimer(actorId, "unknown", createTimerParams("incrementCount", 2)).block();
}
@Test
public void activateThenInvokeTimer() throws IOException {
ActorId actorId = newActorId();
this.manager.activateActor(actorId).block();
this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block();
byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block(); byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block();
Assert.assertEquals("2", new String(response)); Assert.assertEquals("2", new String(response));
} }
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void activateInvokeTimerDeactivateThenInvokeTimer() { public void activateInvokeTimerDeactivateThenInvokeTimer() throws IOException {
ActorId actorId = newActorId(); ActorId actorId = newActorId();
this.manager.activateActor(actorId).block(); this.manager.activateActor(actorId).block();
this.manager.invokeTimer(actorId, "count").block(); this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block();
byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block(); byte[] response = this.manager.invokeMethod(actorId, "getCount", null).block();
Assert.assertEquals("2", new String(response)); Assert.assertEquals("2", new String(response));
this.manager.deactivateActor(actorId).block(); this.manager.deactivateActor(actorId).block();
this.manager.invokeTimer(actorId, "count").block(); this.manager.invokeTimer(actorId, "count", createTimerParams("incrementCount", 2)).block();
} }
private byte[] createReminderParams(String data) throws IOException { private byte[] createReminderParams(String data) throws IOException {
@ -270,6 +270,14 @@ public class ActorManagerTest {
return INTERNAL_SERIALIZER.serialize(params); return INTERNAL_SERIALIZER.serialize(params);
} }
private byte[] createTimerParams(String callback, Object data) throws IOException {
byte[] serializedData = this.context.getObjectSerializer().serialize(data);
ActorTimerParams params = new ActorTimerParams();
params.setCallback(callback);
params.setData(serializedData);
return INTERNAL_SERIALIZER.serialize(params);
}
private static ActorId newActorId() { private static ActorId newActorId() {
return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet())); return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet()));
} }

View File

@ -15,6 +15,8 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
@ -34,26 +36,32 @@ public class ActorNoStateTest {
public interface MyActor { public interface MyActor {
// The test will only call the versions of this in a derived class to the user code base class. // The test will only call the versions of this in a derived class to the user code base class.
// The user code base class version will throw. // The user code base class version will throw.
Mono<String> getMyId();
Mono<String> stringInStringOut(String input); Mono<String> stringInStringOut(String input);
Mono<Boolean> stringInBooleanOut(String input); Mono<Boolean> stringInBooleanOut(String input);
Mono<Void> stringInVoidOutIntentionallyThrows(String input); Mono<Void> stringInVoidOutIntentionallyThrows(String input);
Mono<MyData> classInClassOut(MyData input); Mono<MyData> classInClassOut(MyData input);
Mono<String> registerBadCallbackName();
String registerTimerAutoName();
} }
@ActorType(name = "MyActor") @ActorType(name = "MyActor")
public static class ActorImpl extends AbstractActor implements MyActor { public static class ActorImpl extends AbstractActor implements MyActor {
private final ActorId id;
private boolean activated; private boolean activated;
private boolean methodReturningVoidInvoked; private boolean methodReturningVoidInvoked;
//public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { //public MyActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
public ActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { public ActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id); super(runtimeContext, id);
this.id = id;
this.activated = true; this.activated = true;
this.methodReturningVoidInvoked = false; this.methodReturningVoidInvoked = false;
} }
@Override
public Mono<String> getMyId() {
return Mono.fromSupplier(() -> super.getId().toString());
}
@Override @Override
public Mono<String> stringInStringOut(String s) { public Mono<String> stringInStringOut(String s) {
return Mono.fromSupplier(() -> { return Mono.fromSupplier(() -> {
@ -90,6 +98,16 @@ public class ActorNoStateTest {
input.getNum() + input.getNum()); input.getNum() + input.getNum());
}); });
} }
@Override
public Mono<String> registerBadCallbackName() {
return super.registerActorTimer("mytimer", "", "state", Duration.ofSeconds(1), Duration.ofSeconds(1));
}
@Override
public String registerTimerAutoName() {
return super.registerActorTimer("", "anything", "state", Duration.ofSeconds(1), Duration.ofSeconds(1)).block();
}
} }
static class MyData { static class MyData {
@ -115,6 +133,15 @@ public class ActorNoStateTest {
} }
} }
@Test
public void actorId() {
ActorProxy proxy = createActorProxy();
Assert.assertEquals(
proxy.getActorId().toString(),
proxy.invokeActorMethod("getMyId", String.class).block());
}
@Test @Test
public void stringInStringOut() { public void stringInStringOut() {
ActorProxy proxy = createActorProxy(); ActorProxy proxy = createActorProxy();
@ -163,6 +190,24 @@ public class ActorNoStateTest {
response.getNum()); response.getNum());
} }
@Test(expected = IllegalArgumentException.class)
public void testBadTimerCallbackName() {
MyActor actor = createActorProxy(MyActor.class);
actor.registerBadCallbackName().block();
}
@Test
public void testAutoTimerName() {
MyActor actor = createActorProxy(MyActor.class);
String firstTimer = actor.registerTimerAutoName();
Assert.assertTrue((firstTimer != null) && !firstTimer.isEmpty());
String secondTimer = actor.registerTimerAutoName();
Assert.assertTrue((secondTimer != null) && !secondTimer.isEmpty());
Assert.assertNotEquals(firstTimer, secondTimer);
}
private static ActorId newActorId() { private static ActorId newActorId() {
return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet())); return new ActorId(Integer.toString(ACTOR_ID_COUNT.incrementAndGet()));
} }
@ -193,6 +238,36 @@ public class ActorNoStateTest {
daprClient); daprClient);
} }
private <T> T createActorProxy(Class<T> clazz) {
ActorId actorId = newActorId();
// Mock daprClient for ActorProxy only, not for runtime.
DaprClientStub daprClient = mock(DaprClientStub.class);
when(daprClient.invokeActorMethod(
eq(context.getActorTypeInformation().getName()),
eq(actorId.toString()),
any(),
any()))
.thenAnswer(invocationOnMock ->
this.manager.invokeMethod(
new ActorId(invocationOnMock.getArgument(1, String.class)),
invocationOnMock.getArgument(2, String.class),
invocationOnMock.getArgument(3, byte[].class)));
this.manager.activateActor(actorId).block();
ActorProxyForTestsImpl proxy = new ActorProxyForTestsImpl(
context.getActorTypeInformation().getName(),
actorId,
new DefaultObjectSerializer(),
daprClient);
return (T) Proxy.newProxyInstance(
ActorProxyForTestsImpl.class.getClassLoader(),
new Class[]{clazz},
proxy);
}
private static <T extends AbstractActor> ActorRuntimeContext createContext() { private static <T extends AbstractActor> ActorRuntimeContext createContext() {
DaprClient daprClient = mock(DaprClient.class); DaprClient daprClient = mock(DaprClient.class);

View File

@ -441,7 +441,7 @@ public class ActorStatefulTest {
public void invokeTimer() { public void invokeTimer() {
ActorProxy proxy = newActorProxy(); ActorProxy proxy = newActorProxy();
this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block();
MyMethodContext preContext = MyMethodContext preContext =
proxy.invokeActorMethod("getPreCallMethodContext", MyMethodContext.class).block(); proxy.invokeActorMethod("getPreCallMethodContext", MyMethodContext.class).block();
@ -460,23 +460,25 @@ public class ActorStatefulTest {
this.manager.deactivateActor(proxy.getActorId()).block(); this.manager.deactivateActor(proxy.getActorId()).block();
this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block();
} }
@Test(expected = IllegalStateException.class) @Test
public void invokeTimerAfterUnregister() { public void invokeTimerAfterUnregister() {
ActorProxy proxy = newActorProxy(); ActorProxy proxy = newActorProxy();
proxy.invokeActorMethod("unregisterTimerAndReminder").block(); proxy.invokeActorMethod("unregisterTimerAndReminder").block();
this.manager.invokeTimer(proxy.getActorId(), "mytimer").block(); // This call succeeds because the SDK does not control register/unregister timer, the Dapr runtime does.
this.manager.invokeTimer(proxy.getActorId(), "mytimer", "{ \"callback\": \"hasMessage\" }".getBytes()).block();
} }
@Test(expected = IllegalStateException.class) @Test
public void invokeUnknownTimer() { public void invokeUnknownTimer() {
ActorProxy proxy = newActorProxy(); ActorProxy proxy = newActorProxy();
this.manager.invokeTimer(proxy.getActorId(), "unknown").block(); // SDK does not control timers, Dapr runtime does - so an "unknown" timer can still be triggered.
this.manager.invokeTimer(proxy.getActorId(), "unknown", "{ \"callback\": \"hasMessage\" }".getBytes()).block();
} }
@Test @Test

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.utils.TypeRef;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.time.Duration;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ActorTypeUtilitiesTest {
@Test
public void nullIsNotRemindable() {
assertFalse(ActorTypeUtilities.isRemindableActor(null));
}
@Test
public void nonActorIsNotRemindable() {
assertFalse(ActorTypeUtilities.isRemindableActor(String.class));
}
@Test
public void actorButNotRemindable() {
assertFalse(ActorTypeUtilities.isRemindableActor(NonRemindable.class));
}
@Test
public void actorWithInterfacesButNotRemindable() {
assertFalse(ActorTypeUtilities.isRemindableActor(NonRemindableWithInterfaces.class));
}
@Test
public void actorIsRemindable() {
assertTrue(ActorTypeUtilities.isRemindableActor(Remindable.class));
}
public static class NonRemindable extends AbstractActor {
protected NonRemindable(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
}
public static class NonRemindableWithInterfaces extends AbstractActor implements Closeable {
protected NonRemindableWithInterfaces(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public void close() {
}
}
public static class Remindable extends AbstractActor implements io.dapr.actors.runtime.Remindable {
protected Remindable(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public TypeRef getStateType() {
return null;
}
@Override
public Mono<Void> receiveReminder(String reminderName, Object state, Duration dueTime, Duration period) {
return null;
}
}
}

View File

@ -91,13 +91,15 @@ public class DaprController {
* @param type Actor type. * @param type Actor type.
* @param id Actor Id. * @param id Actor Id.
* @param timer Actor timer's name. * @param timer Actor timer's name.
* @param body Raw request's body.
* @return Void. * @return Void.
*/ */
@PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}") @PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
public Mono<Void> invokeActorTimer(@PathVariable("type") String type, public Mono<Void> invokeActorTimer(@PathVariable("type") String type,
@PathVariable("id") String id, @PathVariable("id") String id,
@PathVariable("timer") String timer) { @PathVariable("timer") String timer,
return ActorRuntime.getInstance().invokeTimer(type, id, timer); @RequestBody byte[] body) {
return ActorRuntime.getInstance().invokeTimer(type, id, timer, body);
} }
/** /**

View File

@ -84,6 +84,12 @@
<version>3.3.1.RELEASE</version> <version>3.3.1.RELEASE</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -0,0 +1,92 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it;
import io.dapr.config.Properties;
import java.io.IOException;
import java.util.HashMap;
import static io.dapr.it.Retry.callWithRetry;
/**
* This class runs an app outside Dapr but adds Dapr env variables.
*/
public class AppRun implements Stoppable {
private static final String APP_COMMAND =
"mvn exec:java -D exec.mainClass=%s -D exec.classpathScope=test -D exec.args=\"%s\"";
private final DaprPorts ports;
private final int maxWaitMilliseconds;
private final Command command;
AppRun(DaprPorts ports,
String successMessage,
Class serviceClass,
int maxWaitMilliseconds) {
this.command = new Command(
successMessage,
buildCommand(serviceClass, ports),
new HashMap<>() {{
put("DAPR_HTTP_PORT", ports.getHttpPort().toString());
put("DAPR_GRPC_PORT", ports.getGrpcPort().toString());
}});
this.ports = ports;
this.maxWaitMilliseconds = maxWaitMilliseconds;
}
public void start() throws InterruptedException, IOException {
long start = System.currentTimeMillis();
// First, try to stop previous run (if left running).
this.stop();
// Wait for the previous run to kill the prior process.
System.out.println("Starting application ...");
this.command.run();
if (this.ports.getAppPort() != null) {
long timeLeft = this.maxWaitMilliseconds - (System.currentTimeMillis() - start);
callWithRetry(() -> {
System.out.println("Checking if app is listening on port ...");
assertListeningOnPort(this.ports.getAppPort());
}, timeLeft);
}
System.out.println("Application started.");
}
@Override
public void stop() throws InterruptedException {
System.out.println("Stopping application ...");
try {
this.command.stop();
System.out.println("Application stopped.");
} catch (RuntimeException e) {
System.out.println("Could not stop command: " + this.command.toString());
}
}
private static String buildCommand(Class serviceClass, DaprPorts ports) {
return String.format(APP_COMMAND, serviceClass.getCanonicalName(),
ports.getAppPort() != null ? ports.getAppPort().toString() : "");
}
private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port);
java.net.SocketAddress socketAddress = new java.net.InetSocketAddress(Properties.SIDECAR_IP.get(), port);
try (java.net.Socket socket = new java.net.Socket()) {
socket.connect(socketAddress, 1000);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.printf("Confirmed listening on port %d.\n", port);
}
}

View File

@ -5,11 +5,13 @@
package io.dapr.it; package io.dapr.it;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.AfterClass;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.junit.AfterClass;
public abstract class BaseIT { public abstract class BaseIT {
@ -17,7 +19,7 @@ public abstract class BaseIT {
private static final Map<String, DaprRun.Builder> DAPR_RUN_BUILDERS = new HashMap<>(); private static final Map<String, DaprRun.Builder> DAPR_RUN_BUILDERS = new HashMap<>();
private static final Collection<DaprRun> DAPR_RUNS = new ArrayList<>(); private static final Collection<Stoppable> TO_BE_STOPPED = new ArrayList<>();
protected static DaprRun startDaprApp( protected static DaprRun startDaprApp(
String testName, String testName,
@ -35,29 +37,50 @@ public abstract class BaseIT {
} }
protected static DaprRun startDaprApp( protected static DaprRun startDaprApp(
String testName, String testName,
String successMessage, String successMessage,
Class serviceClass, Class serviceClass,
Boolean useAppPort, Boolean useAppPort,
Boolean useDaprPorts, Boolean useDaprPorts,
int maxWaitMilliseconds) throws Exception { int maxWaitMilliseconds) throws Exception {
DaprRun.Builder builder = new DaprRun.Builder( DaprRun.Builder builder = new DaprRun.Builder(
testName, testName,
() -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts), () -> DaprPorts.build(useAppPort, useDaprPorts, useDaprPorts),
successMessage, successMessage,
maxWaitMilliseconds).withServiceClass(serviceClass); maxWaitMilliseconds).withServiceClass(serviceClass);
DaprRun run = builder.build(); DaprRun run = builder.build();
DAPR_RUNS.add(run); TO_BE_STOPPED.add(run);
DAPR_RUN_BUILDERS.put(run.getAppName(), builder); DAPR_RUN_BUILDERS.put(run.getAppName(), builder);
run.start(); run.start();
run.use(); run.use();
return run; return run;
} }
protected static ImmutablePair<AppRun, DaprRun> startSplitDaprAndApp(
String testName,
String successMessage,
Class serviceClass,
Boolean useAppPort,
int maxWaitMilliseconds) throws Exception {
DaprRun.Builder builder = new DaprRun.Builder(
testName,
() -> DaprPorts.build(useAppPort, true, true),
successMessage,
maxWaitMilliseconds).withServiceClass(serviceClass);
ImmutablePair<AppRun, DaprRun> runs = builder.splitBuild();
TO_BE_STOPPED.add(runs.left);
TO_BE_STOPPED.add(runs.right);
DAPR_RUN_BUILDERS.put(runs.right.getAppName(), builder);
runs.left.start();
runs.right.start();
runs.right.use();
return runs;
}
@AfterClass @AfterClass
public static void cleanUp() throws Exception { public static void cleanUp() throws Exception {
for (DaprRun app : DAPR_RUNS) { for (Stoppable toBeStopped : TO_BE_STOPPED) {
app.stop(); toBeStopped.stop();
} }
} }

View File

@ -9,30 +9,43 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class Command { public class Command {
private static final int SUCCESS_WAIT_TIMEOUT_MINUTES = 5; private static final int SUCCESS_WAIT_TIMEOUT_MINUTES = 5;
private static final int DESTROY_WAIT_TIMEOUT_SECONDS = 5;
private final String successMessage; private final String successMessage;
private final String command; private final String command;
private Process process; private Process process;
public Command(String successMessage, String command) { private Map<String, String> env;
public Command(String successMessage, String command, Map<String, String> env) {
this.successMessage = successMessage; this.successMessage = successMessage;
this.command = command; this.command = command;
this.env = env;
}
public Command(String successMessage, String command) {
this(successMessage, command, null);
} }
public void run() throws InterruptedException, IOException { public void run() throws InterruptedException, IOException {
final AtomicBoolean success = new AtomicBoolean(false); final AtomicBoolean success = new AtomicBoolean(false);
final Semaphore finished = new Semaphore(0); final Semaphore finished = new Semaphore(0);
this.process = Runtime.getRuntime().exec(command); ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", command);
if (this.env != null) {
processBuilder.environment().putAll(this.env);
}
this.process = processBuilder.start();
final Thread stdoutReader = new Thread(() -> { final Thread stdoutReader = new Thread(() -> {
try { try {
@ -66,4 +79,20 @@ public class Command {
throw new IllegalStateException("Could not find success criteria for command: " + command); throw new IllegalStateException("Could not find success criteria for command: " + command);
} }
} }
public void stop() throws InterruptedException {
if (this.process != null) {
this.process.destroy();
Thread.sleep(DESTROY_WAIT_TIMEOUT_SECONDS * 1000);
if (this.process.isAlive()) {
this.process.destroyForcibly();
}
this.process = null;
}
}
@Override
public String toString() {
return this.command;
}
} }

View File

@ -8,7 +8,6 @@ package io.dapr.it;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;

View File

@ -6,6 +6,7 @@
package io.dapr.it; package io.dapr.it;
import io.dapr.config.Properties; import io.dapr.config.Properties;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -14,7 +15,9 @@ import java.util.function.Supplier;
import static io.dapr.it.Retry.callWithRetry; import static io.dapr.it.Retry.callWithRetry;
public class DaprRun { public class DaprRun implements Stoppable {
private static final String DAPR_SUCCESS_MESSAGE = "You're up and running!";
private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components"; private static final String DAPR_RUN = "dapr run --app-id %s --components-path ./components";
@ -114,6 +117,7 @@ public class DaprRun {
System.out.println("Dapr application started."); System.out.println("Dapr application started.");
} }
@Override
public void stop() throws InterruptedException, IOException { public void stop() throws InterruptedException, IOException {
System.out.println("Stopping dapr application ..."); System.out.println("Stopping dapr application ...");
try { try {
@ -213,11 +217,33 @@ public class DaprRun {
DaprRun build() { DaprRun build() {
return new DaprRun( return new DaprRun(
this.testName, this.testName,
this.portsSupplier.get(), this.portsSupplier.get(),
this.successMessage, this.successMessage,
this.serviceClass, this.serviceClass,
this.maxWaitMilliseconds); this.maxWaitMilliseconds);
}
/**
* Builds app and dapr run separately. It can be useful to force the restart of one of them.
* @return Pair of AppRun and DaprRun.
*/
ImmutablePair<AppRun, DaprRun> splitBuild() {
DaprPorts ports = this.portsSupplier.get();
AppRun appRun = new AppRun(
ports,
this.successMessage,
this.serviceClass,
this.maxWaitMilliseconds);
DaprRun daprRun = new DaprRun(
this.testName,
ports,
DAPR_SUCCESS_MESSAGE,
null,
this.maxWaitMilliseconds);
return new ImmutablePair<>(appRun, daprRun);
} }
} }
} }

View File

@ -0,0 +1,15 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it;
import java.io.IOException;
public interface Stoppable {
void stop() throws InterruptedException, IOException;
}

View File

@ -0,0 +1,116 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.AppRun;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.app.MyActorService;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class ActorTimerRecoveryIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class);
/**
* Create an actor, register a timer, validates its content, restarts the Actor and confirms timer continues.
* @throws Exception This test is not expected to throw. Thrown exceptions are bugs.
*/
@Test
public void timerRecoveryTest() throws Exception {
ImmutablePair<AppRun, DaprRun> runs = startSplitDaprAndApp(
ActorTimerRecoveryIT.class.getSimpleName(),
"Started MyActorService",
MyActorService.class,
true,
60000);
Thread.sleep(3000);
String actorType="MyActorTest";
logger.debug("Creating proxy builder");
ActorProxyBuilder<ActorProxy> proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class);
logger.debug("Creating actorId");
ActorId actorId = new ActorId(UUID.randomUUID().toString());
logger.debug("Building proxy");
ActorProxy proxy = proxyBuilder.build(actorId);
logger.debug("Invoking actor method 'startTimer' which will register a timer");
proxy.invokeActorMethod("startTimer", "myTimer").block();
logger.debug("Pausing 7 seconds to allow timer to fire");
Thread.sleep(7000);
ArrayList<MethodEntryTracker> logs = getAppMethodCallLogs(proxy);
validateTimerCalls(logs, 3);
// Restarts app only.
runs.left.stop();
runs.left.start();
logger.debug("Pausing 10 seconds to allow timer to fire");
Thread.sleep(10000);
ArrayList<MethodEntryTracker> newLogs = getAppMethodCallLogs(proxy);
validateTimerCalls(newLogs, 3);
// Check that the restart actually happened by confirming the old logs are not in the new logs.
for (MethodEntryTracker oldLog: logs) {
for (MethodEntryTracker newLog: newLogs) {
assertNotEquals(oldLog.toString(), newLog.toString());
}
}
// call unregister
logger.debug("Calling actor method 'stopTimer' to unregister timer");
proxy.invokeActorMethod("stopTimer", "myTimer").block();
}
ArrayList<MethodEntryTracker> getAppMethodCallLogs(ActorProxy proxy) {
ArrayList<String> logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block();
ArrayList<MethodEntryTracker> trackers = new ArrayList<MethodEntryTracker>();
for(String t : logs) {
String[] toks = t.split("\\|");
MethodEntryTracker m = new MethodEntryTracker(
toks[0].equals("Enter") ? true : false,
toks[1],
new Date(toks[2]));
trackers.add(m);
}
return trackers;
}
/**
* Validate the timer and reminder has been invoked at least x times.
* @param logs logs with info about method entries and exits returned from the app
* @param minimum minimum number of entries.
*/
void validateTimerCalls(ArrayList<MethodEntryTracker> logs, int minimum) {
// Validate the timer has been invoked at least x times. We cannot validate precisely because of
// differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above,
// we validate below with some margin. Events for each actor method call include "enter" and "exit"
// calls, so they are divided by 2.
List<MethodEntryTracker> timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList());
System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size());
assertTrue(timerInvocations.size() / 2 >= minimum);
}
}