binarylog: consistently rename imports for binarylog proto (#5931)

This commit is contained in:
Easwar Swaminathan 2023-01-12 16:00:34 -08:00 committed by GitHub
parent bf3ad35240
commit be06d526c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 243 additions and 243 deletions

View File

@ -38,7 +38,7 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
testgrpc "google.golang.org/grpc/interop/grpc_testing" testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing"
) )
@ -64,10 +64,10 @@ var testSink = &testBinLogSink{}
type testBinLogSink struct { type testBinLogSink struct {
mu sync.Mutex mu sync.Mutex
buf []*pb.GrpcLogEntry buf []*binlogpb.GrpcLogEntry
} }
func (s *testBinLogSink) Write(e *pb.GrpcLogEntry) error { func (s *testBinLogSink) Write(e *binlogpb.GrpcLogEntry) error {
s.mu.Lock() s.mu.Lock()
s.buf = append(s.buf, e) s.buf = append(s.buf, e)
s.mu.Unlock() s.mu.Unlock()
@ -78,12 +78,12 @@ func (s *testBinLogSink) Close() error { return nil }
// Returns all client entris if client is true, otherwise return all server // Returns all client entris if client is true, otherwise return all server
// entries. // entries.
func (s *testBinLogSink) logEntries(client bool) []*pb.GrpcLogEntry { func (s *testBinLogSink) logEntries(client bool) []*binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_SERVER logger := binlogpb.GrpcLogEntry_LOGGER_SERVER
if client { if client {
logger = pb.GrpcLogEntry_LOGGER_CLIENT logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} }
var ret []*pb.GrpcLogEntry var ret []*binlogpb.GrpcLogEntry
s.mu.Lock() s.mu.Lock()
for _, e := range s.buf { for _, e := range s.buf {
if e.Logger == logger { if e.Logger == logger {
@ -481,31 +481,31 @@ type expectedData struct {
err error err error
} }
func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry { func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_CLIENT logger := binlogpb.GrpcLogEntry_LOGGER_CLIENT
var peer *pb.Address var peer *binlogpb.Address
if !client { if !client {
logger = pb.GrpcLogEntry_LOGGER_SERVER logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
ed.te.clientAddrMu.Lock() ed.te.clientAddrMu.Lock()
peer = &pb.Address{ peer = &binlogpb.Address{
Address: ed.te.clientIP.String(), Address: ed.te.clientIP.String(),
IpPort: uint32(ed.te.clientPort), IpPort: uint32(ed.te.clientPort),
} }
if ed.te.clientIP.To4() != nil { if ed.te.clientIP.To4() != nil {
peer.Type = pb.Address_TYPE_IPV4 peer.Type = binlogpb.Address_TYPE_IPV4
} else { } else {
peer.Type = pb.Address_TYPE_IPV6 peer.Type = binlogpb.Address_TYPE_IPV6
} }
ed.te.clientAddrMu.Unlock() ed.te.clientAddrMu.Unlock()
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Logger: logger, Logger: logger,
Payload: &pb.GrpcLogEntry_ClientHeader{ Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{ ClientHeader: &binlogpb.ClientHeader{
Metadata: iblog.MdToMetadataProto(testMetadata), Metadata: iblog.MdToMetadataProto(testMetadata),
MethodName: ed.method, MethodName: ed.method,
Authority: ed.te.srvAddr, Authority: ed.te.srvAddr,
@ -515,29 +515,29 @@ func (ed *expectedData) newClientHeaderEntry(client bool, rpcID, inRPCID uint64)
} }
} }
func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry { func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_SERVER logger := binlogpb.GrpcLogEntry_LOGGER_SERVER
var peer *pb.Address var peer *binlogpb.Address
if client { if client {
logger = pb.GrpcLogEntry_LOGGER_CLIENT logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
peer = &pb.Address{ peer = &binlogpb.Address{
Address: ed.te.srvIP.String(), Address: ed.te.srvIP.String(),
IpPort: uint32(ed.te.srvPort), IpPort: uint32(ed.te.srvPort),
} }
if ed.te.srvIP.To4() != nil { if ed.te.srvIP.To4() != nil {
peer.Type = pb.Address_TYPE_IPV4 peer.Type = binlogpb.Address_TYPE_IPV4
} else { } else {
peer.Type = pb.Address_TYPE_IPV6 peer.Type = binlogpb.Address_TYPE_IPV6
} }
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Logger: logger, Logger: logger,
Payload: &pb.GrpcLogEntry_ServerHeader{ Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{ ServerHeader: &binlogpb.ServerHeader{
Metadata: iblog.MdToMetadataProto(testMetadata), Metadata: iblog.MdToMetadataProto(testMetadata),
}, },
}, },
@ -545,23 +545,23 @@ func (ed *expectedData) newServerHeaderEntry(client bool, rpcID, inRPCID uint64)
} }
} }
func (ed *expectedData) newClientMessageEntry(client bool, rpcID, inRPCID uint64, msg proto.Message) *pb.GrpcLogEntry { func (ed *expectedData) newClientMessageEntry(client bool, rpcID, inRPCID uint64, msg proto.Message) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_CLIENT logger := binlogpb.GrpcLogEntry_LOGGER_CLIENT
if !client { if !client {
logger = pb.GrpcLogEntry_LOGGER_SERVER logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
data, err := proto.Marshal(msg) data, err := proto.Marshal(msg)
if err != nil { if err != nil {
grpclogLogger.Infof("binarylogging_testing: failed to marshal proto message: %v", err) grpclogLogger.Infof("binarylogging_testing: failed to marshal proto message: %v", err)
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Logger: logger, Logger: logger,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(data)), Length: uint32(len(data)),
Data: data, Data: data,
}, },
@ -569,23 +569,23 @@ func (ed *expectedData) newClientMessageEntry(client bool, rpcID, inRPCID uint64
} }
} }
func (ed *expectedData) newServerMessageEntry(client bool, rpcID, inRPCID uint64, msg proto.Message) *pb.GrpcLogEntry { func (ed *expectedData) newServerMessageEntry(client bool, rpcID, inRPCID uint64, msg proto.Message) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_CLIENT logger := binlogpb.GrpcLogEntry_LOGGER_CLIENT
if !client { if !client {
logger = pb.GrpcLogEntry_LOGGER_SERVER logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
data, err := proto.Marshal(msg) data, err := proto.Marshal(msg)
if err != nil { if err != nil {
grpclogLogger.Infof("binarylogging_testing: failed to marshal proto message: %v", err) grpclogLogger.Infof("binarylogging_testing: failed to marshal proto message: %v", err)
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Logger: logger, Logger: logger,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(data)), Length: uint32(len(data)),
Data: data, Data: data,
}, },
@ -593,34 +593,34 @@ func (ed *expectedData) newServerMessageEntry(client bool, rpcID, inRPCID uint64
} }
} }
func (ed *expectedData) newHalfCloseEntry(client bool, rpcID, inRPCID uint64) *pb.GrpcLogEntry { func (ed *expectedData) newHalfCloseEntry(client bool, rpcID, inRPCID uint64) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_CLIENT logger := binlogpb.GrpcLogEntry_LOGGER_CLIENT
if !client { if !client {
logger = pb.GrpcLogEntry_LOGGER_SERVER logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
Payload: nil, // No payload here. Payload: nil, // No payload here.
Logger: logger, Logger: logger,
} }
} }
func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64, stErr error) *pb.GrpcLogEntry { func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64, stErr error) *binlogpb.GrpcLogEntry {
logger := pb.GrpcLogEntry_LOGGER_SERVER logger := binlogpb.GrpcLogEntry_LOGGER_SERVER
var peer *pb.Address var peer *binlogpb.Address
if client { if client {
logger = pb.GrpcLogEntry_LOGGER_CLIENT logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
peer = &pb.Address{ peer = &binlogpb.Address{
Address: ed.te.srvIP.String(), Address: ed.te.srvIP.String(),
IpPort: uint32(ed.te.srvPort), IpPort: uint32(ed.te.srvPort),
} }
if ed.te.srvIP.To4() != nil { if ed.te.srvIP.To4() != nil {
peer.Type = pb.Address_TYPE_IPV4 peer.Type = binlogpb.Address_TYPE_IPV4
} else { } else {
peer.Type = pb.Address_TYPE_IPV6 peer.Type = binlogpb.Address_TYPE_IPV6
} }
} }
st, ok := status.FromError(stErr) st, ok := status.FromError(stErr)
@ -638,14 +638,14 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64
grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err) grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
} }
} }
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Logger: logger, Logger: logger,
Payload: &pb.GrpcLogEntry_Trailer{ Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{ Trailer: &binlogpb.Trailer{
Metadata: iblog.MdToMetadataProto(testTrailerMetadata), Metadata: iblog.MdToMetadataProto(testTrailerMetadata),
// st will be nil if err was not a status error, but nil is ok. // st will be nil if err was not a status error, but nil is ok.
StatusCode: uint32(st.Code()), StatusCode: uint32(st.Code()),
@ -657,20 +657,20 @@ func (ed *expectedData) newServerTrailerEntry(client bool, rpcID, inRPCID uint64
} }
} }
func (ed *expectedData) newCancelEntry(rpcID, inRPCID uint64) *pb.GrpcLogEntry { func (ed *expectedData) newCancelEntry(rpcID, inRPCID uint64) *binlogpb.GrpcLogEntry {
return &pb.GrpcLogEntry{ return &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: rpcID, CallId: rpcID,
SequenceIdWithinCall: inRPCID, SequenceIdWithinCall: inRPCID,
Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: nil, Payload: nil,
} }
} }
func (ed *expectedData) toClientLogEntries() []*pb.GrpcLogEntry { func (ed *expectedData) toClientLogEntries() []*binlogpb.GrpcLogEntry {
var ( var (
ret []*pb.GrpcLogEntry ret []*binlogpb.GrpcLogEntry
idInRPC uint64 = 1 idInRPC uint64 = 1
) )
ret = append(ret, ed.newClientHeaderEntry(true, globalRPCID, idInRPC)) ret = append(ret, ed.newClientHeaderEntry(true, globalRPCID, idInRPC))
@ -726,9 +726,9 @@ func (ed *expectedData) toClientLogEntries() []*pb.GrpcLogEntry {
return ret return ret
} }
func (ed *expectedData) toServerLogEntries() []*pb.GrpcLogEntry { func (ed *expectedData) toServerLogEntries() []*binlogpb.GrpcLogEntry {
var ( var (
ret []*pb.GrpcLogEntry ret []*binlogpb.GrpcLogEntry
idInRPC uint64 = 1 idInRPC uint64 = 1
) )
ret = append(ret, ed.newClientHeaderEntry(false, globalRPCID, idInRPC)) ret = append(ret, ed.newClientHeaderEntry(false, globalRPCID, idInRPC))
@ -838,7 +838,7 @@ func runRPCs(t *testing.T, cc *rpcConfig) *expectedData {
// //
// This function is typically called with only two entries. It's written in this // This function is typically called with only two entries. It's written in this
// way so the code can be put in a for loop instead of copied twice. // way so the code can be put in a for loop instead of copied twice.
func equalLogEntry(entries ...*pb.GrpcLogEntry) (equal bool) { func equalLogEntry(entries ...*binlogpb.GrpcLogEntry) (equal bool) {
for i, e := range entries { for i, e := range entries {
// Clear out some fields we don't compare. // Clear out some fields we don't compare.
e.Timestamp = nil e.Timestamp = nil
@ -869,7 +869,7 @@ func testClientBinaryLog(t *testing.T, c *rpcConfig) error {
defer testSink.clear() defer testSink.clear()
expect := runRPCs(t, c) expect := runRPCs(t, c)
want := expect.toClientLogEntries() want := expect.toClientLogEntries()
var got []*pb.GrpcLogEntry var got []*binlogpb.GrpcLogEntry
// In racy cases, some entries are not logged when the RPC is finished (e.g. // In racy cases, some entries are not logged when the RPC is finished (e.g.
// context.Cancel). // context.Cancel).
// //
@ -969,7 +969,7 @@ func testServerBinaryLog(t *testing.T, c *rpcConfig) error {
defer testSink.clear() defer testSink.clear()
expect := runRPCs(t, c) expect := runRPCs(t, c)
want := expect.toServerLogEntries() want := expect.toServerLogEntries()
var got []*pb.GrpcLogEntry var got []*binlogpb.GrpcLogEntry
// In racy cases, some entries are not logged when the RPC is finished (e.g. // In racy cases, some entries are not logged when the RPC is finished (e.g.
// context.Cancel). This is unlikely to happen on server side, but it does // context.Cancel). This is unlikely to happen on server side, but it does
// no harm to retry. // no harm to retry.

View File

@ -26,7 +26,7 @@ import (
"fmt" "fmt"
"os" "os"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
iblog "google.golang.org/grpc/internal/binarylog" iblog "google.golang.org/grpc/internal/binarylog"
) )
@ -48,7 +48,7 @@ type Sink interface {
// entry. Some options are: proto bytes, or proto json. // entry. Some options are: proto bytes, or proto json.
// //
// Note this function needs to be thread-safe. // Note this function needs to be thread-safe.
Write(*pb.GrpcLogEntry) error Write(*binlogpb.GrpcLogEntry) error
// Close closes this sink and cleans up resources (e.g. the flushing // Close closes this sink and cleans up resources (e.g. the flushing
// goroutine). // goroutine).
Close() error Close() error

View File

@ -26,7 +26,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
) )
@ -79,7 +79,7 @@ func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
// Build is an internal only method for building the proto message out of the // Build is an internal only method for building the proto message out of the
// input event. It's made public to enable other library to reuse as much logic // input event. It's made public to enable other library to reuse as much logic
// in TruncatingMethodLogger as possible. // in TruncatingMethodLogger as possible.
func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry { func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
m := c.toProto() m := c.toProto()
timestamp, _ := ptypes.TimestampProto(time.Now()) timestamp, _ := ptypes.TimestampProto(time.Now())
m.Timestamp = timestamp m.Timestamp = timestamp
@ -87,11 +87,11 @@ func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *pb.GrpcLogEntry {
m.SequenceIdWithinCall = ml.idWithinCallGen.next() m.SequenceIdWithinCall = ml.idWithinCallGen.next()
switch pay := m.Payload.(type) { switch pay := m.Payload.(type) {
case *pb.GrpcLogEntry_ClientHeader: case *binlogpb.GrpcLogEntry_ClientHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
case *pb.GrpcLogEntry_ServerHeader: case *binlogpb.GrpcLogEntry_ServerHeader:
m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
case *pb.GrpcLogEntry_Message: case *binlogpb.GrpcLogEntry_Message:
m.PayloadTruncated = ml.truncateMessage(pay.Message) m.PayloadTruncated = ml.truncateMessage(pay.Message)
} }
return m return m
@ -102,7 +102,7 @@ func (ml *TruncatingMethodLogger) Log(c LogEntryConfig) {
ml.sink.Write(ml.Build(c)) ml.sink.Write(ml.Build(c))
} }
func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) { func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
if ml.headerMaxLen == maxUInt { if ml.headerMaxLen == maxUInt {
return false return false
} }
@ -132,7 +132,7 @@ func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated
return truncated return truncated
} }
func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) { func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
if ml.messageMaxLen == maxUInt { if ml.messageMaxLen == maxUInt {
return false return false
} }
@ -145,7 +145,7 @@ func (ml *TruncatingMethodLogger) truncateMessage(msgPb *pb.Message) (truncated
// LogEntryConfig represents the configuration for binary log entry. // LogEntryConfig represents the configuration for binary log entry.
type LogEntryConfig interface { type LogEntryConfig interface {
toProto() *pb.GrpcLogEntry toProto() *binlogpb.GrpcLogEntry
} }
// ClientHeader configs the binary log entry to be a ClientHeader entry. // ClientHeader configs the binary log entry to be a ClientHeader entry.
@ -159,10 +159,10 @@ type ClientHeader struct {
PeerAddr net.Addr PeerAddr net.Addr
} }
func (c *ClientHeader) toProto() *pb.GrpcLogEntry { func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
// This function doesn't need to set all the fields (e.g. seq ID). The Log // This function doesn't need to set all the fields (e.g. seq ID). The Log
// function will set the fields when necessary. // function will set the fields when necessary.
clientHeader := &pb.ClientHeader{ clientHeader := &binlogpb.ClientHeader{
Metadata: mdToMetadataProto(c.Header), Metadata: mdToMetadataProto(c.Header),
MethodName: c.MethodName, MethodName: c.MethodName,
Authority: c.Authority, Authority: c.Authority,
@ -170,16 +170,16 @@ func (c *ClientHeader) toProto() *pb.GrpcLogEntry {
if c.Timeout > 0 { if c.Timeout > 0 {
clientHeader.Timeout = ptypes.DurationProto(c.Timeout) clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
} }
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Payload: &pb.GrpcLogEntry_ClientHeader{ Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: clientHeader, ClientHeader: clientHeader,
}, },
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
if c.PeerAddr != nil { if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr) ret.Peer = addrToProto(c.PeerAddr)
@ -195,19 +195,19 @@ type ServerHeader struct {
PeerAddr net.Addr PeerAddr net.Addr
} }
func (c *ServerHeader) toProto() *pb.GrpcLogEntry { func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Payload: &pb.GrpcLogEntry_ServerHeader{ Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{ ServerHeader: &binlogpb.ServerHeader{
Metadata: mdToMetadataProto(c.Header), Metadata: mdToMetadataProto(c.Header),
}, },
}, },
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
if c.PeerAddr != nil { if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr) ret.Peer = addrToProto(c.PeerAddr)
@ -223,7 +223,7 @@ type ClientMessage struct {
Message interface{} Message interface{}
} }
func (c *ClientMessage) toProto() *pb.GrpcLogEntry { func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
var ( var (
data []byte data []byte
err error err error
@ -238,19 +238,19 @@ func (c *ClientMessage) toProto() *pb.GrpcLogEntry {
} else { } else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
} }
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(data)), Length: uint32(len(data)),
Data: data, Data: data,
}, },
}, },
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
return ret return ret
} }
@ -263,7 +263,7 @@ type ServerMessage struct {
Message interface{} Message interface{}
} }
func (c *ServerMessage) toProto() *pb.GrpcLogEntry { func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
var ( var (
data []byte data []byte
err error err error
@ -278,19 +278,19 @@ func (c *ServerMessage) toProto() *pb.GrpcLogEntry {
} else { } else {
grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte") grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
} }
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(data)), Length: uint32(len(data)),
Data: data, Data: data,
}, },
}, },
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
return ret return ret
} }
@ -300,15 +300,15 @@ type ClientHalfClose struct {
OnClientSide bool OnClientSide bool
} }
func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry { func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
Payload: nil, // No payload here. Payload: nil, // No payload here.
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
return ret return ret
} }
@ -324,7 +324,7 @@ type ServerTrailer struct {
PeerAddr net.Addr PeerAddr net.Addr
} }
func (c *ServerTrailer) toProto() *pb.GrpcLogEntry { func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
st, ok := status.FromError(c.Err) st, ok := status.FromError(c.Err)
if !ok { if !ok {
grpclogLogger.Info("binarylogging: error in trailer is not a status error") grpclogLogger.Info("binarylogging: error in trailer is not a status error")
@ -340,10 +340,10 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err) grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
} }
} }
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Payload: &pb.GrpcLogEntry_Trailer{ Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{ Trailer: &binlogpb.Trailer{
Metadata: mdToMetadataProto(c.Trailer), Metadata: mdToMetadataProto(c.Trailer),
StatusCode: uint32(st.Code()), StatusCode: uint32(st.Code()),
StatusMessage: st.Message(), StatusMessage: st.Message(),
@ -352,9 +352,9 @@ func (c *ServerTrailer) toProto() *pb.GrpcLogEntry {
}, },
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
if c.PeerAddr != nil { if c.PeerAddr != nil {
ret.Peer = addrToProto(c.PeerAddr) ret.Peer = addrToProto(c.PeerAddr)
@ -367,15 +367,15 @@ type Cancel struct {
OnClientSide bool OnClientSide bool
} }
func (c *Cancel) toProto() *pb.GrpcLogEntry { func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
ret := &pb.GrpcLogEntry{ ret := &binlogpb.GrpcLogEntry{
Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
Payload: nil, Payload: nil,
} }
if c.OnClientSide { if c.OnClientSide {
ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
} else { } else {
ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
} }
return ret return ret
} }
@ -392,15 +392,15 @@ func metadataKeyOmit(key string) bool {
return strings.HasPrefix(key, "grpc-") return strings.HasPrefix(key, "grpc-")
} }
func mdToMetadataProto(md metadata.MD) *pb.Metadata { func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
ret := &pb.Metadata{} ret := &binlogpb.Metadata{}
for k, vv := range md { for k, vv := range md {
if metadataKeyOmit(k) { if metadataKeyOmit(k) {
continue continue
} }
for _, v := range vv { for _, v := range vv {
ret.Entry = append(ret.Entry, ret.Entry = append(ret.Entry,
&pb.MetadataEntry{ &binlogpb.MetadataEntry{
Key: k, Key: k,
Value: []byte(v), Value: []byte(v),
}, },
@ -410,26 +410,26 @@ func mdToMetadataProto(md metadata.MD) *pb.Metadata {
return ret return ret
} }
func addrToProto(addr net.Addr) *pb.Address { func addrToProto(addr net.Addr) *binlogpb.Address {
ret := &pb.Address{} ret := &binlogpb.Address{}
switch a := addr.(type) { switch a := addr.(type) {
case *net.TCPAddr: case *net.TCPAddr:
if a.IP.To4() != nil { if a.IP.To4() != nil {
ret.Type = pb.Address_TYPE_IPV4 ret.Type = binlogpb.Address_TYPE_IPV4
} else if a.IP.To16() != nil { } else if a.IP.To16() != nil {
ret.Type = pb.Address_TYPE_IPV6 ret.Type = binlogpb.Address_TYPE_IPV6
} else { } else {
ret.Type = pb.Address_TYPE_UNKNOWN ret.Type = binlogpb.Address_TYPE_UNKNOWN
// Do not set address and port fields. // Do not set address and port fields.
break break
} }
ret.Address = a.IP.String() ret.Address = a.IP.String()
ret.IpPort = uint32(a.Port) ret.IpPort = uint32(a.Port)
case *net.UnixAddr: case *net.UnixAddr:
ret.Type = pb.Address_TYPE_UNIX ret.Type = binlogpb.Address_TYPE_UNIX
ret.Address = a.String() ret.Address = a.String()
default: default:
ret.Type = pb.Address_TYPE_UNKNOWN ret.Type = binlogpb.Address_TYPE_UNKNOWN
} }
return ret return ret
} }

View File

@ -26,10 +26,10 @@ import (
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
dpb "github.com/golang/protobuf/ptypes/duration" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
) )
func (s) TestLog(t *testing.T) { func (s) TestLog(t *testing.T) {
@ -46,7 +46,7 @@ func (s) TestLog(t *testing.T) {
port6 := 796 port6 := 796
tcpAddr6, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("[%v]:%d", addr6, port6)) tcpAddr6, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("[%v]:%d", addr6, port6))
testProtoMsg := &pb.Message{ testProtoMsg := &binlogpb.Message{
Length: 1, Length: 1,
Data: []byte{'a'}, Data: []byte{'a'},
} }
@ -54,7 +54,7 @@ func (s) TestLog(t *testing.T) {
testCases := []struct { testCases := []struct {
config LogEntryConfig config LogEntryConfig
want *pb.GrpcLogEntry want *binlogpb.GrpcLogEntry
}{ }{
{ {
config: &ClientHeader{ config: &ClientHeader{
@ -67,31 +67,31 @@ func (s) TestLog(t *testing.T) {
Timeout: 2*time.Second + 3*time.Nanosecond, Timeout: 2*time.Second + 3*time.Nanosecond,
PeerAddr: tcpAddr, PeerAddr: tcpAddr,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Logger: pb.GrpcLogEntry_LOGGER_SERVER, Logger: binlogpb.GrpcLogEntry_LOGGER_SERVER,
Payload: &pb.GrpcLogEntry_ClientHeader{ Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{ ClientHeader: &binlogpb.ClientHeader{
Metadata: &pb.Metadata{ Metadata: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "a", Value: []byte{'b'}}, {Key: "a", Value: []byte{'b'}},
{Key: "a", Value: []byte{'b', 'b'}}, {Key: "a", Value: []byte{'b', 'b'}},
}, },
}, },
MethodName: "testservice/testmethod", MethodName: "testservice/testmethod",
Authority: "test.service.io", Authority: "test.service.io",
Timeout: &dpb.Duration{ Timeout: &durationpb.Duration{
Seconds: 2, Seconds: 2,
Nanos: 3, Nanos: 3,
}, },
}, },
}, },
PayloadTruncated: false, PayloadTruncated: false,
Peer: &pb.Address{ Peer: &binlogpb.Address{
Type: pb.Address_TYPE_IPV4, Type: binlogpb.Address_TYPE_IPV4,
Address: addr, Address: addr,
IpPort: uint32(port), IpPort: uint32(port),
}, },
@ -103,15 +103,15 @@ func (s) TestLog(t *testing.T) {
MethodName: "testservice/testmethod", MethodName: "testservice/testmethod",
Authority: "test.service.io", Authority: "test.service.io",
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Logger: pb.GrpcLogEntry_LOGGER_SERVER, Logger: binlogpb.GrpcLogEntry_LOGGER_SERVER,
Payload: &pb.GrpcLogEntry_ClientHeader{ Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{ ClientHeader: &binlogpb.ClientHeader{
Metadata: &pb.Metadata{}, Metadata: &binlogpb.Metadata{},
MethodName: "testservice/testmethod", MethodName: "testservice/testmethod",
Authority: "test.service.io", Authority: "test.service.io",
}, },
@ -127,16 +127,16 @@ func (s) TestLog(t *testing.T) {
}, },
PeerAddr: tcpAddr6, PeerAddr: tcpAddr6,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: &pb.GrpcLogEntry_ServerHeader{ Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{ ServerHeader: &binlogpb.ServerHeader{
Metadata: &pb.Metadata{ Metadata: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "a", Value: []byte{'b'}}, {Key: "a", Value: []byte{'b'}},
{Key: "a", Value: []byte{'b', 'b'}}, {Key: "a", Value: []byte{'b', 'b'}},
}, },
@ -144,8 +144,8 @@ func (s) TestLog(t *testing.T) {
}, },
}, },
PayloadTruncated: false, PayloadTruncated: false,
Peer: &pb.Address{ Peer: &binlogpb.Address{
Type: pb.Address_TYPE_IPV6, Type: binlogpb.Address_TYPE_IPV6,
Address: addr6, Address: addr6,
IpPort: uint32(port6), IpPort: uint32(port6),
}, },
@ -156,14 +156,14 @@ func (s) TestLog(t *testing.T) {
OnClientSide: true, OnClientSide: true,
Message: testProtoMsg, Message: testProtoMsg,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(testProtoBytes)), Length: uint32(len(testProtoBytes)),
Data: testProtoBytes, Data: testProtoBytes,
}, },
@ -177,14 +177,14 @@ func (s) TestLog(t *testing.T) {
OnClientSide: false, OnClientSide: false,
Message: testProtoMsg, Message: testProtoMsg,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
Logger: pb.GrpcLogEntry_LOGGER_SERVER, Logger: binlogpb.GrpcLogEntry_LOGGER_SERVER,
Payload: &pb.GrpcLogEntry_Message{ Payload: &binlogpb.GrpcLogEntry_Message{
Message: &pb.Message{ Message: &binlogpb.Message{
Length: uint32(len(testProtoBytes)), Length: uint32(len(testProtoBytes)),
Data: testProtoBytes, Data: testProtoBytes,
}, },
@ -197,12 +197,12 @@ func (s) TestLog(t *testing.T) {
config: &ClientHalfClose{ config: &ClientHalfClose{
OnClientSide: false, OnClientSide: false,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
Logger: pb.GrpcLogEntry_LOGGER_SERVER, Logger: binlogpb.GrpcLogEntry_LOGGER_SERVER,
Payload: nil, Payload: nil,
PayloadTruncated: false, PayloadTruncated: false,
Peer: nil, Peer: nil,
@ -214,23 +214,23 @@ func (s) TestLog(t *testing.T) {
Err: status.Errorf(codes.Unavailable, "test"), Err: status.Errorf(codes.Unavailable, "test"),
PeerAddr: tcpAddr, PeerAddr: tcpAddr,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: &pb.GrpcLogEntry_Trailer{ Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{ Trailer: &binlogpb.Trailer{
Metadata: &pb.Metadata{}, Metadata: &binlogpb.Metadata{},
StatusCode: uint32(codes.Unavailable), StatusCode: uint32(codes.Unavailable),
StatusMessage: "test", StatusMessage: "test",
StatusDetails: nil, StatusDetails: nil,
}, },
}, },
PayloadTruncated: false, PayloadTruncated: false,
Peer: &pb.Address{ Peer: &binlogpb.Address{
Type: pb.Address_TYPE_IPV4, Type: binlogpb.Address_TYPE_IPV4,
Address: addr, Address: addr,
IpPort: uint32(port), IpPort: uint32(port),
}, },
@ -240,15 +240,15 @@ func (s) TestLog(t *testing.T) {
config: &ServerTrailer{ config: &ServerTrailer{
OnClientSide: true, OnClientSide: true,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: &pb.GrpcLogEntry_Trailer{ Payload: &binlogpb.GrpcLogEntry_Trailer{
Trailer: &pb.Trailer{ Trailer: &binlogpb.Trailer{
Metadata: &pb.Metadata{}, Metadata: &binlogpb.Metadata{},
StatusCode: uint32(codes.OK), StatusCode: uint32(codes.OK),
StatusMessage: "", StatusMessage: "",
StatusDetails: nil, StatusDetails: nil,
@ -262,12 +262,12 @@ func (s) TestLog(t *testing.T) {
config: &Cancel{ config: &Cancel{
OnClientSide: true, OnClientSide: true,
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: nil, Payload: nil,
PayloadTruncated: false, PayloadTruncated: false,
Peer: nil, Peer: nil,
@ -284,16 +284,16 @@ func (s) TestLog(t *testing.T) {
"a": {"b", "bb"}, "a": {"b", "bb"},
}, },
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
Logger: pb.GrpcLogEntry_LOGGER_SERVER, Logger: binlogpb.GrpcLogEntry_LOGGER_SERVER,
Payload: &pb.GrpcLogEntry_ClientHeader{ Payload: &binlogpb.GrpcLogEntry_ClientHeader{
ClientHeader: &pb.ClientHeader{ ClientHeader: &binlogpb.ClientHeader{
Metadata: &pb.Metadata{ Metadata: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "a", Value: []byte{'b'}}, {Key: "a", Value: []byte{'b'}},
{Key: "a", Value: []byte{'b', 'b'}}, {Key: "a", Value: []byte{'b', 'b'}},
}, },
@ -312,16 +312,16 @@ func (s) TestLog(t *testing.T) {
"a": {"b", "bb"}, "a": {"b", "bb"},
}, },
}, },
want: &pb.GrpcLogEntry{ want: &binlogpb.GrpcLogEntry{
Timestamp: nil, Timestamp: nil,
CallId: 1, CallId: 1,
SequenceIdWithinCall: 0, SequenceIdWithinCall: 0,
Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
Logger: pb.GrpcLogEntry_LOGGER_CLIENT, Logger: binlogpb.GrpcLogEntry_LOGGER_CLIENT,
Payload: &pb.GrpcLogEntry_ServerHeader{ Payload: &binlogpb.GrpcLogEntry_ServerHeader{
ServerHeader: &pb.ServerHeader{ ServerHeader: &binlogpb.ServerHeader{
Metadata: &pb.Metadata{ Metadata: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "a", Value: []byte{'b'}}, {Key: "a", Value: []byte{'b'}},
{Key: "a", Value: []byte{'b', 'b'}}, {Key: "a", Value: []byte{'b', 'b'}},
}, },
@ -336,7 +336,7 @@ func (s) TestLog(t *testing.T) {
buf.Reset() buf.Reset()
tc.want.SequenceIdWithinCall = uint64(i + 1) tc.want.SequenceIdWithinCall = uint64(i + 1)
ml.Log(tc.config) ml.Log(tc.config)
inSink := new(pb.GrpcLogEntry) inSink := new(binlogpb.GrpcLogEntry)
if err := proto.Unmarshal(buf.Bytes()[4:], inSink); err != nil { if err := proto.Unmarshal(buf.Bytes()[4:], inSink); err != nil {
t.Errorf("failed to unmarshal bytes in sink to proto: %v", err) t.Errorf("failed to unmarshal bytes in sink to proto: %v", err)
continue continue
@ -351,44 +351,44 @@ func (s) TestLog(t *testing.T) {
func (s) TestTruncateMetadataNotTruncated(t *testing.T) { func (s) TestTruncateMetadataNotTruncated(t *testing.T) {
testCases := []struct { testCases := []struct {
ml *TruncatingMethodLogger ml *TruncatingMethodLogger
mpPb *pb.Metadata mpPb *binlogpb.Metadata
}{ }{
{ {
ml: NewTruncatingMethodLogger(maxUInt, maxUInt), ml: NewTruncatingMethodLogger(maxUInt, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
}, },
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
}, },
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(1, maxUInt), ml: NewTruncatingMethodLogger(1, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: nil}, {Key: "", Value: nil},
}, },
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1, 1}}, {Key: "", Value: []byte{1, 1}},
}, },
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
}, },
@ -398,8 +398,8 @@ func (s) TestTruncateMetadataNotTruncated(t *testing.T) {
// limit. // limit.
{ {
ml: NewTruncatingMethodLogger(1, maxUInt), ml: NewTruncatingMethodLogger(1, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
{Key: "grpc-trace-bin", Value: []byte("some.trace.key")}, {Key: "grpc-trace-bin", Value: []byte("some.trace.key")},
}, },
@ -418,14 +418,14 @@ func (s) TestTruncateMetadataNotTruncated(t *testing.T) {
func (s) TestTruncateMetadataTruncated(t *testing.T) { func (s) TestTruncateMetadataTruncated(t *testing.T) {
testCases := []struct { testCases := []struct {
ml *TruncatingMethodLogger ml *TruncatingMethodLogger
mpPb *pb.Metadata mpPb *binlogpb.Metadata
entryLen int entryLen int
}{ }{
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1, 1, 1}}, {Key: "", Value: []byte{1, 1, 1}},
}, },
}, },
@ -433,8 +433,8 @@ func (s) TestTruncateMetadataTruncated(t *testing.T) {
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
@ -444,8 +444,8 @@ func (s) TestTruncateMetadataTruncated(t *testing.T) {
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1, 1}}, {Key: "", Value: []byte{1, 1}},
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
}, },
@ -454,8 +454,8 @@ func (s) TestTruncateMetadataTruncated(t *testing.T) {
}, },
{ {
ml: NewTruncatingMethodLogger(2, maxUInt), ml: NewTruncatingMethodLogger(2, maxUInt),
mpPb: &pb.Metadata{ mpPb: &binlogpb.Metadata{
Entry: []*pb.MetadataEntry{ Entry: []*binlogpb.MetadataEntry{
{Key: "", Value: []byte{1}}, {Key: "", Value: []byte{1}},
{Key: "", Value: []byte{1, 1}}, {Key: "", Value: []byte{1, 1}},
}, },
@ -479,23 +479,23 @@ func (s) TestTruncateMetadataTruncated(t *testing.T) {
func (s) TestTruncateMessageNotTruncated(t *testing.T) { func (s) TestTruncateMessageNotTruncated(t *testing.T) {
testCases := []struct { testCases := []struct {
ml *TruncatingMethodLogger ml *TruncatingMethodLogger
msgPb *pb.Message msgPb *binlogpb.Message
}{ }{
{ {
ml: NewTruncatingMethodLogger(maxUInt, maxUInt), ml: NewTruncatingMethodLogger(maxUInt, maxUInt),
msgPb: &pb.Message{ msgPb: &binlogpb.Message{
Data: []byte{1}, Data: []byte{1},
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(maxUInt, 3), ml: NewTruncatingMethodLogger(maxUInt, 3),
msgPb: &pb.Message{ msgPb: &binlogpb.Message{
Data: []byte{1, 1}, Data: []byte{1, 1},
}, },
}, },
{ {
ml: NewTruncatingMethodLogger(maxUInt, 2), ml: NewTruncatingMethodLogger(maxUInt, 2),
msgPb: &pb.Message{ msgPb: &binlogpb.Message{
Data: []byte{1, 1}, Data: []byte{1, 1},
}, },
}, },
@ -512,13 +512,13 @@ func (s) TestTruncateMessageNotTruncated(t *testing.T) {
func (s) TestTruncateMessageTruncated(t *testing.T) { func (s) TestTruncateMessageTruncated(t *testing.T) {
testCases := []struct { testCases := []struct {
ml *TruncatingMethodLogger ml *TruncatingMethodLogger
msgPb *pb.Message msgPb *binlogpb.Message
oldLength uint32 oldLength uint32
}{ }{
{ {
ml: NewTruncatingMethodLogger(maxUInt, 2), ml: NewTruncatingMethodLogger(maxUInt, 2),
msgPb: &pb.Message{ msgPb: &binlogpb.Message{
Length: 3, Length: 3,
Data: []byte{1, 1, 1}, Data: []byte{1, 1, 1},
}, },

View File

@ -26,7 +26,7 @@ import (
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
) )
var ( var (
@ -42,15 +42,15 @@ type Sink interface {
// Write will be called to write the log entry into the sink. // Write will be called to write the log entry into the sink.
// //
// It should be thread-safe so it can be called in parallel. // It should be thread-safe so it can be called in parallel.
Write(*pb.GrpcLogEntry) error Write(*binlogpb.GrpcLogEntry) error
// Close will be called when the Sink is replaced by a new Sink. // Close will be called when the Sink is replaced by a new Sink.
Close() error Close() error
} }
type noopSink struct{} type noopSink struct{}
func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil } func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
func (ns *noopSink) Close() error { return nil } func (ns *noopSink) Close() error { return nil }
// newWriterSink creates a binary log sink with the given writer. // newWriterSink creates a binary log sink with the given writer.
// //
@ -66,7 +66,7 @@ type writerSink struct {
out io.Writer out io.Writer
} }
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
b, err := proto.Marshal(e) b, err := proto.Marshal(e)
if err != nil { if err != nil {
grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err) grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
@ -96,7 +96,7 @@ type bufferedSink struct {
done chan struct{} done chan struct{}
} }
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error { func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
if !fs.flusherStarted { if !fs.flusherStarted {