cronet: allow application to provide all threads

This commit is contained in:
ZHANG Dapeng 2018-03-21 15:04:31 -07:00 committed by GitHub
parent 7c69c3a867
commit ca55b6f7e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 3 deletions

View File

@ -17,6 +17,7 @@
package io.grpc.cronet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.annotations.VisibleForTesting;
@ -73,6 +74,9 @@ public final class CronetChannelBuilder extends
throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead");
}
@Nullable
private ScheduledExecutorService scheduledExecutorService;
private final CronetEngine cronetEngine;
private boolean alwaysUsePut = false;
@ -161,12 +165,30 @@ public final class CronetChannelBuilder extends
return this;
}
/**
* Provides a custom scheduled executor service.
*
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
* the channel is built, the builder will use a static cached thread pool.
*
* @return this
*
* @since 1.12.0
*/
public final CronetChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
return this;
}
@Override
protected final ClientTransportFactory buildTransportFactory() {
return new CronetTransportFactory(
new TaggingStreamFactory(
cronetEngine, trafficStatsTagSet, trafficStatsTag, trafficStatsUidSet, trafficStatsUid),
MoreExecutors.directExecutor(),
scheduledExecutorService,
maxMessageSize,
alwaysUsePut,
transportTracerFactory.create());
@ -180,20 +202,24 @@ public final class CronetChannelBuilder extends
@VisibleForTesting
static class CronetTransportFactory implements ClientTransportFactory {
private final ScheduledExecutorService timeoutService =
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private final ScheduledExecutorService timeoutService;
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;
private final StreamBuilderFactory streamFactory;
private final TransportTracer transportTracer;
private final boolean usingSharedScheduler;
private CronetTransportFactory(
StreamBuilderFactory streamFactory,
Executor executor,
@Nullable ScheduledExecutorService timeoutService,
int maxMessageSize,
boolean alwaysUsePut,
TransportTracer transportTracer) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut;
this.streamFactory = streamFactory;
@ -216,7 +242,9 @@ public final class CronetChannelBuilder extends
@Override
public void close() {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}
}
}

View File

@ -16,15 +16,21 @@
package io.grpc.cronet;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.cronet.CronetChannelBuilder.CronetTransportFactory;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.testing.TestMethodDescriptors;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import org.chromium.net.ExperimentalCronetEngine;
import org.junit.Before;
import org.junit.Test;
@ -73,4 +79,31 @@ public final class CronetChannelBuilderTest {
assertFalse(stream.idempotent);
}
@Test
public void scheduledExecutorService_default() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
assertSame(
SharedResourceHolder.get(TIMER_SERVICE),
clientTransportFactory.getScheduledExecutorService());
SharedResourceHolder.release(
TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}
@Test
public void scheduledExecutorService_custom() {
CronetChannelBuilder builder = CronetChannelBuilder.forAddress("address", 1234, mockEngine);
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
CronetChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);
ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}
}