xds: have LoadReportClient support LRS v3

As noted in the design doc "The LRS protocol has a transport version, just like the xDS protocol itself does. Initially, we will use the server feature in the bootstrap file to determine the version of the LRS transport protocol. This means that there will not be any way to use a different transport protocol for LRS than for xDS."
This commit is contained in:
ZHANG Dapeng 2020-08-20 10:45:41 -07:00 committed by GitHub
parent c67dcb3b08
commit 00fee4d141
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 7 deletions

View File

@ -24,6 +24,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceStub;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.InternalLogId;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
@ -158,9 +162,11 @@ final class LoadReportClient {
private void startLrsRpc() {
checkState(lrsStream == null, "previous lbStream has not been cleared yet");
// TODO(zdapeng): implement LrsStreamV3 and instantiate lrsStream based on value of
// xdsChannel.useProtocolV3
if (xdsChannel.isUseProtocolV3()) {
lrsStream = new LrsStreamV3();
} else {
lrsStream = new LrsStreamV2();
}
retryStopwatch.reset().start();
lrsStream.start();
}
@ -355,6 +361,49 @@ final class LoadReportClient {
}
}
private final class LrsStreamV3 extends LrsStream {
StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
@Override
void start() {
StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
new StreamObserver<LoadStatsResponse>() {
@Override
public void onNext(LoadStatsResponse response) {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response));
}
@Override
public void onError(Throwable t) {
handleRpcError(t);
}
@Override
public void onCompleted() {
handleRpcComplete();
}
};
LoadReportingServiceStub stubV3 =
LoadReportingServiceGrpc.newStub(xdsChannel.getManagedChannel());
lrsRequestWriterV3 = stubV3.withWaitForReady().streamLoadStats(lrsResponseReaderV3);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(new LoadStatsRequestData(node, null));
}
@Override
void sendLoadStatsRequest(LoadStatsRequestData request) {
LoadStatsRequest requestProto = request.toEnvoyProtoV3();
lrsRequestWriterV3.onNext(requestProto);
logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", requestProto);
}
@Override
void sendError(Exception error) {
lrsRequestWriterV3.onError(error);
}
}
private static final class LoadStatsRequestData {
final Node node;
@Nullable
@ -376,6 +425,17 @@ final class LoadReportClient {
}
return builder.build();
}
LoadStatsRequest toEnvoyProtoV3() {
LoadStatsRequest.Builder builder = LoadStatsRequest.newBuilder();
builder.setNode(node.toEnvoyProtoNode());
if (clusterStatsList != null) {
for (ClusterStats stats : clusterStatsList) {
builder.addClusterStats(stats.toEnvoyProtoClusterStats());
}
}
return builder.build();
}
}
private static final class LoadStatsResponseData {
@ -409,5 +469,12 @@ final class LoadReportClient {
loadStatsResponse.getClustersList(),
Durations.toNanos(loadStatsResponse.getLoadReportingInterval()));
}
static LoadStatsResponseData fromEnvoyProtoV3(LoadStatsResponse loadStatsResponse) {
return new LoadStatsResponseData(
loadStatsResponse.getSendAllClusters(),
loadStatsResponse.getClustersList(),
Durations.toNanos(loadStatsResponse.getLoadReportingInterval()));
}
}
}

View File

@ -55,9 +55,9 @@ import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.envoyproxy.envoy.api.v2.core.AggregatedConfigSource;
import io.envoyproxy.envoy.api.v2.core.ConfigSource;
import io.envoyproxy.envoy.api.v2.core.HealthStatus;
import io.envoyproxy.envoy.api.v2.endpoint.ClusterStats;
import io.envoyproxy.envoy.api.v2.route.RedirectAction;
import io.envoyproxy.envoy.api.v2.route.WeightedCluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterStats;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager;
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.config.route.v3.QueryParameterMatcher;
@ -70,9 +70,9 @@ import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.SdsSecretConfig;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceImplBase;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc.LoadReportingServiceImplBase;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Context;
import io.grpc.Context.CancellationListener;
import io.grpc.ManagedChannel;