gcp/observability: Change logging schema and set queue size limit for logs and batching delay (#6118)

This commit is contained in:
Zach Reyes 2023-03-14 20:20:09 -04:00 committed by GitHub
parent 16c3b7df7f
commit 7507ea6bcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 22 deletions

View File

@ -21,6 +21,7 @@ package observability
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"google.golang.org/api/option" "google.golang.org/api/option"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -72,7 +73,7 @@ func newCloudLoggingExporter(ctx context.Context, config *config) (loggingExport
return &cloudLoggingExporter{ return &cloudLoggingExporter{
projectID: config.ProjectID, projectID: config.ProjectID,
client: c, client: c,
logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.Labels)), logger: c.Logger("microservices.googleapis.com/observability/grpc", gcplogging.CommonLabels(config.Labels), gcplogging.BufferedByteLimit(1024*1024*50), gcplogging.DelayThreshold(time.Second*10)),
}, nil }, nil
} }

View File

@ -33,6 +33,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/binarylog"
iblog "google.golang.org/grpc/internal/binarylog" iblog "google.golang.org/grpc/internal/binarylog"
@ -44,6 +45,8 @@ var lExporter loggingExporter
var newLoggingExporter = newCloudLoggingExporter var newLoggingExporter = newCloudLoggingExporter
var canonicalString = internal.CanonicalString.(func(codes.Code) string)
// translateMetadata translates the metadata from Binary Logging format to // translateMetadata translates the metadata from Binary Logging format to
// its GrpcLogEntry equivalent. // its GrpcLogEntry equivalent.
func translateMetadata(m *binlogpb.Metadata) map[string]string { func translateMetadata(m *binlogpb.Metadata) map[string]string {
@ -153,7 +156,7 @@ type payload struct {
// Timeout is the RPC timeout value. // Timeout is the RPC timeout value.
Timeout time.Duration `json:"timeout,omitempty"` Timeout time.Duration `json:"timeout,omitempty"`
// StatusCode is the gRPC status code. // StatusCode is the gRPC status code.
StatusCode uint32 `json:"statusCode,omitempty"` StatusCode string `json:"statusCode,omitempty"`
// StatusMessage is the gRPC status message. // StatusMessage is the gRPC status message.
StatusMessage string `json:"statusMessage,omitempty"` StatusMessage string `json:"statusMessage,omitempty"`
// StatusDetails is the value of the grpc-status-details-bin metadata key, // StatusDetails is the value of the grpc-status-details-bin metadata key,
@ -170,9 +173,9 @@ type addrType int
const ( const (
typeUnknown addrType = iota // `json:"TYPE_UNKNOWN"` typeUnknown addrType = iota // `json:"TYPE_UNKNOWN"`
typeIPv4 // `json:"TYPE_IPV4"` ipv4 // `json:"IPV4"`
typeIPv6 // `json:"TYPE_IPV6"` ipv6 // `json:"IPV6"`
typeUnix // `json:"TYPE_UNIX"` unix // `json:"UNIX"`
) )
func (at addrType) MarshalJSON() ([]byte, error) { func (at addrType) MarshalJSON() ([]byte, error) {
@ -180,12 +183,12 @@ func (at addrType) MarshalJSON() ([]byte, error) {
switch at { switch at {
case typeUnknown: case typeUnknown:
buffer.WriteString("TYPE_UNKNOWN") buffer.WriteString("TYPE_UNKNOWN")
case typeIPv4: case ipv4:
buffer.WriteString("TYPE_IPV4") buffer.WriteString("IPV4")
case typeIPv6: case ipv6:
buffer.WriteString("TYPE_IPV6") buffer.WriteString("IPV6")
case typeUnix: case unix:
buffer.WriteString("TYPE_UNIX") buffer.WriteString("UNIX")
} }
buffer.WriteString(`"`) buffer.WriteString(`"`)
return buffer.Bytes(), nil return buffer.Bytes(), nil
@ -303,7 +306,7 @@ func (bml *binaryMethodLogger) buildGCPLoggingEntry(ctx context.Context, c iblog
case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER: case binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER:
grpcLogEntry.Type = eventTypeServerTrailer grpcLogEntry.Type = eventTypeServerTrailer
grpcLogEntry.Payload.Metadata = translateMetadata(binLogEntry.GetTrailer().Metadata) grpcLogEntry.Payload.Metadata = translateMetadata(binLogEntry.GetTrailer().Metadata)
grpcLogEntry.Payload.StatusCode = binLogEntry.GetTrailer().GetStatusCode() grpcLogEntry.Payload.StatusCode = canonicalString(codes.Code(binLogEntry.GetTrailer().GetStatusCode()))
grpcLogEntry.Payload.StatusMessage = binLogEntry.GetTrailer().GetStatusMessage() grpcLogEntry.Payload.StatusMessage = binLogEntry.GetTrailer().GetStatusMessage()
grpcLogEntry.Payload.StatusDetails = binLogEntry.GetTrailer().GetStatusDetails() grpcLogEntry.Payload.StatusDetails = binLogEntry.GetTrailer().GetStatusDetails()
grpcLogEntry.PayloadTruncated = binLogEntry.GetPayloadTruncated() grpcLogEntry.PayloadTruncated = binLogEntry.GetPayloadTruncated()

View File

@ -231,7 +231,8 @@ func (s) TestClientRPCEventsLogAll(t *testing.T) {
SequenceID: 5, SequenceID: 5,
Authority: ss.Address, Authority: ss.Address,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -319,7 +320,8 @@ func (s) TestClientRPCEventsLogAll(t *testing.T) {
Authority: ss.Address, Authority: ss.Address,
SequenceID: 6, SequenceID: 6,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -438,7 +440,8 @@ func (s) TestServerRPCEventsLogAll(t *testing.T) {
SequenceID: 5, SequenceID: 5,
Authority: ss.Address, Authority: ss.Address,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -525,7 +528,8 @@ func (s) TestServerRPCEventsLogAll(t *testing.T) {
Authority: ss.Address, Authority: ss.Address,
SequenceID: 6, SequenceID: 6,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -745,7 +749,8 @@ func (s) TestClientRPCEventsTruncateHeaderAndMetadata(t *testing.T) {
SequenceID: 5, SequenceID: 5,
Authority: ss.Address, Authority: ss.Address,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -892,7 +897,8 @@ func (s) TestPrecedenceOrderingInConfiguration(t *testing.T) {
SequenceID: 5, SequenceID: 5,
Authority: ss.Address, Authority: ss.Address,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -959,7 +965,8 @@ func (s) TestPrecedenceOrderingInConfiguration(t *testing.T) {
Authority: ss.Address, Authority: ss.Address,
SequenceID: 3, SequenceID: 3,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }
@ -1080,14 +1087,14 @@ func (s) TestMarshalJSON(t *testing.T) {
Payload: payload{ Payload: payload{
Metadata: map[string]string{"header1": "value1"}, Metadata: map[string]string{"header1": "value1"},
Timeout: 20, Timeout: 20,
StatusCode: 3, StatusCode: "UNKNOWN",
StatusMessage: "ok", StatusMessage: "ok",
StatusDetails: []byte("ok"), StatusDetails: []byte("ok"),
MessageLength: 3, MessageLength: 3,
Message: []byte("wow"), Message: []byte("wow"),
}, },
Peer: address{ Peer: address{
Type: typeIPv4, Type: ipv4,
Address: "localhost", Address: "localhost",
IPPort: 16000, IPPort: 16000,
}, },
@ -1214,7 +1221,8 @@ func (s) TestMetadataTruncationAccountsKey(t *testing.T) {
SequenceID: 5, SequenceID: 5,
Authority: ss.Address, Authority: ss.Address,
Payload: payload{ Payload: payload{
Metadata: map[string]string{}, Metadata: map[string]string{},
StatusCode: "OK",
}, },
}, },
} }