api, core: make scheduled executor service accessible for NameResolver.Args (#6455)

Added new API on NameResolver.Args to access ScheduledExecutorService, which is wrapped transport executor.
This commit is contained in:
Chengyuan Zhang 2019-11-21 16:14:13 -08:00 committed by GitHub
parent 81efecd86a
commit eb21c646b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@ -412,6 +413,7 @@ public abstract class NameResolver {
private final ProxyDetector proxyDetector; private final ProxyDetector proxyDetector;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final ServiceConfigParser serviceConfigParser; private final ServiceConfigParser serviceConfigParser;
@Nullable private final ScheduledExecutorService scheduledExecutorService;
@Nullable private final ChannelLogger channelLogger; @Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor; @Nullable private final Executor executor;
@ -420,12 +422,14 @@ public abstract class NameResolver {
ProxyDetector proxyDetector, ProxyDetector proxyDetector,
SynchronizationContext syncContext, SynchronizationContext syncContext,
ServiceConfigParser serviceConfigParser, ServiceConfigParser serviceConfigParser,
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger, @Nullable ChannelLogger channelLogger,
@Nullable Executor executor) { @Nullable Executor executor) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set"); this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set"); this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set"); this.syncContext = checkNotNull(syncContext, "syncContext not set");
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set"); this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
this.scheduledExecutorService = scheduledExecutorService;
this.channelLogger = channelLogger; this.channelLogger = channelLogger;
this.executor = executor; this.executor = executor;
} }
@ -460,6 +464,25 @@ public abstract class NameResolver {
return syncContext; return syncContext;
} }
/**
* Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
*
* <p>This service is a shared resource and is only meant for quick tasks. DO NOT block or run
* time-consuming tasks.
*
* <p>The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
* and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
*
* @since 1.26.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public ScheduledExecutorService getScheduledExecutorService() {
if (scheduledExecutorService == null) {
throw new IllegalStateException("ScheduledExecutorService not set in Builder");
}
return scheduledExecutorService;
}
/** /**
* Returns the {@link ServiceConfigParser}. * Returns the {@link ServiceConfigParser}.
* *
@ -501,6 +524,7 @@ public abstract class NameResolver {
.add("proxyDetector", proxyDetector) .add("proxyDetector", proxyDetector)
.add("syncContext", syncContext) .add("syncContext", syncContext)
.add("serviceConfigParser", serviceConfigParser) .add("serviceConfigParser", serviceConfigParser)
.add("scheduledExecutorService", scheduledExecutorService)
.add("channelLogger", channelLogger) .add("channelLogger", channelLogger)
.add("executor", executor) .add("executor", executor)
.toString(); .toString();
@ -517,6 +541,7 @@ public abstract class NameResolver {
builder.setProxyDetector(proxyDetector); builder.setProxyDetector(proxyDetector);
builder.setSynchronizationContext(syncContext); builder.setSynchronizationContext(syncContext);
builder.setServiceConfigParser(serviceConfigParser); builder.setServiceConfigParser(serviceConfigParser);
builder.setScheduledExecutorService(scheduledExecutorService);
builder.setChannelLogger(channelLogger); builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor); builder.setOffloadExecutor(executor);
return builder; return builder;
@ -541,6 +566,7 @@ public abstract class NameResolver {
private ProxyDetector proxyDetector; private ProxyDetector proxyDetector;
private SynchronizationContext syncContext; private SynchronizationContext syncContext;
private ServiceConfigParser serviceConfigParser; private ServiceConfigParser serviceConfigParser;
private ScheduledExecutorService scheduledExecutorService;
private ChannelLogger channelLogger; private ChannelLogger channelLogger;
private Executor executor; private Executor executor;
@ -577,6 +603,16 @@ public abstract class NameResolver {
return this; return this;
} }
/**
* See {@link Args#getScheduledExecutorService}.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
public Builder setScheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
return this;
}
/** /**
* See {@link Args#getServiceConfigParser}. This is a required field. * See {@link Args#getServiceConfigParser}. This is a required field.
* *
@ -618,7 +654,7 @@ public abstract class NameResolver {
return return
new Args( new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser, defaultPort, proxyDetector, syncContext, serviceConfigParser,
channelLogger, executor); scheduledExecutorService, channelLogger, executor);
} }
} }
} }

View File

@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -45,6 +46,8 @@ public class NameResolverTest {
private final SynchronizationContext syncContext = private final SynchronizationContext syncContext =
new SynchronizationContext(mock(UncaughtExceptionHandler.class)); new SynchronizationContext(mock(UncaughtExceptionHandler.class));
private final ServiceConfigParser parser = mock(ServiceConfigParser.class); private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
private final ScheduledExecutorService scheduledExecutorService =
mock(ScheduledExecutorService.class);
private final ChannelLogger channelLogger = mock(ChannelLogger.class); private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor(); private final Executor executor = Executors.newSingleThreadExecutor();
private URI uri; private URI uri;
@ -62,6 +65,7 @@ public class NameResolverTest {
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser); assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
@ -70,6 +74,7 @@ public class NameResolverTest {
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector); assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext); assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser); assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger); assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor); assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
@ -254,6 +259,7 @@ public class NameResolverTest {
.setProxyDetector(proxyDetector) .setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext) .setSynchronizationContext(syncContext)
.setServiceConfigParser(parser) .setServiceConfigParser(parser)
.setScheduledExecutorService(scheduledExecutorService)
.setChannelLogger(channelLogger) .setChannelLogger(channelLogger)
.setOffloadExecutor(executor) .setOffloadExecutor(executor)
.build(); .build();

View File

@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final NameResolver.Args nameResolverArgs; private final NameResolver.Args nameResolverArgs;
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory; private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
private final ClientTransportFactory transportFactory; private final ClientTransportFactory transportFactory;
private final ScheduledExecutorForBalancer scheduledExecutorForBalancer; private final RestrictedScheduledExecutor scheduledExecutor;
private final Executor executor; private final Executor executor;
private final ObjectPool<? extends Executor> executorPool; private final ObjectPool<? extends Executor> executorPool;
private final ObjectPool<? extends Executor> balancerRpcExecutorPool; private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
@ -562,6 +562,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.target = checkNotNull(builder.target, "target"); this.target = checkNotNull(builder.target, "target");
this.logId = InternalLogId.allocate("Channel", target); this.logId = InternalLogId.allocate("Channel", target);
this.timeProvider = checkNotNull(timeProvider, "timeProvider"); this.timeProvider = checkNotNull(timeProvider, "timeProvider");
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutor =
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
maxTraceEvents = builder.maxTraceEvents; maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer( channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(), logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
@ -581,6 +587,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
.setDefaultPort(builder.getDefaultPort()) .setDefaultPort(builder.getDefaultPort())
.setProxyDetector(proxyDetector) .setProxyDetector(proxyDetector)
.setSynchronizationContext(syncContext) .setSynchronizationContext(syncContext)
.setScheduledExecutorService(scheduledExecutor)
.setServiceConfigParser( .setServiceConfigParser(
new ScParser( new ScParser(
retryEnabled, retryEnabled,
@ -598,18 +605,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
}) })
.build(); .build();
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs); this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"); this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool); this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext); this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener); this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider; this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutorForBalancer =
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());
serviceConfigInterceptor = new ServiceConfigInterceptor( serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts); retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
this.defaultServiceConfig = builder.defaultServiceConfig; this.defaultServiceConfig = builder.defaultServiceConfig;
@ -1269,7 +1269,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override @Override
public ScheduledExecutorService getScheduledExecutorService() { public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorForBalancer; return scheduledExecutor;
} }
@Override @Override
@ -1736,10 +1736,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
} }
private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService { private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
final ScheduledExecutorService delegate; final ScheduledExecutorService delegate;
private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) { private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
} }