Improve concurrent data structures in ActorRuntime (#479)

* Switch ActorManager registry to CHM in ActorRuntime

Synchronized HashMap may lead to unnecessary contention during
ActorManager lookups.

* Change ActorRuntimeConfig to be thread-safe
This commit is contained in:
Andrey Pechkurov 2021-02-04 20:45:33 +03:00 committed by GitHub
parent 5147ad0202
commit b7eb723445
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 23 deletions

View File

@ -18,9 +18,8 @@ import reactor.core.publisher.Mono;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap; import java.util.concurrent.ConcurrentMap;
import java.util.Map;
/** /**
* Contains methods to register actor types. Registering the types allows the * Contains methods to register actor types. Registering the types allows the
@ -66,7 +65,7 @@ public class ActorRuntime implements Closeable {
/** /**
* Map of ActorType --> ActorManager. * Map of ActorType --> ActorManager.
*/ */
private final Map<String, ActorManager> actorManagers; private final ConcurrentMap<String, ActorManager> actorManagers;
/** /**
* The default constructor. This should not be called directly. * The default constructor. This should not be called directly.
@ -100,7 +99,7 @@ public class ActorRuntime implements Closeable {
} }
this.config = new ActorRuntimeConfig(); this.config = new ActorRuntimeConfig();
this.actorManagers = Collections.synchronizedMap(new HashMap<>()); this.actorManagers = new ConcurrentHashMap<>();
this.daprClient = daprClient; this.daprClient = daprClient;
this.channel = channel; this.channel = channel;
} }
@ -138,7 +137,7 @@ public class ActorRuntime implements Closeable {
* @throws IOException If cannot serialize config. * @throws IOException If cannot serialize config.
*/ */
public byte[] serializeConfig() throws IOException { public byte[] serializeConfig() throws IOException {
return this.INTERNAL_SERIALIZER.serialize(this.config); return INTERNAL_SERIALIZER.serialize(this.config);
} }
/** /**
@ -207,17 +206,18 @@ public class ActorRuntime implements Closeable {
ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz); ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz);
ActorRuntimeContext<T> context = new ActorRuntimeContext<>( // Create ActorManager, if not yet registered.
this, this.actorManagers.computeIfAbsent(actorTypeInfo.getName(), (k) -> {
objectSerializer, ActorRuntimeContext<T> context = new ActorRuntimeContext<>(
actorFactory, this,
actorTypeInfo, objectSerializer,
this.daprClient, actorFactory,
new DaprStateAsyncProvider(this.daprClient, stateSerializer)); actorTypeInfo,
this.daprClient,
// Create ActorManagers, override existing entry if registered again. new DaprStateAsyncProvider(this.daprClient, stateSerializer));
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context)); this.config.addRegisteredActorType(actorTypeInfo.getName());
this.config.addRegisteredActorType(actorTypeInfo.getName()); return new ActorManager<T>(context);
});
} }
/** /**

View File

@ -6,24 +6,25 @@
package io.dapr.actors.runtime; package io.dapr.actors.runtime;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* Represents the configuration for the Actor Runtime. * Represents the configuration for the Actor Runtime.
*/ */
public class ActorRuntimeConfig { public class ActorRuntimeConfig {
private Collection<String> registeredActorTypes = new ArrayList<>(); private List<String> registeredActorTypes = new CopyOnWriteArrayList<>();
private Duration actorIdleTimeout; private volatile Duration actorIdleTimeout;
private Duration actorScanInterval; private volatile Duration actorScanInterval;
private Duration drainOngoingCallTimeout; private volatile Duration drainOngoingCallTimeout;
private Boolean drainBalancedActors; private volatile Boolean drainBalancedActors;
/** /**
* Instantiates a new config for the Actor Runtime. * Instantiates a new config for the Actor Runtime.