This commit is contained in:
Cassie Coyle 2025-08-28 01:36:19 -07:00 committed by GitHub
commit 8ee752d6de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 492 additions and 152 deletions

View File

@ -104,7 +104,7 @@ jobs:
./dist/linux_amd64/release/placement &
- name: Spin local environment
run: |
docker compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka
docker compose -f ./sdk-tests/deploy/local-test.yml up -d mongo kafka mysql
docker ps
- name: Install local ToxiProxy to simulate connectivity issues to Dapr sidecar
run: |

View File

@ -0,0 +1,88 @@
/*
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.actors.runtime;
import java.time.Instant;
/**
* Represents a state change for an actor.
*/
final class ActorState<T> {
/**
* Name of the state being changed.
*/
private final String name;
/**
* New value for the state being changed.
*/
private final T value;
/**
* Expiration.
*/
private final Instant expiration;
/**
* Creates a new instance of the metadata on actor state.
*
* @param name Name of the state being changed.
* @param value Value to be set.
*/
ActorState(String name, T value) {
this(name, value, null);
}
/**
* Creates a new instance of the metadata on actor state.
*
* @param name Name of the state being changed.
* @param value Value to be set.
* @param expiration When the value is set to expire (recommended but accepts null).
*/
ActorState(String name, T value, Instant expiration) {
this.name = name;
this.value = value;
this.expiration = expiration;
}
/**
* Gets the name of the state being changed.
*
* @return Name of the state.
*/
String getName() {
return name;
}
/**
* Gets the new value of the state being changed.
*
* @return New value.
*/
T getValue() {
return value;
}
/**
* Gets the expiration of the state.
*
* @return State expiration.
*/
Instant getExpiration() {
return expiration;
}
}

View File

@ -19,14 +19,9 @@ package io.dapr.actors.runtime;
public final class ActorStateChange {
/**
* Name of the state being changed.
* State being changed.
*/
private final String stateName;
/**
* New value for the state being changed.
*/
private final Object value;
private final ActorState state;
/**
* Type of change {@link ActorStateChangeKind}.
@ -36,32 +31,21 @@ public final class ActorStateChange {
/**
* Creates an actor state change.
*
* @param stateName Name of the state being changed.
* @param value New value for the state being changed.
* @param state State being changed.
* @param changeKind Kind of change.
*/
ActorStateChange(String stateName, Object value, ActorStateChangeKind changeKind) {
this.stateName = stateName;
this.value = value;
ActorStateChange(ActorState state, ActorStateChangeKind changeKind) {
this.state = state;
this.changeKind = changeKind;
}
/**
* Gets the name of the state being changed.
* Gets the state being changed.
*
* @return Name of the state.
* @return state.
*/
String getStateName() {
return stateName;
}
/**
* Gets the new value of the state being changed.
*
* @return New value.
*/
Object getValue() {
return value;
ActorState getState() {
return state;
}
/**

View File

@ -17,7 +17,10 @@ import io.dapr.actors.ActorId;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -68,10 +71,11 @@ public class ActorStateManager {
*
* @param stateName Name of the state being added.
* @param value Value to be added.
* @param expiration State's expiration.
* @param <T> Type of the object being added.
* @return Asynchronous void operation.
*/
public <T> Mono<Void> add(String stateName, T value) {
public <T> Mono<Void> add(String stateName, T value, Instant expiration) {
return Mono.fromSupplier(() -> {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
@ -84,7 +88,8 @@ public class ActorStateManager {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value));
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value, expiration));
return true;
}
@ -95,7 +100,8 @@ public class ActorStateManager {
throw new IllegalStateException("Duplicate state: " + stateName);
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value));
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value, expiration));
return true;
}))
.then();
@ -130,6 +136,10 @@ public class ActorStateManager {
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.isExpired()) {
throw new NoSuchElementException("State is expired: " + stateName);
}
if (metadata.kind == ActorStateChangeKind.REMOVE) {
throw new NoSuchElementException("State is marked for removal: " + stateName);
}
@ -142,20 +152,37 @@ public class ActorStateManager {
this.stateProvider.load(this.actorTypeName, this.actorId, stateName, type)
.switchIfEmpty(Mono.error(new NoSuchElementException("State not found: " + stateName)))
.map(v -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v));
return (T) v;
this.stateChangeTracker.put(
stateName, new StateChangeMetadata(ActorStateChangeKind.NONE, v.getValue(), v.getExpiration()));
return (T) v.getValue();
}));
}
/**
* Updates a given key/value pair in the state store's cache.
* Use the variation that takes in an TTL instead.
*
* @param stateName Name of the state being updated.
* @param value Value to be set for given state.
* @param <T> Type of the value being set.
* @return Asynchronous void result.
*/
@Deprecated
public <T> Mono<Void> set(String stateName, T value) {
return this.set(stateName, value, Duration.ZERO);
}
/**
* Updates a given key/value pair in the state store's cache.
* Using TTL is highly recommended to avoid state to be left in the state store forever.
*
* @param stateName Name of the state being updated.
* @param value Value to be set for given state.
* @param ttl Time to live.
* @param <T> Type of the value being set.
* @return Asynchronous void result.
*/
public <T> Mono<Void> set(String stateName, T value, Duration ttl) {
return Mono.fromSupplier(() -> {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
@ -165,11 +192,12 @@ public class ActorStateManager {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
ActorStateChangeKind kind = metadata.kind;
if ((kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
if (metadata.isExpired() || (kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
kind = ActorStateChangeKind.UPDATE;
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value));
var expiration = buildExpiration(ttl);
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value, expiration));
return true;
}
@ -177,8 +205,10 @@ public class ActorStateManager {
}).filter(x -> x)
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
.map(exists -> {
var expiration = buildExpiration(ttl);
this.stateChangeTracker.put(stateName,
new StateChangeMetadata(exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value));
new StateChangeMetadata(
exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value, expiration));
return exists;
}))
.then();
@ -208,7 +238,7 @@ public class ActorStateManager {
return true;
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
return true;
}
@ -218,7 +248,7 @@ public class ActorStateManager {
.switchIfEmpty(this.stateProvider.contains(this.actorTypeName, this.actorId, stateName))
.filter(exists -> exists)
.map(exists -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null, null));
return exists;
})
.then();
@ -239,7 +269,7 @@ public class ActorStateManager {
return this.stateChangeTracker.get(stateName);
}
).map(metadata -> {
if (metadata.kind == ActorStateChangeKind.REMOVE) {
if (metadata.isExpired() || (metadata.kind == ActorStateChangeKind.REMOVE)) {
return Boolean.FALSE;
}
@ -264,7 +294,8 @@ public class ActorStateManager {
continue;
}
changes.add(new ActorStateChange(tuple.getKey(), tuple.getValue().value, tuple.getValue().kind));
var actorState = new ActorState<>(tuple.getKey(), tuple.getValue().value, tuple.getValue().expiration);
changes.add(new ActorStateChange(actorState, tuple.getValue().kind));
}
return changes.toArray(new ActorStateChange[0]);
@ -288,12 +319,17 @@ public class ActorStateManager {
if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.remove(stateName);
} else {
StateChangeMetadata metadata = new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value);
StateChangeMetadata metadata =
new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value, tuple.getValue().expiration);
this.stateChangeTracker.put(stateName, metadata);
}
}
}
private static Instant buildExpiration(Duration ttl) {
return (ttl != null) && !ttl.isNegative() && !ttl.isZero() ? Instant.now().plus(ttl) : null;
}
/**
* Internal class to represent value and change kind.
*/
@ -309,15 +345,26 @@ public class ActorStateManager {
*/
private final Object value;
/**
* Expiration.
*/
private final Instant expiration;
/**
* Creates a new instance of the metadata on state change.
*
* @param kind Kind of change.
* @param value Value to be set.
* @param expiration When the value is set to expire (recommended but accepts null).
*/
private StateChangeMetadata(ActorStateChangeKind kind, Object value) {
private StateChangeMetadata(ActorStateChangeKind kind, Object value, Instant expiration) {
this.kind = kind;
this.value = value;
this.expiration = expiration;
}
private boolean isExpired() {
return (this.expiration != null) && Instant.now().isAfter(this.expiration);
}
}
}

View File

@ -25,28 +25,19 @@ final class ActorStateOperation {
private String operationType;
/**
* Key for the state to be persisted.
* State to be persisted.
*/
private String key;
/**
* Value of the state to be persisted.
*/
private Object value;
private ActorState state;
/**
* Instantiates a new Actor Timer.
*
* @param operationType Type of state operation.
* @param key Key to be persisted.
* @param value Value to be persisted.
* @param state Key to be persisted.
*/
ActorStateOperation(String operationType,
String key,
Object value) {
ActorStateOperation(String operationType, ActorState state) {
this.operationType = operationType;
this.key = key;
this.value = value;
this.state = state;
}
/**
@ -59,20 +50,12 @@ final class ActorStateOperation {
}
/**
* Gets the key to be persisted.
* Gets the state to be persisted.
*
* @return Key to be persisted.
* @return State to be persisted.
*/
public String getKey() {
return key;
public ActorState getState() {
return state;
}
/**
* Gets the value to be persisted.
*
* @return Value to be persisted.
*/
public Object getValue() {
return value;
}
}

View File

@ -28,9 +28,9 @@ interface DaprClient {
* @param actorType Type of actor.
* @param actorId Actor Identifier.
* @param keyName State name.
* @return Asynchronous result with current state value.
* @return Asynchronous result with current state.
*/
Mono<byte[]> getState(String actorType, String actorId, String keyName);
Mono<ActorState<byte[]>> getState(String actorType, String actorId, String keyName);
/**
* Saves state batch to Dapr.

View File

@ -29,6 +29,7 @@ import reactor.core.publisher.MonoSink;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@ -77,7 +78,7 @@ class DaprClientImpl implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<byte[]> getState(String actorType, String actorId, String keyName) {
public Mono<ActorState<byte[]>> getState(String actorType, String actorId, final String keyName) {
DaprProtos.GetActorStateRequest req =
DaprProtos.GetActorStateRequest.newBuilder()
.setActorType(actorType)
@ -86,7 +87,14 @@ class DaprClientImpl implements DaprClient {
.build();
return Mono.<DaprProtos.GetActorStateResponse>create(it ->
client.getActorState(req, createStreamObserver(it))).map(r -> r.getData().toByteArray());
client.getActorState(req, createStreamObserver(it))).map(r -> {
var expirationStr = r.getMetadataOrDefault("ttlExpireTime", null);
Instant expiration = null;
if ((expirationStr != null) && !expirationStr.isEmpty()) {
expiration = Instant.parse(expirationStr);
}
return new ActorState<>(keyName, r.getData().toByteArray(), expiration);
});
}
/**
@ -100,12 +108,26 @@ class DaprClientImpl implements DaprClient {
List<DaprProtos.TransactionalActorStateOperation> grpcOps = new ArrayList<>();
for (ActorStateOperation op : operations) {
String operationType = op.getOperationType();
String key = op.getKey();
Object value = op.getValue();
String key = op.getState().getName();
Object value = op.getState().getValue();
Instant expiration = op.getState().getExpiration();
Long ttlInSeconds = null;
if (expiration != null) {
ttlInSeconds = expiration.getEpochSecond() - Instant.now().getEpochSecond();
}
DaprProtos.TransactionalActorStateOperation.Builder opBuilder =
DaprProtos.TransactionalActorStateOperation.newBuilder()
.setOperationType(operationType)
.setKey(key);
if (ttlInSeconds != null) {
if (ttlInSeconds <= 0) {
// already expired, min is 1s.
ttlInSeconds = 1L;
}
opBuilder.putMetadata("ttlInSeconds", ttlInSeconds.toString());
}
if (value != null) {
if (value instanceof String) {
opBuilder.setValue(Any.newBuilder().setValue(ByteString.copyFrom((String) value, CHARSET)));

View File

@ -20,9 +20,13 @@ import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.io.IOException;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
/**
@ -67,8 +71,8 @@ class DaprStateAsyncProvider {
this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
}
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
Mono<byte[]> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
<T> Mono<ActorState<T>> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
Mono<ActorState<byte[]>> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
return result.flatMap(s -> {
try {
@ -76,19 +80,19 @@ class DaprStateAsyncProvider {
return Mono.empty();
}
T response = this.stateSerializer.deserialize(s, type);
T response = this.stateSerializer.deserialize(s.getValue(), type);
if (this.isStateSerializerDefault && (response instanceof byte[])) {
if (s.length == 0) {
if ((s.getValue() == null) || (s.getValue().length == 0)) {
return Mono.empty();
}
// Default serializer just passes through byte arrays, so we need to decode it here.
response = (T) OBJECT_MAPPER.readValue(s, byte[].class);
response = (T) OBJECT_MAPPER.readValue(s.getValue(), byte[].class);
}
if (response == null) {
return Mono.empty();
}
return Mono.just(response);
return Mono.just(new ActorState<>(s.getName(), response, s.getExpiration()));
} catch (IOException e) {
return Mono.error(new RuntimeException(e));
}
@ -96,8 +100,8 @@ class DaprStateAsyncProvider {
}
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
Mono<byte[]> result = this.daprClient.getState(actorType, actorId.toString(), stateName);
return result.map(s -> s.length > 0).defaultIfEmpty(false);
var result = this.daprClient.getState(actorType, actorId.toString(), stateName);
return result.map(s -> (s.getValue() != null) && (s.getValue().length > 0)).defaultIfEmpty(false);
}
/**
@ -139,14 +143,15 @@ class DaprStateAsyncProvider {
continue;
}
String key = stateChange.getStateName();
var state = stateChange.getState();
String key = state.getName();
Object value = null;
if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE)
|| (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
try {
byte[] data = this.stateSerializer.serialize(stateChange.getValue());
byte[] data = this.stateSerializer.serialize(state.getValue());
if (data != null) {
if (this.isStateSerializerDefault && !(stateChange.getValue() instanceof byte[])) {
if (this.isStateSerializerDefault && !(state.getValue() instanceof byte[])) {
// DefaultObjectSerializer is a JSON serializer, so we just pass it on.
value = new String(data, CHARSET);
} else {
@ -160,7 +165,7 @@ class DaprStateAsyncProvider {
}
}
operations.add(new ActorStateOperation(operationName, key, value));
operations.add(new ActorStateOperation(operationName, new ActorState(key, value, state.getExpiration())));
}
return this.daprClient.saveStateTransactionally(actorType, actorId.toString(), operations);

View File

@ -68,6 +68,10 @@ public class ActorStatefulTest {
Mono<String> setMessage(String message);
Mono<Boolean> setMessageFor1s(String message);
Mono<Boolean> setMessageAndWait(String message);
Mono<String> getMessage();
Mono<Boolean> hasMessage();
@ -197,7 +201,7 @@ public class ActorStatefulTest {
@Override
public Mono<Void> addMessage(String message) {
return super.getActorStateManager().add("message", message);
return super.getActorStateManager().add("message", message, null);
}
@Override
@ -205,6 +209,20 @@ public class ActorStatefulTest {
return super.getActorStateManager().set("message", message).thenReturn(executeSayMethod(message));
}
@Override
public Mono<Boolean> setMessageFor1s(String message) {
return super
.getActorStateManager().set("message", message, Duration.ofSeconds(1))
.then(super.getActorStateManager().contains("message"));
}
@Override
public Mono<Boolean> setMessageAndWait(String message) {
return super.getActorStateManager().set("message", message, Duration.ofSeconds(1))
.then(Mono.delay(Duration.ofMillis(1100)))
.then(super.getActorStateManager().contains("message"));
}
@Override
public Mono<String> getMessage() {
return super.getActorStateManager().get("message", String.class);
@ -223,20 +241,20 @@ public class ActorStatefulTest {
@Override
public Mono<Void> forceDuplicateException() {
// Second add should throw exception.
return super.getActorStateManager().add("message", "anything")
.then(super.getActorStateManager().add("message", "something else"));
return super.getActorStateManager().add("message", "anything", null)
.then(super.getActorStateManager().add("message", "something else", null));
}
@Override
public Mono<Void> forcePartialChange() {
return super.getActorStateManager().add("message", "first message")
return super.getActorStateManager().add("message", "first message", null)
.then(super.saveState())
.then(super.getActorStateManager().add("message", "second message"));
.then(super.getActorStateManager().add("message", "second message", null));
}
@Override
public Mono<Void> throwsWithoutSaving() {
return super.getActorStateManager().add("message", "first message")
return super.getActorStateManager().add("message", "first message", null)
.then(Mono.error(new IllegalCharsetNameException("random")));
}
@ -315,6 +333,50 @@ public class ActorStatefulTest {
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
}
@Test
public void actorStateTTL() throws Exception {
ActorProxy proxy = newActorProxy();
Assertions.assertEquals(
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
Assertions.assertTrue(
proxy.invokeMethod("setMessageFor1s", "hello world expires in 1s", Boolean.class).block());
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
Assertions.assertEquals(
"hello world expires in 1s", proxy.invokeMethod("getMessage", String.class).block());
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
Thread.sleep(1100);
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
}
@Test
public void actorStateTTLExpiresInLocalCache() throws Exception {
ActorProxy proxy = newActorProxy();
Assertions.assertEquals(
proxy.getActorId().toString(), proxy.invokeMethod("getIdString", String.class).block());
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
//First, sets a message without TTL and checks it is saved.
proxy.invokeMethod("setMessage", "hello world").block();
Assertions.assertTrue(proxy.invokeMethod("hasMessage", Boolean.class).block());
Assertions.assertEquals(
"hello world", proxy.invokeMethod("getMessage", String.class).block());
// Now, sets a message that expires still in local cache, before it is sent to state store.
Assertions.assertFalse(
proxy.invokeMethod("setMessageAndWait", "expires while still in cache", Boolean.class).block());
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
Thread.sleep(1100);
Assertions.assertFalse(proxy.invokeMethod("hasMessage", Boolean.class).block());
}
@Test
public void lazyGet() {
ActorProxy proxy = newActorProxy();

View File

@ -63,8 +63,8 @@ public class DaprGrpcClientTest {
private static final byte[] RESPONSE_PAYLOAD = "\"hello world\"".getBytes();
private static final List<ActorStateOperation> OPERATIONS = Arrays.asList(
new ActorStateOperation("upsert", "mykey", "hello world".getBytes()),
new ActorStateOperation("delete", "mykey", null));
new ActorStateOperation("upsert", new ActorState("mykey", "hello world".getBytes(), null)),
new ActorStateOperation("delete", new ActorState("mykey", null, null)));
private final DaprGrpc.DaprImplBase serviceImpl = new CustomDaprClient();
@ -92,7 +92,7 @@ public class DaprGrpcClientTest {
@Test
public void getActorStateException() {
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY);
Mono<ActorState<byte[]>> result = client.getState(ACTOR_TYPE, ACTOR_EXCEPTION, KEY);
assertThrowsDaprException(
ExecutionException.class,
"UNKNOWN",
@ -102,8 +102,8 @@ public class DaprGrpcClientTest {
@Test
public void getActorState() {
Mono<byte[]> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY);
assertArrayEquals(RESPONSE_PAYLOAD, result.block());
Mono<ActorState<byte[]>> result = client.getState(ACTOR_TYPE, ACTOR_ID, KEY);
assertArrayEquals(RESPONSE_PAYLOAD, result.block().getValue());
}
@Test
@ -130,8 +130,8 @@ public class DaprGrpcClientTest {
@Test
public void saveActorStateTransactionallyInvalidValueType() {
ActorStateOperation[] operations = new ActorStateOperation[]{
new ActorStateOperation("upsert", "mykey", 123),
new ActorStateOperation("delete", "mykey", null),
new ActorStateOperation("upsert", new ActorState("mykey", 123, null)),
new ActorStateOperation("delete", new ActorState("mykey", null, null)),
};
Mono<Void> result = client.saveStateTransactionally(ACTOR_TYPE, ACTOR_ID, Arrays.asList(operations));
@ -327,9 +327,9 @@ public class DaprGrpcClientTest {
for (ActorStateOperation operation : operations) {
boolean found = false;
for (DaprProtos.TransactionalActorStateOperation grpcOperation : argument.getOperationsList()) {
if (operation.getKey().equals(grpcOperation.getKey())
if (operation.getState().getName().equals(grpcOperation.getKey())
&& operation.getOperationType().equals(grpcOperation.getOperationType())
&& nullableEquals(operation.getValue(), grpcOperation.getValue())) {
&& nullableEquals(operation.getState().getValue(), grpcOperation.getValue())) {
found = true;
break;
}

View File

@ -19,6 +19,7 @@ import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@ -27,7 +28,7 @@ import java.util.Map;
*/
public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
private static final Map<String, byte[]> stateStore = new HashMap<>();
private static final Map<String, ActorState<byte[]>> stateStore = new HashMap<>();
private final DaprObjectSerializer serializer;
@ -37,7 +38,7 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
}
@Override
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
<T> Mono<ActorState<T>> load(String actorType, ActorId actorId, String stateName, TypeRef<T> type) {
return Mono.fromSupplier(() -> {
try {
String stateId = this.buildId(actorType, actorId, stateName);
@ -45,16 +46,38 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
throw new IllegalStateException("State not found.");
}
return this.serializer.deserialize(this.stateStore.get(stateId), type);
var state = this.stateStore.get(stateId);
if (state.getExpiration() != null) {
if (!state.getExpiration().isAfter(Instant.now())) {
throw new IllegalStateException("State expired.");
}
}
var v = this.serializer.deserialize(state.getValue(), type);
return new ActorState<>(stateName, v, state.getExpiration());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
private boolean contains(String stateId) {
if (!stateStore.containsKey(stateId)) {
return false;
}
var state = this.stateStore.get(stateId);
if (state.getExpiration() != null) {
if (!state.getExpiration().isAfter(Instant.now())) {
return false;
}
}
return true;
}
@Override
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
return Mono.fromSupplier(() -> stateStore.containsKey(this.buildId(actorType, actorId, stateName)));
return Mono.fromSupplier(() -> contains(this.buildId(actorType, actorId, stateName)));
}
@Override
@ -62,15 +85,16 @@ public class DaprInMemoryStateProvider extends DaprStateAsyncProvider {
return Mono.fromRunnable(() -> {
try {
for (ActorStateChange stateChange : stateChanges) {
String stateId = buildId(actorType, actorId, stateChange.getStateName());
String stateId = buildId(actorType, actorId, stateChange.getState().getName());
switch (stateChange.getChangeKind()) {
case REMOVE:
stateStore.remove(stateId);
break;
case ADD:
case UPDATE:
byte[] raw = this.serializer.serialize(stateChange.getValue());
stateStore.put(stateId, raw);
byte[] raw = this.serializer.serialize(stateChange.getState().getValue());
stateStore.put(stateId,
new ActorState<>(stateChange.getState().getName(), raw, stateChange.getState().getExpiration()));
break;
}
}

View File

@ -110,13 +110,13 @@ public class DaprStateAsyncProviderTest {
if (operation.getOperationType() == null) {
return false;
}
if (operation.getKey() == null) {
if (operation.getState().getName() == null) {
return false;
}
String opName = operation.getOperationType();
String key = operation.getKey();
Object value = operation.getValue();
String key = operation.getState().getName();
Object value = operation.getState().getValue();
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
@ -153,58 +153,59 @@ public class DaprStateAsyncProviderTest {
DaprClient daprClient = mock(DaprClient.class);
when(daprClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just(SERIALIZER.serialize("Jon Doe")));
.thenReturn(Mono.just(new ActorState<>("name", SERIALIZER.serialize("Jon Doe"))));
when(daprClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just(SERIALIZER.serialize(98021)));
.thenReturn(Mono.just(new ActorState<>("zipcode", SERIALIZER.serialize(98021))));
when(daprClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just(SERIALIZER.serialize(98)));
.thenReturn(Mono.just(new ActorState<>("goals", SERIALIZER.serialize(98))));
when(daprClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just(SERIALIZER.serialize(46.55)));
.thenReturn(Mono.just(new ActorState<>("balance", SERIALIZER.serialize(46.55))));
when(daprClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just(SERIALIZER.serialize(true)));
.thenReturn(Mono.just(new ActorState<>("active", SERIALIZER.serialize(true))));
when(daprClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes()));
.thenReturn(Mono.just(new ActorState<>("customer", "{ \"id\": 1000, \"name\": \"Roxane\"}".getBytes())));
when(daprClient
.getState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}".getBytes()));
.thenReturn(Mono.just(new ActorState<>("anotherCustomer", "{ \"id\": 2000, \"name\": \"Max\"}".getBytes())));
when(daprClient
.getState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.empty());
when(daprClient
.getState(any(), any(), eq("bytes")))
.thenReturn(Mono.just("\"QQ==\"".getBytes()));
.thenReturn(Mono.just(new ActorState<>("bytes", "\"QQ==\"".getBytes())));
when(daprClient
.getState(any(), any(), eq("emptyBytes")))
.thenReturn(Mono.just(new byte[0]));
.thenReturn(Mono.just(new ActorState<>("emptyBytes", new byte[0])));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprClient, SERIALIZER);
Assertions.assertEquals("Jon Doe",
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block());
provider.load("MyActor", new ActorId("123"), "name", TypeRef.STRING).block().getValue());
Assertions.assertEquals(98021,
(int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "zipcode", TypeRef.INT).block().getValue());
Assertions.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue());
Assertions.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block());
(int) provider.load("MyActor", new ActorId("123"), "goals", TypeRef.INT).block().getValue());
Assertions.assertEquals(46.55,
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block(),
(double) provider.load("MyActor", new ActorId("123"), "balance", TypeRef.DOUBLE).block().getValue(),
EPSILON);
Assertions.assertEquals(true,
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block());
(boolean) provider.load("MyActor", new ActorId("123"), "active", TypeRef.BOOLEAN).block().getValue());
Assertions.assertEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block());
provider.load("MyActor", new ActorId("123"), "customer", TypeRef.get(Customer.class)).block().getValue());
Assertions.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block());
provider.load(
"MyActor", new ActorId("123"), "anotherCustomer", TypeRef.get(Customer.class)).block().getValue());
Assertions.assertNull(
provider.load("MyActor", new ActorId("123"), "nullCustomer", TypeRef.get(Customer.class)).block());
Assertions.assertArrayEquals("A".getBytes(),
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block());
provider.load("MyActor", new ActorId("123"), "bytes", TypeRef.get(byte[].class)).block().getValue());
Assertions.assertNull(
provider.load("MyActor", new ActorId("123"), "emptyBytes", TypeRef.get(byte[].class)).block());
}
@ -216,22 +217,28 @@ public class DaprStateAsyncProviderTest {
// Keys that exists.
when(daprClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("name", "Jon Doe".getBytes())));
when(daprClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("zipcode", "98021".getBytes())));
when(daprClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("goals", "98".getBytes())));
when(daprClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("balance", "46.55".getBytes())));
when(daprClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just("true".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("active", "true".getBytes())));
when(daprClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes()));
.thenReturn(Mono.just(
new ActorState<>("customer", "{ \"id\": \"3000\", \"name\": \"Ely\" }".getBytes())));
// Keys that do not exist.
when(daprClient
@ -257,15 +264,15 @@ public class DaprStateAsyncProviderTest {
Assertions.assertFalse(provider.contains("MyActor", new ActorId("123"), null).block());
}
private final <T> ActorStateChange createInsertChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.ADD);
private <T> ActorStateChange createInsertChange(String name, T value) {
return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.ADD);
}
private final <T> ActorStateChange createUpdateChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE);
private <T> ActorStateChange createUpdateChange(String name, T value) {
return new ActorStateChange(new ActorState(name, value), ActorStateChangeKind.UPDATE);
}
private final ActorStateChange createDeleteChange(String name) {
return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE);
private ActorStateChange createDeleteChange(String name) {
return new ActorStateChange(new ActorState(name, null), ActorStateChangeKind.REMOVE);
}
}

View File

@ -0,0 +1,26 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mysql-actorstatestore
spec:
type: state.mysql
version: v1
metadata:
- name: connectionString
value: "root:test@tcp(127.0.0.1:3306)/?allowNativePasswords=true"
- name: cleanupIntervalInSeconds
value: "1"
- name: actorStateStore
value: "true"
scopes:
- actorstateit-statefulactorservice
- activationdeactivationit-demoactorservice
- actorexceptionit-myactorservice
- actormethodnameit-myactorservice
- actorreminderfailoveritone-myactorservice
- actorreminderfailoverittwo-myactorservice
- actorreminderrecoveryit
- actorsdkresiliencytit-demoactorservice
- actorstateit-statefulactorservice
- actortimerrecoveryit
- actorturnbasedconcurrencyit-myactorservice

View File

@ -10,5 +10,3 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"

View File

@ -7,3 +7,6 @@ spec:
samplingRate: "1"
zipkin:
endpointAddress: http://localhost:9411/api/v2/spans
features:
- name: ActorStateTTL
enabled: true

View File

@ -26,3 +26,10 @@ services:
image: mongo
ports:
- "27017:27017"
mysql:
image: mysql
environment:
MYSQL_ROOT_PASSWORD: test
ports:
- '3306:3306'

View File

@ -169,4 +169,79 @@ public class ActorStateIT extends BaseIT {
assertArrayEquals(bytes, result);
}, 5000);
}
@ParameterizedTest
@MethodSource("data")
public void stateTTL(AppRun.AppProtocol serviceAppProtocol) throws Exception {
logger.debug("Starting actor runtime ...");
// The call below will fail if service cannot start successfully.
DaprRun runtime = startDaprApp(
this.getClass().getSimpleName(),
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000,
serviceAppProtocol);
String message = "This is a message to be saved and retrieved.";
String name = "Jon Doe";
byte[] bytes = new byte[] { 0x1 };
ActorId actorId = new ActorId(
String.format("%d-%b-state-ttl", System.currentTimeMillis(), serviceAppProtocol));
String actorType = "StatefulActorTest";
logger.debug("Building proxy ...");
ActorProxyBuilder<ActorProxy> proxyBuilder =
new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient());
ActorProxy proxy = proxyBuilder.build(actorId);
// wating for actor to be activated
Thread.sleep(2000);
// Validate conditional read works.
callWithRetry(() -> {
logger.debug("Invoking readMessage where data is not present yet ... ");
String result = proxy.invokeMethod("readMessage", String.class).block();
assertNull(result);
}, 5000);
callWithRetry(() -> {
logger.debug("Invoking writeMessageFor1s ... ");
proxy.invokeMethod("writeMessageFor1s", message).block();
}, 5000);
callWithRetry(() -> {
logger.debug("Invoking readMessage where data is probably still cached ... ");
String result = proxy.invokeMethod("readMessage", String.class).block();
assertEquals(message, result);
}, 5000);
logger.debug("Waiting, so actor can be deactivated ...");
Thread.sleep(10000);
logger.debug("Stopping service ...");
runtime.stop();
logger.debug("Starting service ...");
DaprRun run2 = startDaprApp(
this.getClass().getSimpleName(),
StatefulActorService.SUCCESS_MESSAGE,
StatefulActorService.class,
true,
60000,
serviceAppProtocol);
// Need new proxy builder because the proxy builder holds the channel.
proxyBuilder = new ActorProxyBuilder(actorType, ActorProxy.class, newActorClient());
ActorProxy newProxy = proxyBuilder.build(actorId);
// waiting for actor to be activated
Thread.sleep(2000);
callWithRetry(() -> {
logger.debug("Invoking readMessage where data is not cached and expired ... ");
String result = newProxy.invokeMethod("readMessage", String.class).block();
assertNull(result);
}, 5000);
}
}

View File

@ -17,6 +17,8 @@ public interface StatefulActor {
void writeMessage(String something);
void writeMessageFor1s(String something);
String readMessage();
void writeName(String something);

View File

@ -18,6 +18,8 @@ import io.dapr.actors.ActorType;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.ActorRuntimeContext;
import java.time.Duration;
@ActorType(name = "StatefulActorTest")
public class StatefulActorImpl extends AbstractActor implements StatefulActor {
@ -30,6 +32,11 @@ public class StatefulActorImpl extends AbstractActor implements StatefulActor {
super.getActorStateManager().set("message", something).block();
}
@Override
public void writeMessageFor1s(String something) {
super.getActorStateManager().set("message", something, Duration.ofSeconds(1)).block();
}
@Override
public String readMessage() {
if (super.getActorStateManager().contains("message").block()) {