From 00fee4d141163edb358cdd83315c6e91027a6918 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Thu, 20 Aug 2020 10:45:41 -0700 Subject: [PATCH] xds: have LoadReportClient support LRS v3 As noted in the design doc "The LRS protocol has a transport version, just like the xDS protocol itself does. Initially, we will use the server feature in the bootstrap file to determine the version of the LRS transport protocol. This means that there will not be any way to use a different transport protocol for LRS than for xDS." --- .../java/io/grpc/xds/LoadReportClient.java | 73 ++++++++++++++++++- .../java/io/grpc/xds/XdsClientImplTest.java | 8 +- 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index d5126c6891..c9cff2db6c 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -24,6 +24,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.grpc.InternalLogId; import io.grpc.Status; import io.grpc.SynchronizationContext; @@ -158,9 +162,11 @@ final class LoadReportClient { private void startLrsRpc() { checkState(lrsStream == null, "previous lbStream has not been cleared yet"); - // TODO(zdapeng): implement LrsStreamV3 and instantiate lrsStream based on value of - // xdsChannel.useProtocolV3 - lrsStream = new LrsStreamV2(); + if (xdsChannel.isUseProtocolV3()) { + lrsStream = new LrsStreamV3(); + } else { + lrsStream = new LrsStreamV2(); + } retryStopwatch.reset().start(); lrsStream.start(); } @@ -355,6 +361,49 @@ final class LoadReportClient { } } + private final class LrsStreamV3 extends LrsStream { + StreamObserver lrsRequestWriterV3; + + @Override + void start() { + StreamObserver lrsResponseReaderV3 = + new StreamObserver() { + @Override + public void onNext(LoadStatsResponse response) { + logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); + handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response)); + } + + @Override + public void onError(Throwable t) { + handleRpcError(t); + } + + @Override + public void onCompleted() { + handleRpcComplete(); + } + }; + LoadReportingServiceStub stubV3 = + LoadReportingServiceGrpc.newStub(xdsChannel.getManagedChannel()); + lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3); + logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); + sendLoadStatsRequest(new LoadStatsRequestData(node, null)); + } + + @Override + void sendLoadStatsRequest(LoadStatsRequestData request) { + LoadStatsRequest requestProto = request.toEnvoyProtoV3(); + lrsRequestWriterV3.onNext(requestProto); + logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", requestProto); + } + + @Override + void sendError(Exception error) { + lrsRequestWriterV3.onError(error); + } + } + private static final class LoadStatsRequestData { final Node node; @Nullable @@ -376,6 +425,17 @@ final class LoadReportClient { } return builder.build(); } + + LoadStatsRequest toEnvoyProtoV3() { + LoadStatsRequest.Builder builder = LoadStatsRequest.newBuilder(); + builder.setNode(node.toEnvoyProtoNode()); + if (clusterStatsList != null) { + for (ClusterStats stats : clusterStatsList) { + builder.addClusterStats(stats.toEnvoyProtoClusterStats()); + } + } + return builder.build(); + } } private static final class LoadStatsResponseData { @@ -409,5 +469,12 @@ final class LoadReportClient { loadStatsResponse.getClustersList(), Durations.toNanos(loadStatsResponse.getLoadReportingInterval())); } + + static LoadStatsResponseData fromEnvoyProtoV3(LoadStatsResponse loadStatsResponse) { + return new LoadStatsResponseData( + loadStatsResponse.getSendAllClusters(), + loadStatsResponse.getClustersList(), + Durations.toNanos(loadStatsResponse.getLoadReportingInterval())); + } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index d2cf73ab71..149dc19cdd 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -55,9 +55,9 @@ import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; import io.envoyproxy.envoy.api.v2.core.ConfigSource; import io.envoyproxy.envoy.api.v2.core.HealthStatus; -import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; import io.envoyproxy.envoy.api.v2.route.RedirectAction; import io.envoyproxy.envoy.api.v2.route.WeightedCluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; import io.envoyproxy.envoy.config.route.v3.QueryParameterMatcher; @@ -70,9 +70,9 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.ManagedChannel;