Fix the use of scheduler pools in BinderServer. (#8210)

Switch to using scheduled pools in BinderServer.
This commit is contained in:
markb74 2021-05-27 13:37:22 +02:00 committed by GitHub
parent bfcba82dd5
commit f88d362bc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 51 additions and 29 deletions

View File

@ -28,10 +28,13 @@ import io.grpc.binder.HostServices;
import io.grpc.binder.InboundParcelablePolicy; import io.grpc.binder.InboundParcelablePolicy;
import io.grpc.binder.SecurityPolicies; import io.grpc.binder.SecurityPolicies;
import io.grpc.internal.AbstractTransportTest; import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.junit.After; import org.junit.After;
import org.junit.Ignore; import org.junit.Ignore;
@ -48,8 +51,8 @@ import org.junit.runner.RunWith;
public final class BinderTransportTest extends AbstractTransportTest { public final class BinderTransportTest extends AbstractTransportTest {
private final Context appContext = ApplicationProvider.getApplicationContext(); private final Context appContext = ApplicationProvider.getApplicationContext();
private final ScheduledExecutorService scheduledExecutorService = private final ObjectPool<ScheduledExecutorService> executorServicePool =
Executors.newScheduledThreadPool(2); SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
@Override @Override
@After @After
@ -63,7 +66,7 @@ public final class BinderTransportTest extends AbstractTransportTest {
AndroidComponentAddress addr = HostServices.allocateService(appContext); AndroidComponentAddress addr = HostServices.allocateService(appContext);
BinderServer binderServer = new BinderServer(addr, BinderServer binderServer = new BinderServer(addr,
scheduledExecutorService, executorServicePool,
streamTracerFactories, streamTracerFactories,
SecurityPolicies.serverInternalOnly(), SecurityPolicies.serverInternalOnly(),
InboundParcelablePolicy.DEFAULT); InboundParcelablePolicy.DEFAULT);
@ -95,8 +98,8 @@ public final class BinderTransportTest extends AbstractTransportTest {
addr, addr,
BindServiceFlags.DEFAULTS, BindServiceFlags.DEFAULTS,
ContextCompat.getMainExecutor(appContext), ContextCompat.getMainExecutor(appContext),
scheduledExecutorService, executorServicePool,
MoreExecutors.directExecutor(), new FixedObjectPool<>(MoreExecutors.directExecutor()),
SecurityPolicies.internalOnly(), SecurityPolicies.internalOnly(),
InboundParcelablePolicy.DEFAULT, InboundParcelablePolicy.DEFAULT,
eagAttrs()); eagAttrs());

View File

@ -32,6 +32,7 @@ import io.grpc.binder.InboundParcelablePolicy;
import io.grpc.binder.ServerSecurityPolicy; import io.grpc.binder.ServerSecurityPolicy;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer; import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener; import io.grpc.internal.ServerListener;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import java.io.IOException; import java.io.IOException;
@ -53,8 +54,7 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe @ThreadSafe
final class BinderServer implements InternalServer, LeakSafeOneWayBinder.TransactionHandler { final class BinderServer implements InternalServer, LeakSafeOneWayBinder.TransactionHandler {
private final boolean useSharedTimer; private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ScheduledExecutorService executorService;
private final ImmutableList<ServerStreamTracer.Factory> streamTracerFactories; private final ImmutableList<ServerStreamTracer.Factory> streamTracerFactories;
private final AndroidComponentAddress listenAddress; private final AndroidComponentAddress listenAddress;
private final LeakSafeOneWayBinder hostServiceBinder; private final LeakSafeOneWayBinder hostServiceBinder;
@ -64,19 +64,20 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac
@GuardedBy("this") @GuardedBy("this")
private ServerListener listener; private ServerListener listener;
@GuardedBy("this")
private ScheduledExecutorService executorService;
@GuardedBy("this") @GuardedBy("this")
private boolean shutdown; private boolean shutdown;
BinderServer( BinderServer(
AndroidComponentAddress listenAddress, AndroidComponentAddress listenAddress,
@Nullable ScheduledExecutorService executorService, ObjectPool<ScheduledExecutorService> executorServicePool,
List<? extends ServerStreamTracer.Factory> streamTracerFactories, List<? extends ServerStreamTracer.Factory> streamTracerFactories,
ServerSecurityPolicy serverSecurityPolicy, ServerSecurityPolicy serverSecurityPolicy,
InboundParcelablePolicy inboundParcelablePolicy) { InboundParcelablePolicy inboundParcelablePolicy) {
this.listenAddress = listenAddress; this.listenAddress = listenAddress;
useSharedTimer = executorService == null; this.executorServicePool = executorServicePool;
this.executorService =
useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : executorService;
this.streamTracerFactories = this.streamTracerFactories =
ImmutableList.copyOf(checkNotNull(streamTracerFactories, "streamTracerFactories")); ImmutableList.copyOf(checkNotNull(streamTracerFactories, "streamTracerFactories"));
this.serverSecurityPolicy = checkNotNull(serverSecurityPolicy, "serverSecurityPolicy"); this.serverSecurityPolicy = checkNotNull(serverSecurityPolicy, "serverSecurityPolicy");
@ -92,6 +93,7 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac
@Override @Override
public synchronized void start(ServerListener serverListener) throws IOException { public synchronized void start(ServerListener serverListener) throws IOException {
this.listener = serverListener; this.listener = serverListener;
executorService = executorServicePool.getObject();
} }
@Override @Override
@ -119,14 +121,10 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac
public synchronized void shutdown() { public synchronized void shutdown() {
if (!shutdown) { if (!shutdown) {
shutdown = true; shutdown = true;
if (useSharedTimer) {
// TODO: Transports may still be using this resource. They should
// be managing its use as well.
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, executorService);
}
// Break the connection to the binder. We'll receive no more transactions. // Break the connection to the binder. We'll receive no more transactions.
hostServiceBinder.detach(); hostServiceBinder.detach();
listener.serverShutdown(); listener.serverShutdown();
executorService = executorServicePool.returnObject(executorService);
} }
} }
@ -156,7 +154,7 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac
// Create a new transport and let our listener know about it. // Create a new transport and let our listener know about it.
BinderTransport.BinderServerTransport transport = BinderTransport.BinderServerTransport transport =
new BinderTransport.BinderServerTransport( new BinderTransport.BinderServerTransport(
executorService, attrsBuilder.build(), streamTracerFactories, callbackBinder); executorServicePool, attrsBuilder.build(), streamTracerFactories, callbackBinder);
transport.setServerTransportListener(listener.transportCreated(transport)); transport.setServerTransportListener(listener.transportCreated(transport));
return true; return true;
} }

View File

@ -52,6 +52,7 @@ import io.grpc.internal.FailingClientStream;
import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcAttributes;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
@ -169,6 +170,7 @@ abstract class BinderTransport
// receive any data. // receive any data.
} }
private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final InternalLogId logId; private final InternalLogId logId;
private final LeakSafeOneWayBinder incomingBinder; private final LeakSafeOneWayBinder incomingBinder;
@ -206,12 +208,13 @@ abstract class BinderTransport
private volatile boolean transmitWindowFull; private volatile boolean transmitWindowFull;
private BinderTransport( private BinderTransport(
ScheduledExecutorService scheduledExecutorService, ObjectPool<ScheduledExecutorService> executorServicePool,
Attributes attributes, Attributes attributes,
InternalLogId logId) { InternalLogId logId) {
this.scheduledExecutorService = scheduledExecutorService; this.executorServicePool = executorServicePool;
this.attributes = attributes; this.attributes = attributes;
this.logId = logId; this.logId = logId;
scheduledExecutorService = executorServicePool.getObject();
incomingBinder = new LeakSafeOneWayBinder(this); incomingBinder = new LeakSafeOneWayBinder(this);
ongoingCalls = new ConcurrentHashMap<>(); ongoingCalls = new ConcurrentHashMap<>();
numOutgoingBytes = new AtomicLong(); numOutgoingBytes = new AtomicLong();
@ -250,6 +253,10 @@ abstract class BinderTransport
abstract void notifyTerminated(); abstract void notifyTerminated();
void releaseExecutors() {
executorServicePool.returnObject(scheduledExecutorService);
}
@GuardedBy("this") @GuardedBy("this")
boolean inState(TransportState transportState) { boolean inState(TransportState transportState) {
return this.transportState == transportState; return this.transportState == transportState;
@ -304,6 +311,7 @@ abstract class BinderTransport
} }
} }
notifyTerminated(); notifyTerminated();
releaseExecutors();
}); });
} }
} }
@ -539,7 +547,8 @@ abstract class BinderTransport
static final class BinderClientTransport extends BinderTransport static final class BinderClientTransport extends BinderTransport
implements ConnectionClientTransport, Bindable.Observer { implements ConnectionClientTransport, Bindable.Observer {
private final Executor blockingExecutor; private final ObjectPool<? extends Executor> offloadExecutorPool;
private final Executor offloadExecutor;
private final SecurityPolicy securityPolicy; private final SecurityPolicy securityPolicy;
private final Bindable serviceBinding; private final Bindable serviceBinding;
/** Number of ongoing calls which keep this transport "in-use". */ /** Number of ongoing calls which keep this transport "in-use". */
@ -557,17 +566,18 @@ abstract class BinderTransport
AndroidComponentAddress targetAddress, AndroidComponentAddress targetAddress,
BindServiceFlags bindServiceFlags, BindServiceFlags bindServiceFlags,
Executor mainThreadExecutor, Executor mainThreadExecutor,
ScheduledExecutorService scheduledExecutorService, ObjectPool<ScheduledExecutorService> executorServicePool,
Executor blockingExecutor, ObjectPool<? extends Executor> offloadExecutorPool,
SecurityPolicy securityPolicy, SecurityPolicy securityPolicy,
InboundParcelablePolicy inboundParcelablePolicy, InboundParcelablePolicy inboundParcelablePolicy,
Attributes eagAttrs) { Attributes eagAttrs) {
super( super(
scheduledExecutorService, executorServicePool,
buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy), buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy),
buildLogId(sourceContext, targetAddress)); buildLogId(sourceContext, targetAddress));
this.blockingExecutor = blockingExecutor; this.offloadExecutorPool = offloadExecutorPool;
this.securityPolicy = securityPolicy; this.securityPolicy = securityPolicy;
this.offloadExecutor = offloadExecutorPool.getObject();
numInUseStreams = new AtomicInteger(); numInUseStreams = new AtomicInteger();
pingTracker = new PingTracker(TimeProvider.SYSTEM_TIME_PROVIDER, (id) -> sendPing(id)); pingTracker = new PingTracker(TimeProvider.SYSTEM_TIME_PROVIDER, (id) -> sendPing(id));
@ -581,6 +591,12 @@ abstract class BinderTransport
this); this);
} }
@Override
void releaseExecutors() {
super.releaseExecutors();
offloadExecutorPool.returnObject(offloadExecutor);
}
@Override @Override
public synchronized void onBound(IBinder binder) { public synchronized void onBound(IBinder binder) {
sendSetupTransaction(binder); sendSetupTransaction(binder);
@ -698,7 +714,7 @@ abstract class BinderTransport
shutdownInternal( shutdownInternal(
Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true); Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true);
} else { } else {
blockingExecutor.execute(() -> checkSecurityPolicy(binder)); offloadExecutor.execute(() -> checkSecurityPolicy(binder));
} }
} }
} }
@ -790,11 +806,11 @@ abstract class BinderTransport
@Nullable private ServerTransportListener serverTransportListener; @Nullable private ServerTransportListener serverTransportListener;
BinderServerTransport( BinderServerTransport(
ScheduledExecutorService scheduledExecutorService, ObjectPool<ScheduledExecutorService> executorServicePool,
Attributes attributes, Attributes attributes,
List<ServerStreamTracer.Factory> streamTracerFactories, List<ServerStreamTracer.Factory> streamTracerFactories,
IBinder callbackBinder) { IBinder callbackBinder) {
super(scheduledExecutorService, attributes, buildLogId(attributes)); super(executorServicePool, attributes, buildLogId(attributes));
this.streamTracerFactories = streamTracerFactories; this.streamTracerFactories = streamTracerFactories;
setOutgoingBinder(callbackBinder); setOutgoingBinder(callbackBinder);
} }
@ -804,6 +820,7 @@ abstract class BinderTransport
if (isShutdown()) { if (isShutdown()) {
setState(TransportState.SHUTDOWN_TERMINATED); setState(TransportState.SHUTDOWN_TERMINATED);
notifyTerminated(); notifyTerminated();
releaseExecutors();
} else { } else {
sendSetupTransaction(); sendSetupTransaction();
// Check we're not shutdown again, since a failure inside sendSetupTransaction (or a // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a

View File

@ -31,6 +31,7 @@ import com.google.common.util.concurrent.testing.TestingExecutors;
import io.grpc.Attributes; import io.grpc.Attributes;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerTransportListener; import io.grpc.internal.ServerTransportListener;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -67,7 +68,10 @@ public final class BinderServerTransportTest {
public void setUp() throws Exception { public void setUp() throws Exception {
transport = transport =
new BinderTransport.BinderServerTransport( new BinderTransport.BinderServerTransport(
executorService, Attributes.EMPTY, ImmutableList.of(), mockBinder); new FixedObjectPool<>(executorService),
Attributes.EMPTY,
ImmutableList.of(),
mockBinder);
} }
@Test @Test