IT for actor state + handle actor config. (#163)

This commit is contained in:
Artur Souza 2020-01-29 21:50:46 -08:00 committed by GitHub
parent e72f1fa919
commit 91ac017f7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 366 additions and 12 deletions

View File

@ -99,6 +99,15 @@ public class ActorRuntime {
return instance;
}
/**
* Gets the Actor configuration for this runtime.
*
* @return Actor configuration.
*/
public ActorRuntimeConfig getConfig() {
return this.config;
}
/**
* Gets the Actor configuration for this runtime.
*

View File

@ -5,14 +5,37 @@
package io.dapr.actors.runtime;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
class ActorRuntimeConfig {
/**
* Represents the configuration for the Actor Runtime.
*/
public class ActorRuntimeConfig {
private Collection<String> registeredActorTypes = new ArrayList<>();
private Duration actorIdleTimeout;
private Duration actorScanInterval;
private Duration drainOngoingCallTimeout;
private Boolean drainBalancedActors;
/**
* Instantiates a new config for the Actor Runtime.
*/
ActorRuntimeConfig() {
}
/**
* Adds a registered actor to the list of registered actors.
* @param actorTypeName Actor type that was registered.
* @return This instance.
*/
ActorRuntimeConfig addRegisteredActorType(String actorTypeName) {
if (actorTypeName == null) {
throw new IllegalArgumentException("Registered actor must have a type name.");
@ -22,12 +45,92 @@ class ActorRuntimeConfig {
return this;
}
/**
* Gets the list of registered actor types.
*
* @return List of registered actor types.
*/
Collection<String> getRegisteredActorTypes() {
return Collections.unmodifiableCollection(registeredActorTypes);
}
ActorRuntimeConfig setRegisteredActorTypes(Collection<String> registeredActorTypes) {
this.registeredActorTypes = registeredActorTypes;
/**
* Gets the duration for Actors' timeout.
*
* @return Duration for Actors' timeout.
*/
public Duration getActorIdleTimeout() {
return actorIdleTimeout;
}
/**
* Sets the duration for Actors' timeout.
*
* @param actorIdleTimeout Duration for Actors' timeout.
* @return This instance.
*/
public ActorRuntimeConfig setActorIdleTimeout(Duration actorIdleTimeout) {
this.actorIdleTimeout = actorIdleTimeout;
return this;
}
/**
* Gets the duration to scan for Actors.
*
* @return The duration to scan for Actors.
*/
public Duration getActorScanInterval() {
return actorScanInterval;
}
/**
* Sets the duration to scan for Actors.
*
* @param actorScanInterval The duration to scan for Actors.
* @return This instance.
*/
public ActorRuntimeConfig setActorScanInterval(Duration actorScanInterval) {
this.actorScanInterval = actorScanInterval;
return this;
}
/**
* Gets the timeout to drain ongoing calls.
*
* @return The timeout to drain ongoing calls.
*/
public Duration getDrainOngoingCallTimeout() {
return drainOngoingCallTimeout;
}
/**
* Sets the timeout to drain ongoing calls.
*
* @param drainOngoingCallTimeout The timeout to drain ongoing calls.
* @return This instance.
*/
public ActorRuntimeConfig setDrainOngoingCallTimeout(Duration drainOngoingCallTimeout) {
this.drainOngoingCallTimeout = drainOngoingCallTimeout;
return this;
}
/**
* Gets whether balanced actors should be drained.
*
* @return Whether balanced actors should be drained.
*/
public Boolean getDrainBalancedActors() {
return drainBalancedActors;
}
/**
* Sets whether balanced actors should be drained.
*
* @param drainBalancedActors Whether balanced actors should be drained.
* @return This instance.
*/
public ActorRuntimeConfig setDrainBalancedActors(Boolean drainBalancedActors) {
this.drainBalancedActors = drainBalancedActors;
return this;
}

View File

@ -197,4 +197,4 @@ class DaprStateAsyncProvider {
}
}
}
}

View File

@ -119,7 +119,21 @@ public class ObjectSerializer extends io.dapr.client.ObjectSerializer {
generator.writeString(actorClass);
}
generator.writeEndArray();
// TODO: handle configuration.
if (config.getActorIdleTimeout() != null) {
generator.writeStringField("actorIdleTimeout",
DurationUtils.ConvertDurationToDaprFormat(config.getActorIdleTimeout()));
}
if (config.getActorScanInterval() != null) {
generator.writeStringField("actorScanInterval",
DurationUtils.ConvertDurationToDaprFormat(config.getActorScanInterval()));
}
if (config.getDrainOngoingCallTimeout() != null) {
generator.writeStringField("drainOngoingCallTimeout",
DurationUtils.ConvertDurationToDaprFormat(config.getDrainOngoingCallTimeout()));
}
if (config.getDrainBalancedActors() != null) {
generator.writeBooleanField("drainBalancedActors", config.getDrainBalancedActors());
}
generator.writeEndObject();
generator.close();
writer.flush();

View File

@ -9,7 +9,7 @@ import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.actors.services.springboot.ActorService;
import io.dapr.it.actors.services.springboot.DemoActorService;
import io.dapr.it.services.EmptyService;
import io.dapr.serializer.DefaultObjectSerializer;
import java.util.List;
@ -33,8 +33,8 @@ public class ActivationDeactivationIT extends BaseIT {
// The call below will fail if service cannot start successfully.
startDaprApp(
ActivationDeactivationIT.class.getSimpleName(),
ActorService.SUCCESS_MESSAGE,
ActorService.class,
DemoActorService.SUCCESS_MESSAGE,
DemoActorService.class,
true,
60000);
}

View File

@ -0,0 +1,101 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors;
import io.dapr.actors.ActorId;
import io.dapr.actors.client.ActorProxy;
import io.dapr.actors.client.ActorProxyBuilder;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.actors.services.springboot.DemoActorService;
import io.dapr.it.actors.services.springboot.StatefulActor;
import io.dapr.it.actors.services.springboot.StatefulActorService;
import io.dapr.it.services.EmptyService;
import io.dapr.serializer.DefaultObjectSerializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.dapr.it.Retry.callWithRetry;
import static org.junit.Assert.*;
public class ActorStateIT extends BaseIT {
private static Logger logger = LoggerFactory.getLogger(ActorStateIT.class);
@Test
public void writeReadState() throws Exception {
logger.debug("Starting actor runtime ...");
// The call below will fail if service cannot start successfully.
DaprRun runtime = startDaprApp(
this.getClass().getSimpleName(),
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000);
String message = "This is a message to be saved and retrieved.";
ActorId actorId = new ActorId(Long.toString(System.currentTimeMillis()));
String actorType = "StatefulActorTest";
logger.debug("Building proxy ...");
ActorProxyBuilder proxyBuilder = new ActorProxyBuilder(actorType);
ActorProxy proxy = proxyBuilder.build(actorId);
callWithRetry(() -> {
logger.debug("Invoking writeMessage ... ");
proxy.invokeActorMethod("writeMessage", message).block();
}, 5000);
callWithRetry(() -> {
logger.debug("Invoking readMessage where data is probably still cached ... ");
String result = proxy.invokeActorMethod("readMessage", String.class).block();
assertEquals(message, result);
}, 5000);
// writeData uses an object instead of String to test serialization.
StatefulActor.MyData mydata = new StatefulActor.MyData();
mydata.value = "My data value.";
callWithRetry(() -> {
logger.debug("Invoking writeData with object ... ");
proxy.invokeActorMethod("writeData", mydata).block();
}, 5000);
callWithRetry(() -> {
logger.debug("Invoking readData where data is probably still cached ... ");
StatefulActor.MyData result = proxy.invokeActorMethod("readData", StatefulActor.MyData.class).block();
assertEquals(mydata.value, result.value);
}, 5000);
logger.debug("Waiting, so actor can be deactivated ...");
Thread.sleep(10000);
logger.debug("Stopping service ...");
runtime.stop();
logger.debug("Starting service ...");
startDaprApp(
this.getClass().getSimpleName(),
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000);
ActorProxy newProxy = proxyBuilder.build(actorId);
callWithRetry(() -> {
logger.debug("Invoking readMessage where data is not cached ... ");
String result = newProxy.invokeActorMethod("readMessage", String.class).block();
assertEquals(message, result);
}, 5000);
callWithRetry(() -> {
logger.debug("Invoking readData where data is not cached ... ");
StatefulActor.MyData result = newProxy.invokeActorMethod("readData", StatefulActor.MyData.class).block();
assertEquals(mydata.value, result.value);
}, 5000);
logger.debug("Finished testing actor string state.");
}
}

View File

@ -21,8 +21,8 @@ public class DaprController {
}
@GetMapping("/dapr/config")
public String daprConfig() throws Exception {
return "{\"actorIdleTimeout\":\"5s\",\"actorScanInterval\":\"2s\",\"drainOngoingCallTimeout\":\"1s\",\"drainBalancedActors\":true,\"entities\":[\"DemoActorTest\"]}";
public byte[] daprConfig() throws Exception {
return ActorRuntime.getInstance().serializeConfig();
}
@PostMapping(path = "/actors/{type}/{id}")

View File

@ -11,4 +11,8 @@ public interface DemoActor {
String say(String something);
List<String> retrieveActiveActors();
void writeMessage(String something);
String readMessage();
}

View File

@ -49,6 +49,20 @@ public class DemoActorImpl extends AbstractActor implements DemoActor {
return Collections.unmodifiableList(ACTIVE_ACTOR);
}
@Override
public void writeMessage(String something) {
super.getActorStateManager().set("message", something).block();
}
@Override
public String readMessage() {
if (super.getActorStateManager().contains("message").block()) {
return super.getActorStateManager().get("message", String.class).block();
}
return null;
}
@Override
protected Mono<Void> onActivate() {
return Mono.fromRunnable(() -> ACTIVE_ACTOR.add(super.getId().toString())).then(super.onActivate());

View File

@ -8,7 +8,9 @@ package io.dapr.it.actors.services.springboot;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.serializer.DefaultObjectSerializer;
public class ActorService {
import java.time.Duration;
public class DemoActorService {
public static final String SUCCESS_MESSAGE = "actors: established connection to placement service at localhost";
@ -22,9 +24,12 @@ public class ActorService {
// If port string is not valid, it will throw an exception.
long port = Long.parseLong(args[0]);
ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofSeconds(5));
ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(2));
ActorRuntime.getInstance().getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(10));
ActorRuntime.getInstance().getConfig().setDrainBalancedActors(true);
ActorRuntime.getInstance().registerActor(
DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
DaprApplication.start(port);
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors.services.springboot;
public interface StatefulActor {
void writeMessage(String something);
String readMessage();
void writeData(MyData something);
MyData readData();
class MyData {
public String value;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors.services.springboot;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.ActorRuntimeContext;
import io.dapr.actors.runtime.ActorType;
@ActorType(name = "StatefulActorTest")
public class StatefulActorImpl extends AbstractActor implements StatefulActor {
public StatefulActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public void writeMessage(String something) {
super.getActorStateManager().set("message", something).block();
}
@Override
public String readMessage() {
if (super.getActorStateManager().contains("message").block()) {
return super.getActorStateManager().get("message", String.class).block();
}
return null;
}
@Override
public void writeData(MyData something) {
super.getActorStateManager().set("mydata", something).block();
}
@Override
public MyData readData() {
if (super.getActorStateManager().contains("mydata").block()) {
return super.getActorStateManager().get("mydata", MyData.class).block();
}
return null;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.actors.services.springboot;
import io.dapr.actors.runtime.ActorRuntime;
import io.dapr.serializer.DefaultObjectSerializer;
import java.time.Duration;
public class StatefulActorService {
public static final String SUCCESS_MESSAGE = "actors: established connection to placement service at localhost";
/**
* Starts the service.
*
* @param args Expects the port as only argument.
* @throws Exception If cannot start service.
*/
public static void main(String[] args) throws Exception {
// If port string is not valid, it will throw an exception.
long port = Long.parseLong(args[0]);
ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofSeconds(5));
ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(2));
ActorRuntime.getInstance().getConfig().setDrainOngoingCallTimeout(Duration.ofSeconds(10));
ActorRuntime.getInstance().getConfig().setDrainBalancedActors(true);
ActorRuntime.getInstance().registerActor(
StatefulActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
DaprApplication.start(port);
}
}