xds: log raw response messages in sync context (#7441)

This commit is contained in:
Chengyuan Zhang 2020-09-22 17:28:45 -07:00 committed by GitHub
parent b434df25cd
commit bc8c758a3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 164 additions and 137 deletions

View File

@ -185,49 +185,36 @@ final class LoadReportClient {
abstract void sendError(Exception error);
final void handleResponse(final LoadStatsResponseData response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
if (!initialResponseReceived) {
logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
initialResponseReceived = true;
}
reportAllClusters = response.getSendAllClusters();
if (reportAllClusters) {
logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
} else {
logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", response.getClustersList());
clusterNames = response.getClustersList();
}
long interval = response.getLoadReportingIntervalNanos();
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
scheduleNextLoadReport();
}
});
// Must run in syncContext.
final void handleResponse(LoadStatsResponseData response) {
if (closed) {
return;
}
if (!initialResponseReceived) {
logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
initialResponseReceived = true;
}
reportAllClusters = response.getSendAllClusters();
if (reportAllClusters) {
logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
} else {
logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", response.getClustersList());
clusterNames = response.getClustersList();
}
long interval = response.getLoadReportingIntervalNanos();
logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval);
loadReportIntervalNano = interval;
scheduleNextLoadReport();
}
final void handleRpcError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(Status.fromThrowable(t));
}
});
// Must run in syncContext.
final void handleRpcError(Throwable t) {
handleStreamClosed(Status.fromThrowable(t));
}
final void handleRpcComplete() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(
Status.UNAVAILABLE.withDescription("Closed by server"));
}
});
// Must run in syncContext.
final void handleRpcCompleted() {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}
private void sendLoadReport() {
@ -324,19 +311,34 @@ final class LoadReportClient {
new StreamObserver<io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse>() {
@Override
public void onNext(
io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse response) {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleResponse(LoadStatsResponseData.fromEnvoyProtoV2(response));
final io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LoadStatsResponse:\n{0}", response);
handleResponse(LoadStatsResponseData.fromEnvoyProtoV2(response));
}
});
}
@Override
public void onError(Throwable t) {
handleRpcError(t);
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
handleRpcComplete();
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceStub
@ -369,19 +371,34 @@ final class LoadReportClient {
StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
new StreamObserver<LoadStatsResponse>() {
@Override
public void onNext(LoadStatsResponse response) {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response));
public void onNext(final LoadStatsResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response));
}
});
}
@Override
public void onError(Throwable t) {
handleRpcError(t);
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
handleRpcComplete();
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
LoadReportingServiceStub stubV3 =

View File

@ -1455,68 +1455,54 @@ final class XdsClientImpl extends XdsClient {
abstract void sendError(Exception error);
final void onDiscoveryResponse(final DiscoveryResponseData response) {
syncContext.execute(
new Runnable() {
@Override
public void run() {
if (closed) {
return;
}
responseReceived = true;
String respNonce = response.getNonce();
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
ResourceType resourceType = response.getResourceType();
switch (resourceType) {
case LDS:
ldsRespNonce = respNonce;
handleLdsResponse(response);
break;
case RDS:
rdsRespNonce = respNonce;
handleRdsResponse(response);
break;
case CDS:
cdsRespNonce = respNonce;
handleCdsResponse(response);
break;
case EDS:
edsRespNonce = respNonce;
handleEdsResponse(response);
break;
case UNKNOWN:
logger.log(
XdsLogLevel.WARNING,
"Received an unknown type of DiscoveryResponse\n{0}",
respNonce);
break;
default:
throw new AssertionError("Missing case in enum switch: " + resourceType);
}
}
});
// Must run in syncContext.
final void handleResponse(DiscoveryResponseData response) {
if (closed) {
return;
}
responseReceived = true;
String respNonce = response.getNonce();
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
ResourceType resourceType = response.getResourceType();
switch (resourceType) {
case LDS:
ldsRespNonce = respNonce;
handleLdsResponse(response);
break;
case RDS:
rdsRespNonce = respNonce;
handleRdsResponse(response);
break;
case CDS:
cdsRespNonce = respNonce;
handleCdsResponse(response);
break;
case EDS:
edsRespNonce = respNonce;
handleEdsResponse(response);
break;
case UNKNOWN:
logger.log(
XdsLogLevel.WARNING,
"Received an unknown type of DiscoveryResponse\n{0}",
respNonce);
break;
default:
throw new AssertionError("Missing case in enum switch: " + resourceType);
}
}
final void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(Status.fromThrowable(t));
}
});
// Must run in syncContext.
final void handleRpcError(Throwable t) {
handleStreamClosed(Status.fromThrowable(t));
}
final void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleStreamClosed(
Status.UNAVAILABLE.withDescription("Closed by server"));
}
});
// Must run in syncContext.
final void handleRpcCompleted() {
handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
}
private void handleStreamClosed(Status error) {
@ -1733,27 +1719,40 @@ final class XdsClientImpl extends XdsClient {
StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse> responseReaderV2 =
new StreamObserver<io.envoyproxy.envoy.api.v2.DiscoveryResponse>() {
@Override
public void onNext(io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
DiscoveryResponseData responseData =
DiscoveryResponseData.fromEnvoyProtoV2(response);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG,
"Received {0} response:\n{1}",
responseData.getResourceType(),
respPrinter.print(response));
}
onDiscoveryResponse(responseData);
public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}",
ResourceType.fromTypeUrl(response.getTypeUrl()),
respPrinter.print(response));
}
DiscoveryResponseData responseData =
DiscoveryResponseData.fromEnvoyProtoV2(response);
handleResponse(responseData);
}
});
}
@Override
public void onError(Throwable t) {
AdsStreamV2.this.onError(t);
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
AdsStreamV2.this.onCompleted();
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
requestWriterV2 = stubV2.withWaitForReady().streamAggregatedResources(responseReaderV2);
@ -1787,27 +1786,38 @@ final class XdsClientImpl extends XdsClient {
void start() {
StreamObserver<DiscoveryResponse> responseReader = new StreamObserver<DiscoveryResponse>() {
@Override
public void onNext(DiscoveryResponse response) {
DiscoveryResponseData responseData =
DiscoveryResponseData.fromEnvoyProto(response);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG,
"Received {0} response:\n{1}",
responseData.getResourceType(),
respPrinter.print(response));
}
onDiscoveryResponse(responseData);
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}",
ResourceType.fromTypeUrl(response.getTypeUrl()), respPrinter.print(response));
}
DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response);
handleResponse(responseData);
}
});
}
@Override
public void onError(Throwable t) {
AdsStream.this.onError(t);
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}
@Override
public void onCompleted() {
AdsStream.this.onCompleted();
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
};
requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);