xdsclient/transport: reduce chattiness of logs (#5992)

This commit is contained in:
Easwar Swaminathan 2023-02-24 13:13:13 -08:00 committed by GitHub
parent 6fe609daff
commit 3775f633ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 19 deletions

View File

@ -63,6 +63,9 @@ func (pl *PrefixLogger) Errorf(format string, args ...interface{}) {
// Debugf does info logging at verbose level 2.
func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
if !Logger.V(2) {
return
}
@ -73,6 +76,15 @@ func (pl *PrefixLogger) Debugf(format string, args ...interface{}) {
return
}
InfoDepth(1, fmt.Sprintf(format, args...))
}
// V reports whether verbosity level l is at least the requested verbose level.
func (pl *PrefixLogger) V(l int) bool {
// TODO(6044): Refactor interfaces LoggerV2 and DepthLogger, and maybe
// rewrite PrefixLogger a little to ensure that we don't use the global
// `Logger` here, and instead use the `logger` field.
return Logger.V(l)
}
// NewPrefixLogger creates a prefix logger with the given prefix.

View File

@ -120,19 +120,19 @@ func (t *Transport) lrsRunner(ctx context.Context) {
defer cancel()
stream, err := v3lrsgrpc.NewLoadReportingServiceClient(t.cc).StreamLoadStats(streamCtx)
if err != nil {
t.logger.Warningf("Failed to create LRS stream: %v", err)
t.logger.Warningf("Creating LRS stream to server %q failed: %v", t.serverURI, err)
return false
}
t.logger.Infof("Created LRS stream to server: %s", t.serverURI)
t.logger.Infof("Created LRS stream to server %q", t.serverURI)
if err := t.sendFirstLoadStatsRequest(stream, node); err != nil {
t.logger.Warningf("Failed to send first LRS request: %v", err)
t.logger.Warningf("Sending first LRS request failed: %v", err)
return false
}
clusters, interval, err := t.recvFirstLoadStatsResponse(stream)
if err != nil {
t.logger.Warningf("Failed to read from LRS stream: %v", err)
t.logger.Warningf("Reading from LRS stream failed: %v", err)
return false
}
@ -160,7 +160,7 @@ func (t *Transport) sendLoads(ctx context.Context, stream lrsStream, clusterName
return
}
if err := t.sendLoadStatsRequest(stream, t.lrsStore.Stats(clusterNames)); err != nil {
t.logger.Warningf("Failed to write to LRS stream: %v", err)
t.logger.Warningf("Writing to LRS stream failed: %v", err)
return
}
}
@ -168,7 +168,9 @@ func (t *Transport) sendLoads(ctx context.Context, stream lrsStream, clusterName
func (t *Transport) sendFirstLoadStatsRequest(stream lrsStream, node *v3corepb.Node) error {
req := &v3lrspb.LoadStatsRequest{Node: node}
t.logger.Debugf("Sending initial LoadStatsRequest: %s", pretty.ToJSON(req))
if t.logger.V(perRPCVerbosityLevel) {
t.logger.Infof("Sending initial LoadStatsRequest: %s", pretty.ToJSON(req))
}
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)
@ -181,7 +183,9 @@ func (t *Transport) recvFirstLoadStatsResponse(stream lrsStream) ([]string, time
if err != nil {
return nil, 0, fmt.Errorf("failed to receive first LoadStatsResponse: %v", err)
}
t.logger.Debugf("Received first LoadStatsResponse: %s", pretty.ToJSON(resp))
if t.logger.V(perRPCVerbosityLevel) {
t.logger.Infof("Received first LoadStatsResponse: %s", pretty.ToJSON(resp))
}
interval, err := ptypes.Duration(resp.GetLoadReportingInterval())
if err != nil {
@ -251,7 +255,9 @@ func (t *Transport) sendLoadStatsRequest(stream lrsStream, loads []*load.Data) e
}
req := &v3lrspb.LoadStatsRequest{ClusterStats: clusterStats}
t.logger.Debugf("Sending LRS loads: %s", pretty.ToJSON(req))
if t.logger.V(perRPCVerbosityLevel) {
t.logger.Infof("Sending LRS loads: %s", pretty.ToJSON(req))
}
err := stream.Send(req)
if err == io.EOF {
return getStreamError(stream)

View File

@ -45,6 +45,12 @@ import (
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)
// Any per-RPC level logs which print complete request or response messages
// should be gated at this verbosity level. Other per-RPC level logs which print
// terse output should be at `INFO` and verbosity 2, which corresponds to using
// the `Debugf` method on the logger.
const perRPCVerbosityLevel = 9
type adsStream = v3adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Transport provides a resource-type agnostic implementation of the xDS
@ -264,19 +270,26 @@ func (t *Transport) sendAggregatedDiscoveryServiceRequest(stream adsStream, reso
}
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("sending ADS request %s failed: %v", pretty.ToJSON(req), err)
return err
}
if t.logger.V(perRPCVerbosityLevel) {
t.logger.Infof("ADS request sent: %v", pretty.ToJSON(req))
} else {
t.logger.Debugf("ADS request sent for type %q, resources: %v, version %q, nonce %q", resourceURL, resourceNames, version, nonce)
}
t.logger.Debugf("ADS request sent: %v", pretty.ToJSON(req))
return nil
}
func (t *Transport) recvAggregatedDiscoveryServiceResponse(stream adsStream) (resources []*anypb.Any, resourceURL, version, nonce string, err error) {
resp, err := stream.Recv()
if err != nil {
return nil, "", "", "", fmt.Errorf("failed to read ADS response: %v", err)
return nil, "", "", "", err
}
if t.logger.V(perRPCVerbosityLevel) {
t.logger.Infof("ADS response received: %v", pretty.ToJSON(resp))
} else {
t.logger.Debugf("ADS response received for type %q, version %q, nonce %q", resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce())
}
t.logger.Infof("ADS response received, type: %v", resp.GetTypeUrl())
t.logger.Debugf("ADS response received: %v", pretty.ToJSON(resp))
return resp.GetResources(), resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), nil
}
@ -307,7 +320,7 @@ func (t *Transport) adsRunner(ctx context.Context) {
stream, err := t.newAggregatedDiscoveryServiceStream(ctx, t.cc)
if err != nil {
t.adsStreamErrHandler(err)
t.logger.Warningf("ADS stream creation failed: %v", err)
t.logger.Warningf("Creating new ADS stream failed: %v", err)
return false
}
t.logger.Infof("ADS stream created")
@ -377,7 +390,7 @@ func (t *Transport) send(ctx context.Context) {
continue
}
if err := t.sendAggregatedDiscoveryServiceRequest(stream, resources, url, version, nonce, nackErr); err != nil {
t.logger.Warningf("ADS request for {resources: %q, url: %v, version: %q, nonce: %q} failed: %v", resources, url, version, nonce, err)
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err)
// Send failed, clear the current stream.
stream = nil
}
@ -410,7 +423,7 @@ func (t *Transport) sendExisting(stream adsStream) bool {
for url, resources := range t.resources {
if err := t.sendAggregatedDiscoveryServiceRequest(stream, mapToSlice(resources), url, t.versions[url], "", nil); err != nil {
t.logger.Warningf("ADS request failed: %v", err)
t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err)
return false
}
}
@ -427,7 +440,7 @@ func (t *Transport) recv(stream adsStream) bool {
resources, url, rVersion, nonce, err := t.recvAggregatedDiscoveryServiceResponse(stream)
if err != nil {
t.adsStreamErrHandler(err)
t.logger.Warningf("ADS stream is closed with error: %v", err)
t.logger.Warningf("ADS stream closed: %v", err)
return msgReceived
}
msgReceived = true
@ -454,7 +467,7 @@ func (t *Transport) recv(stream adsStream) bool {
nackErr: err,
})
t.mu.Unlock()
t.logger.Warningf("Sending NACK for resource type: %v, version: %v, nonce: %v, reason: %v", url, rVersion, nonce, err)
t.logger.Warningf("Sending NACK for resource type: %q, version: %q, nonce: %q, reason: %v", url, rVersion, nonce, err)
continue
}
t.adsRequestCh.Put(&ackRequest{
@ -463,7 +476,7 @@ func (t *Transport) recv(stream adsStream) bool {
stream: stream,
version: rVersion,
})
t.logger.Infof("Sending ACK for resource type: %v, version: %v, nonce: %v", url, rVersion, nonce)
t.logger.Debugf("Sending ACK for resource type: %q, version: %q, nonce: %q", url, rVersion, nonce)
}
}