diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index d5126c6891..c9cff2db6c 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -24,6 +24,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.grpc.InternalLogId; import io.grpc.Status; import io.grpc.SynchronizationContext; @@ -158,9 +162,11 @@ final class LoadReportClient { private void startLrsRpc() { checkState(lrsStream == null, "previous lbStream has not been cleared yet"); - // TODO(zdapeng): implement LrsStreamV3 and instantiate lrsStream based on value of - // xdsChannel.useProtocolV3 - lrsStream = new LrsStreamV2(); + if (xdsChannel.isUseProtocolV3()) { + lrsStream = new LrsStreamV3(); + } else { + lrsStream = new LrsStreamV2(); + } retryStopwatch.reset().start(); lrsStream.start(); } @@ -355,6 +361,49 @@ final class LoadReportClient { } } + private final class LrsStreamV3 extends LrsStream { + StreamObserver lrsRequestWriterV3; + + @Override + void start() { + StreamObserver lrsResponseReaderV3 = + new StreamObserver() { + @Override + public void onNext(LoadStatsResponse response) { + logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); + handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response)); + } + + @Override + public void onError(Throwable t) { + handleRpcError(t); + } + + @Override + public void onCompleted() { + handleRpcComplete(); + } + }; + LoadReportingServiceStub stubV3 = + LoadReportingServiceGrpc.newStub(xdsChannel.getManagedChannel()); + lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3); + logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); + sendLoadStatsRequest(new LoadStatsRequestData(node, null)); + } + + @Override + void sendLoadStatsRequest(LoadStatsRequestData request) { + LoadStatsRequest requestProto = request.toEnvoyProtoV3(); + lrsRequestWriterV3.onNext(requestProto); + logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", requestProto); + } + + @Override + void sendError(Exception error) { + lrsRequestWriterV3.onError(error); + } + } + private static final class LoadStatsRequestData { final Node node; @Nullable @@ -376,6 +425,17 @@ final class LoadReportClient { } return builder.build(); } + + LoadStatsRequest toEnvoyProtoV3() { + LoadStatsRequest.Builder builder = LoadStatsRequest.newBuilder(); + builder.setNode(node.toEnvoyProtoNode()); + if (clusterStatsList != null) { + for (ClusterStats stats : clusterStatsList) { + builder.addClusterStats(stats.toEnvoyProtoClusterStats()); + } + } + return builder.build(); + } } private static final class LoadStatsResponseData { @@ -409,5 +469,12 @@ final class LoadReportClient { loadStatsResponse.getClustersList(), Durations.toNanos(loadStatsResponse.getLoadReportingInterval())); } + + static LoadStatsResponseData fromEnvoyProtoV3(LoadStatsResponse loadStatsResponse) { + return new LoadStatsResponseData( + loadStatsResponse.getSendAllClusters(), + loadStatsResponse.getClustersList(), + Durations.toNanos(loadStatsResponse.getLoadReportingInterval())); + } } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index d2cf73ab71..149dc19cdd 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -55,9 +55,9 @@ import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource; import io.envoyproxy.envoy.api.v2.core.ConfigSource; import io.envoyproxy.envoy.api.v2.core.HealthStatus; -import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats; import io.envoyproxy.envoy.api.v2.route.RedirectAction; import io.envoyproxy.envoy.api.v2.route.WeightedCluster; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats; import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager; import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; import io.envoyproxy.envoy.config.route.v3.QueryParameterMatcher; @@ -70,9 +70,9 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest; -import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse; +import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; +import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.grpc.Context; import io.grpc.Context.CancellationListener; import io.grpc.ManagedChannel;