diff --git a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java index ee64d79743..24af04d4d6 100644 --- a/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java +++ b/binder/src/androidTest/java/io/grpc/binder/internal/BinderTransportTest.java @@ -28,10 +28,13 @@ import io.grpc.binder.HostServices; import io.grpc.binder.InboundParcelablePolicy; import io.grpc.binder.SecurityPolicies; import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.FixedObjectPool; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.SharedResourcePool; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Ignore; @@ -48,8 +51,8 @@ import org.junit.runner.RunWith; public final class BinderTransportTest extends AbstractTransportTest { private final Context appContext = ApplicationProvider.getApplicationContext(); - private final ScheduledExecutorService scheduledExecutorService = - Executors.newScheduledThreadPool(2); + private final ObjectPool executorServicePool = + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); @Override @After @@ -63,7 +66,7 @@ public final class BinderTransportTest extends AbstractTransportTest { AndroidComponentAddress addr = HostServices.allocateService(appContext); BinderServer binderServer = new BinderServer(addr, - scheduledExecutorService, + executorServicePool, streamTracerFactories, SecurityPolicies.serverInternalOnly(), InboundParcelablePolicy.DEFAULT); @@ -95,8 +98,8 @@ public final class BinderTransportTest extends AbstractTransportTest { addr, BindServiceFlags.DEFAULTS, ContextCompat.getMainExecutor(appContext), - scheduledExecutorService, - MoreExecutors.directExecutor(), + executorServicePool, + new FixedObjectPool<>(MoreExecutors.directExecutor()), SecurityPolicies.internalOnly(), InboundParcelablePolicy.DEFAULT, eagAttrs()); diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java index 5b74b5f034..c9ea346702 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderServer.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderServer.java @@ -32,6 +32,7 @@ import io.grpc.binder.InboundParcelablePolicy; import io.grpc.binder.ServerSecurityPolicy; import io.grpc.internal.GrpcUtil; import io.grpc.internal.InternalServer; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; import io.grpc.internal.SharedResourceHolder; import java.io.IOException; @@ -53,8 +54,7 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe final class BinderServer implements InternalServer, LeakSafeOneWayBinder.TransactionHandler { - private final boolean useSharedTimer; - private final ScheduledExecutorService executorService; + private final ObjectPool executorServicePool; private final ImmutableList streamTracerFactories; private final AndroidComponentAddress listenAddress; private final LeakSafeOneWayBinder hostServiceBinder; @@ -64,19 +64,20 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac @GuardedBy("this") private ServerListener listener; + @GuardedBy("this") + private ScheduledExecutorService executorService; + @GuardedBy("this") private boolean shutdown; BinderServer( AndroidComponentAddress listenAddress, - @Nullable ScheduledExecutorService executorService, + ObjectPool executorServicePool, List streamTracerFactories, ServerSecurityPolicy serverSecurityPolicy, InboundParcelablePolicy inboundParcelablePolicy) { this.listenAddress = listenAddress; - useSharedTimer = executorService == null; - this.executorService = - useSharedTimer ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : executorService; + this.executorServicePool = executorServicePool; this.streamTracerFactories = ImmutableList.copyOf(checkNotNull(streamTracerFactories, "streamTracerFactories")); this.serverSecurityPolicy = checkNotNull(serverSecurityPolicy, "serverSecurityPolicy"); @@ -92,6 +93,7 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac @Override public synchronized void start(ServerListener serverListener) throws IOException { this.listener = serverListener; + executorService = executorServicePool.getObject(); } @Override @@ -119,14 +121,10 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac public synchronized void shutdown() { if (!shutdown) { shutdown = true; - if (useSharedTimer) { - // TODO: Transports may still be using this resource. They should - // be managing its use as well. - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, executorService); - } // Break the connection to the binder. We'll receive no more transactions. hostServiceBinder.detach(); listener.serverShutdown(); + executorService = executorServicePool.returnObject(executorService); } } @@ -156,7 +154,7 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac // Create a new transport and let our listener know about it. BinderTransport.BinderServerTransport transport = new BinderTransport.BinderServerTransport( - executorService, attrsBuilder.build(), streamTracerFactories, callbackBinder); + executorServicePool, attrsBuilder.build(), streamTracerFactories, callbackBinder); transport.setServerTransportListener(listener.transportCreated(transport)); return true; } diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java index a0d9d37b59..671ab84dd0 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderTransport.java @@ -52,6 +52,7 @@ import io.grpc.internal.FailingClientStream; import io.grpc.internal.GrpcAttributes; import io.grpc.internal.GrpcUtil; import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; @@ -169,6 +170,7 @@ abstract class BinderTransport // receive any data. } + private final ObjectPool executorServicePool; private final ScheduledExecutorService scheduledExecutorService; private final InternalLogId logId; private final LeakSafeOneWayBinder incomingBinder; @@ -206,12 +208,13 @@ abstract class BinderTransport private volatile boolean transmitWindowFull; private BinderTransport( - ScheduledExecutorService scheduledExecutorService, + ObjectPool executorServicePool, Attributes attributes, InternalLogId logId) { - this.scheduledExecutorService = scheduledExecutorService; + this.executorServicePool = executorServicePool; this.attributes = attributes; this.logId = logId; + scheduledExecutorService = executorServicePool.getObject(); incomingBinder = new LeakSafeOneWayBinder(this); ongoingCalls = new ConcurrentHashMap<>(); numOutgoingBytes = new AtomicLong(); @@ -250,6 +253,10 @@ abstract class BinderTransport abstract void notifyTerminated(); + void releaseExecutors() { + executorServicePool.returnObject(scheduledExecutorService); + } + @GuardedBy("this") boolean inState(TransportState transportState) { return this.transportState == transportState; @@ -304,6 +311,7 @@ abstract class BinderTransport } } notifyTerminated(); + releaseExecutors(); }); } } @@ -539,7 +547,8 @@ abstract class BinderTransport static final class BinderClientTransport extends BinderTransport implements ConnectionClientTransport, Bindable.Observer { - private final Executor blockingExecutor; + private final ObjectPool offloadExecutorPool; + private final Executor offloadExecutor; private final SecurityPolicy securityPolicy; private final Bindable serviceBinding; /** Number of ongoing calls which keep this transport "in-use". */ @@ -557,17 +566,18 @@ abstract class BinderTransport AndroidComponentAddress targetAddress, BindServiceFlags bindServiceFlags, Executor mainThreadExecutor, - ScheduledExecutorService scheduledExecutorService, - Executor blockingExecutor, + ObjectPool executorServicePool, + ObjectPool offloadExecutorPool, SecurityPolicy securityPolicy, InboundParcelablePolicy inboundParcelablePolicy, Attributes eagAttrs) { super( - scheduledExecutorService, + executorServicePool, buildClientAttributes(eagAttrs, sourceContext, targetAddress, inboundParcelablePolicy), buildLogId(sourceContext, targetAddress)); - this.blockingExecutor = blockingExecutor; + this.offloadExecutorPool = offloadExecutorPool; this.securityPolicy = securityPolicy; + this.offloadExecutor = offloadExecutorPool.getObject(); numInUseStreams = new AtomicInteger(); pingTracker = new PingTracker(TimeProvider.SYSTEM_TIME_PROVIDER, (id) -> sendPing(id)); @@ -581,6 +591,12 @@ abstract class BinderTransport this); } + @Override + void releaseExecutors() { + super.releaseExecutors(); + offloadExecutorPool.returnObject(offloadExecutor); + } + @Override public synchronized void onBound(IBinder binder) { sendSetupTransaction(binder); @@ -698,7 +714,7 @@ abstract class BinderTransport shutdownInternal( Status.UNAVAILABLE.withDescription("Malformed SETUP_TRANSPORT data"), true); } else { - blockingExecutor.execute(() -> checkSecurityPolicy(binder)); + offloadExecutor.execute(() -> checkSecurityPolicy(binder)); } } } @@ -790,11 +806,11 @@ abstract class BinderTransport @Nullable private ServerTransportListener serverTransportListener; BinderServerTransport( - ScheduledExecutorService scheduledExecutorService, + ObjectPool executorServicePool, Attributes attributes, List streamTracerFactories, IBinder callbackBinder) { - super(scheduledExecutorService, attributes, buildLogId(attributes)); + super(executorServicePool, attributes, buildLogId(attributes)); this.streamTracerFactories = streamTracerFactories; setOutgoingBinder(callbackBinder); } @@ -804,6 +820,7 @@ abstract class BinderTransport if (isShutdown()) { setState(TransportState.SHUTDOWN_TERMINATED); notifyTerminated(); + releaseExecutors(); } else { sendSetupTransaction(); // Check we're not shutdown again, since a failure inside sendSetupTransaction (or a diff --git a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java index 15aaa73784..76cf9b6a6a 100644 --- a/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java +++ b/binder/src/test/java/io/grpc/binder/internal/BinderServerTransportTest.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.Attributes; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.internal.FixedObjectPool; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerTransportListener; import java.util.concurrent.ScheduledExecutorService; @@ -67,7 +68,10 @@ public final class BinderServerTransportTest { public void setUp() throws Exception { transport = new BinderTransport.BinderServerTransport( - executorService, Attributes.EMPTY, ImmutableList.of(), mockBinder); + new FixedObjectPool<>(executorService), + Attributes.EMPTY, + ImmutableList.of(), + mockBinder); } @Test