IT for failover actor reminder. (#411)

* IT for failover actor reminder.

* Refactoring for actor tests.
This commit is contained in:
Artur Souza 2020-12-11 15:28:55 -08:00 committed by GitHub
parent 3f51e29dde
commit 85bdb24fa9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 232 additions and 143 deletions

View File

@ -0,0 +1,117 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.app.MyActorService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.UUID;
import static io.dapr.it.actors.MyActorTestUtils.*;
import static org.junit.Assert.assertNotEquals;
public class ActorReminderFailoverIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorReminderFailoverIT.class);
private static final String METHOD_NAME = "receiveReminder";
private ActorProxy proxy;
private DaprRun firstAppRun;
private DaprRun secondAppRun;
private DaprRun clientAppRun;
@Before
public void init() throws Exception {
firstAppRun = startDaprApp(
ActorReminderFailoverIT.class.getSimpleName() + "One",
"Started MyActorService",
MyActorService.class,
true,
60000);
secondAppRun = startDaprApp(
ActorReminderFailoverIT.class.getSimpleName() + "Two",
"Started MyActorService",
MyActorService.class,
true,
60000);
clientAppRun = startDaprApp(
ActorReminderFailoverIT.class.getSimpleName() + "Client",
60000);
Thread.sleep(3000);
ActorId actorId = new ActorId(UUID.randomUUID().toString());
String actorType="MyActorTest";
logger.debug("Creating proxy builder");
ActorProxyBuilder<ActorProxy> proxyBuilder = deferClose(new ActorProxyBuilder(actorType, ActorProxy.class));
logger.debug("Creating actorId");
logger.debug("Building proxy");
proxy = proxyBuilder.build(actorId);
}
@After
public void tearDown() {
// call unregister
logger.debug("Calling actor method 'stopReminder' to unregister reminder");
proxy.invokeActorMethod("stopReminder", "myReminder").block();
}
/**
* Create an actor, register a reminder, validates its content, restarts the runtime and confirms reminder continues.
* @throws Exception This test is not expected to throw. Thrown exceptions are bugs.
*/
@Test
public void reminderRecoveryTest() throws Exception {
clientAppRun.use();
logger.debug("Invoking actor method 'startReminder' which will register a reminder");
proxy.invokeActorMethod("startReminder", "myReminder").block();
logger.debug("Pausing 7 seconds to allow reminder to fire");
Thread.sleep(7000);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateMethodCalls(logs, METHOD_NAME, 3);
int originalActorHostIdentifier = Integer.parseInt(
proxy.invokeActorMethod("getIdentifier", String.class).block());
if (originalActorHostIdentifier == firstAppRun.getHttpPort()) {
firstAppRun.stop();
}
if (originalActorHostIdentifier == secondAppRun.getHttpPort()) {
secondAppRun.stop();
}
logger.debug("Pausing 10 seconds to allow failover to take place");
Thread.sleep(10000);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
logger.debug("Pausing 10 seconds to allow reminder to fire a few times");
Thread.sleep(10000);
List<MethodEntryTracker> newLogs2 = fetchMethodCallLogs(proxy);
logger.debug("Check if there has been additional calls");
validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 4);
int newActorHostIdentifier = Integer.parseInt(
proxy.invokeActorMethod("getIdentifier", String.class).block());
assertNotEquals(originalActorHostIdentifier, newActorHostIdentifier);
}
}

View File

@ -19,17 +19,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
import static io.dapr.it.actors.MyActorTestUtils.*;
public class ActorReminderRecoveryIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorReminderRecoveryIT.class);
private static final Logger logger = LoggerFactory.getLogger(ActorReminderRecoveryIT.class);
private static final String METHOD_NAME = "receiveReminder";
private ActorProxy proxy;
@ -75,64 +74,24 @@ public class ActorReminderRecoveryIT extends BaseIT {
logger.debug("Pausing 7 seconds to allow reminder to fire");
Thread.sleep(7000);
ArrayList<MethodEntryTracker> logs = getAppMethodCallLogs(proxy);
validateReminderCalls(logs, 3);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateMethodCalls(logs, METHOD_NAME, 3);
// Restarts runtime only.
logger.info("Stopping Dapr sidecar");
runs.right.stop();
logger.info("Starting Dapr sidecar");
runs.right.start();
logger.info("Dapr sidecar started");
logger.debug("Pausing 5 seconds to allow sidecar to be healthy");
Thread.sleep(5000);
ArrayList<MethodEntryTracker> newLogs = getAppMethodCallLogs(proxy);
logger.debug("Pausing 7 seconds to allow sidecar to be healthy");
Thread.sleep(7000);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
logger.debug("Pausing 10 seconds to allow reminder to fire a few times");
Thread.sleep(10000);
ArrayList<MethodEntryTracker> newLogs2 = getAppMethodCallLogs(proxy);
List<MethodEntryTracker> newLogs2 = fetchMethodCallLogs(proxy);
logger.debug("Check if there has been additional calls");
validateReminderCalls(newLogs2, countReminderCalls(newLogs) + 3);
}
ArrayList<MethodEntryTracker> getAppMethodCallLogs(ActorProxy proxy) {
ArrayList<String> logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block();
ArrayList<MethodEntryTracker> trackers = new ArrayList<MethodEntryTracker>();
for(String t : logs) {
String[] toks = t.split("\\|");
MethodEntryTracker m = new MethodEntryTracker(
toks[0].equals("Enter") ? true : false,
toks[1],
new Date(toks[2]));
trackers.add(m);
}
return trackers;
}
/**
* Validate the reminder has been invoked at least x times.
* @param logs logs with info about method entries and exits returned from the app
* @return number of successful invocations of reminder
*/
private int countReminderCalls(ArrayList<MethodEntryTracker> logs) {
// Counts number of times reminder is invoked.
// Events for each actor method call include "enter" and "exit" calls, so they are divided by 2.
List<MethodEntryTracker> calls =
logs.stream().filter(x -> x.getMethodName().equals(("receiveReminder"))).collect(Collectors.toList());
System.out.printf(
"Size of reminder count list is %d, which means it's been invoked half that many times.\n", calls.size());
return calls.size() / 2;
}
/**
* Validate the reminder has been invoked at least x times.
* @param logs logs with info about method entries and exits returned from the app
* @param minimum minimum number of entries.
*/
void validateReminderCalls(ArrayList<MethodEntryTracker> logs, int minimum) {
// Validate the reminder has been invoked at least x times. We cannot validate precisely because of
// differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above,
// we validate below with some margin.
int callsCount = countReminderCalls(logs);
assertTrue(callsCount >= minimum);
validateMethodCalls(newLogs2, METHOD_NAME, countMethodCalls(newLogs, METHOD_NAME) + 3);
}
}

View File

@ -17,18 +17,18 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs;
import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class ActorTimerRecoveryIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class);
private static final Logger logger = LoggerFactory.getLogger(ActorTimerRecoveryIT.class);
private static final String METHOD_NAME = "clock";
/**
* Create an actor, register a timer, validates its content, restarts the Actor and confirms timer continues.
@ -59,8 +59,8 @@ public class ActorTimerRecoveryIT extends BaseIT {
logger.debug("Pausing 7 seconds to allow timer to fire");
Thread.sleep(7000);
ArrayList<MethodEntryTracker> logs = getAppMethodCallLogs(proxy);
validateTimerCalls(logs, 3);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateMethodCalls(logs, METHOD_NAME, 3);
// Restarts app only.
runs.left.stop();
@ -68,8 +68,8 @@ public class ActorTimerRecoveryIT extends BaseIT {
logger.debug("Pausing 10 seconds to allow timer to fire");
Thread.sleep(10000);
ArrayList<MethodEntryTracker> newLogs = getAppMethodCallLogs(proxy);
validateTimerCalls(newLogs, 3);
List<MethodEntryTracker> newLogs = fetchMethodCallLogs(proxy);
validateMethodCalls(newLogs, METHOD_NAME, 3);
// Check that the restart actually happened by confirming the old logs are not in the new logs.
for (MethodEntryTracker oldLog: logs) {
@ -83,34 +83,4 @@ public class ActorTimerRecoveryIT extends BaseIT {
proxy.invokeActorMethod("stopTimer", "myTimer").block();
}
ArrayList<MethodEntryTracker> getAppMethodCallLogs(ActorProxy proxy) {
ArrayList<String> logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block();
ArrayList<MethodEntryTracker> trackers = new ArrayList<MethodEntryTracker>();
for(String t : logs) {
String[] toks = t.split("\\|");
MethodEntryTracker m = new MethodEntryTracker(
toks[0].equals("Enter") ? true : false,
toks[1],
new Date(toks[2]));
trackers.add(m);
}
return trackers;
}
/**
* Validate the timer and reminder has been invoked at least x times.
* @param logs logs with info about method entries and exits returned from the app
* @param minimum minimum number of entries.
*/
void validateTimerCalls(ArrayList<MethodEntryTracker> logs, int minimum) {
// Validate the timer has been invoked at least x times. We cannot validate precisely because of
// differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above,
// we validate below with some margin. Events for each actor method call include "enter" and "exit"
// calls, so they are divided by 2.
List<MethodEntryTracker> timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList());
System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size());
assertTrue(timerInvocations.size() / 2 >= minimum);
}
}

View File

@ -20,22 +20,28 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static io.dapr.it.Retry.callWithRetry;
import static io.dapr.it.actors.MyActorTestUtils.fetchMethodCallLogs;
import static io.dapr.it.actors.MyActorTestUtils.validateMethodCalls;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class ActorTurnBasedConcurrencyIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorTurnBasedConcurrencyIT.class);
private static final Logger logger = LoggerFactory.getLogger(ActorTurnBasedConcurrencyIT.class);
private final String ACTOR_TYPE = "MyActorTest";
private final String REMINDER_NAME = UUID.randomUUID().toString();
private final String ACTOR_ID = "1";
private static final String TIMER_METHOD_NAME = "clock";
private static final String REMINDER_METHOD_NAME = "receiveReminder";
private static final String ACTOR_TYPE = "MyActorTest";
private static final String REMINDER_NAME = UUID.randomUUID().toString();
private static final String ACTOR_ID = "1";
@After
public void cleanUpTestCase() {
@ -112,10 +118,11 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
logger.debug("Pausing 7 seconds to allow timer and reminders to fire");
Thread.sleep(7000);
ArrayList<MethodEntryTracker> logs = getAppMethodCallLogs(proxy);
List<MethodEntryTracker> logs = fetchMethodCallLogs(proxy);
validateTurnBasedConcurrency(logs);
validateTimerCalls(logs);
validateMethodCalls(logs, TIMER_METHOD_NAME, 2);
validateMethodCalls(logs, REMINDER_METHOD_NAME, 3);
// call unregister
logger.debug("Calling actor method 'stopTimer' to unregister timer");
@ -133,33 +140,18 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
Thread.sleep(5000);
// get history again, we don't additional timer/reminder calls
logs = getAppMethodCallLogs(proxy);
validateEventNotObserved(logs, "stopTimer", "clock");
validateEventNotObserved(logs, "stopReminder", "receiveReminder");
logs = fetchMethodCallLogs(proxy);
validateEventNotObserved(logs, "stopTimer", TIMER_METHOD_NAME);
validateEventNotObserved(logs, "stopReminder", REMINDER_METHOD_NAME);
}
ArrayList<MethodEntryTracker> getAppMethodCallLogs(ActorProxy proxy) {
ArrayList<String> logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block();
ArrayList<MethodEntryTracker> trackers = new ArrayList<MethodEntryTracker>();
for(String t : logs) {
String[] toks = t.split("\\|");
MethodEntryTracker m = new MethodEntryTracker(
toks[0].equals("Enter") ? true : false,
toks[1],
new Date(toks[2]));
trackers.add(m);
}
return trackers;
}
/**
* Validate turn-based concurrency enter and exit logging - we should see "Enter" and "Exit" alternate since
* our app implementation service logs that on actor methods.
* @param logs logs with info about method entries and exits returned from the app
*/
void validateTurnBasedConcurrency(ArrayList<MethodEntryTracker> logs) {
void validateTurnBasedConcurrency(List<MethodEntryTracker> logs) {
if (logs.size() == 0) {
logger.warn("No logs");
return;
@ -186,25 +178,6 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
}
}
/**
* Validate the timer and reminder has been invoked at least x times.
* @param logs logs with info about method entries and exits returned from the app
*/
void validateTimerCalls(ArrayList<MethodEntryTracker> logs) {
// Validate the timer has been invoked at least x times. We cannot validate precisely because of
// differences due issues like how loaded the machine may be. Based on its dueTime and period, and our sleep above,
// we validate below with some margin. Events for each actor method call include "enter" and "exit"
// calls, so they are divided by 2.
List<MethodEntryTracker> timerInvocations = logs.stream().filter(x -> x.getMethodName().equals(("clock"))).collect(Collectors.toList());
System.out.println("Size of timer count list is %d, which means it's been invoked half that many times" + timerInvocations.size());
assertTrue(timerInvocations.size() / 2 >= 2);
List<MethodEntryTracker> reminderInvocations = logs.stream().filter(x -> x.getMethodName().equals(("receiveReminder"))).collect(Collectors.toList());
System.out.println("Size of reminder count list is %d, which means it's been invoked half that many times" + reminderInvocations.size());
assertTrue(reminderInvocations.size() / 2 >= 3);
}
/**
* Validates that after an event in "startingPointMethodName", the events in "methodNameThatShouldNotAppear" do not appear.
* This can be used to validate that timers and reminders are stopped.
@ -213,7 +186,7 @@ public class ActorTurnBasedConcurrencyIT extends BaseIT {
* @param startingPointMethodName The name of the method after which "methodNameThatShouldNotAppear" should not appear
* @param methodNameThatShouldNotAppear The method which should not appear
*/
void validateEventNotObserved(ArrayList<MethodEntryTracker> logs, String startingPointMethodName, String methodNameThatShouldNotAppear) throws Exception {
void validateEventNotObserved(List<MethodEntryTracker> logs, String startingPointMethodName, String methodNameThatShouldNotAppear) {
System.out.println("Validating event " + methodNameThatShouldNotAppear + " does not appear after event " + startingPointMethodName);
int index = -1;
for (int i = 0; i < logs.size(); i++) {

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors;
import io.dapr.actors.client.ActorProxy;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.Assert.assertTrue;
/**
* Utility class for tests that use MyActor class.
*/
public class MyActorTestUtils {
private MyActorTestUtils() {}
/**
* Count number of calls.
* @param logs logs with info about method entries and exits returned from the app
* @param methodName name of the method to be counted
* @return number of successful invocations of reminder
*/
static int countMethodCalls(List<MethodEntryTracker> logs, String methodName) {
// Counts number of times reminder is invoked.
// Events for each actor method call include "enter" and "exit" calls, so they are divided by 2.
List<MethodEntryTracker> calls =
logs.stream().filter(x -> x.getMethodName().equals(methodName)).collect(Collectors.toList());
System.out.printf(
"Size of %s count list is %d, which means it's been invoked half that many times.\n", methodName, calls.size());
return calls.size() / 2;
}
/**
* Validate the number of call of a given method.
* @param logs logs with info about method entries and exits returned from the app
* @param methodName name of the method to be validated.
* @param minimum minimum number of entries.
*/
static void validateMethodCalls(List<MethodEntryTracker> logs, String methodName, int minimum) {
int callsCount = countMethodCalls(logs, methodName);
assertTrue(callsCount >= minimum);
}
/**
* Fetches the call log for the given Actor.
* @param proxy Actor proxy for the actor.
* @return List of call log.
*/
static List<MethodEntryTracker> fetchMethodCallLogs(ActorProxy proxy) {
ArrayList<String> logs = proxy.invokeActorMethod("getCallLog", ArrayList.class).block();
ArrayList<MethodEntryTracker> trackers = new ArrayList<MethodEntryTracker>();
for(String t : logs) {
String[] toks = t.split("\\|");
MethodEntryTracker m = new MethodEntryTracker(
toks[0].equals("Enter") ? true : false,
toks[1],
new Date(toks[2]));
trackers.add(m);
}
return trackers;
}
}

View File

@ -140,8 +140,8 @@ public class MyActorImpl extends AbstractActor implements MyActor, Remindable<St
// Handles the request by printing message.
System.out.println(String.format(
"^^^^^^^^^^^^^^Server reminded actor %s of: %s for %s @ %s",
this.getId(), reminderName, state, utcNowAsString));
"> Server reminded actor %s of: %s for %s @ %s hosted by instance id %s",
this.getId(), reminderName, state, utcNowAsString, System.getenv("DAPR_HTTP_PORT")));
this.formatAndLog(false, "receiveReminder");
});