xds: implement xDS timeout (#7481)

The xDS timeout retrieves per-route timeout value from RouteAction.max_stream_duration.grpc_timeout_header_max or RouteAction.max_stream_duration.max_stream_duration if the former is not set. If neither is set, it eventually falls back to the max_stream_duration setting in HttpConnectionManager.common_http_options retrieved from the Route's upstream Listener resource. The final timeout value applied to the call is the minimum of the xDS timeout value and the per-call timeout set by application.
This commit is contained in:
Chengyuan Zhang 2020-10-14 17:53:30 -07:00 committed by GitHub
parent ef90da036d
commit d25f5acf1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 162 additions and 103 deletions

View File

@ -42,7 +42,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
@ -1118,7 +1117,8 @@ final class EnvoyProtoData {
* See corresponding Envoy proto message {@link io.envoyproxy.envoy.config.route.v3.RouteAction}.
*/
static final class RouteAction {
private final long timeoutNano;
@Nullable
private final Long timeoutNano;
// Exactly one of the following fields is non-null.
@Nullable
private final String cluster;
@ -1126,16 +1126,14 @@ final class EnvoyProtoData {
private final List<ClusterWeight> weightedClusters;
@VisibleForTesting
RouteAction(
long timeoutNano,
@Nullable String cluster,
RouteAction(@Nullable Long timeoutNano, @Nullable String cluster,
@Nullable List<ClusterWeight> weightedClusters) {
this.timeoutNano = timeoutNano;
this.cluster = cluster;
this.weightedClusters = weightedClusters;
}
@Nullable
Long getTimeoutNano() {
return timeoutNano;
}
@ -1172,7 +1170,9 @@ final class EnvoyProtoData {
@Override
public String toString() {
ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
toStringHelper.add("timeout", timeoutNano + "ns");
if (timeoutNano != null) {
toStringHelper.add("timeout", timeoutNano + "ns");
}
if (cluster != null) {
toStringHelper.add("cluster", cluster);
}
@ -1212,14 +1212,15 @@ final class EnvoyProtoData {
return StructOrError.fromError(
"Unknown cluster specifier: " + proto.getClusterSpecifierCase());
}
long timeoutNano = TimeUnit.SECONDS.toNanos(15L); // default 15s
if (proto.hasMaxGrpcTimeout()) {
timeoutNano = Durations.toNanos(proto.getMaxGrpcTimeout());
} else if (proto.hasTimeout()) {
timeoutNano = Durations.toNanos(proto.getTimeout());
}
if (timeoutNano == 0) {
timeoutNano = Long.MAX_VALUE;
Long timeoutNano = null;
if (proto.hasMaxStreamDuration()) {
io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration maxStreamDuration
= proto.getMaxStreamDuration();
if (maxStreamDuration.hasGrpcTimeoutHeaderMax()) {
timeoutNano = Durations.toNanos(maxStreamDuration.getGrpcTimeoutHeaderMax());
} else if (maxStreamDuration.hasMaxStreamDuration()) {
timeoutNano = Durations.toNanos(maxStreamDuration.getMaxStreamDuration());
}
}
return StructOrError.fromStruct(new RouteAction(timeoutNano, cluster, weightedClusters));
}

View File

@ -86,7 +86,7 @@ final class XdsNameResolver extends NameResolver {
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
private final ConfigSelector configSelector = new ConfigSelector();
private volatile List<Route> currentRoutes = Collections.emptyList();
private volatile RoutingConfig routingConfig = RoutingConfig.empty;
private Listener2 listener;
private ObjectPool<XdsClient> xdsClientPool;
private XdsClient xdsClient;
@ -326,7 +326,7 @@ final class XdsNameResolver extends NameResolver {
String cluster = null;
Route selectedRoute = null;
do {
for (Route route : currentRoutes) {
for (Route route : routingConfig.routes) {
if (route.getRouteMatch().matches(
"/" + args.getMethodDescriptor().getFullMethodName(), asciiHeaders)) {
selectedRoute = route;
@ -359,8 +359,13 @@ final class XdsNameResolver extends NameResolver {
// TODO(chengyuanzhang): avoid service config generation and parsing for each call.
Map<String, ?> rawServiceConfig = Collections.emptyMap();
if (enableTimeout) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(
selectedRoute.getRouteAction().getTimeoutNano());
Long timeoutNano = selectedRoute.getRouteAction().getTimeoutNano();
if (timeoutNano == null) {
timeoutNano = routingConfig.fallbackTimeoutNano;
}
if (timeoutNano > 0) {
rawServiceConfig = generateServiceConfigWithMethodTimeoutConfig(timeoutNano);
}
}
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
Object config = parsedServiceConfig.getConfig();
@ -430,10 +435,12 @@ final class XdsNameResolver extends NameResolver {
private String rdsResource;
@Nullable
private RdsResourceWatcher rdsWatcher;
private long httpMaxStreamDurationNano;
@Override
public void onChanged(LdsUpdate update) {
logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
httpMaxStreamDurationNano = update.getHttpMaxStreamDurationNano();
List<VirtualHost> virtualHosts = update.getVirtualHosts();
String rdsName = update.getRdsName();
if (rdsName != null && rdsName.equals(rdsResource)) {
@ -479,6 +486,8 @@ final class XdsNameResolver extends NameResolver {
private void updateRoutes(List<VirtualHost> virtualHosts) {
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, authority);
if (virtualHost == null) {
logger.log(XdsLogLevel.WARNING,
"Failed to find virtual host matching hostname {0}", authority);
listener.onResult(emptyResult);
return;
}
@ -515,7 +524,7 @@ final class XdsNameResolver extends NameResolver {
}
// Make newly added clusters selectable by config selector and deleted clusters no longer
// selectable.
currentRoutes = routes;
routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routes);
shouldUpdateResult = false;
for (String cluster : deletedClusters) {
int count = clusterRefs.get(cluster).decrementAndGet();
@ -560,4 +569,19 @@ final class XdsNameResolver extends NameResolver {
}
}
}
/**
* Grouping of the list of usable routes and their corresponding fallback timeout value.
*/
private static class RoutingConfig {
private long fallbackTimeoutNano;
private List<Route> routes;
private static RoutingConfig empty = new RoutingConfig(0L, Collections.<Route>emptyList());
private RoutingConfig(long fallbackTimeoutNano, List<Route> routes) {
this.fallbackTimeoutNano = fallbackTimeoutNano;
this.routes = routes;
}
}
}

View File

@ -29,6 +29,7 @@ import com.google.re2j.Pattern;
import io.envoyproxy.envoy.config.core.v3.RuntimeFractionalPercent;
import io.envoyproxy.envoy.config.route.v3.QueryParameterMatcher;
import io.envoyproxy.envoy.config.route.v3.RedirectAction;
import io.envoyproxy.envoy.config.route.v3.RouteAction.MaxStreamDuration;
import io.envoyproxy.envoy.config.route.v3.WeightedCluster;
import io.envoyproxy.envoy.type.matcher.v3.RegexMatcher;
import io.envoyproxy.envoy.type.v3.FractionalPercent;
@ -212,7 +213,7 @@ public class EnvoyProtoDataTest {
new Route(
new RouteMatch(PathMatcher.fromPath("/service/method", false),
Collections.<HeaderMatcher>emptyList(), null),
new RouteAction(TimeUnit.SECONDS.toNanos(15L), "cluster-foo", null)));
new RouteAction(null, "cluster-foo", null)));
io.envoyproxy.envoy.config.route.v3.Route unsupportedProto =
io.envoyproxy.envoy.config.route.v3.Route.newBuilder()
@ -370,74 +371,84 @@ public class EnvoyProtoDataTest {
}
@Test
public void convertRouteAction() {
// cluster_specifier = cluster, default timeout
io.envoyproxy.envoy.config.route.v3.RouteAction proto1 =
public void convertRouteAction_cluster() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct1 = RouteAction.fromEnvoyProtoRouteAction(proto1);
assertThat(struct1.getErrorDetail()).isNull();
assertThat(struct1.getStruct().getTimeoutNano())
.isEqualTo(TimeUnit.SECONDS.toNanos(15L)); // default value
assertThat(struct1.getStruct().getCluster()).isEqualTo("cluster-foo");
assertThat(struct1.getStruct().getWeightedCluster()).isNull();
StructOrError<RouteAction> struct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct().getCluster()).isEqualTo("cluster-foo");
assertThat(struct.getStruct().getWeightedCluster()).isNull();
}
// cluster_specifier = cluster, infinity timeout
io.envoyproxy.envoy.config.route.v3.RouteAction proto2 =
@Test
public void convertRouteAction_weightedCluster() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setMaxGrpcTimeout(Durations.fromNanos(0))
.setTimeout(Durations.fromMicros(20L))
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct2 = RouteAction.fromEnvoyProtoRouteAction(proto2);
assertThat(struct2.getStruct().getTimeoutNano())
.isEqualTo(Long.MAX_VALUE); // infinite
// cluster_specifier = cluster, infinity timeout
io.envoyproxy.envoy.config.route.v3.RouteAction proto3 =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setTimeout(Durations.fromNanos(0))
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct3 = RouteAction.fromEnvoyProtoRouteAction(proto3);
assertThat(struct3.getStruct().getTimeoutNano()).isEqualTo(Long.MAX_VALUE); // infinite
// cluster_specifier = cluster_header
io.envoyproxy.envoy.config.route.v3.RouteAction proto4 =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setClusterHeader("cluster-bar")
.build();
StructOrError<RouteAction> struct4 = RouteAction.fromEnvoyProtoRouteAction(proto4);
assertThat(struct4).isNull();
// cluster_specifier = weighted_cluster
io.envoyproxy.envoy.config.route.v3.RouteAction proto5 =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setMaxGrpcTimeout(Durations.fromSeconds(6L))
.setTimeout(Durations.fromMicros(20L))
.setWeightedClusters(
WeightedCluster.newBuilder()
.addClusters(
WeightedCluster.ClusterWeight
.newBuilder()
.setName("cluster-baz")
.setWeight(UInt32Value.newBuilder().setValue(100))))
.setName("cluster-foo")
.setWeight(UInt32Value.newBuilder().setValue(30)))
.addClusters(WeightedCluster.ClusterWeight
.newBuilder()
.setName("cluster-bar")
.setWeight(UInt32Value.newBuilder().setValue(70))))
.build();
StructOrError<RouteAction> struct5 = RouteAction.fromEnvoyProtoRouteAction(proto5);
assertThat(struct5.getErrorDetail()).isNull();
assertThat(struct5.getStruct().getTimeoutNano())
.isEqualTo(TimeUnit.SECONDS.toNanos(6L));
assertThat(struct5.getStruct().getCluster()).isNull();
assertThat(struct5.getStruct().getWeightedCluster())
.containsExactly(new ClusterWeight("cluster-baz", 100));
StructOrError<RouteAction> struct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(struct.getErrorDetail()).isNull();
assertThat(struct.getStruct().getCluster()).isNull();
assertThat(struct.getStruct().getWeightedCluster()).containsExactly(
new ClusterWeight("cluster-foo", 30), new ClusterWeight("cluster-bar", 70));
}
// cluster_specifier unset
io.envoyproxy.envoy.config.route.v3.RouteAction unsetProto =
@Test
public void convertRouteAction_unspecifiedClusterError() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.getDefaultInstance();
StructOrError<RouteAction> unsetStruct = RouteAction.fromEnvoyProtoRouteAction(unsetProto);
assertThat(unsetStruct.getErrorDetail()).isNotNull();
StructOrError<RouteAction> unsetStruct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(unsetStruct.getStruct()).isNull();
assertThat(unsetStruct.getErrorDetail()).isNotNull();
}
@Test
public void convertRouteAction_timeoutByGrpcTimeoutHeaderMax() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.setMaxStreamDuration(
MaxStreamDuration.newBuilder()
.setGrpcTimeoutHeaderMax(Durations.fromSeconds(5L))
.setMaxStreamDuration(Durations.fromMillis(20L)))
.build();
StructOrError<RouteAction> struct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(struct.getStruct().getTimeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L));
}
@Test
public void convertRouteAction_timeoutByMaxStreamDuration() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.setMaxStreamDuration(
MaxStreamDuration.newBuilder()
.setMaxStreamDuration(Durations.fromSeconds(5L)))
.build();
StructOrError<RouteAction> struct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(struct.getStruct().getTimeoutNano()).isEqualTo(TimeUnit.SECONDS.toNanos(5L));
}
@Test
public void convertRouteAction_timeoutUnset() {
io.envoyproxy.envoy.config.route.v3.RouteAction proto =
io.envoyproxy.envoy.config.route.v3.RouteAction.newBuilder()
.setCluster("cluster-foo")
.build();
StructOrError<RouteAction> struct = RouteAction.fromEnvoyProtoRouteAction(proto);
assertThat(struct.getStruct().getTimeoutNano()).isNull();
}
@Test

View File

@ -264,7 +264,7 @@ public class XdsNameResolverTest {
public void resolving_matchingVirtualHostNotFoundInLdsResource() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverVirtualHostsViaLds(AUTHORITY, buildUnmatchedVirtualHosts());
xdsClient.deliverLdsUpdate(AUTHORITY, 0L, buildUnmatchedVirtualHosts());
assertEmptyResolutionResult();
}
@ -275,7 +275,7 @@ public class XdsNameResolverTest {
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRdsName(AUTHORITY, rdsResource);
assertThat(xdsClient.rdsWatcher).isNotNull();
xdsClient.deliverVirtualHostsViaRds(rdsResource, buildUnmatchedVirtualHosts());
xdsClient.deliverRdsUpdate(rdsResource, buildUnmatchedVirtualHosts());
assertEmptyResolutionResult();
}
@ -291,6 +291,43 @@ public class XdsNameResolverTest {
Collections.singletonList(route2)));
}
@SuppressWarnings("unchecked")
@Test
public void resolved_noTimeout() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = new Route(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
new RouteAction(null, cluster1, null)); // per-route timeout unset
VirtualHost virtualHost = new VirtualHost("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route));
xdsClient.deliverLdsUpdate(AUTHORITY, 0L, Collections.singletonList(virtualHost));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
Result selectResult = configSelector.selectConfig(
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
assertThat(selectResult.getStatus().isOk()).isTrue();
assertThat(selectResult.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY))
.isEqualTo(cluster1);
assertThat((Map<String, ?>) selectResult.getConfig()).isEmpty();
}
@Test
public void resolved_fallbackToHttpMaxStreamDurationAsTimeout() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
Route route = new Route(RouteMatch.withPathExactOnly(call1.getFullMethodNameForPath()),
new RouteAction(null, cluster1, null)); // per-route timeout unset
VirtualHost virtualHost = new VirtualHost("does not matter",
Collections.singletonList(AUTHORITY), Collections.singletonList(route));
xdsClient.deliverLdsUpdate(AUTHORITY, TimeUnit.SECONDS.toNanos(5L),
Collections.singletonList(virtualHost));
verify(mockListener).onResult(resolutionResultCaptor.capture());
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
assertCallSelectResult(call1, configSelector, cluster1, 5.0);
}
@Test
public void resolved_simpleCallSucceeds() {
InternalConfigSelector configSelector = resolveToClusters();
@ -320,7 +357,7 @@ public class XdsNameResolverTest {
reset(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -355,7 +392,7 @@ public class XdsNameResolverTest {
InternalConfigSelector configSelector = resolveToClusters();
reset(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -386,7 +423,7 @@ public class XdsNameResolverTest {
reset(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -402,7 +439,7 @@ public class XdsNameResolverTest {
Arrays.asList(cluster1, cluster2, "another-cluster"),
(Map<String, ?>) result.getServiceConfig().getConfig());
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -420,13 +457,13 @@ public class XdsNameResolverTest {
InternalConfigSelector configSelector = resolveToClusters();
Result result = assertCallSelectResult(call1, configSelector, cluster1, 15.0);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Collections.singletonList(
new Route(
RouteMatch.withPathExactOnly(call2.getFullMethodNameForPath()),
new RouteAction(TimeUnit.SECONDS.toNanos(15L), cluster2, null))));
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -445,7 +482,7 @@ public class XdsNameResolverTest {
when(mockRandom.nextInt(anyInt())).thenReturn(90, 10);
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -501,7 +538,7 @@ public class XdsNameResolverTest {
private InternalConfigSelector resolveToClusters() {
resolver.start(mockListener);
FakeXdsClient xdsClient = (FakeXdsClient) resolver.getXdsClient();
xdsClient.deliverRoutesViaLds(
xdsClient.deliverLdsUpdate(
AUTHORITY,
Arrays.asList(
new Route(
@ -761,7 +798,7 @@ public class XdsNameResolverTest {
// no-op
}
void deliverVirtualHostsViaLds(final String resourceName,
void deliverLdsUpdate(final String resourceName, final long httpMaxStreamDurationNano,
final List<VirtualHost> virtualHosts) {
syncContext.execute(new Runnable() {
@Override
@ -770,6 +807,7 @@ public class XdsNameResolverTest {
return;
}
LdsUpdate.Builder updateBuilder = LdsUpdate.newBuilder();
updateBuilder.setHttpMaxStreamDurationNano(httpMaxStreamDurationNano);
for (VirtualHost virtualHost : virtualHosts) {
updateBuilder.addVirtualHost(virtualHost);
}
@ -778,7 +816,7 @@ public class XdsNameResolverTest {
});
}
void deliverRoutesViaLds(final String resourceName, final List<Route> routes) {
void deliverLdsUpdate(final String resourceName, final List<Route> routes) {
syncContext.execute(new Runnable() {
@Override
public void run() {
@ -816,8 +854,7 @@ public class XdsNameResolverTest {
});
}
void deliverVirtualHostsViaRds(final String resourceName,
final List<VirtualHost> virtualHosts) {
void deliverRdsUpdate(final String resourceName, final List<VirtualHost> virtualHosts) {
syncContext.execute(new Runnable() {
@Override
public void run() {
@ -829,20 +866,6 @@ public class XdsNameResolverTest {
});
}
void deliverRoutesViaRds(final String resourceName, final List<Route> routes) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!resourceName.equals(rdsResource)) {
return;
}
VirtualHost virtualHost =
new VirtualHost("virtual-host", Collections.singletonList(AUTHORITY), routes);
rdsWatcher.onChanged(RdsUpdate.fromVirtualHosts(Collections.singletonList(virtualHost)));
}
});
}
void deliverRdsResourceNotFound(final String resourceName) {
syncContext.execute(new Runnable() {
@Override