core: allow application to provide all threads - okhttp channel

This commit is contained in:
ZHANG Dapeng 2018-03-01 10:55:44 -08:00 committed by GitHub
parent fcc8ea5950
commit 3334a8a554
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 4 deletions

View File

@ -16,6 +16,7 @@
package io.grpc.okhttp; package io.grpc.okhttp;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIMEOUT_NANOS;
import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS; import static io.grpc.internal.GrpcUtil.DEFAULT_KEEPALIVE_TIME_NANOS;
import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED;
@ -105,6 +106,7 @@ public class OkHttpChannelBuilder extends
} }
private Executor transportExecutor; private Executor transportExecutor;
private ScheduledExecutorService scheduledExecutorService;
private SSLSocketFactory sslSocketFactory; private SSLSocketFactory sslSocketFactory;
private HostnameVerifier hostnameVerifier; private HostnameVerifier hostnameVerifier;
@ -318,11 +320,28 @@ public class OkHttpChannelBuilder extends
return this; return this;
} }
/**
* Provides a custom scheduled executor service.
*
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
* the channel is built, the builder will use a static cached thread pool.
*
* @return this
*
* @since 1.11.0
*/
public final OkHttpChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
return this;
}
@Override @Override
@Internal @Internal
protected final ClientTransportFactory buildTransportFactory() { protected final ClientTransportFactory buildTransportFactory() {
boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED; boolean enableKeepAlive = keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED;
return new OkHttpTransportFactory(transportExecutor, return new OkHttpTransportFactory(transportExecutor, scheduledExecutorService,
createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(), createSocketFactory(), hostnameVerifier, connectionSpec, maxInboundMessageSize(),
enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, enableKeepAlive, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory); transportTracerFactory);
@ -391,6 +410,7 @@ public class OkHttpChannelBuilder extends
static final class OkHttpTransportFactory implements ClientTransportFactory { static final class OkHttpTransportFactory implements ClientTransportFactory {
private final Executor executor; private final Executor executor;
private final boolean usingSharedExecutor; private final boolean usingSharedExecutor;
private final boolean usingSharedScheduler;
private final TransportTracer.Factory transportTracerFactory; private final TransportTracer.Factory transportTracerFactory;
@Nullable @Nullable
private final SSLSocketFactory socketFactory; private final SSLSocketFactory socketFactory;
@ -402,11 +422,11 @@ public class OkHttpChannelBuilder extends
private final AtomicBackoff keepAliveTimeNanos; private final AtomicBackoff keepAliveTimeNanos;
private final long keepAliveTimeoutNanos; private final long keepAliveTimeoutNanos;
private final boolean keepAliveWithoutCalls; private final boolean keepAliveWithoutCalls;
private final ScheduledExecutorService timeoutService = private final ScheduledExecutorService timeoutService;
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private boolean closed; private boolean closed;
private OkHttpTransportFactory(Executor executor, private OkHttpTransportFactory(Executor executor,
@Nullable ScheduledExecutorService timeoutService,
@Nullable SSLSocketFactory socketFactory, @Nullable SSLSocketFactory socketFactory,
@Nullable HostnameVerifier hostnameVerifier, @Nullable HostnameVerifier hostnameVerifier,
ConnectionSpec connectionSpec, ConnectionSpec connectionSpec,
@ -416,6 +436,9 @@ public class OkHttpChannelBuilder extends
long keepAliveTimeoutNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory) { TransportTracer.Factory transportTracerFactory) {
usingSharedScheduler = timeoutService == null;
this.timeoutService = usingSharedScheduler
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : timeoutService;
this.socketFactory = socketFactory; this.socketFactory = socketFactory;
this.hostnameVerifier = hostnameVerifier; this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = connectionSpec; this.connectionSpec = connectionSpec;
@ -483,7 +506,10 @@ public class OkHttpChannelBuilder extends
return; return;
} }
closed = true; closed = true;
if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService); SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}
if (usingSharedExecutor) { if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor); SharedResourceHolder.release(SHARED_EXECUTOR, (ExecutorService) executor);

View File

@ -16,14 +16,20 @@
package io.grpc.okhttp; package io.grpc.okhttp;
import static io.grpc.internal.GrpcUtil.TIMER_SERVICE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import com.squareup.okhttp.ConnectionSpec; import com.squareup.okhttp.ConnectionSpec;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -128,5 +134,34 @@ public class OkHttpChannelBuilderTest {
builder.usePlaintext(); builder.usePlaintext();
assertNull(builder.createSocketFactory()); assertNull(builder.createSocketFactory());
} }
@Test
public void scheduledExecutorService_default() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo");
ClientTransportFactory clientTransportFactory = builder.buildTransportFactory();
assertSame(
SharedResourceHolder.get(TIMER_SERVICE),
clientTransportFactory.getScheduledExecutorService());
SharedResourceHolder.release(
TIMER_SERVICE, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}
@Test
public void scheduledExecutorService_custom() {
OkHttpChannelBuilder builder = OkHttpChannelBuilder.forTarget("foo");
ScheduledExecutorService scheduledExecutorService =
new FakeClock().getScheduledExecutorService();
OkHttpChannelBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);
ClientTransportFactory clientTransportFactory = builder1.buildTransportFactory();
assertSame(scheduledExecutorService, clientTransportFactory.getScheduledExecutorService());
clientTransportFactory.close();
}
} }