diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java index d9a31c41a..832a1203a 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntime.java @@ -18,9 +18,8 @@ import reactor.core.publisher.Mono; import java.io.Closeable; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Contains methods to register actor types. Registering the types allows the @@ -66,7 +65,7 @@ public class ActorRuntime implements Closeable { /** * Map of ActorType --> ActorManager. */ - private final Map actorManagers; + private final ConcurrentMap actorManagers; /** * The default constructor. This should not be called directly. @@ -100,7 +99,7 @@ public class ActorRuntime implements Closeable { } this.config = new ActorRuntimeConfig(); - this.actorManagers = Collections.synchronizedMap(new HashMap<>()); + this.actorManagers = new ConcurrentHashMap<>(); this.daprClient = daprClient; this.channel = channel; } @@ -138,7 +137,7 @@ public class ActorRuntime implements Closeable { * @throws IOException If cannot serialize config. */ public byte[] serializeConfig() throws IOException { - return this.INTERNAL_SERIALIZER.serialize(this.config); + return INTERNAL_SERIALIZER.serialize(this.config); } /** @@ -207,17 +206,18 @@ public class ActorRuntime implements Closeable { ActorTypeInformation actorTypeInfo = ActorTypeInformation.create(clazz); - ActorRuntimeContext context = new ActorRuntimeContext<>( - this, - objectSerializer, - actorFactory, - actorTypeInfo, - this.daprClient, - new DaprStateAsyncProvider(this.daprClient, stateSerializer)); - - // Create ActorManagers, override existing entry if registered again. - this.actorManagers.put(actorTypeInfo.getName(), new ActorManager(context)); - this.config.addRegisteredActorType(actorTypeInfo.getName()); + // Create ActorManager, if not yet registered. + this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> { + ActorRuntimeContext context = new ActorRuntimeContext<>( + this, + objectSerializer, + actorFactory, + actorTypeInfo, + this.daprClient, + new DaprStateAsyncProvider(this.daprClient, stateSerializer)); + this.config.addRegisteredActorType(actorTypeInfo.getName()); + return new ActorManager(context); + }); } /** diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeConfig.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeConfig.java index 2a88ebce6..e10f87fbe 100644 --- a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeConfig.java +++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorRuntimeConfig.java @@ -6,24 +6,25 @@ package io.dapr.actors.runtime; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * Represents the configuration for the Actor Runtime. */ public class ActorRuntimeConfig { - private Collection registeredActorTypes = new ArrayList<>(); + private List registeredActorTypes = new CopyOnWriteArrayList<>(); - private Duration actorIdleTimeout; + private volatile Duration actorIdleTimeout; - private Duration actorScanInterval; + private volatile Duration actorScanInterval; - private Duration drainOngoingCallTimeout; + private volatile Duration drainOngoingCallTimeout; - private Boolean drainBalancedActors; + private volatile Boolean drainBalancedActors; /** * Instantiates a new config for the Actor Runtime.