From 4b52639aa1ae9b25f07c1a269233d1aaf77828f3 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 5 Mar 2021 10:51:42 -0800 Subject: [PATCH] xds: implement per-RPC hash generation (#7922) Generates a hash value for each RPC based on the HashPolicies configured for the Route that the RPC is routed to. --- .../java/io/grpc/xds/ThreadSafeRandom.java | 7 + .../java/io/grpc/xds/XdsNameResolver.java | 104 ++++--- .../io/grpc/xds/WeightedRandomPickerTest.java | 5 + .../java/io/grpc/xds/XdsNameResolverTest.java | 276 ++++++++++++------ 4 files changed, 268 insertions(+), 124 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java index 6a01f0a3d7..1e844cede3 100644 --- a/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java +++ b/xds/src/main/java/io/grpc/xds/ThreadSafeRandom.java @@ -23,6 +23,8 @@ import javax.annotation.concurrent.ThreadSafe; interface ThreadSafeRandom { int nextInt(int bound); + long nextLong(); + final class ThreadSafeRandomImpl implements ThreadSafeRandom { static final ThreadSafeRandom instance = new ThreadSafeRandomImpl(); @@ -33,5 +35,10 @@ interface ThreadSafeRandom { public int nextInt(int bound) { return ThreadLocalRandom.current().nextInt(bound); } + + @Override + public long nextLong() { + return ThreadLocalRandom.current().nextLong(); + } } } diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index a106fc6b91..037f0e3aa0 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -57,6 +57,7 @@ import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl; import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route.RouteAction; import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight; +import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.XdsClient.LdsResourceWatcher; import io.grpc.xds.XdsClient.LdsUpdate; @@ -95,6 +96,8 @@ final class XdsNameResolver extends NameResolver { static final CallOptions.Key CLUSTER_SELECTION_KEY = CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY"); + static final CallOptions.Key RPC_HASH_KEY = + CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY"); @VisibleForTesting static boolean enableTimeout = Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT")); @@ -119,6 +122,7 @@ final class XdsNameResolver extends NameResolver { @VisibleForTesting static AtomicLong activeFaultInjectedStreamCounter = new AtomicLong(); + private final InternalLogId logId; private final XdsLogger logger; private final String authority; private final ServiceConfigParser serviceConfigParser; @@ -126,6 +130,7 @@ final class XdsNameResolver extends NameResolver { private final ScheduledExecutorService scheduler; private final XdsClientPoolFactory xdsClientPoolFactory; private final ThreadSafeRandom random; + private final XxHash64 hashFunc = XxHash64.INSTANCE; private final ConcurrentMap clusterRefs = new ConcurrentHashMap<>(); private final ConfigSelector configSelector = new ConfigSelector(); @@ -152,7 +157,8 @@ final class XdsNameResolver extends NameResolver { this.scheduler = checkNotNull(scheduler, "scheduler"); this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory"); this.random = checkNotNull(random, "random"); - logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name)); + logId = InternalLogId.allocate("xds-resolver", name); + logger = XdsLogger.withLogId(logId); logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name); } @@ -347,26 +353,33 @@ final class XdsNameResolver extends NameResolver { private final class ConfigSelector extends InternalConfigSelector { @Override public Result selectConfig(PickSubchannelArgs args) { - // Index ASCII headers by keys. - Map> asciiHeaders = new HashMap<>(); + // Index ASCII headers by key, multi-value headers are concatenated for matching purposes. + Map asciiHeaders = new HashMap<>(); Metadata headers = args.getHeaders(); for (String headerName : headers.keys()) { if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { continue; } Metadata.Key key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); - asciiHeaders.put(headerName, headers.getAll(key)); + Iterable values = headers.getAll(key); + if (values != null) { + asciiHeaders.put(headerName, Joiner.on(",").join(values)); + } } + // Special hack for exposing headers: "content-type". + asciiHeaders.put("content-type", "application/grpc"); String cluster = null; Route selectedRoute = null; HttpFault selectedFaultConfig; + RoutingConfig routingCfg; do { - selectedFaultConfig = routingConfig.faultConfig; - for (Route route : routingConfig.routes) { + routingCfg = routingConfig; + selectedFaultConfig = routingCfg.faultConfig; + for (Route route : routingCfg.routes) { if (matchRoute(route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(), asciiHeaders, random)) { selectedRoute = route; - if (routingConfig.applyFaultInjection && route.httpFault() != null) { + if (routingCfg.applyFaultInjection && route.httpFault() != null) { selectedFaultConfig = route.httpFault(); } break; @@ -390,7 +403,7 @@ final class XdsNameResolver extends NameResolver { accumulator += weightedCluster.weight(); if (select < accumulator) { cluster = weightedCluster.name(); - if (routingConfig.applyFaultInjection && weightedCluster.httpFault() != null) { + if (routingCfg.applyFaultInjection && weightedCluster.httpFault() != null) { selectedFaultConfig = weightedCluster.httpFault(); } break; @@ -403,7 +416,7 @@ final class XdsNameResolver extends NameResolver { if (enableTimeout) { Long timeoutNano = selectedRoute.routeAction().timeoutNano(); if (timeoutNano == null) { - timeoutNano = routingConfig.fallbackTimeoutNano; + timeoutNano = routingCfg.fallbackTimeoutNano; } if (timeoutNano > 0) { rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano); @@ -417,7 +430,6 @@ final class XdsNameResolver extends NameResolver { parsedServiceConfig.getError().augmentDescription( "Failed to parse service config (method config)")); } - final String finalCluster = cluster; if (selectedFaultConfig != null && selectedFaultConfig.maxActiveFaults() != null && activeFaultInjectedStreamCounter.get() >= selectedFaultConfig.maxActiveFaults()) { selectedFaultConfig = null; @@ -447,15 +459,18 @@ final class XdsNameResolver extends NameResolver { abortStatus = determineFaultAbortStatus(selectedFaultConfig.faultAbort(), headers); } } + final String finalCluster = cluster; final Long finalDelayNanos = delayNanos; final Status finalAbortStatus = abortStatus; + final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), asciiHeaders); class ClusterSelectionInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( final MethodDescriptor method, CallOptions callOptions, final Channel next) { final CallOptions callOptionsForCluster = - callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster); + callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster) + .withOption(RPC_HASH_KEY, hash); Supplier> configApplyingCallSupplier = new Supplier>() { @Override @@ -553,6 +568,36 @@ final class XdsNameResolver extends NameResolver { } } + private long generateHash(List hashPolicies, Map headers) { + Long hash = null; + for (HashPolicy policy : hashPolicies) { + Long newHash = null; + if (policy.type() == HashPolicy.Type.HEADER) { + if (headers.containsKey(policy.headerName())) { + String value = headers.get(policy.headerName()); + if (policy.regEx() != null && policy.regExSubstitution() != null) { + value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution()); + } + newHash = hashFunc.hashAsciiString(value); + } + } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) { + newHash = hashFunc.hashLong(logId.getId()); + } + if (newHash != null ) { + // Rotating the old value prevents duplicate hash rules from cancelling each other out + // and preserves all of the entropy. + long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0; + hash = oldHash ^ newHash; + } + // If the policy is a terminal policy and a hash has been generated, ignore + // the rest of the hash policies. + if (policy.isTerminal() && hash != null) { + break; + } + } + return hash == null ? random.nextLong() : hash; + } + @Nullable private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) { Long delayNanos; @@ -748,7 +793,7 @@ final class XdsNameResolver extends NameResolver { @VisibleForTesting static boolean matchRoute(RouteMatch routeMatch, String fullMethodName, - Map> headers, ThreadSafeRandom random) { + Map headers, ThreadSafeRandom random) { if (!matchPath(routeMatch.pathMatcher(), fullMethodName)) { return false; } @@ -774,18 +819,9 @@ final class XdsNameResolver extends NameResolver { } private static boolean matchHeaders( - List headerMatchers, Map> headers) { + List headerMatchers, Map headers) { for (HeaderMatcher headerMatcher : headerMatchers) { - Iterable headerValues = headers.get(headerMatcher.name()); - // Special cases for hiding headers: "grpc-previous-rpc-attempts". - if (headerMatcher.name().equals("grpc-previous-rpc-attempts")) { - headerValues = null; - } - // Special case for exposing headers: "content-type". - if (headerMatcher.name().equals("content-type")) { - headerValues = Collections.singletonList("application/grpc"); - } - if (!matchHeader(headerMatcher, headerValues)) { + if (!matchHeader(headerMatcher, headers.get(headerMatcher.name()))) { return false; } } @@ -793,33 +829,31 @@ final class XdsNameResolver extends NameResolver { } @VisibleForTesting - static boolean matchHeader(HeaderMatcher headerMatcher, - @Nullable Iterable headerValues) { + static boolean matchHeader(HeaderMatcher headerMatcher, @Nullable String value) { if (headerMatcher.present() != null) { - return (headerValues == null) == headerMatcher.present().equals(headerMatcher.inverted()); + return (value == null) == headerMatcher.present().equals(headerMatcher.inverted()); } - if (headerValues == null) { + if (value == null) { return false; } - String valueStr = Joiner.on(",").join(headerValues); boolean baseMatch; if (headerMatcher.exactValue() != null) { - baseMatch = headerMatcher.exactValue().equals(valueStr); + baseMatch = headerMatcher.exactValue().equals(value); } else if (headerMatcher.safeRegEx() != null) { - baseMatch = headerMatcher.safeRegEx().matches(valueStr); + baseMatch = headerMatcher.safeRegEx().matches(value); } else if (headerMatcher.range() != null) { long numValue; try { - numValue = Long.parseLong(valueStr); + numValue = Long.parseLong(value); baseMatch = numValue >= headerMatcher.range().start() && numValue <= headerMatcher.range().end(); } catch (NumberFormatException ignored) { baseMatch = false; } } else if (headerMatcher.prefix() != null) { - baseMatch = valueStr.startsWith(headerMatcher.prefix()); + baseMatch = value.startsWith(headerMatcher.prefix()); } else { - baseMatch = valueStr.endsWith(headerMatcher.suffix()); + baseMatch = value.endsWith(headerMatcher.suffix()); } return baseMatch != headerMatcher.inverted(); } @@ -1033,7 +1067,7 @@ final class XdsNameResolver extends NameResolver { } /** - * Grouping of the list of usable routes and their corresponding fallback timeout value. + * VirtualHost-level configuration for request routing. */ private static class RoutingConfig { private final long fallbackTimeoutNano; @@ -1042,7 +1076,7 @@ final class XdsNameResolver extends NameResolver { @Nullable private final HttpFault faultConfig; - private static RoutingConfig empty = + private static final RoutingConfig empty = new RoutingConfig(0L, Collections.emptyList(), false, null); private RoutingConfig( diff --git a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java index 8418e80bf2..ecdd96a734 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedRandomPickerTest.java @@ -97,6 +97,11 @@ public class WeightedRandomPickerTest { assertThat(nextInt).isLessThan(bound); return nextInt; } + + @Override + public long nextLong() { + throw new UnsupportedOperationException("Should not be called"); + } } private final FakeRandom fakeRandom = new FakeRandom(); diff --git a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java index 2ece3ba9c9..422f0f1c62 100644 --- a/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java @@ -35,12 +35,14 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.re2j.Pattern; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; import io.grpc.InternalConfigSelector; import io.grpc.InternalConfigSelector.Result; import io.grpc.Metadata; @@ -133,11 +135,13 @@ public class XdsNameResolverTest { ArgumentCaptor errorCaptor; private XdsNameResolver resolver; private TestCall testCall; - private boolean originalEnableTimeout = XdsNameResolver.enableTimeout; - private AtomicLong originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter; + private boolean originalEnableTimeout; + private AtomicLong originalFaultCounter; @Before public void setUp() { + originalEnableTimeout = XdsNameResolver.enableTimeout; + originalFaultCounter = XdsNameResolver.activeFaultInjectedStreamCounter; XdsNameResolver.enableTimeout = true; XdsNameResolver.activeFaultInjectedStreamCounter = new AtomicLong(); resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler, @@ -331,6 +335,100 @@ public class XdsNameResolverTest { verifyNoMoreInteractions(mockListener); } + @Test + public void resolved_rpcHashingByHeader() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverLdsUpdate( + AUTHORITY, + Collections.singletonList( + Route.create( + RouteMatch.withPathExactOnly( + "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), + RouteAction.forCluster(cluster1, Collections.singletonList(HashPolicy.forHeader( + false, "custom-key", Pattern.compile("value"), "val")), + null), null))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + InternalConfigSelector configSelector = + resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY); + + // First call, with header "custom-key": "custom-value". + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of("custom-key", "custom-value"), CallOptions.DEFAULT); + long hash1 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + // Second call, with header "custom-key": "custom-val", "another-key": "another-value". + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of("custom-key", "custom-val", "another-key", "another-value"), + CallOptions.DEFAULT); + long hash2 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + // Third call, with header "custom-key": "value". + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of("custom-key", "value"), CallOptions.DEFAULT); + long hash3 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + assertThat(hash2).isEqualTo(hash1); + assertThat(hash3).isNotEqualTo(hash1); + } + + @Test + public void resolved_rpcHashingByChannelId() { + resolver.start(mockListener); + FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverLdsUpdate( + AUTHORITY, + Collections.singletonList( + Route.create( + RouteMatch.withPathExactOnly( + "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), + RouteAction.forCluster(cluster1, Collections.singletonList( + HashPolicy.forChannelId(false)), null), null))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + InternalConfigSelector configSelector = + resolutionResultCaptor.getValue().getAttributes().get(InternalConfigSelector.KEY); + + // First call, with header "custom-key": "value1". + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of("custom-key", "value1"), + CallOptions.DEFAULT); + long hash1 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + // Second call, with no custom header. + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), + CallOptions.DEFAULT); + long hash2 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + // A different resolver/Channel. + resolver.shutdown(); + reset(mockListener); + resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, scheduler, + xdsClientPoolFactory, mockRandom); + resolver.start(mockListener); + xdsClient = (FakeXdsClient) resolver.getXdsClient(); + xdsClient.deliverLdsUpdate( + AUTHORITY, + Collections.singletonList( + Route.create( + RouteMatch.withPathExactOnly( + "/" + TestMethodDescriptors.voidMethod().getFullMethodName()), + RouteAction.forCluster(cluster1, Collections.singletonList( + HashPolicy.forChannelId(false)), null), null))); + verify(mockListener).onResult(resolutionResultCaptor.capture()); + configSelector = resolutionResultCaptor.getValue().getAttributes().get( + InternalConfigSelector.KEY); + + // Third call, with no custom header. + startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), + CallOptions.DEFAULT); + long hash3 = testCall.callOptions.getOption(XdsNameResolver.RPC_HASH_KEY); + + assertThat(hash2).isEqualTo(hash1); + assertThat(hash3).isNotEqualTo(hash1); + } + @SuppressWarnings("unchecked") @Test public void resolved_resourceUpdateAfterCallStarted() { @@ -747,29 +845,26 @@ public class XdsNameResolverTest { ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); // no header abort key provided in metadata, rpc should succeed - Metadata metadata = new Metadata(); - ClientCall.Listener observer = startCall(configSelector, metadata); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // header abort http status key provided, rpc should fail - metadata = new Metadata(); - metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404"); - metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404", + HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404")); // header abort grpc status key provided, rpc should fail - metadata = new Metadata(); - metadata.put(HEADER_ABORT_GRPC_STATUS_KEY, String.valueOf( - Status.UNAUTHENTICATED.getCode().value())); - metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_ABORT_GRPC_STATUS_KEY.name(), + String.valueOf(Status.UNAUTHENTICATED.getCode().value()), + HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNAUTHENTICATED); // header abort, both http and grpc code keys provided, rpc should fail with http code - metadata = new Metadata(); - metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404"); - metadata.put(HEADER_ABORT_GRPC_STATUS_KEY, String.valueOf( - Status.UNAUTHENTICATED.getCode().value())); - metadata.put(HEADER_ABORT_PERCENTAGE_KEY, "60"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404", + HEADER_ABORT_GRPC_STATUS_KEY.name(), + String.valueOf(Status.UNAUTHENTICATED.getCode().value()), + HEADER_ABORT_PERCENTAGE_KEY.name(), "60"), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404")); // header abort, no header rate, fix rate = 60 % @@ -784,9 +879,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404"), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNIMPLEMENTED.withDescription("HTTP status code 404")); // header abort, no header rate, fix rate = 0 @@ -801,9 +895,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(HEADER_ABORT_HTTP_STATUS_KEY, "404"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_ABORT_HTTP_STATUS_KEY.name(), "404"), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // fixed abort, fix rate = 60% @@ -820,7 +913,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated")); // fixed abort, fix rate = 40% @@ -837,7 +931,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); } @@ -860,14 +955,13 @@ public class XdsNameResolverTest { ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); // no header delay key provided in metadata, rpc should succeed immediately - Metadata metadata = new Metadata(); - ClientCall.Listener observer = startCall(configSelector, metadata); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // header delay key provided, rpc should be delayed - metadata = new Metadata(); - metadata.put(HEADER_DELAY_KEY, "1000"); - metadata.put(HEADER_DELAY_PERCENTAGE_KEY, "60"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000", HEADER_DELAY_PERCENTAGE_KEY.name(), "60"), + CallOptions.DEFAULT); verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000)); // header delay, no header rate, fix rate = 60 % @@ -882,9 +976,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(HEADER_DELAY_KEY, "1000"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000"), CallOptions.DEFAULT); verifyRpcDelayed(observer, TimeUnit.MILLISECONDS.toNanos(1000)); // header delay, no header rate, fix rate = 0 @@ -899,9 +992,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(HEADER_DELAY_KEY, "1000"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(HEADER_DELAY_KEY.name(), "1000"), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // fixed delay, fix rate = 60% @@ -916,7 +1008,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcDelayed(observer, 5000L); // fixed delay, fix rate = 40% @@ -931,7 +1024,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); } @@ -955,7 +1049,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startCall(configSelector, new Metadata()); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); } @@ -979,9 +1074,9 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - Metadata metadata = new Metadata(); - metadata.put(DOWNSTREAM_NODE_KEY, "node1.example.com"); - ClientCall.Listener observer = startCall(configSelector, metadata); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node1.example.com"), + CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNAUTHENTICATED.withDescription("unauthenticated")); // downstream node mismatch @@ -998,9 +1093,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(DOWNSTREAM_NODE_KEY, "node2.example.com"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of(DOWNSTREAM_NODE_KEY.name(), "node2.example.com"), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // downstream node absent in headers @@ -1017,7 +1111,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); } @@ -1026,7 +1121,6 @@ public class XdsNameResolverTest { resolver.start(mockListener); FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient(); when(mockRandom.nextInt(1000_000)).thenReturn(500_000); // 50% - Metadata.Key faultKey = Metadata.Key.of("fault_key", Metadata.ASCII_STRING_MARSHALLER); // headers match HttpFault httpFilterFaultConfig = HttpFault.create( @@ -1041,9 +1135,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - Metadata metadata = new Metadata(); - metadata.put(faultKey, "fault_value"); - ClientCall.Listener observer = startCall(configSelector, metadata); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, ImmutableMap.of("fault_key", "fault_value"), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNAUTHENTICATED); // headers mismatch @@ -1060,9 +1153,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - metadata = new Metadata(); - metadata.put(faultKey, "value_not_match"); - observer = startCall(configSelector, metadata); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + ImmutableMap.of("fault_key", "value_not_match"), CallOptions.DEFAULT); verifyRpcSucceeded(observer); // headers absent @@ -1079,7 +1171,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer); } @@ -1103,13 +1196,16 @@ public class XdsNameResolverTest { // Send two calls, then the first call should delayed and the second call should not be delayed // because maxActiveFaults is exceeded. - ClientCall.Listener observer1 = startCall(configSelector, new Metadata()); + ClientCall.Listener observer1 = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); assertThat(testCall).isNull(); - ClientCall.Listener observer2 = startCall(configSelector, new Metadata()); + ClientCall.Listener observer2 = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcSucceeded(observer2); verifyRpcDelayed(observer1, 5000L); // Once all calls are finished, new call should be delayed. - ClientCall.Listener observer3 = startCall(configSelector, new Metadata()); + ClientCall.Listener observer3 = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcDelayed(observer3, 5000L); } @@ -1132,7 +1228,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startCall(configSelector, new Metadata()); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcDelayedThenAborted( observer, 5000L, Status.UNAUTHENTICATED.withDescription("unauthenticated")); } @@ -1164,7 +1261,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startCall(configSelector, new Metadata()); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.INTERNAL); // Route fault config override @@ -1180,7 +1278,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNKNOWN); // WeightedCluster fault config override @@ -1197,7 +1296,8 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); result = resolutionResultCaptor.getValue(); configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - observer = startCall(configSelector, new Metadata()); + observer = startNewCall(TestMethodDescriptors.voidMethod(), configSelector, + Collections.emptyMap(), CallOptions.DEFAULT); verifyRpcFailed(observer, Status.UNAVAILABLE); } @@ -1230,28 +1330,26 @@ public class XdsNameResolverTest { verify(mockListener).onResult(resolutionResultCaptor.capture()); ResolutionResult result = resolutionResultCaptor.getValue(); InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY); - ClientCall.Listener observer = startCall(configSelector, new Metadata()); + ClientCall.Listener observer = startNewCall(TestMethodDescriptors.voidMethod(), + configSelector, Collections.emptyMap(), CallOptions.DEFAULT);; verifyRpcFailed(observer, Status.UNKNOWN); } - private ClientCall.Listener startCall( - InternalConfigSelector configSelector, - Metadata metadata) { - testCall = null; - MethodDescriptor method = TestMethodDescriptors.voidMethod(); - Result result = configSelector.selectConfig( - new PickSubchannelArgsImpl(method, metadata, CallOptions.DEFAULT)); - assertThat(result.getStatus().isOk()).isTrue(); - ClientInterceptor interceptor = result.getInterceptor(); - ClientCall clientCall = interceptor.interceptCall( - method, CallOptions.DEFAULT, channel); + private ClientCall.Listener startNewCall( + MethodDescriptor method, InternalConfigSelector selector, + Map headers, CallOptions callOptions) { + Metadata metadata = new Metadata(); + for (String key : headers.keySet()) { + metadata.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), headers.get(key)); + } @SuppressWarnings("unchecked") - ClientCall.Listener observer = - (ClientCall.Listener) mock(ClientCall.Listener.class); - clientCall.start(observer, metadata); - clientCall.sendMessage(null); - clientCall.halfClose(); - return observer; + ClientCall.Listener listener = mock(ClientCall.Listener.class); + Result result = selector.selectConfig(new PickSubchannelArgsImpl( + method, metadata, callOptions)); + ClientCall call = ClientInterceptors.intercept(channel, + result.getInterceptor()).newCall(method, callOptions); + call.start(listener, metadata); + return listener; } private void verifyRpcSucceeded(ClientCall.Listener observer) { @@ -1287,7 +1385,7 @@ public class XdsNameResolverTest { @Test public void routeMatching_pathOnly() { - Map> headers = Collections.emptyMap(); + Map headers = Collections.emptyMap(); ThreadSafeRandom random = mock(ThreadSafeRandom.class); RouteMatch routeMatch1 = @@ -1320,12 +1418,12 @@ public class XdsNameResolverTest { @Test public void routeMatching_withHeaders() { - Map> headers = new HashMap<>(); - headers.put("authority", Collections.singletonList("foo.googleapis.com")); - headers.put("grpc-encoding", Collections.singletonList("gzip")); - headers.put("user-agent", Collections.singletonList("gRPC-Java")); - headers.put("content-length", Collections.singletonList("1000")); - headers.put("custom-key", Arrays.asList("custom-value1", "custom-value2")); + Map headers = new HashMap<>(); + headers.put("authority", "foo.googleapis.com"); + headers.put("grpc-encoding", "gzip"); + headers.put("user-agent", "gRPC-Java"); + headers.put("content-length", "1000"); + headers.put("custom-key", "custom-value1,custom-value2"); ThreadSafeRandom random = mock(ThreadSafeRandom.class); PathMatcher pathMatcher = PathMatcher.fromPath("/FooService/barMethod", true);