diff --git a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java index 9cea2f8f7b..e84e525f51 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetChannelBuilder.java @@ -17,6 +17,7 @@ package io.grpc.cronet; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import com.google.common.annotations.VisibleForTesting; @@ -73,6 +74,9 @@ public final class CronetChannelBuilder extends throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead"); } + @Nullable + private ScheduledExecutorService scheduledExecutorService; + private final CronetEngine cronetEngine; private boolean alwaysUsePut = false; @@ -161,12 +165,30 @@ public final class CronetChannelBuilder extends return this; } + /** + * Provides a custom scheduled executor service. + * + *

It's an optional parameter. If the user has not provided a scheduled executor service when + * the channel is built, the builder will use a static cached thread pool. + * + * @return this + * + * @since 1.12.0 + */ + public final CronetChannelBuilder scheduledExecutorService( + ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = + checkNotNull(scheduledExecutorService, "scheduledExecutorService"); + return this; + } + @Override protected final ClientTransportFactory buildTransportFactory() { return new CronetTransportFactory( new TaggingStreamFactory( cronetEngine, trafficStatsTagSet, trafficStatsTag, trafficStatsUidSet, trafficStatsUid), MoreExecutors.directExecutor(), + scheduledExecutorService, maxMessageSize, alwaysUsePut, transportTracerFactory.create()); @@ -180,20 +202,24 @@ public final class CronetChannelBuilder extends @VisibleForTesting static class CronetTransportFactory implements ClientTransportFactory { - private final ScheduledExecutorService timeoutService = - SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); + private final ScheduledExecutorService timeoutService; private final Executor executor; private final int maxMessageSize; private final boolean alwaysUsePut; private final StreamBuilderFactory streamFactory; private final TransportTracer transportTracer; + private final boolean usingSharedScheduler; private CronetTransportFactory( StreamBuilderFactory streamFactory, Executor executor, + @Nullable ScheduledExecutorService timeoutService, int maxMessageSize, boolean alwaysUsePut, TransportTracer transportTracer) { + usingSharedScheduler = timeoutService == null; + this.timeoutService = usingSharedScheduler + ? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService; this.maxMessageSize = maxMessageSize; this.alwaysUsePut = alwaysUsePut; this.streamFactory = streamFactory; @@ -216,7 +242,9 @@ public final class CronetChannelBuilder extends @Override public void close() { - SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); + if (usingSharedScheduler) { + SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); + } } } diff --git a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java index febff9b90a..94d2f48028 100644 --- a/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java +++ b/cronet/src/test/java/io/grpc/cronet/CronetChannelBuilderTest.java @@ -16,15 +16,21 @@ package io.grpc.cronet; +import static io.grpc.internal.GrpcUtil.TIMER_SERVICE; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.cronet.CronetChannelBuilder.CronetTransportFactory; +import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.SharedResourceHolder; import io.grpc.testing.TestMethodDescriptors; import java.net.InetSocketAddress; +import java.util.concurrent.ScheduledExecutorService; import org.chromium.net.ExperimentalCronetEngine; import org.junit.Before; import org.junit.Test; @@ -73,4 +79,31 @@ public final class CronetChannelBuilderTest { assertFalse(stream.idempotent); } + + @Test + public void scheduledExecutorService_default() { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + ClientTransportFactory clientTransportFactory = builder.buildTransportFactory(); + assertSame( + SharedResourceHolder.get(TIMER_SERVICE), + clientTransportFactory.getScheduledExecutorService()); + + SharedResourceHolder.release( + TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService()); + clientTransportFactory.close(); + } + + @Test + public void scheduledExecutorService_custom() { + CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine); + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + + CronetChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService); + assertSame(builder, builder1); + + ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory(); + assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService()); + + clientTransportFactory.close(); + } }