xds: implement per-RPC hash generation (#7922)

Generates a hash value for each RPC based on the HashPolicies configured for the Route that the RPC is routed to.
This commit is contained in:
Chengyuan Zhang 2021-03-05 10:51:42 -08:00 committed by GitHub
parent b5c0a4a97a
commit 4b52639aa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 268 additions and 124 deletions

View File

@ -23,6 +23,8 @@ import javax.annotation.concurrent.ThreadSafe;
interface ThreadSafeRandom {
int nextInt(int bound);
long nextLong();
final class ThreadSafeRandomImpl implements ThreadSafeRandom {
static final ThreadSafeRandom instance = new ThreadSafeRandomImpl();
@ -33,5 +35,10 @@ interface ThreadSafeRandom {
public int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}
@Override
public long nextLong() {
return ThreadLocalRandom.current().nextLong();
}
}
}

View File

@ -57,6 +57,7 @@ import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
import io.grpc.xds.VirtualHost.Route;
import io.grpc.xds.VirtualHost.Route.RouteAction;
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
import io.grpc.xds.VirtualHost.Route.RouteMatch;
import io.grpc.xds.XdsClient.LdsResourceWatcher;
import io.grpc.xds.XdsClient.LdsUpdate;
@ -95,6 +96,8 @@ final class XdsNameResolver extends NameResolver {
static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
static final CallOptions.Key<Long> RPC_HASH_KEY =
CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
@VisibleForTesting
static boolean enableTimeout =
Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
@ -119,6 +122,7 @@ final class XdsNameResolver extends NameResolver {
@VisibleForTesting
static AtomicLong activeFaultInjectedStreamCounter = new AtomicLong();
private final InternalLogId logId;
private final XdsLogger logger;
private final String authority;
private final ServiceConfigParser serviceConfigParser;
@ -126,6 +130,7 @@ final class XdsNameResolver extends NameResolver {
private final ScheduledExecutorService scheduler;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final XxHash64 hashFunc = XxHash64.INSTANCE;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector();
@ -152,7 +157,8 @@ final class XdsNameResolver extends NameResolver {
this.scheduler = checkNotNull(scheduler, "scheduler");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
logId = InternalLogId.allocate("xds-resolver", name);
logger = XdsLogger.withLogId(logId);
logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
}
@ -347,26 +353,33 @@ final class XdsNameResolver extends NameResolver {
private final class ConfigSelector extends InternalConfigSelector {
@Override
public Result selectConfig(PickSubchannelArgs args) {
// Index ASCII headers by keys.
Map<String, Iterable<String>> asciiHeaders = new HashMap<>();
// Index ASCII headers by key, multi-value headers are concatenated for matching purposes.
Map<String, String> asciiHeaders = new HashMap<>();
Metadata headers = args.getHeaders();
for (String headerName : headers.keys()) {
if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
continue;
}
Metadata.Key<String> key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
asciiHeaders.put(headerName, headers.getAll(key));
Iterable<String> values = headers.getAll(key);
if (values != null) {
asciiHeaders.put(headerName, Joiner.on(",").join(values));
}
}
// Special hack for exposing headers: "content-type".
asciiHeaders.put("content-type", "application/grpc");
String cluster = null;
Route selectedRoute = null;
HttpFault selectedFaultConfig;
RoutingConfig routingCfg;
do {
selectedFaultConfig = routingConfig.faultConfig;
for (Route route : routingConfig.routes) {
routingCfg = routingConfig;
selectedFaultConfig = routingCfg.faultConfig;
for (Route route : routingCfg.routes) {
if (matchRoute(route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
asciiHeaders, random)) {
selectedRoute = route;
if (routingConfig.applyFaultInjection && route.httpFault() != null) {
if (routingCfg.applyFaultInjection && route.httpFault() != null) {
selectedFaultConfig = route.httpFault();
}
break;
@ -390,7 +403,7 @@ final class XdsNameResolver extends NameResolver {
accumulator += weightedCluster.weight();
if (select < accumulator) {
cluster = weightedCluster.name();
if (routingConfig.applyFaultInjection && weightedCluster.httpFault() != null) {
if (routingCfg.applyFaultInjection && weightedCluster.httpFault() != null) {
selectedFaultConfig = weightedCluster.httpFault();
}
break;
@ -403,7 +416,7 @@ final class XdsNameResolver extends NameResolver {
if (enableTimeout) {
Long timeoutNano = selectedRoute.routeAction().timeoutNano();
if (timeoutNano == null) {
timeoutNano = routingConfig.fallbackTimeoutNano;
timeoutNano = routingCfg.fallbackTimeoutNano;
}
if (timeoutNano > 0) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano);
@ -417,7 +430,6 @@ final class XdsNameResolver extends NameResolver {
parsedServiceConfig.getError().augmentDescription(
"Failed to parse service config (method config)"));
}
final String finalCluster = cluster;
if (selectedFaultConfig != null && selectedFaultConfig.maxActiveFaults() != null
&& activeFaultInjectedStreamCounter.get() >= selectedFaultConfig.maxActiveFaults()) {
selectedFaultConfig = null;
@ -447,15 +459,18 @@ final class XdsNameResolver extends NameResolver {
abortStatus = determineFaultAbortStatus(selectedFaultConfig.faultAbort(), headers);
}
}
final String finalCluster = cluster;
final Long finalDelayNanos = delayNanos;
final Status finalAbortStatus = abortStatus;
final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), asciiHeaders);
class ClusterSelectionInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
final Channel next) {
final CallOptions callOptionsForCluster =
callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster);
callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
.withOption(RPC_HASH_KEY, hash);
Supplier<ClientCall<ReqT, RespT>> configApplyingCallSupplier =
new Supplier<ClientCall<ReqT, RespT>>() {
@Override
@ -553,6 +568,36 @@ final class XdsNameResolver extends NameResolver {
}
}
private long generateHash(List<HashPolicy> hashPolicies, Map<String, String> headers) {
Long hash = null;
for (HashPolicy policy : hashPolicies) {
Long newHash = null;
if (policy.type() == HashPolicy.Type.HEADER) {
if (headers.containsKey(policy.headerName())) {
String value = headers.get(policy.headerName());
if (policy.regEx() != null && policy.regExSubstitution() != null) {
value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
}
newHash = hashFunc.hashAsciiString(value);
}
} else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
newHash = hashFunc.hashLong(logId.getId());
}
if (newHash != null ) {
// Rotating the old value prevents duplicate hash rules from cancelling each other out
// and preserves all of the entropy.
long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
hash = oldHash ^ newHash;
}
// If the policy is a terminal policy and a hash has been generated, ignore
// the rest of the hash policies.
if (policy.isTerminal() && hash != null) {
break;
}
}
return hash == null ? random.nextLong() : hash;
}
@Nullable
private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
Long delayNanos;
@ -748,7 +793,7 @@ final class XdsNameResolver extends NameResolver {
@VisibleForTesting
static boolean matchRoute(RouteMatch routeMatch, String fullMethodName,
Map<String, Iterable<String>> headers, ThreadSafeRandom random) {
Map<String, String> headers, ThreadSafeRandom random) {
if (!matchPath(routeMatch.pathMatcher(), fullMethodName)) {
return false;
}
@ -774,18 +819,9 @@ final class XdsNameResolver extends NameResolver {
}
private static boolean matchHeaders(
List<HeaderMatcher> headerMatchers, Map<String, Iterable<String>> headers) {
List<HeaderMatcher> headerMatchers, Map<String, String> headers) {
for (HeaderMatcher headerMatcher : headerMatchers) {
Iterable<String> headerValues = headers.get(headerMatcher.name());
// Special cases for hiding headers: "grpc-previous-rpc-attempts".
if (headerMatcher.name().equals("grpc-previous-rpc-attempts")) {
headerValues = null;
}
// Special case for exposing headers: "content-type".
if (headerMatcher.name().equals("content-type")) {
headerValues = Collections.singletonList("application/grpc");
}
if (!matchHeader(headerMatcher, headerValues)) {
if (!matchHeader(headerMatcher, headers.get(headerMatcher.name()))) {
return false;
}
}
@ -793,33 +829,31 @@ final class XdsNameResolver extends NameResolver {
}
@VisibleForTesting
static boolean matchHeader(HeaderMatcher headerMatcher,
@Nullable Iterable<String> headerValues) {
static boolean matchHeader(HeaderMatcher headerMatcher, @Nullable String value) {
if (headerMatcher.present() != null) {
return (headerValues == null) == headerMatcher.present().equals(headerMatcher.inverted());
return (value == null) == headerMatcher.present().equals(headerMatcher.inverted());
}
if (headerValues == null) {
if (value == null) {
return false;
}
String valueStr = Joiner.on(",").join(headerValues);
boolean baseMatch;
if (headerMatcher.exactValue() != null) {
baseMatch = headerMatcher.exactValue().equals(valueStr);
baseMatch = headerMatcher.exactValue().equals(value);
} else if (headerMatcher.safeRegEx() != null) {
baseMatch = headerMatcher.safeRegEx().matches(valueStr);
baseMatch = headerMatcher.safeRegEx().matches(value);
} else if (headerMatcher.range() != null) {
long numValue;
try {
numValue = Long.parseLong(valueStr);
numValue = Long.parseLong(value);
baseMatch = numValue >= headerMatcher.range().start()
&& numValue <= headerMatcher.range().end();
} catch (NumberFormatException ignored) {
baseMatch = false;
}
} else if (headerMatcher.prefix() != null) {
baseMatch = valueStr.startsWith(headerMatcher.prefix());
baseMatch = value.startsWith(headerMatcher.prefix());
} else {
baseMatch = valueStr.endsWith(headerMatcher.suffix());
baseMatch = value.endsWith(headerMatcher.suffix());
}
return baseMatch != headerMatcher.inverted();
}
@ -1033,7 +1067,7 @@ final class XdsNameResolver extends NameResolver {
}
/**
* Grouping of the list of usable routes and their corresponding fallback timeout value.
* VirtualHost-level configuration for request routing.
*/
private static class RoutingConfig {
private final long fallbackTimeoutNano;
@ -1042,7 +1076,7 @@ final class XdsNameResolver extends NameResolver {
@Nullable
private final HttpFault faultConfig;
private static RoutingConfig empty =
private static final RoutingConfig empty =
new RoutingConfig(0L, Collections.<Route>emptyList(), false, null);
private RoutingConfig(

View File

@ -97,6 +97,11 @@ public class WeightedRandomPickerTest {
assertThat(nextInt).isLessThan(bound);
return nextInt;
}
@Override
public long nextLong() {
throw new UnsupportedOperationException("Should not be called");
}
}
private final FakeRandom fakeRandom = new FakeRandom();

View File

@ -35,12 +35,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.re2j.Pattern;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.InternalConfigSelector;
import io.grpc.InternalConfigSelector.Result;
import io.grpc.Metadata;
@ -133,11 +135,13 @@ public class XdsNameResolverTest {
ArgumentCaptor<Status> errorCaptor;
private XdsNameResolver resolver;
private TestCall<?, ?> testCall;
private boolean originalEnableTimeout = XdsNameResolver.enableTimeout;
private AtomicLong originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter;
private boolean originalEnableTimeout;
private AtomicLong originalFaultCounter;
@Before
public void setUp() {
originalEnableTimeout = XdsNameResolver.enableTimeout;
originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter;
XdsNameResolver.enableTimeout = true;
XdsNameResolver.activeFaultInjectedStreamCounter = new AtomicLong();
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler,
@ -331,6 +335,100 @@ public class XdsNameResolverTest {
verifyNoMoreInteractions(mockListener);
}
@Test
public void resolved_rpcHashingByHeader() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
AUTHORITY,
Collections.singletonList(
Route.create(
RouteMatch.withPathExactOnly(
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(HashPolicy.forHeader(
false, "custom-key", Pattern.compile("value"), "val")),
null), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture());
InternalConfigSelector configSelector =
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
// First call, with header "custom-key": "custom-value".
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("custom-key", "custom-value"), CallOptions.DEFAULT);
long hash1 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
// Second call, with header "custom-key": "custom-val", "another-key": "another-value".
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("custom-key", "custom-val", "another-key", "another-value"),
CallOptions.DEFAULT);
long hash2 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
// Third call, with header "custom-key": "value".
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("custom-key", "value"), CallOptions.DEFAULT);
long hash3 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
assertThat(hash2).isEqualTo(hash1);
assertThat(hash3).isNotEqualTo(hash1);
}
@Test
public void resolved_rpcHashingByChannelId() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
AUTHORITY,
Collections.singletonList(
Route.create(
RouteMatch.withPathExactOnly(
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(
HashPolicy.forChannelId(false)), null), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture());
InternalConfigSelector configSelector =
resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY);
// First call, with header "custom-key": "value1".
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("custom-key", "value1"),
CallOptions.DEFAULT);
long hash1 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
// Second call, with no custom header.
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(),
CallOptions.DEFAULT);
long hash2 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
// A different resolver/Channel.
resolver.shutdown();
reset(mockListener);
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler,
xdsClientPoolFactory, mockRandom);
resolver.start(mockListener);
xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverLdsUpdate(
AUTHORITY,
Collections.singletonList(
Route.create(
RouteMatch.withPathExactOnly(
"/" + TestMethodDescriptors.voidMethod().getFullMethodName()),
RouteAction.forCluster(cluster1, Collections.singletonList(
HashPolicy.forChannelId(false)), null), null)));
verify(mockListener).onResult(resolutionResultCaptor.capture());
configSelector = resolutionResultCaptor.getValue().getAttributes().get(
InternalConfigSelector.KEY);
// Third call, with no custom header.
startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(),
CallOptions.DEFAULT);
long hash3 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY);
assertThat(hash2).isEqualTo(hash1);
assertThat(hash3).isNotEqualTo(hash1);
}
@SuppressWarnings("unchecked")
@Test
public void resolved_resourceUpdateAfterCallStarted() {
@ -747,29 +845,26 @@ public class XdsNameResolverTest {
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
// no header abort key provided in metadata, rpc should succeed
Metadata metadata = new Metadata();
ClientCall.Listener<Void> observer = startCall(configSelector, metadata);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// header abort http status key provided, rpc should fail
metadata = new Metadata();
metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404");
metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404",
HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404"));
// header abort grpc status key provided, rpc should fail
metadata = new Metadata();
metadata.put(HEADER_ABORT_GRPC_STATUS_KEY, String.valueOf(
Status.UNAUTHENTICATED.getCode().value()));
metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_ABORT_GRPC_STATUS_KEY.name(),
String.valueOf(Status.UNAUTHENTICATED.getCode().value()),
HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED);
// header abort, both http and grpc code keys provided, rpc should fail with http code
metadata = new Metadata();
metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404");
metadata.put(HEADER_ABORT_GRPC_STATUS_KEY, String.valueOf(
Status.UNAUTHENTICATED.getCode().value()));
metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404",
HEADER_ABORT_GRPC_STATUS_KEY.name(),
String.valueOf(Status.UNAUTHENTICATED.getCode().value()),
HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404"));
// header abort, no header rate, fix rate = 60 %
@ -784,9 +879,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404"));
// header abort, no header rate, fix rate = 0
@ -801,9 +895,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// fixed abort, fix rate = 60%
@ -820,7 +913,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated"));
// fixed abort, fix rate = 40%
@ -837,7 +931,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@ -860,14 +955,13 @@ public class XdsNameResolverTest {
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
// no header delay key provided in metadata, rpc should succeed immediately
Metadata metadata = new Metadata();
ClientCall.Listener<Void> observer = startCall(configSelector, metadata);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// header delay key provided, rpc should be delayed
metadata = new Metadata();
metadata.put(HEADER_DELAY_KEY, "1000");
metadata.put(HEADER_DELAY_PERCENTAGE_KEY, "60");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000", HEADER_DELAY_PERCENTAGE_KEY.name(), "60"),
CallOptions.DEFAULT);
verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000));
// header delay, no header rate, fix rate = 60 %
@ -882,9 +976,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(HEADER_DELAY_KEY, "1000");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000"), CallOptions.DEFAULT);
verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000));
// header delay, no header rate, fix rate = 0
@ -899,9 +992,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(HEADER_DELAY_KEY, "1000");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// fixed delay, fix rate = 60%
@ -916,7 +1008,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcDelayed(observer, 5000L);
// fixed delay, fix rate = 40%
@ -931,7 +1024,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@ -955,7 +1049,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@ -979,9 +1074,9 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
Metadata metadata = new Metadata();
metadata.put(DOWNSTREAM_NODE_KEY, "node1.example.com");
ClientCall.Listener<Void> observer = startCall(configSelector, metadata);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node1.example.com"),
CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated"));
// downstream node mismatch
@ -998,9 +1093,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(DOWNSTREAM_NODE_KEY, "node2.example.com");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node2.example.com"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// downstream node absent in headers
@ -1017,7 +1111,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@ -1026,7 +1121,6 @@ public class XdsNameResolverTest {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50%
Metadata.Key<String> faultKey = Metadata.Key.of("fault_key", Metadata.ASCII_STRING_MARSHALLER);
// headers match
HttpFault httpFilterFaultConfig = HttpFault.create(
@ -1041,9 +1135,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
Metadata metadata = new Metadata();
metadata.put(faultKey, "fault_value");
ClientCall.Listener<Void> observer = startCall(configSelector, metadata);
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, ImmutableMap.of("fault_key", "fault_value"), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAUTHENTICATED);
// headers mismatch
@ -1060,9 +1153,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
metadata = new Metadata();
metadata.put(faultKey, "value_not_match");
observer = startCall(configSelector, metadata);
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
ImmutableMap.of("fault_key", "value_not_match"), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
// headers absent
@ -1079,7 +1171,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer);
}
@ -1103,13 +1196,16 @@ public class XdsNameResolverTest {
// Send two calls, then the first call should delayed and the second call should not be delayed
// because maxActiveFaults is exceeded.
ClientCall.Listener<Void> observer1 = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer1 = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
assertThat(testCall).isNull();
ClientCall.Listener<Void> observer2 = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer2 = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcSucceeded(observer2);
verifyRpcDelayed(observer1, 5000L);
// Once all calls are finished, new call should be delayed.
ClientCall.Listener<Void> observer3 = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer3 = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcDelayed(observer3, 5000L);
}
@ -1132,7 +1228,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcDelayedThenAborted(
observer, 5000L, Status.UNAUTHENTICATED.withDescription("unauthenticated"));
}
@ -1164,7 +1261,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.INTERNAL);
// Route fault config override
@ -1180,7 +1278,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNKNOWN);
// WeightedCluster fault config override
@ -1197,7 +1296,8 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
result = resolutionResultCaptor.getValue();
configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
observer = startCall(configSelector, new Metadata());
observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector,
Collections.<String, String>emptyMap(), CallOptions.DEFAULT);
verifyRpcFailed(observer, Status.UNAVAILABLE);
}
@ -1230,28 +1330,26 @@ public class XdsNameResolverTest {
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
ClientCall.Listener<Void> observer = startCall(configSelector, new Metadata());
ClientCall.Listener<Void> observer = startNewCall(TestMethodDescriptors.voidMethod(),
configSelector, Collections.<String, String>emptyMap(), CallOptions.DEFAULT);;
verifyRpcFailed(observer, Status.UNKNOWN);
}
private ClientCall.Listener<Void> startCall(
InternalConfigSelector configSelector,
Metadata metadata) {
testCall = null;
MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod();
Result result = configSelector.selectConfig(
new PickSubchannelArgsImpl(method, metadata, CallOptions.DEFAULT));
assertThat(result.getStatus().isOk()).isTrue();
ClientInterceptor interceptor = result.getInterceptor();
ClientCall<Void, Void> clientCall = interceptor.interceptCall(
method, CallOptions.DEFAULT, channel);
private <ReqT, RespT> ClientCall.Listener<RespT> startNewCall(
MethodDescriptor<ReqT, RespT> method, InternalConfigSelector selector,
Map<String, String> headers, CallOptions callOptions) {
Metadata metadata = new Metadata();
for (String key : headers.keySet()) {
metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), headers.get(key));
}
@SuppressWarnings("unchecked")
ClientCall.Listener<Void> observer =
(ClientCall.Listener<Void>) mock(ClientCall.Listener.class);
clientCall.start(observer, metadata);
clientCall.sendMessage(null);
clientCall.halfClose();
return observer;
ClientCall.Listener<RespT> listener = mock(ClientCall.Listener.class);
Result result = selector.selectConfig(new PickSubchannelArgsImpl(
method, metadata, callOptions));
ClientCall<ReqT, RespT> call = ClientInterceptors.intercept(channel,
result.getInterceptor()).newCall(method, callOptions);
call.start(listener, metadata);
return listener;
}
private void verifyRpcSucceeded(ClientCall.Listener<Void> observer) {
@ -1287,7 +1385,7 @@ public class XdsNameResolverTest {
@Test
public void routeMatching_pathOnly() {
Map<String, Iterable<String>> headers = Collections.emptyMap();
Map<String, String> headers = Collections.emptyMap();
ThreadSafeRandom random = mock(ThreadSafeRandom.class);
RouteMatch routeMatch1 =
@ -1320,12 +1418,12 @@ public class XdsNameResolverTest {
@Test
public void routeMatching_withHeaders() {
Map<String, Iterable<String>> headers = new HashMap<>();
headers.put("authority", Collections.singletonList("foo.googleapis.com"));
headers.put("grpc-encoding", Collections.singletonList("gzip"));
headers.put("user-agent", Collections.singletonList("gRPC-Java"));
headers.put("content-length", Collections.singletonList("1000"));
headers.put("custom-key", Arrays.asList("custom-value1", "custom-value2"));
Map<String, String> headers = new HashMap<>();
headers.put("authority", "foo.googleapis.com");
headers.put("grpc-encoding", "gzip");
headers.put("user-agent", "gRPC-Java");
headers.put("content-length", "1000");
headers.put("custom-key", "custom-value1,custom-value2");
ThreadSafeRandom random = mock(ThreadSafeRandom.class);
PathMatcher pathMatcher = PathMatcher.fromPath("/FooService/barMethod", true);