xds: change controlPlaneClient and loadReportClient to use xdsTransportFactory (#10829)

This commit is contained in:
yifeizhuang 2024-01-25 17:05:12 -08:00 committed by GitHub
parent 6d96e6588e
commit 8e1cc943b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 172 additions and 248 deletions

View File

@ -28,23 +28,22 @@ import com.google.rpc.Code;
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.Channel;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy; import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ProcessingTracker; import io.grpc.xds.XdsClient.ProcessingTracker;
import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler; import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsTransportFactory.EventHandler;
import io.grpc.xds.XdsTransportFactory.StreamingCall;
import io.grpc.xds.XdsTransportFactory.XdsTransport;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -67,7 +66,7 @@ final class ControlPlaneClient {
private final InternalLogId logId; private final InternalLogId logId;
private final XdsLogger logger; private final XdsLogger logger;
private final ServerInfo serverInfo; private final ServerInfo serverInfo;
private final ManagedChannel channel; private final XdsTransport xdsTransport;
private final XdsResponseHandler xdsResponseHandler; private final XdsResponseHandler xdsResponseHandler;
private final ResourceStore resourceStore; private final ResourceStore resourceStore;
private final Context context; private final Context context;
@ -84,7 +83,7 @@ final class ControlPlaneClient {
private boolean shutdown; private boolean shutdown;
@Nullable @Nullable
private AbstractAdsStream adsStream; private AdsStream adsStream;
@Nullable @Nullable
private BackoffPolicy retryBackoffPolicy; private BackoffPolicy retryBackoffPolicy;
@Nullable @Nullable
@ -93,7 +92,7 @@ final class ControlPlaneClient {
/** An entity that manages ADS RPCs over a single channel. */ /** An entity that manages ADS RPCs over a single channel. */
// TODO: rename to XdsChannel // TODO: rename to XdsChannel
ControlPlaneClient( ControlPlaneClient(
XdsChannelFactory xdsChannelFactory, XdsTransport xdsTransport,
ServerInfo serverInfo, ServerInfo serverInfo,
Node bootstrapNode, Node bootstrapNode,
XdsResponseHandler xdsResponseHandler, XdsResponseHandler xdsResponseHandler,
@ -106,7 +105,7 @@ final class ControlPlaneClient {
Supplier<Stopwatch> stopwatchSupplier, Supplier<Stopwatch> stopwatchSupplier,
XdsClient.TimerLaunch timerLaunch) { XdsClient.TimerLaunch timerLaunch) {
this.serverInfo = checkNotNull(serverInfo, "serverInfo"); this.serverInfo = checkNotNull(serverInfo, "serverInfo");
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
@ -121,12 +120,6 @@ final class ControlPlaneClient {
logger.log(XdsLogLevel.INFO, "Created"); logger.log(XdsLogLevel.INFO, "Created");
} }
/** The underlying channel. */
// Currently, only externally used for LrsClient.
Channel channel() {
return channel;
}
void shutdown() { void shutdown() {
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {
@Override @Override
@ -139,7 +132,7 @@ final class ControlPlaneClient {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
rpcRetryTimer.cancel(); rpcRetryTimer.cancel();
} }
channel.shutdown(); xdsTransport.shutdown();
} }
}); });
} }
@ -207,7 +200,7 @@ final class ControlPlaneClient {
} }
boolean isReady() { boolean isReady() {
return adsStream != null && adsStream.isReady(); return adsStream != null && adsStream.call != null && adsStream.call.isReady();
} }
/** /**
@ -234,10 +227,9 @@ final class ControlPlaneClient {
// Must be synchronized. // Must be synchronized.
private void startRpcStream() { private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet"); checkState(adsStream == null, "Previous adsStream has not been cleared yet");
adsStream = new AdsStreamV3();
Context prevContext = context.attach(); Context prevContext = context.attach();
try { try {
adsStream.start(); adsStream = new AdsStream();
} finally { } finally {
context.detach(prevContext); context.detach(prevContext);
} }
@ -271,7 +263,7 @@ final class ControlPlaneClient {
return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
} }
private abstract class AbstractAdsStream { private class AdsStream implements EventHandler<DiscoveryResponse> {
private boolean responseReceived; private boolean responseReceived;
private boolean closed; private boolean closed;
// Response nonce for the most recently received discovery responses of each resource type. // Response nonce for the most recently received discovery responses of each resource type.
@ -281,14 +273,15 @@ final class ControlPlaneClient {
// To avoid confusion, client-initiated requests will always use the nonce in // To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type. // most recently received responses of each resource type.
private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>(); private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
abstract void start(); private AdsStream() {
this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
abstract void sendError(Exception error); methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
call.start(this);
abstract boolean isReady(); }
abstract void request(int count);
/** /**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
@ -296,8 +289,30 @@ final class ControlPlaneClient {
* client-initiated discovery requests, use {@link * client-initiated discovery requests, use {@link
* #sendDiscoveryRequest(XdsResourceType, Collection)}. * #sendDiscoveryRequest(XdsResourceType, Collection)}.
*/ */
abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version, void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
Collection<String> resources, String nonce, @Nullable String errorDetail); Collection<String> resources, String nonce,
@Nullable String errorDetail) {
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapNode.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
DiscoveryRequest request = builder.build();
call.sendMessage(request);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
}
}
/** /**
* Sends a client-initiated discovery request. * Sends a client-initiated discovery request.
@ -308,6 +323,48 @@ final class ControlPlaneClient {
respNonces.getOrDefault(type, ""), null); respNonces.getOrDefault(type, ""), null);
} }
@Override
public void onReady() {
readyHandler();
}
@Override
public void onRecvMessage(DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
MessagePrinter.print(response));
}
if (type == null) {
logger.log(
XdsLogLevel.WARNING,
"Ignore an unknown type of DiscoveryResponse: {0}",
response.getTypeUrl());
call.startRecvMessage();
return;
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
});
}
final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources, final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
String nonce) { String nonce) {
checkNotNull(type, "type"); checkNotNull(type, "type");
@ -316,20 +373,13 @@ final class ControlPlaneClient {
} }
responseReceived = true; responseReceived = true;
respNonces.put(type, nonce); respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext); ProcessingTracker processingTracker = new ProcessingTracker(
() -> call.startRecvMessage(), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
processingTracker); processingTracker);
processingTracker.onComplete(); processingTracker.onComplete();
} }
final void handleRpcError(Throwable t) {
handleRpcStreamClosed(Status.fromThrowable(t));
}
final void handleRpcCompleted() {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
}
private void handleRpcStreamClosed(Status error) { private void handleRpcStreamClosed(Status error) {
if (closed) { if (closed) {
return; return;
@ -366,7 +416,7 @@ final class ControlPlaneClient {
} }
closed = true; closed = true;
cleanUp(); cleanUp();
sendError(error); call.sendError(error);
} }
private void cleanUp() { private void cleanUp() {
@ -375,115 +425,4 @@ final class ControlPlaneClient {
} }
} }
} }
private final class AdsStreamV3 extends AbstractAdsStream {
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;
@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}
@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);
final class AdsClientResponseObserver
implements ClientResponseObserver<DiscoveryRequest, DiscoveryResponse> {
@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}
@Override
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
MessagePrinter.print(response));
}
if (type == null) {
logger.log(
XdsLogLevel.WARNING,
"Ignore an unknown type of DiscoveryResponse: {0}",
response.getTypeUrl());
request(1);
return;
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}
@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
}
requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}
@Override
void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
Collection<String> resources, String nonce,
@Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started");
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapNode.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
DiscoveryRequest request = builder.build();
requestWriter.onNext(request);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
}
}
@Override
void request(int count) {
requestWriter.request(count);
}
@Override
void sendError(Exception error) {
requestWriter.onError(error);
}
}
} }

View File

@ -18,6 +18,7 @@ package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.ChannelCredentials; import io.grpc.ChannelCredentials;
import io.grpc.ClientCall; import io.grpc.ClientCall;
@ -30,14 +31,21 @@ import java.util.concurrent.TimeUnit;
final class GrpcXdsTransportFactory implements XdsTransportFactory { final class GrpcXdsTransportFactory implements XdsTransportFactory {
static final XdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = new GrpcXdsTransportFactory(); static final GrpcXdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY =
new GrpcXdsTransportFactory();
@Override @Override
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
return new GrpcXdsTransport(serverInfo); return new GrpcXdsTransport(serverInfo);
} }
private class GrpcXdsTransport implements XdsTransport { @VisibleForTesting
public XdsTransport createForTest(ManagedChannel channel) {
return new GrpcXdsTransport(channel);
}
@VisibleForTesting
static class GrpcXdsTransport implements XdsTransport {
private final ManagedChannel channel; private final ManagedChannel channel;
@ -49,6 +57,11 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory {
.build(); .build();
} }
@VisibleForTesting
public GrpcXdsTransport(ManagedChannel channel) {
this.channel = checkNotNull(channel, "channel");
}
@Override @Override
public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall( public <ReqT, RespT> StreamingCall<ReqT, RespT> createStreamingCall(
String fullMethodName, String fullMethodName,

View File

@ -27,19 +27,21 @@ import com.google.protobuf.util.Durations;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
import io.grpc.Channel;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy; import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.Stats.ClusterStats; import io.grpc.xds.Stats.ClusterStats;
import io.grpc.xds.Stats.DroppedRequests; import io.grpc.xds.Stats.DroppedRequests;
import io.grpc.xds.Stats.UpstreamLocalityStats; import io.grpc.xds.Stats.UpstreamLocalityStats;
import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsTransportFactory.EventHandler;
import io.grpc.xds.XdsTransportFactory.StreamingCall;
import io.grpc.xds.XdsTransportFactory.XdsTransport;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -55,7 +57,7 @@ import javax.annotation.Nullable;
final class LoadReportClient { final class LoadReportClient {
private final InternalLogId logId; private final InternalLogId logId;
private final XdsLogger logger; private final XdsLogger logger;
private final Channel channel; private final XdsTransport xdsTransport;
private final Context context; private final Context context;
private final Node node; private final Node node;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
@ -64,7 +66,6 @@ final class LoadReportClient {
private final BackoffPolicy.Provider backoffPolicyProvider; private final BackoffPolicy.Provider backoffPolicyProvider;
@VisibleForTesting @VisibleForTesting
final LoadStatsManager2 loadStatsManager; final LoadStatsManager2 loadStatsManager;
private boolean started; private boolean started;
@Nullable @Nullable
private BackoffPolicy lrsRpcRetryPolicy; private BackoffPolicy lrsRpcRetryPolicy;
@ -73,10 +74,12 @@ final class LoadReportClient {
@Nullable @Nullable
@VisibleForTesting @VisibleForTesting
LrsStream lrsStream; LrsStream lrsStream;
private static final MethodDescriptor<LoadStatsRequest, LoadStatsResponse> method =
LoadReportingServiceGrpc.getStreamLoadStatsMethod();
LoadReportClient( LoadReportClient(
LoadStatsManager2 loadStatsManager, LoadStatsManager2 loadStatsManager,
Channel channel, XdsTransport xdsTransport,
Context context, Context context,
Node node, Node node,
SynchronizationContext syncContext, SynchronizationContext syncContext,
@ -84,7 +87,7 @@ final class LoadReportClient {
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) { Supplier<Stopwatch> stopwatchSupplier) {
this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager"); this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
this.channel = checkNotNull(channel, "xdsChannel"); this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
this.context = checkNotNull(context, "context"); this.context = checkNotNull(context, "context");
this.syncContext = checkNotNull(syncContext, "syncContext"); this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(scheduledExecutorService, "timeService"); this.timerService = checkNotNull(scheduledExecutorService, "timeService");
@ -160,66 +163,62 @@ final class LoadReportClient {
return; return;
} }
checkState(lrsStream == null, "previous lbStream has not been cleared yet"); checkState(lrsStream == null, "previous lbStream has not been cleared yet");
lrsStream = new LrsStream();
retryStopwatch.reset().start(); retryStopwatch.reset().start();
Context prevContext = context.attach(); Context prevContext = context.attach();
try { try {
lrsStream.start(); lrsStream = new LrsStream();
} finally { } finally {
context.detach(prevContext); context.detach(prevContext);
} }
} }
private final class LrsStream { private final class LrsStream implements EventHandler<LoadStatsResponse> {
boolean initialResponseReceived; boolean initialResponseReceived;
boolean closed; boolean closed;
long intervalNano = -1; long intervalNano = -1;
boolean reportAllClusters; boolean reportAllClusters;
List<String> clusterNames; // clusters to report loads for, if not report all. List<String> clusterNames; // clusters to report loads for, if not report all.
ScheduledHandle loadReportTimer; ScheduledHandle loadReportTimer;
StreamObserver<LoadStatsRequest> lrsRequestWriterV3; private final StreamingCall<LoadStatsRequest, LoadStatsResponse> call;
void start() { LrsStream() {
StreamObserver<LoadStatsResponse> lrsResponseReaderV3 = this.call = xdsTransport.createStreamingCall(method.getFullMethodName(),
new StreamObserver<LoadStatsResponse>() { method.getRequestMarshaller(), method.getResponseMarshaller());
@Override call.start(this);
public void onNext(final LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
Durations.toNanos(response.getLoadReportingInterval()));
}
});
}
@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
.streamLoadStats(lrsResponseReaderV3);
logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
sendLoadStatsRequest(Collections.<ClusterStats>emptyList()); sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
} }
@Override
public void onReady() {}
@Override
public void onRecvMessage(LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
Durations.toNanos(response.getLoadReportingInterval()));
call.startRecvMessage();
}
});
}
@Override
public void onStatusReceived(final Status status) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (status.isOk()) {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
} else {
handleStreamClosed(status);
}
}
});
}
void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) { void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.Builder requestBuilder =
LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode()); LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
@ -227,14 +226,10 @@ final class LoadReportClient {
requestBuilder.addClusterStats(buildClusterStats(stats)); requestBuilder.addClusterStats(buildClusterStats(stats));
} }
LoadStatsRequest request = requestBuilder.build(); LoadStatsRequest request = requestBuilder.build();
lrsRequestWriterV3.onNext(request); call.sendMessage(request);
logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request); logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
} }
void sendError(Exception error) {
lrsRequestWriterV3.onError(error);
}
void handleRpcResponse(List<String> clusters, boolean sendAllClusters, void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
long loadReportIntervalNano) { long loadReportIntervalNano) {
if (closed) { if (closed) {
@ -256,14 +251,6 @@ final class LoadReportClient {
scheduleNextLoadReport(); scheduleNextLoadReport();
} }
void handleRpcError(Throwable t) {
handleStreamClosed(Status.fromThrowable(t));
}
void handleRpcCompleted() {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}
private void sendLoadReport() { private void sendLoadReport() {
if (closed) { if (closed) {
return; return;
@ -330,7 +317,7 @@ final class LoadReportClient {
} }
closed = true; closed = true;
cleanUp(); cleanUp();
sendError(error); call.sendError(error);
} }
private void cleanUp() { private void cleanUp() {

View File

@ -17,6 +17,7 @@
package io.grpc.xds; package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import io.grpc.Context; import io.grpc.Context;
@ -26,7 +27,6 @@ import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TimeProvider; import io.grpc.internal.TimeProvider;
import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import io.grpc.xds.internal.security.TlsContextManagerImpl; import io.grpc.xds.internal.security.TlsContextManagerImpl;
import java.util.Map; import java.util.Map;
@ -124,7 +124,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
if (refCount == 0) { if (refCount == 0) {
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = new XdsClientImpl( xdsClient = new XdsClientImpl(
XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo, bootstrapInfo,
context, context,
scheduler, scheduler,

View File

@ -31,12 +31,9 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import io.grpc.ChannelCredentials;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.InternalLogId; import io.grpc.InternalLogId;
import io.grpc.LoadBalancerRegistry; import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
@ -100,7 +97,7 @@ final class XdsClientImpl extends XdsClient
private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>(); private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>(); private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>(); private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
private final XdsChannelFactory xdsChannelFactory; private final XdsTransportFactory xdsTransportFactory;
private final Bootstrapper.BootstrapInfo bootstrapInfo; private final Bootstrapper.BootstrapInfo bootstrapInfo;
private final Context context; private final Context context;
private final ScheduledExecutorService timeService; private final ScheduledExecutorService timeService;
@ -113,7 +110,7 @@ final class XdsClientImpl extends XdsClient
private volatile boolean isShutdown; private volatile boolean isShutdown;
XdsClientImpl( XdsClientImpl(
XdsChannelFactory xdsChannelFactory, XdsTransportFactory xdsTransportFactory,
Bootstrapper.BootstrapInfo bootstrapInfo, Bootstrapper.BootstrapInfo bootstrapInfo,
Context context, Context context,
ScheduledExecutorService timeService, ScheduledExecutorService timeService,
@ -121,7 +118,7 @@ final class XdsClientImpl extends XdsClient
Supplier<Stopwatch> stopwatchSupplier, Supplier<Stopwatch> stopwatchSupplier,
TimeProvider timeProvider, TimeProvider timeProvider,
TlsContextManager tlsContextManager) { TlsContextManager tlsContextManager) {
this.xdsChannelFactory = xdsChannelFactory; this.xdsTransportFactory = xdsTransportFactory;
this.bootstrapInfo = bootstrapInfo; this.bootstrapInfo = bootstrapInfo;
this.context = context; this.context = context;
this.timeService = timeService; this.timeService = timeService;
@ -142,8 +139,9 @@ final class XdsClientImpl extends XdsClient
if (serverChannelMap.containsKey(serverInfo)) { if (serverChannelMap.containsKey(serverInfo)) {
return; return;
} }
XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo);
ControlPlaneClient xdsChannel = new ControlPlaneClient( ControlPlaneClient xdsChannel = new ControlPlaneClient(
xdsChannelFactory, xdsTransport,
serverInfo, serverInfo,
bootstrapInfo.node(), bootstrapInfo.node(),
this, this,
@ -157,7 +155,7 @@ final class XdsClientImpl extends XdsClient
LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
loadStatsManagerMap.put(serverInfo, loadStatsManager); loadStatsManagerMap.put(serverInfo, loadStatsManager);
LoadReportClient lrsClient = new LoadReportClient( LoadReportClient lrsClient = new LoadReportClient(
loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext, loadStatsManager, xdsTransport, context, bootstrapInfo.node(), syncContext,
timeService, backoffPolicyProvider, stopwatchSupplier); timeService, backoffPolicyProvider, stopwatchSupplier);
serverChannelMap.put(serverInfo, xdsChannel); serverChannelMap.put(serverInfo, xdsChannel);
serverLrsClientMap.put(serverInfo, lrsClient); serverLrsClientMap.put(serverInfo, lrsClient);
@ -747,19 +745,4 @@ final class XdsClientImpl extends XdsClient
watcher.onChanged(update); watcher.onChanged(update);
} }
} }
abstract static class XdsChannelFactory {
static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
String target = serverInfo.target();
ChannelCredentials channelCredentials = serverInfo.channelCredentials();
return Grpc.newChannelBuilder(target, channelCredentials)
.keepAliveTime(5, TimeUnit.MINUTES)
.build();
}
};
abstract ManagedChannel create(ServerInfo serverInfo);
}
} }

View File

@ -174,7 +174,9 @@ public class LoadReportClientTest {
when(backoffPolicy2.nextBackoffNanos()) when(backoffPolicy2.nextBackoffNanos())
.thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L)); .thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L));
addFakeStatsData(); addFakeStatsData();
lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, NODE, lrsClient = new LoadReportClient(loadStatsManager,
GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY.createForTest(channel),
Context.ROOT, NODE,
syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier()); fakeClock.getStopwatchSupplier());
syncContext.execute(new Runnable() { syncContext.execute(new Runnable() {

View File

@ -18,7 +18,7 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage; import static com.google.common.truth.Truth.assertWithMessage;
import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
@ -85,13 +85,13 @@ import io.grpc.xds.EnvoyServerProtoData.FailurePercentageEjection;
import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.FilterChain;
import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection;
import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType; import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType;
import io.grpc.xds.GrpcXdsTransportFactory.GrpcXdsTransport;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
import io.grpc.xds.XdsClient.ResourceMetadata; import io.grpc.xds.XdsClient.ResourceMetadata;
import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState; import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState;
import io.grpc.xds.XdsClient.ResourceUpdate; import io.grpc.xds.XdsClient.ResourceUpdate;
import io.grpc.xds.XdsClient.ResourceWatcher; import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.XdsEndpointResource.EdsUpdate;
@ -322,25 +322,25 @@ public abstract class XdsClientImplTestBase {
.start()); .start());
channel = channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
@Override @Override
ManagedChannel create(ServerInfo serverInfo) { public XdsTransport create(ServerInfo serverInfo) {
if (serverInfo.target().equals(SERVER_URI)) { if (serverInfo.target().equals(SERVER_URI)) {
return channel; return new GrpcXdsTransport(channel);
} }
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
if (channelForCustomAuthority == null) { if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register( channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build()); InProcessChannelBuilder.forName(serverName).directExecutor().build());
} }
return channelForCustomAuthority; return new GrpcXdsTransport(channelForCustomAuthority);
} }
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) { if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register( channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build()); InProcessChannelBuilder.forName(serverName).directExecutor().build());
} }
return channelForEmptyAuthority; return new GrpcXdsTransport(channelForEmptyAuthority);
} }
throw new IllegalArgumentException("Can not create channel for " + serverInfo); throw new IllegalArgumentException("Can not create channel for " + serverInfo);
} }
@ -368,7 +368,7 @@ public abstract class XdsClientImplTestBase {
.build(); .build();
xdsClient = xdsClient =
new XdsClientImpl( new XdsClientImpl(
xdsChannelFactory, xdsTransportFactory,
bootstrapInfo, bootstrapInfo,
Context.ROOT, Context.ROOT,
fakeClock.getScheduledExecutorService(), fakeClock.getScheduledExecutorService(),
@ -3743,7 +3743,7 @@ public abstract class XdsClientImplTestBase {
private XdsClientImpl createXdsClient(String serverUri) { private XdsClientImpl createXdsClient(String serverUri) {
BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); BootstrapInfo bootstrapInfo = buildBootStrap(serverUri);
return new XdsClientImpl( return new XdsClientImpl(
DEFAULT_XDS_CHANNEL_FACTORY, DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo, bootstrapInfo,
Context.ROOT, Context.ROOT,
fakeClock.getScheduledExecutorService(), fakeClock.getScheduledExecutorService(),