From 85bdb24fa9f961ce8c3cb650c65616c1f79dcc0b Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 11 Dec 2020 15:28:55 -0800 Subject: [PATCH] IT for failover actor reminder. (#411) * IT for failover actor reminder. * Refactoring for actor tests. --- .../it/actors/ActorReminderFailoverIT.java | 117 ++++++++++++++++++ .../it/actors/ActorReminderRecoveryIT.java | 69 +++-------- .../dapr/it/actors/ActorTimerRecoveryIT.java | 48 ++----- .../actors/ActorTurnBasedConcurrencyIT.java | 67 +++------- .../io/dapr/it/actors/MyActorTestUtils.java | 70 +++++++++++ .../io/dapr/it/actors/app/MyActorImpl.java | 4 +- 6 files changed, 232 insertions(+), 143 deletions(-) create mode 100644 sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderFailoverIT.java create mode 100644 sdk-tests/src/test/java/io/dapr/it/actors/MyActorTestUtils.java diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderFailoverIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderFailoverIT.java new file mode 100644 index 000000000..4cd0b1199 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderFailoverIT.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.actors; + +import io.dapr.actors.ActorId; +import io.dapr.actors.client.ActorProxy; +import io.dapr.actors.client.ActorProxyBuilder; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import io.dapr.it.actors.app.MyActorService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.UUID; + +import static io.dapr.it.actors.MyActorTestUtils.*; +import static org.junit.Assert.assertNotEquals; + +public class ActorReminderFailoverIT extends BaseIT { + + private static Logger logger = LoggerFactory.getLogger(ActorReminderFailoverIT.class); + + private static final String METHOD_NAME = "receiveReminder"; + + private ActorProxy proxy; + + private DaprRun firstAppRun; + + private DaprRun secondAppRun; + + private DaprRun clientAppRun; + + @Before + public void init() throws Exception { + firstAppRun = startDaprApp( + ActorReminderFailoverIT.class.getSimpleName() + "One", + "Started MyActorService", + MyActorService.class, + true, + 60000); + secondAppRun = startDaprApp( + ActorReminderFailoverIT.class.getSimpleName() + "Two", + "Started MyActorService", + MyActorService.class, + true, + 60000); + clientAppRun = startDaprApp( + ActorReminderFailoverIT.class.getSimpleName() + "Client", + 60000); + + Thread.sleep(3000); + + ActorId actorId = new ActorId(UUID.randomUUID().toString()); + String actorType="MyActorTest"; + logger.debug("Creating proxy builder"); + + ActorProxyBuilder proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class)); + logger.debug("Creating actorId"); + logger.debug("Building proxy"); + proxy = proxyBuilder.build(actorId); + } + + @After + public void tearDown() { + // call unregister + logger.debug("Calling actor method 'stopReminder' to unregister reminder"); + proxy.invokeActorMethod("stopReminder", "myReminder").block(); + } + + /** + * Create an actor, register a reminder, validates its content, restarts the runtime and confirms reminder continues. + * @throws Exception This test is not expected to throw. Thrown exceptions are bugs. + */ + @Test + public void reminderRecoveryTest() throws Exception { + clientAppRun.use(); + + logger.debug("Invoking actor method 'startReminder' which will register a reminder"); + proxy.invokeActorMethod("startReminder", "myReminder").block(); + + logger.debug("Pausing 7 seconds to allow reminder to fire"); + Thread.sleep(7000); + + List logs = fetchMethodCallLogs(proxy); + validateMethodCalls(logs, METHOD_NAME, 3); + + int originalActorHostIdentifier = Integer.parseInt( + proxy.invokeActorMethod("getIdentifier", String.class).block()); + if (originalActorHostIdentifier == firstAppRun.getHttpPort()) { + firstAppRun.stop(); + } + if (originalActorHostIdentifier == secondAppRun.getHttpPort()) { + secondAppRun.stop(); + } + + logger.debug("Pausing 10 seconds to allow failover to take place"); + Thread.sleep(10000); + List newLogs = fetchMethodCallLogs(proxy); + logger.debug("Pausing 10 seconds to allow reminder to fire a few times"); + Thread.sleep(10000); + List newLogs2 = fetchMethodCallLogs(proxy); + logger.debug("Check if there has been additional calls"); + validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 4); + + int newActorHostIdentifier = Integer.parseInt( + proxy.invokeActorMethod("getIdentifier", String.class).block()); + assertNotEquals(originalActorHostIdentifier, newActorHostIdentifier); + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java index 218a63f7c..f33c57534 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorReminderRecoveryIT.java @@ -19,17 +19,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; -import static org.junit.Assert.assertTrue; +import static io.dapr.it.actors.MyActorTestUtils.*; public class ActorReminderRecoveryIT extends BaseIT { - private static Logger logger = LoggerFactory.getLogger(ActorReminderRecoveryIT.class); + private static final Logger logger = LoggerFactory.getLogger(ActorReminderRecoveryIT.class); + + private static final String METHOD_NAME = "receiveReminder"; private ActorProxy proxy; @@ -75,64 +74,24 @@ public class ActorReminderRecoveryIT extends BaseIT { logger.debug("Pausing 7 seconds to allow reminder to fire"); Thread.sleep(7000); - ArrayList logs = getAppMethodCallLogs(proxy); - validateReminderCalls(logs, 3); + List logs = fetchMethodCallLogs(proxy); + validateMethodCalls(logs, METHOD_NAME, 3); // Restarts runtime only. + logger.info("Stopping Dapr sidecar"); runs.right.stop(); + logger.info("Starting Dapr sidecar"); runs.right.start(); + logger.info("Dapr sidecar started"); - logger.debug("Pausing 5 seconds to allow sidecar to be healthy"); - Thread.sleep(5000); - ArrayList newLogs = getAppMethodCallLogs(proxy); + logger.debug("Pausing 7 seconds to allow sidecar to be healthy"); + Thread.sleep(7000); + List newLogs = fetchMethodCallLogs(proxy); logger.debug("Pausing 10 seconds to allow reminder to fire a few times"); Thread.sleep(10000); - ArrayList newLogs2 = getAppMethodCallLogs(proxy); + List newLogs2 = fetchMethodCallLogs(proxy); logger.debug("Check if there has been additional calls"); - validateReminderCalls(newLogs2, countReminderCalls(newLogs) + 3); - } - - ArrayList getAppMethodCallLogs(ActorProxy proxy) { - ArrayList logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block(); - ArrayList trackers = new ArrayList(); - for(String t : logs) { - String[] toks = t.split("\\|"); - MethodEntryTracker m = new MethodEntryTracker( - toks[0].equals("Enter") ? true : false, - toks[1], - new Date(toks[2])); - trackers.add(m); - } - - return trackers; - } - - /** - * Validate the reminder has been invoked at least x times. - * @param logs logs with info about method entries and exits returned from the app - * @return number of successful invocations of reminder - */ - private int countReminderCalls(ArrayList logs) { - // Counts number of times reminder is invoked. - // Events for each actor method call include "enter" and "exit" calls, so they are divided by 2. - List calls = - logs.stream().filter(x -> x.getMethodName().equals(("receiveReminder"))).collect(Collectors.toList()); - System.out.printf( - "Size of reminder count list is %d, which means it's been invoked half that many times.\n", calls.size()); - return calls.size() / 2; - } - - /** - * Validate the reminder has been invoked at least x times. - * @param logs logs with info about method entries and exits returned from the app - * @param minimum minimum number of entries. - */ - void validateReminderCalls(ArrayList logs, int minimum) { - // Validate the reminder has been invoked at least x times. We cannot validate precisely because of - // differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above, - // we validate below with some margin. - int callsCount = countReminderCalls(logs); - assertTrue(callsCount >= minimum); + validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java index e0add6a54..99c4c44ba 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTimerRecoveryIT.java @@ -17,18 +17,18 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; +import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs; +import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; public class ActorTimerRecoveryIT extends BaseIT { - private static Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class); + private static final Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class); + + private static final String METHOD_NAME = "clock"; /** * Create an actor, register a timer, validates its content, restarts the Actor and confirms timer continues. @@ -59,8 +59,8 @@ public class ActorTimerRecoveryIT extends BaseIT { logger.debug("Pausing 7 seconds to allow timer to fire"); Thread.sleep(7000); - ArrayList logs = getAppMethodCallLogs(proxy); - validateTimerCalls(logs, 3); + List logs = fetchMethodCallLogs(proxy); + validateMethodCalls(logs, METHOD_NAME, 3); // Restarts app only. runs.left.stop(); @@ -68,8 +68,8 @@ public class ActorTimerRecoveryIT extends BaseIT { logger.debug("Pausing 10 seconds to allow timer to fire"); Thread.sleep(10000); - ArrayList newLogs = getAppMethodCallLogs(proxy); - validateTimerCalls(newLogs, 3); + List newLogs = fetchMethodCallLogs(proxy); + validateMethodCalls(newLogs, METHOD_NAME, 3); // Check that the restart actually happened by confirming the old logs are not in the new logs. for (MethodEntryTracker oldLog: logs) { @@ -83,34 +83,4 @@ public class ActorTimerRecoveryIT extends BaseIT { proxy.invokeActorMethod("stopTimer", "myTimer").block(); } - ArrayList getAppMethodCallLogs(ActorProxy proxy) { - ArrayList logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block(); - ArrayList trackers = new ArrayList(); - for(String t : logs) { - String[] toks = t.split("\\|"); - MethodEntryTracker m = new MethodEntryTracker( - toks[0].equals("Enter") ? true : false, - toks[1], - new Date(toks[2])); - trackers.add(m); - } - - return trackers; - } - - /** - * Validate the timer and reminder has been invoked at least x times. - * @param logs logs with info about method entries and exits returned from the app - * @param minimum minimum number of entries. - */ - void validateTimerCalls(ArrayList logs, int minimum) { - // Validate the timer has been invoked at least x times. We cannot validate precisely because of - // differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above, - // we validate below with some margin. Events for each actor method call include "enter" and "exit" - // calls, so they are divided by 2. - List timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList()); - System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size()); - assertTrue(timerInvocations.size() / 2 >= minimum); - } - } diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java index 5303ce771..dd101ebf0 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/ActorTurnBasedConcurrencyIT.java @@ -20,22 +20,28 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Date; import java.util.List; import java.util.UUID; -import java.util.stream.Collectors; import static io.dapr.it.Retry.callWithRetry; +import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs; +import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class ActorTurnBasedConcurrencyIT extends BaseIT { - private static Logger logger = LoggerFactory.getLogger(ActorTurnBasedConcurrencyIT.class); + private static final Logger logger = LoggerFactory.getLogger(ActorTurnBasedConcurrencyIT.class); - private final String ACTOR_TYPE = "MyActorTest"; - private final String REMINDER_NAME = UUID.randomUUID().toString(); - private final String ACTOR_ID = "1"; + private static final String TIMER_METHOD_NAME = "clock"; + + private static final String REMINDER_METHOD_NAME = "receiveReminder"; + + private static final String ACTOR_TYPE = "MyActorTest"; + + private static final String REMINDER_NAME = UUID.randomUUID().toString(); + + private static final String ACTOR_ID = "1"; @After public void cleanUpTestCase() { @@ -112,10 +118,11 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { logger.debug("Pausing 7 seconds to allow timer and reminders to fire"); Thread.sleep(7000); - ArrayList logs = getAppMethodCallLogs(proxy); + List logs = fetchMethodCallLogs(proxy); validateTurnBasedConcurrency(logs); - validateTimerCalls(logs); + validateMethodCalls(logs, TIMER_METHOD_NAME, 2); + validateMethodCalls(logs, REMINDER_METHOD_NAME, 3); // call unregister logger.debug("Calling actor method 'stopTimer' to unregister timer"); @@ -133,33 +140,18 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { Thread.sleep(5000); // get history again, we don't additional timer/reminder calls - logs = getAppMethodCallLogs(proxy); - validateEventNotObserved(logs, "stopTimer", "clock"); - validateEventNotObserved(logs, "stopReminder", "receiveReminder"); + logs = fetchMethodCallLogs(proxy); + validateEventNotObserved(logs, "stopTimer", TIMER_METHOD_NAME); + validateEventNotObserved(logs, "stopReminder", REMINDER_METHOD_NAME); } - ArrayList getAppMethodCallLogs(ActorProxy proxy) { - ArrayList logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block(); - ArrayList trackers = new ArrayList(); - for(String t : logs) { - String[] toks = t.split("\\|"); - MethodEntryTracker m = new MethodEntryTracker( - toks[0].equals("Enter") ? true : false, - toks[1], - new Date(toks[2])); - trackers.add(m); - } - - return trackers; - } - /** * Validate turn-based concurrency enter and exit logging - we should see "Enter" and "Exit" alternate since * our app implementation service logs that on actor methods. * @param logs logs with info about method entries and exits returned from the app */ - void validateTurnBasedConcurrency(ArrayList logs) { + void validateTurnBasedConcurrency(List logs) { if (logs.size() == 0) { logger.warn("No logs"); return; @@ -186,25 +178,6 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { } } - /** - * Validate the timer and reminder has been invoked at least x times. - * @param logs logs with info about method entries and exits returned from the app - */ - void validateTimerCalls(ArrayList logs) { - - // Validate the timer has been invoked at least x times. We cannot validate precisely because of - // differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above, - // we validate below with some margin. Events for each actor method call include "enter" and "exit" - // calls, so they are divided by 2. - List timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList()); - System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size()); - assertTrue(timerInvocations.size() / 2 >= 2); - - List reminderInvocations = logs.stream().filter(x -> x.getMethodName().equals(("receiveReminder"))).collect(Collectors.toList()); - System.out.println("Size of reminder count list is %d, which means it's been invoked half that many times" + reminderInvocations.size()); - assertTrue(reminderInvocations.size() / 2 >= 3); - } - /** * Validates that after an event in "startingPointMethodName", the events in "methodNameThatShouldNotAppear" do not appear. * This can be used to validate that timers and reminders are stopped. @@ -213,7 +186,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT { * @param startingPointMethodName The name of the method after which "methodNameThatShouldNotAppear" should not appear * @param methodNameThatShouldNotAppear The method which should not appear */ - void validateEventNotObserved(ArrayList logs, String startingPointMethodName, String methodNameThatShouldNotAppear) throws Exception { + void validateEventNotObserved(List logs, String startingPointMethodName, String methodNameThatShouldNotAppear) { System.out.println("Validating event " + methodNameThatShouldNotAppear + " does not appear after event " + startingPointMethodName); int index = -1; for (int i = 0; i < logs.size(); i++) { diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/MyActorTestUtils.java b/sdk-tests/src/test/java/io/dapr/it/actors/MyActorTestUtils.java new file mode 100644 index 000000000..fb5895810 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/actors/MyActorTestUtils.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + */ + +package io.dapr.it.actors; + +import io.dapr.actors.client.ActorProxy; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; + +/** + * Utility class for tests that use MyActor class. + */ +public class MyActorTestUtils { + + private MyActorTestUtils() {} + + /** + * Count number of calls. + * @param logs logs with info about method entries and exits returned from the app + * @param methodName name of the method to be counted + * @return number of successful invocations of reminder + */ + static int countMethodCalls(List logs, String methodName) { + // Counts number of times reminder is invoked. + // Events for each actor method call include "enter" and "exit" calls, so they are divided by 2. + List calls = + logs.stream().filter(x -> x.getMethodName().equals(methodName)).collect(Collectors.toList()); + System.out.printf( + "Size of %s count list is %d, which means it's been invoked half that many times.\n", methodName, calls.size()); + return calls.size() / 2; + } + + /** + * Validate the number of call of a given method. + * @param logs logs with info about method entries and exits returned from the app + * @param methodName name of the method to be validated. + * @param minimum minimum number of entries. + */ + static void validateMethodCalls(List logs, String methodName, int minimum) { + int callsCount = countMethodCalls(logs, methodName); + assertTrue(callsCount >= minimum); + } + + /** + * Fetches the call log for the given Actor. + * @param proxy Actor proxy for the actor. + * @return List of call log. + */ + static List fetchMethodCallLogs(ActorProxy proxy) { + ArrayList logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block(); + ArrayList trackers = new ArrayList(); + for(String t : logs) { + String[] toks = t.split("\\|"); + MethodEntryTracker m = new MethodEntryTracker( + toks[0].equals("Enter") ? true : false, + toks[1], + new Date(toks[2])); + trackers.add(m); + } + + return trackers; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java index fe4c0d43f..89679711a 100644 --- a/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java +++ b/sdk-tests/src/test/java/io/dapr/it/actors/app/MyActorImpl.java @@ -140,8 +140,8 @@ public class MyActorImpl extends AbstractActor implements MyActor, Remindable Server reminded actor %s of: %s for %s @ %s hosted by instance id %s", + this.getId(), reminderName, state, utcNowAsString, System.getenv("DAPR_HTTP_PORT"))); this.formatAndLog(false, "receiveReminder"); });