mirror of https://github.com/grpc/grpc-java.git
core: separate service config parsing and add NR.Helper method
This commit is contained in:
parent
aed4e40b60
commit
0c23735cfc
|
|
@ -16,11 +16,16 @@
|
|||
|
||||
package io.grpc;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
|
|
@ -242,5 +247,94 @@ public abstract class NameResolver {
|
|||
public SynchronizationContext getSynchronizationContext() {
|
||||
throw new UnsupportedOperationException("Not implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses and validates the service configuration chosen by the name resolver. This will
|
||||
* return a {@link ConfigOrError} which contains either the successfully parsed config, or the
|
||||
* {@link Status} representing the failure to parse. Implementations are expected to not throw
|
||||
* exceptions but return a Status representing the failure.
|
||||
*
|
||||
* @param rawServiceConfig The {@link Map} representation of the service config
|
||||
* @return a tuple of the fully parsed and validated channel configuration, else the Status.
|
||||
* @since 1.20.0
|
||||
*/
|
||||
public ConfigOrError<?> parseServiceConfig(Map<String, ?> rawServiceConfig) {
|
||||
return ConfigOrError.fromError(
|
||||
Status.INTERNAL.withDescription("service config parsing not supported"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents either a successfully parsed service config, containing all necessary parts to be
|
||||
* later applied by the channel, or a Status containing the error encountered while parsing.
|
||||
*
|
||||
* @param <T> The message type of the config.
|
||||
* @since 1.20.0
|
||||
*/
|
||||
public static final class ConfigOrError<T> {
|
||||
|
||||
/**
|
||||
* Returns a {@link ConfigOrError} for the successfully parsed config.
|
||||
*
|
||||
* @since 1.20.0
|
||||
*/
|
||||
public static <T> ConfigOrError<T> fromConfig(T config) {
|
||||
return new ConfigOrError<>(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link ConfigOrError} for the failure to parse the config.
|
||||
*
|
||||
* @param status a non-OK status
|
||||
*
|
||||
* @since 1.20.0
|
||||
*/
|
||||
public static <T> ConfigOrError<T> fromError(Status status) {
|
||||
return new ConfigOrError<>(status);
|
||||
}
|
||||
|
||||
private final Status status;
|
||||
private final T config;
|
||||
|
||||
private ConfigOrError(T config) {
|
||||
this.config = checkNotNull(config, "config");
|
||||
this.status = null;
|
||||
}
|
||||
|
||||
private ConfigOrError(Status status) {
|
||||
this.config = null;
|
||||
this.status = checkNotNull(status, "status");
|
||||
checkArgument(!status.isOk(), "cannot use OK status: %s", status);
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.20.0
|
||||
*/
|
||||
@Nullable
|
||||
public T getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 1.20.0
|
||||
*/
|
||||
@Nullable
|
||||
public Status getError() {
|
||||
return status;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
if (config != null) {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("config", config)
|
||||
.toString();
|
||||
} else {
|
||||
assert status != null;
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("error", status)
|
||||
.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -542,25 +542,17 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
this.target = checkNotNull(builder.target, "target");
|
||||
this.logId = InternalLogId.allocate("Channel", target);
|
||||
this.nameResolverFactory = builder.getNameResolverFactory();
|
||||
final ProxyDetector proxyDetector =
|
||||
ProxyDetector proxyDetector =
|
||||
builder.proxyDetector != null ? builder.proxyDetector : GrpcUtil.getDefaultProxyDetector();
|
||||
final int defaultPort = builder.getDefaultPort();
|
||||
this.nameResolverHelper = new NameResolver.Helper() {
|
||||
@Override
|
||||
public int getDefaultPort() {
|
||||
return defaultPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProxyDetector getProxyDetector() {
|
||||
return proxyDetector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SynchronizationContext getSynchronizationContext() {
|
||||
return syncContext;
|
||||
}
|
||||
};
|
||||
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
|
||||
this.nameResolverHelper =
|
||||
new NrHelper(
|
||||
builder.getDefaultPort(),
|
||||
proxyDetector,
|
||||
syncContext,
|
||||
retryEnabled,
|
||||
builder.maxRetryAttempts,
|
||||
builder.maxHedgedAttempts);
|
||||
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverHelper);
|
||||
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
|
||||
maxTraceEvents = builder.maxTraceEvents;
|
||||
|
|
@ -584,7 +576,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
|
||||
this.scheduledExecutorForBalancer =
|
||||
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());
|
||||
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
|
||||
|
||||
serviceConfigInterceptor = new ServiceConfigInterceptor(
|
||||
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
|
||||
Channel channel = new RealChannel(nameResolver.getServiceAuthority());
|
||||
|
|
@ -1661,4 +1653,64 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
delegate.execute(command);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static final class NrHelper extends NameResolver.Helper {
|
||||
|
||||
private final int defaultPort;
|
||||
private final ProxyDetector proxyDetector;
|
||||
private final SynchronizationContext syncCtx;
|
||||
private final boolean retryEnabled;
|
||||
private final int maxRetryAttemptsLimit;
|
||||
private final int maxHedgedAttemptsLimit;
|
||||
|
||||
NrHelper(
|
||||
int defaultPort,
|
||||
ProxyDetector proxyDetector,
|
||||
SynchronizationContext syncCtx,
|
||||
boolean retryEnabled,
|
||||
int maxRetryAttemptsLimit,
|
||||
int maxHedgedAttemptsLimit) {
|
||||
this.defaultPort = defaultPort;
|
||||
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector");
|
||||
this.syncCtx = checkNotNull(syncCtx, "syncCtx");
|
||||
this.retryEnabled = retryEnabled;
|
||||
this.maxRetryAttemptsLimit = maxRetryAttemptsLimit;
|
||||
this.maxHedgedAttemptsLimit = maxHedgedAttemptsLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDefaultPort() {
|
||||
return defaultPort;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProxyDetector getProxyDetector() {
|
||||
return proxyDetector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SynchronizationContext getSynchronizationContext() {
|
||||
return syncCtx;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public ConfigOrError<ManagedChannelServiceConfig> parseServiceConfig(
|
||||
Map<String, ?> rawServiceConfig) {
|
||||
// TODO(carl-mastrangelo): Change the type from Map<String, Object> to Map<String, ?>
|
||||
Map<String, Object> cfg = (Map<String, Object>) rawServiceConfig;
|
||||
try {
|
||||
return ConfigOrError.fromConfig(
|
||||
ManagedChannelServiceConfig.fromServiceConfig(
|
||||
cfg,
|
||||
retryEnabled,
|
||||
maxRetryAttemptsLimit,
|
||||
maxHedgedAttemptsLimit));
|
||||
} catch (RuntimeException e) {
|
||||
return ConfigOrError.fromError(
|
||||
Status.UNKNOWN.withDescription("failed to parse service config").withCause(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,281 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Verify.verify;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Strings;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Status.Code;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* {@link ManagedChannelServiceConfig} is a fully parsed and validated representation of service
|
||||
* configuration data.
|
||||
*/
|
||||
final class ManagedChannelServiceConfig {
|
||||
|
||||
private static final Logger logger =
|
||||
Logger.getLogger(ManagedChannelServiceConfig.class.getName());
|
||||
|
||||
private final Map<String, MethodInfo> serviceMethodMap;
|
||||
private final Map<String, MethodInfo> serviceMap;
|
||||
|
||||
ManagedChannelServiceConfig(
|
||||
Map<String, MethodInfo> serviceMethodMap,
|
||||
Map<String, MethodInfo> serviceMap) {
|
||||
this.serviceMethodMap = Collections.unmodifiableMap(new HashMap<>(serviceMethodMap));
|
||||
this.serviceMap = Collections.unmodifiableMap(new HashMap<>(serviceMap));
|
||||
}
|
||||
|
||||
static ManagedChannelServiceConfig fromServiceConfig(
|
||||
Map<String, ?> serviceConfig,
|
||||
boolean retryEnabled,
|
||||
int maxRetryAttemptsLimit,
|
||||
int maxHedgedAttemptsLimit) {
|
||||
Map<String, MethodInfo> serviceMethodMap = new HashMap<>();
|
||||
Map<String, MethodInfo> serviceMap = new HashMap<>();
|
||||
|
||||
// Try and do as much validation here before we swap out the existing configuration. In case
|
||||
// the input is invalid, we don't want to lose the existing configuration.
|
||||
|
||||
List<Map<String, ?>> methodConfigs =
|
||||
ServiceConfigUtil.getMethodConfigFromServiceConfig(serviceConfig);
|
||||
|
||||
if (methodConfigs == null) {
|
||||
// this is surprising, but possible.
|
||||
return new ManagedChannelServiceConfig(serviceMethodMap, serviceMap);
|
||||
}
|
||||
|
||||
for (Map<String, ?> methodConfig : methodConfigs) {
|
||||
MethodInfo info = new MethodInfo(
|
||||
methodConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit);
|
||||
|
||||
List<Map<String, ?>> nameList =
|
||||
ServiceConfigUtil.getNameListFromMethodConfig(methodConfig);
|
||||
|
||||
checkArgument(
|
||||
nameList != null && !nameList.isEmpty(), "no names in method config %s", methodConfig);
|
||||
for (Map<String, ?> name : nameList) {
|
||||
String serviceName = ServiceConfigUtil.getServiceFromName(name);
|
||||
checkArgument(!Strings.isNullOrEmpty(serviceName), "missing service name");
|
||||
String methodName = ServiceConfigUtil.getMethodFromName(name);
|
||||
if (Strings.isNullOrEmpty(methodName)) {
|
||||
// Service scoped config
|
||||
checkArgument(
|
||||
!serviceMap.containsKey(serviceName), "Duplicate service %s", serviceName);
|
||||
serviceMap.put(serviceName, info);
|
||||
} else {
|
||||
// Method scoped config
|
||||
String fullMethodName = MethodDescriptor.generateFullMethodName(serviceName, methodName);
|
||||
checkArgument(
|
||||
!serviceMethodMap.containsKey(fullMethodName),
|
||||
"Duplicate method name %s",
|
||||
fullMethodName);
|
||||
serviceMethodMap.put(fullMethodName, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ManagedChannelServiceConfig(serviceMethodMap, serviceMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the per-service configuration for the channel.
|
||||
*/
|
||||
Map<String, MethodInfo> getServiceMap() {
|
||||
return serviceMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the per-method configuration for the channel.
|
||||
*/
|
||||
Map<String, MethodInfo> getServiceMethodMap() {
|
||||
return serviceMethodMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent of MethodConfig from a ServiceConfig with restrictions from Channel setting.
|
||||
*/
|
||||
static final class MethodInfo {
|
||||
// TODO(carl-mastrangelo): add getters for these fields and make them private.
|
||||
final Long timeoutNanos;
|
||||
final Boolean waitForReady;
|
||||
final Integer maxInboundMessageSize;
|
||||
final Integer maxOutboundMessageSize;
|
||||
final RetryPolicy retryPolicy;
|
||||
final HedgingPolicy hedgingPolicy;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect.
|
||||
*/
|
||||
MethodInfo(
|
||||
Map<String, ?> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit,
|
||||
int maxHedgedAttemptsLimit) {
|
||||
timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig);
|
||||
waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig);
|
||||
maxInboundMessageSize =
|
||||
ServiceConfigUtil.getMaxResponseMessageBytesFromMethodConfig(methodConfig);
|
||||
if (maxInboundMessageSize != null) {
|
||||
checkArgument(
|
||||
maxInboundMessageSize >= 0,
|
||||
"maxInboundMessageSize %s exceeds bounds", maxInboundMessageSize);
|
||||
}
|
||||
maxOutboundMessageSize =
|
||||
ServiceConfigUtil.getMaxRequestMessageBytesFromMethodConfig(methodConfig);
|
||||
if (maxOutboundMessageSize != null) {
|
||||
checkArgument(
|
||||
maxOutboundMessageSize >= 0,
|
||||
"maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize);
|
||||
}
|
||||
|
||||
Map<String, ?> retryPolicyMap =
|
||||
retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null;
|
||||
retryPolicy = retryPolicyMap == null
|
||||
? RetryPolicy.DEFAULT : retryPolicy(retryPolicyMap, maxRetryAttemptsLimit);
|
||||
|
||||
Map<String, ?> hedgingPolicyMap =
|
||||
retryEnabled ? ServiceConfigUtil.getHedgingPolicyFromMethodConfig(methodConfig) : null;
|
||||
hedgingPolicy = hedgingPolicyMap == null
|
||||
? HedgingPolicy.DEFAULT : hedgingPolicy(hedgingPolicyMap, maxHedgedAttemptsLimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
timeoutNanos,
|
||||
waitForReady,
|
||||
maxInboundMessageSize,
|
||||
maxOutboundMessageSize,
|
||||
retryPolicy,
|
||||
hedgingPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof MethodInfo)) {
|
||||
return false;
|
||||
}
|
||||
MethodInfo that = (MethodInfo) other;
|
||||
return Objects.equal(this.timeoutNanos, that.timeoutNanos)
|
||||
&& Objects.equal(this.waitForReady, that.waitForReady)
|
||||
&& Objects.equal(this.maxInboundMessageSize, that.maxInboundMessageSize)
|
||||
&& Objects.equal(this.maxOutboundMessageSize, that.maxOutboundMessageSize)
|
||||
&& Objects.equal(this.retryPolicy, that.retryPolicy)
|
||||
&& Objects.equal(this.hedgingPolicy, that.hedgingPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("timeoutNanos", timeoutNanos)
|
||||
.add("waitForReady", waitForReady)
|
||||
.add("maxInboundMessageSize", maxInboundMessageSize)
|
||||
.add("maxOutboundMessageSize", maxOutboundMessageSize)
|
||||
.add("retryPolicy", retryPolicy)
|
||||
.add("hedgingPolicy", hedgingPolicy)
|
||||
.toString();
|
||||
}
|
||||
|
||||
private static RetryPolicy retryPolicy(Map<String, ?> retryPolicy, int maxAttemptsLimit) {
|
||||
int maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
long initialBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getInitialBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"initialBackoff cannot be empty");
|
||||
checkArgument(
|
||||
initialBackoffNanos > 0,
|
||||
"initialBackoffNanos must be greater than 0: %s",
|
||||
initialBackoffNanos);
|
||||
|
||||
long maxBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getMaxBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"maxBackoff cannot be empty");
|
||||
checkArgument(
|
||||
maxBackoffNanos > 0, "maxBackoff must be greater than 0: %s", maxBackoffNanos);
|
||||
|
||||
double backoffMultiplier = checkNotNull(
|
||||
ServiceConfigUtil.getBackoffMultiplierFromRetryPolicy(retryPolicy),
|
||||
"backoffMultiplier cannot be empty");
|
||||
checkArgument(
|
||||
backoffMultiplier > 0,
|
||||
"backoffMultiplier must be greater than 0: %s",
|
||||
backoffMultiplier);
|
||||
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes);
|
||||
|
||||
return new RetryPolicy(
|
||||
maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
|
||||
private static HedgingPolicy hedgingPolicy(
|
||||
Map<String, ?> hedgingPolicy, int maxAttemptsLimit) {
|
||||
int maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromHedgingPolicy(hedgingPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
long hedgingDelayNanos = checkNotNull(
|
||||
ServiceConfigUtil.getHedgingDelayNanosFromHedgingPolicy(hedgingPolicy),
|
||||
"hedgingDelay cannot be empty");
|
||||
checkArgument(
|
||||
hedgingDelayNanos >= 0, "hedgingDelay must not be negative: %s", hedgingDelayNanos);
|
||||
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getNonFatalStatusCodesFromHedgingPolicy(hedgingPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
Set<Code> nonFatalStatusCodes = Collections.unmodifiableSet(codes);
|
||||
|
||||
return new HedgingPolicy(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,31 +16,19 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Verify.verify;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.base.Strings;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.Channel;
|
||||
import io.grpc.ClientCall;
|
||||
import io.grpc.ClientInterceptor;
|
||||
import io.grpc.Deadline;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.Status.Code;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import javax.annotation.CheckForNull;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
|
|
@ -49,14 +37,9 @@ import javax.annotation.Nonnull;
|
|||
*/
|
||||
final class ServiceConfigInterceptor implements ClientInterceptor {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ServiceConfigInterceptor.class.getName());
|
||||
|
||||
// Map from method name to MethodInfo
|
||||
@VisibleForTesting
|
||||
final AtomicReference<Map<String, MethodInfo>> serviceMethodMap
|
||||
= new AtomicReference<>();
|
||||
@VisibleForTesting
|
||||
final AtomicReference<Map<String, MethodInfo>> serviceMap
|
||||
final AtomicReference<ManagedChannelServiceConfig> managedChannelServiceConfig
|
||||
= new AtomicReference<>();
|
||||
|
||||
private final boolean retryEnabled;
|
||||
|
|
@ -74,207 +57,10 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
}
|
||||
|
||||
void handleUpdate(@Nonnull Map<String, ?> serviceConfig) {
|
||||
Map<String, MethodInfo> newServiceMethodConfigs = new HashMap<>();
|
||||
Map<String, MethodInfo> newServiceConfigs = new HashMap<>();
|
||||
|
||||
// Try and do as much validation here before we swap out the existing configuration. In case
|
||||
// the input is invalid, we don't want to lose the existing configuration.
|
||||
|
||||
List<Map<String, ?>> methodConfigs =
|
||||
ServiceConfigUtil.getMethodConfigFromServiceConfig(serviceConfig);
|
||||
if (methodConfigs == null) {
|
||||
logger.log(Level.FINE, "No method configs found, skipping");
|
||||
ManagedChannelServiceConfig conf = ManagedChannelServiceConfig.fromServiceConfig(
|
||||
serviceConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit);
|
||||
managedChannelServiceConfig.set(conf);
|
||||
nameResolveComplete = true;
|
||||
return;
|
||||
}
|
||||
|
||||
for (Map<String, ?> methodConfig : methodConfigs) {
|
||||
MethodInfo info = new MethodInfo(
|
||||
methodConfig, retryEnabled, maxRetryAttemptsLimit, maxHedgedAttemptsLimit);
|
||||
|
||||
List<Map<String, ?>> nameList =
|
||||
ServiceConfigUtil.getNameListFromMethodConfig(methodConfig);
|
||||
|
||||
checkArgument(
|
||||
nameList != null && !nameList.isEmpty(), "no names in method config %s", methodConfig);
|
||||
for (Map<String, ?> name : nameList) {
|
||||
String serviceName = ServiceConfigUtil.getServiceFromName(name);
|
||||
checkArgument(!Strings.isNullOrEmpty(serviceName), "missing service name");
|
||||
String methodName = ServiceConfigUtil.getMethodFromName(name);
|
||||
if (Strings.isNullOrEmpty(methodName)) {
|
||||
// Service scoped config
|
||||
checkArgument(
|
||||
!newServiceConfigs.containsKey(serviceName), "Duplicate service %s", serviceName);
|
||||
newServiceConfigs.put(serviceName, info);
|
||||
} else {
|
||||
// Method scoped config
|
||||
String fullMethodName = MethodDescriptor.generateFullMethodName(serviceName, methodName);
|
||||
checkArgument(
|
||||
!newServiceMethodConfigs.containsKey(fullMethodName),
|
||||
"Duplicate method name %s",
|
||||
fullMethodName);
|
||||
newServiceMethodConfigs.put(fullMethodName, info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Okay, service config is good, swap it.
|
||||
serviceMethodMap.set(Collections.unmodifiableMap(newServiceMethodConfigs));
|
||||
serviceMap.set(Collections.unmodifiableMap(newServiceConfigs));
|
||||
nameResolveComplete = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Equivalent of MethodConfig from a ServiceConfig with restrictions from Channel setting.
|
||||
*/
|
||||
static final class MethodInfo {
|
||||
final Long timeoutNanos;
|
||||
final Boolean waitForReady;
|
||||
final Integer maxInboundMessageSize;
|
||||
final Integer maxOutboundMessageSize;
|
||||
final RetryPolicy retryPolicy;
|
||||
final HedgingPolicy hedgingPolicy;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param retryEnabled when false, the argument maxRetryAttemptsLimit will have no effect.
|
||||
*/
|
||||
MethodInfo(
|
||||
Map<String, ?> methodConfig, boolean retryEnabled, int maxRetryAttemptsLimit,
|
||||
int maxHedgedAttemptsLimit) {
|
||||
timeoutNanos = ServiceConfigUtil.getTimeoutFromMethodConfig(methodConfig);
|
||||
waitForReady = ServiceConfigUtil.getWaitForReadyFromMethodConfig(methodConfig);
|
||||
maxInboundMessageSize =
|
||||
ServiceConfigUtil.getMaxResponseMessageBytesFromMethodConfig(methodConfig);
|
||||
if (maxInboundMessageSize != null) {
|
||||
checkArgument(
|
||||
maxInboundMessageSize >= 0,
|
||||
"maxInboundMessageSize %s exceeds bounds", maxInboundMessageSize);
|
||||
}
|
||||
maxOutboundMessageSize =
|
||||
ServiceConfigUtil.getMaxRequestMessageBytesFromMethodConfig(methodConfig);
|
||||
if (maxOutboundMessageSize != null) {
|
||||
checkArgument(
|
||||
maxOutboundMessageSize >= 0,
|
||||
"maxOutboundMessageSize %s exceeds bounds", maxOutboundMessageSize);
|
||||
}
|
||||
|
||||
Map<String, ?> retryPolicyMap =
|
||||
retryEnabled ? ServiceConfigUtil.getRetryPolicyFromMethodConfig(methodConfig) : null;
|
||||
retryPolicy = retryPolicyMap == null
|
||||
? RetryPolicy.DEFAULT : retryPolicy(retryPolicyMap, maxRetryAttemptsLimit);
|
||||
|
||||
Map<String, ?> hedgingPolicyMap =
|
||||
retryEnabled ? ServiceConfigUtil.getHedgingPolicyFromMethodConfig(methodConfig) : null;
|
||||
hedgingPolicy = hedgingPolicyMap == null
|
||||
? HedgingPolicy.DEFAULT : hedgingPolicy(hedgingPolicyMap, maxHedgedAttemptsLimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(
|
||||
timeoutNanos, waitForReady, maxInboundMessageSize, maxOutboundMessageSize, retryPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (!(other instanceof MethodInfo)) {
|
||||
return false;
|
||||
}
|
||||
MethodInfo that = (MethodInfo) other;
|
||||
return Objects.equal(this.timeoutNanos, that.timeoutNanos)
|
||||
&& Objects.equal(this.waitForReady, that.waitForReady)
|
||||
&& Objects.equal(this.maxInboundMessageSize, that.maxInboundMessageSize)
|
||||
&& Objects.equal(this.maxOutboundMessageSize, that.maxOutboundMessageSize)
|
||||
&& Objects.equal(this.retryPolicy, that.retryPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("timeoutNanos", timeoutNanos)
|
||||
.add("waitForReady", waitForReady)
|
||||
.add("maxInboundMessageSize", maxInboundMessageSize)
|
||||
.add("maxOutboundMessageSize", maxOutboundMessageSize)
|
||||
.add("retryPolicy", retryPolicy)
|
||||
.toString();
|
||||
}
|
||||
|
||||
private static RetryPolicy retryPolicy(Map<String, ?> retryPolicy, int maxAttemptsLimit) {
|
||||
int maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromRetryPolicy(retryPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
long initialBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getInitialBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"initialBackoff cannot be empty");
|
||||
checkArgument(
|
||||
initialBackoffNanos > 0,
|
||||
"initialBackoffNanos must be greater than 0: %s",
|
||||
initialBackoffNanos);
|
||||
|
||||
long maxBackoffNanos = checkNotNull(
|
||||
ServiceConfigUtil.getMaxBackoffNanosFromRetryPolicy(retryPolicy),
|
||||
"maxBackoff cannot be empty");
|
||||
checkArgument(
|
||||
maxBackoffNanos > 0, "maxBackoff must be greater than 0: %s", maxBackoffNanos);
|
||||
|
||||
double backoffMultiplier = checkNotNull(
|
||||
ServiceConfigUtil.getBackoffMultiplierFromRetryPolicy(retryPolicy),
|
||||
"backoffMultiplier cannot be empty");
|
||||
checkArgument(
|
||||
backoffMultiplier > 0,
|
||||
"backoffMultiplier must be greater than 0: %s",
|
||||
backoffMultiplier);
|
||||
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
Set<Code> retryableStatusCodes = Collections.unmodifiableSet(codes);
|
||||
|
||||
return new RetryPolicy(
|
||||
maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
|
||||
retryableStatusCodes);
|
||||
}
|
||||
}
|
||||
|
||||
private static HedgingPolicy hedgingPolicy(
|
||||
Map<String, ?> hedgingPolicy, int maxAttemptsLimit) {
|
||||
int maxAttempts = checkNotNull(
|
||||
ServiceConfigUtil.getMaxAttemptsFromHedgingPolicy(hedgingPolicy),
|
||||
"maxAttempts cannot be empty");
|
||||
checkArgument(maxAttempts >= 2, "maxAttempts must be greater than 1: %s", maxAttempts);
|
||||
maxAttempts = Math.min(maxAttempts, maxAttemptsLimit);
|
||||
|
||||
long hedgingDelayNanos = checkNotNull(
|
||||
ServiceConfigUtil.getHedgingDelayNanosFromHedgingPolicy(hedgingPolicy),
|
||||
"hedgingDelay cannot be empty");
|
||||
checkArgument(
|
||||
hedgingDelayNanos >= 0, "hedgingDelay must not be negative: %s", hedgingDelayNanos);
|
||||
|
||||
List<String> rawCodes =
|
||||
ServiceConfigUtil.getNonFatalStatusCodesFromHedgingPolicy(hedgingPolicy);
|
||||
checkNotNull(rawCodes, "rawCodes must be present");
|
||||
checkArgument(!rawCodes.isEmpty(), "rawCodes can't be empty");
|
||||
EnumSet<Code> codes = EnumSet.noneOf(Code.class);
|
||||
// service config doesn't say if duplicates are allowed, so just accept them.
|
||||
for (String rawCode : rawCodes) {
|
||||
verify(!"OK".equals(rawCode), "rawCode can not be \"OK\"");
|
||||
codes.add(Code.valueOf(rawCode));
|
||||
}
|
||||
Set<Code> nonFatalStatusCodes = Collections.unmodifiableSet(codes);
|
||||
|
||||
return new HedgingPolicy(maxAttempts, hedgingDelayNanos, nonFatalStatusCodes);
|
||||
}
|
||||
|
||||
static final CallOptions.Key<RetryPolicy.Provider> RETRY_POLICY_KEY =
|
||||
|
|
@ -395,17 +181,14 @@ final class ServiceConfigInterceptor implements ClientInterceptor {
|
|||
|
||||
@CheckForNull
|
||||
private MethodInfo getMethodInfo(MethodDescriptor<?, ?> method) {
|
||||
Map<String, MethodInfo> localServiceMethodMap = serviceMethodMap.get();
|
||||
ManagedChannelServiceConfig mcsc = managedChannelServiceConfig.get();
|
||||
MethodInfo info = null;
|
||||
if (localServiceMethodMap != null) {
|
||||
info = localServiceMethodMap.get(method.getFullMethodName());
|
||||
}
|
||||
if (info == null) {
|
||||
Map<String, MethodInfo> localServiceMap = serviceMap.get();
|
||||
if (localServiceMap != null) {
|
||||
info = localServiceMap.get(
|
||||
MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
|
||||
if (mcsc != null) {
|
||||
info = mcsc.getServiceMethodMap().get(method.getFullMethodName());
|
||||
}
|
||||
if (info == null && mcsc != null) {
|
||||
String serviceName = MethodDescriptor.extractFullServiceName(method.getFullMethodName());
|
||||
info = mcsc.getServiceMap().get(serviceName);
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,14 +91,18 @@ import io.grpc.Metadata;
|
|||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.NameResolver.Helper.ConfigOrError;
|
||||
import io.grpc.ProxiedSocketAddress;
|
||||
import io.grpc.ProxyDetector;
|
||||
import io.grpc.SecurityLevel;
|
||||
import io.grpc.ServerMethodDefinition;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.Status.Code;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.ClientTransportFactory.ClientTransportOptions;
|
||||
import io.grpc.internal.InternalSubchannel.TransportLogger;
|
||||
import io.grpc.internal.ManagedChannelImpl.NrHelper;
|
||||
import io.grpc.internal.TestUtils.MockClientTransportInfo;
|
||||
import io.grpc.stub.ClientCalls;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
|
|
@ -3376,6 +3380,59 @@ public class ManagedChannelImplTest {
|
|||
assertEquals(SERVICE_NAME, channel.authority());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nameResolverHelper_emptyConfigSucceeds() {
|
||||
int defaultPort = 1;
|
||||
ProxyDetector proxyDetector = GrpcUtil.getDefaultProxyDetector();
|
||||
SynchronizationContext syncCtx =
|
||||
new SynchronizationContext(Thread.currentThread().getUncaughtExceptionHandler());
|
||||
boolean retryEnabled = false;
|
||||
int maxRetryAttemptsLimit = 2;
|
||||
int maxHedgedAttemptsLimit = 3;
|
||||
|
||||
NrHelper nrh = new NrHelper(
|
||||
defaultPort,
|
||||
proxyDetector,
|
||||
syncCtx,
|
||||
retryEnabled,
|
||||
maxRetryAttemptsLimit,
|
||||
maxHedgedAttemptsLimit);
|
||||
|
||||
ConfigOrError<ManagedChannelServiceConfig> coe =
|
||||
nrh.parseServiceConfig(ImmutableMap.<String, Object>of());
|
||||
|
||||
assertThat(coe.getConfig()).isNotNull();
|
||||
assertThat(coe.getConfig().getServiceMap()).isEmpty();
|
||||
assertThat(coe.getConfig().getServiceMethodMap()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nameResolverHelper_badConfigFails() {
|
||||
int defaultPort = 1;
|
||||
ProxyDetector proxyDetector = GrpcUtil.getDefaultProxyDetector();
|
||||
SynchronizationContext syncCtx =
|
||||
new SynchronizationContext(Thread.currentThread().getUncaughtExceptionHandler());
|
||||
boolean retryEnabled = false;
|
||||
int maxRetryAttemptsLimit = 2;
|
||||
int maxHedgedAttemptsLimit = 3;
|
||||
|
||||
NrHelper nrh = new NrHelper(
|
||||
defaultPort,
|
||||
proxyDetector,
|
||||
syncCtx,
|
||||
retryEnabled,
|
||||
maxRetryAttemptsLimit,
|
||||
maxHedgedAttemptsLimit);
|
||||
|
||||
ConfigOrError<ManagedChannelServiceConfig> coe =
|
||||
nrh.parseServiceConfig(ImmutableMap.<String, Object>of("methodConfig", "bogus"));
|
||||
|
||||
assertThat(coe.getError()).isNotNull();
|
||||
assertThat(coe.getError().getCode()).isEqualTo(Code.UNKNOWN);
|
||||
assertThat(coe.getError().getDescription()).contains("failed to parse service config");
|
||||
assertThat(coe.getError().getCause()).isInstanceOf(ClassCastException.class);
|
||||
}
|
||||
|
||||
private static final class ChannelBuilder
|
||||
extends AbstractManagedChannelImplBuilder<ChannelBuilder> {
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ import io.grpc.Channel;
|
|||
import io.grpc.Deadline;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.internal.ServiceConfigInterceptor.MethodInfo;
|
||||
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
|
@ -345,13 +345,13 @@ public class ServiceConfigInterceptorTest {
|
|||
|
||||
interceptor.handleUpdate(serviceConfig1);
|
||||
|
||||
assertThat(interceptor.serviceMap.get()).isNotEmpty();
|
||||
assertThat(interceptor.serviceMethodMap.get()).isEmpty();
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMap()).isNotEmpty();
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMethodMap()).isEmpty();
|
||||
|
||||
interceptor.handleUpdate(serviceConfig2);
|
||||
|
||||
assertThat(interceptor.serviceMap.get()).isEmpty();
|
||||
assertThat(interceptor.serviceMethodMap.get()).isNotEmpty();
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMap()).isEmpty();
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMethodMap()).isNotEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -363,11 +363,11 @@ public class ServiceConfigInterceptorTest {
|
|||
|
||||
interceptor.handleUpdate(serviceConfig);
|
||||
|
||||
assertThat(interceptor.serviceMethodMap.get())
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMethodMap())
|
||||
.containsExactly(
|
||||
methodDescriptor.getFullMethodName(),
|
||||
new MethodInfo(methodConfig, false, 1, 1));
|
||||
assertThat(interceptor.serviceMap.get()).containsExactly(
|
||||
assertThat(interceptor.managedChannelServiceConfig.get().getServiceMap()).containsExactly(
|
||||
"service2", new MethodInfo(methodConfig, false, 1, 1));
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue