mirror of https://github.com/grpc/grpc-java.git
core: Make getTransport's fast path lock-free
This change exposed a pre-existing bug where shutdownNow wasn't called for decommissionedTransports. The bug is fixed and a test added in this commit. Fixes #2120
This commit is contained in:
parent
3d4ae36074
commit
6907d81109
|
|
@ -63,10 +63,10 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
|
@ -139,14 +139,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
private LoadBalancer<ClientTransport> loadBalancer;
|
||||
|
||||
/**
|
||||
* Maps EquivalentAddressGroups to transports for that server.
|
||||
* Maps EquivalentAddressGroups to transports for that server. "lock" must be held when mutating.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
private final Map<EquivalentAddressGroup, TransportSet> transports =
|
||||
new HashMap<EquivalentAddressGroup, TransportSet>();
|
||||
// Even though we set a concurrency level of 1, this is better than Collections.synchronizedMap
|
||||
// because it doesn't need to acquire a lock for reads.
|
||||
private final ConcurrentMap<EquivalentAddressGroup, TransportSet> transports =
|
||||
new ConcurrentHashMap<EquivalentAddressGroup, TransportSet>(16, .75f, 1);
|
||||
|
||||
/**
|
||||
* TransportSets that are shutdown (but not yet terminated) due to channel idleness.
|
||||
* TransportSets that are shutdown (but not yet terminated) due to channel idleness or channel
|
||||
* shut down.
|
||||
*/
|
||||
@GuardedBy("lock")
|
||||
private final HashSet<TransportSet> decommissionedTransports = new HashSet<TransportSet>();
|
||||
|
|
@ -425,6 +427,8 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
maybeTerminateChannel();
|
||||
if (!terminated) {
|
||||
transportsCopy.addAll(transports.values());
|
||||
transports.clear();
|
||||
decommissionedTransports.addAll(transportsCopy);
|
||||
delayedTransportsCopy.addAll(delayedTransports);
|
||||
oobTransportsCopy.addAll(oobTransports);
|
||||
}
|
||||
|
|
@ -472,6 +476,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
List<OobTransportProviderImpl> oobTransportsCopy;
|
||||
synchronized (lock) {
|
||||
transportsCopy = new ArrayList<TransportSet>(transports.values());
|
||||
transportsCopy.addAll(decommissionedTransports);
|
||||
delayedTransportsCopy = new ArrayList<DelayedClientTransport>(delayedTransports);
|
||||
oobTransportsCopy = new ArrayList<OobTransportProviderImpl>(oobTransports);
|
||||
}
|
||||
|
|
@ -589,7 +594,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI
|
|||
@Override
|
||||
public ClientTransport getTransport(final EquivalentAddressGroup addressGroup) {
|
||||
checkNotNull(addressGroup, "addressGroup");
|
||||
TransportSet ts;
|
||||
TransportSet ts = transports.get(addressGroup);
|
||||
if (ts != null) {
|
||||
return ts.obtainActiveTransport();
|
||||
}
|
||||
synchronized (lock) {
|
||||
if (shutdown) {
|
||||
return SHUTDOWN_TRANSPORT;
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ import io.grpc.MethodDescriptor;
|
|||
import io.grpc.NameResolver;
|
||||
import io.grpc.ResolvedServerInfo;
|
||||
import io.grpc.ResolvedServerInfoGroup;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.TransportManager.InterimTransport;
|
||||
import io.grpc.TransportManager.OobTransportProvider;
|
||||
|
|
@ -303,6 +304,10 @@ public class ManagedChannelImplIdlenessTest {
|
|||
|
||||
channel.shutdown();
|
||||
verify(t1.transport).shutdown();
|
||||
channel.shutdownNow();
|
||||
verify(t0.transport).shutdownNow(any(Status.class));
|
||||
verify(t1.transport).shutdownNow(any(Status.class));
|
||||
|
||||
t1.listener.transportTerminated();
|
||||
assertFalse(channel.isTerminated());
|
||||
t0.listener.transportTerminated();
|
||||
|
|
|
|||
Loading…
Reference in New Issue