From bc8c758a3c2e3f355e81a293f3dadc965c72064a Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 22 Sep 2020 17:28:45 -0700 Subject: [PATCH] xds: log raw response messages in sync context (#7441) --- .../java/io/grpc/xds/LoadReportClient.java | 119 +++++++----- .../main/java/io/grpc/xds/XdsClientImpl.java | 182 +++++++++--------- 2 files changed, 164 insertions(+), 137 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index 119ee4dfdc..48d7b0a5cb 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -185,49 +185,36 @@ final class LoadReportClient { abstract void sendError(Exception error); - final void handleResponse(final LoadStatsResponseData response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - if (closed) { - return; - } - if (!initialResponseReceived) { - logger.log(XdsLogLevel.DEBUG, "Initial LRS response received"); - initialResponseReceived = true; - } - reportAllClusters = response.getSendAllClusters(); - if (reportAllClusters) { - logger.log(XdsLogLevel.INFO, "Report loads for all clusters"); - } else { - logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", response.getClustersList()); - clusterNames = response.getClustersList(); - } - long interval = response.getLoadReportingIntervalNanos(); - logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval); - loadReportIntervalNano = interval; - scheduleNextLoadReport(); - } - }); + // Must run in syncContext. + final void handleResponse(LoadStatsResponseData response) { + if (closed) { + return; + } + if (!initialResponseReceived) { + logger.log(XdsLogLevel.DEBUG, "Initial LRS response received"); + initialResponseReceived = true; + } + reportAllClusters = response.getSendAllClusters(); + if (reportAllClusters) { + logger.log(XdsLogLevel.INFO, "Report loads for all clusters"); + } else { + logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", response.getClustersList()); + clusterNames = response.getClustersList(); + } + long interval = response.getLoadReportingIntervalNanos(); + logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", interval); + loadReportIntervalNano = interval; + scheduleNextLoadReport(); } - final void handleRpcError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed(Status.fromThrowable(t)); - } - }); + // Must run in syncContext. + final void handleRpcError(Throwable t) { + handleStreamClosed(Status.fromThrowable(t)); } - final void handleRpcComplete() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed( - Status.UNAVAILABLE.withDescription("Closed by server")); - } - }); + // Must run in syncContext. + final void handleRpcCompleted() { + handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); } private void sendLoadReport() { @@ -324,19 +311,34 @@ final class LoadReportClient { new StreamObserver() { @Override public void onNext( - io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse response) { - logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); - handleResponse(LoadStatsResponseData.fromEnvoyProtoV2(response)); + final io.envoyproxy.envoy.service.load_stats.v2.LoadStatsResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(XdsLogLevel.DEBUG, "Received LoadStatsResponse:\n{0}", response); + handleResponse(LoadStatsResponseData.fromEnvoyProtoV2(response)); + } + }); } @Override - public void onError(Throwable t) { - handleRpcError(t); + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); } @Override public void onCompleted() { - handleRpcComplete(); + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); } }; io.envoyproxy.envoy.service.load_stats.v2.LoadReportingServiceGrpc.LoadReportingServiceStub @@ -369,19 +371,34 @@ final class LoadReportClient { StreamObserver lrsResponseReaderV3 = new StreamObserver() { @Override - public void onNext(LoadStatsResponse response) { - logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); - handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response)); + public void onNext(final LoadStatsResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); + handleResponse(LoadStatsResponseData.fromEnvoyProtoV3(response)); + } + }); } @Override - public void onError(Throwable t) { - handleRpcError(t); + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); } @Override public void onCompleted() { - handleRpcComplete(); + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); } }; LoadReportingServiceStub stubV3 = diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index b427cdd7c9..ba43da1796 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -1455,68 +1455,54 @@ final class XdsClientImpl extends XdsClient { abstract void sendError(Exception error); - final void onDiscoveryResponse(final DiscoveryResponseData response) { - syncContext.execute( - new Runnable() { - @Override - public void run() { - if (closed) { - return; - } - responseReceived = true; - String respNonce = response.getNonce(); - // Nonce in each response is echoed back in the following ACK/NACK request. It is - // used for management server to identify which response the client is ACKing/NACking. - // To avoid confusion, client-initiated requests will always use the nonce in - // most recently received responses of each resource type. - ResourceType resourceType = response.getResourceType(); - switch (resourceType) { - case LDS: - ldsRespNonce = respNonce; - handleLdsResponse(response); - break; - case RDS: - rdsRespNonce = respNonce; - handleRdsResponse(response); - break; - case CDS: - cdsRespNonce = respNonce; - handleCdsResponse(response); - break; - case EDS: - edsRespNonce = respNonce; - handleEdsResponse(response); - break; - case UNKNOWN: - logger.log( - XdsLogLevel.WARNING, - "Received an unknown type of DiscoveryResponse\n{0}", - respNonce); - break; - default: - throw new AssertionError("Missing case in enum switch: " + resourceType); - } - } - }); + // Must run in syncContext. + final void handleResponse(DiscoveryResponseData response) { + if (closed) { + return; + } + responseReceived = true; + String respNonce = response.getNonce(); + // Nonce in each response is echoed back in the following ACK/NACK request. It is + // used for management server to identify which response the client is ACKing/NACking. + // To avoid confusion, client-initiated requests will always use the nonce in + // most recently received responses of each resource type. + ResourceType resourceType = response.getResourceType(); + switch (resourceType) { + case LDS: + ldsRespNonce = respNonce; + handleLdsResponse(response); + break; + case RDS: + rdsRespNonce = respNonce; + handleRdsResponse(response); + break; + case CDS: + cdsRespNonce = respNonce; + handleCdsResponse(response); + break; + case EDS: + edsRespNonce = respNonce; + handleEdsResponse(response); + break; + case UNKNOWN: + logger.log( + XdsLogLevel.WARNING, + "Received an unknown type of DiscoveryResponse\n{0}", + respNonce); + break; + default: + throw new AssertionError("Missing case in enum switch: " + resourceType); + } } - final void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed(Status.fromThrowable(t)); - } - }); + // Must run in syncContext. + final void handleRpcError(Throwable t) { + handleStreamClosed(Status.fromThrowable(t)); } - final void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleStreamClosed( - Status.UNAVAILABLE.withDescription("Closed by server")); - } - }); + // Must run in syncContext. + final void handleRpcCompleted() { + handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); } private void handleStreamClosed(Status error) { @@ -1733,27 +1719,40 @@ final class XdsClientImpl extends XdsClient { StreamObserver responseReaderV2 = new StreamObserver() { @Override - public void onNext(io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { - DiscoveryResponseData responseData = - DiscoveryResponseData.fromEnvoyProtoV2(response); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log( - XdsLogLevel.DEBUG, - "Received {0} response:\n{1}", - responseData.getResourceType(), - respPrinter.print(response)); - } - onDiscoveryResponse(responseData); + public void onNext(final io.envoyproxy.envoy.api.v2.DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", + ResourceType.fromTypeUrl(response.getTypeUrl()), + respPrinter.print(response)); + } + DiscoveryResponseData responseData = + DiscoveryResponseData.fromEnvoyProtoV2(response); + handleResponse(responseData); + } + }); } @Override - public void onError(Throwable t) { - AdsStreamV2.this.onError(t); + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); } @Override public void onCompleted() { - AdsStreamV2.this.onCompleted(); + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); } }; requestWriterV2 = stubV2.withWaitForReady().streamAggregatedResources(responseReaderV2); @@ -1787,27 +1786,38 @@ final class XdsClientImpl extends XdsClient { void start() { StreamObserver responseReader = new StreamObserver() { @Override - public void onNext(DiscoveryResponse response) { - DiscoveryResponseData responseData = - DiscoveryResponseData.fromEnvoyProto(response); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log( - XdsLogLevel.DEBUG, - "Received {0} response:\n{1}", - responseData.getResourceType(), - respPrinter.print(response)); - } - onDiscoveryResponse(responseData); + public void onNext(final DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Received {0} response:\n{1}", + ResourceType.fromTypeUrl(response.getTypeUrl()), respPrinter.print(response)); + } + DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response); + handleResponse(responseData); + } + }); } @Override - public void onError(Throwable t) { - AdsStream.this.onError(t); + public void onError(final Throwable t) { + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcError(t); + } + }); } @Override public void onCompleted() { - AdsStream.this.onCompleted(); + syncContext.execute(new Runnable() { + @Override + public void run() { + handleRpcCompleted(); + } + }); } }; requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader);