channelz: include channelz identifier in logs (#5192)

This commit is contained in:
Easwar Swaminathan 2022-02-23 07:30:06 -08:00 committed by GitHub
parent 02f384d41a
commit a73725f42d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 637 additions and 408 deletions

View File

@ -27,6 +27,7 @@ import (
"net"
"strings"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
@ -192,7 +193,7 @@ type BuildOptions struct {
// server can ignore this field.
Authority string
// ChannelzParentID is the parent ClientConn's channelz ID.
ChannelzParentID int64
ChannelzParentID *channelz.Identifier
// CustomUserAgent is the custom user agent set on the parent ClientConn.
// The balancer should set the same custom user agent if it creates a
// ClientConn.

View File

@ -35,7 +35,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@ -240,9 +239,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first"}`))
dopts = append(dopts, grpc.WithResolvers(lb.manualResolver))
if channelz.IsOn() {
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
}
dopts = append(dopts, grpc.WithChannelzParentID(lb.opt.ChannelzParentID))
// Enable Keepalive for grpclb client.
dopts = append(dopts, grpc.WithKeepaliveParams(keepalive.ClientParameters{

View File

@ -184,6 +184,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
ac, err := ccb.cc.newAddrConn(addrs, opts)
if err != nil {
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}

View File

@ -31,6 +31,7 @@ import (
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/status"
)
@ -123,12 +124,16 @@ type server struct {
startedErr chan error // sent nil or an error after server starts
mu sync.Mutex
conns map[transport.ServerTransport]bool
channelzID *channelz.Identifier
}
type ctxKey string
func newTestServer() *server {
return &server{startedErr: make(chan error, 1)}
return &server{
startedErr: make(chan error, 1),
channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil),
}
}
// start starts server. Other goroutines should block on s.startedErr for further operations.
@ -158,10 +163,12 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) {
return
}
config := &transport.ServerConfig{
MaxStreams: maxStreams,
MaxStreams: maxStreams,
ChannelzParentID: s.channelzID,
}
st, err := transport.NewServerTransport(conn, config)
if err != nil {
t.Errorf("failed to create server transport: %v", err)
continue
}
s.mu.Lock()

36
channelz/channelz.go Normal file
View File

@ -0,0 +1,36 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package channelz exports internals of the channelz implementation as required
// by other gRPC packages.
//
// The implementation of the channelz spec as defined in
// https://github.com/grpc/proposal/blob/master/A14-channelz.md, is provided by
// the `internal/channelz` package.
//
// Experimental
//
// Notice: All APIs in this package are experimental and may be removed in a
// later release.
package channelz
import "google.golang.org/grpc/internal/channelz"
// Identifier is an opaque identifier which uniquely identifies an entity in the
// channelz database.
type Identifier = channelz.Identifier

View File

@ -28,15 +28,17 @@ package service
import (
"context"
"reflect"
"strconv"
"testing"
"github.com/golang/protobuf/ptypes"
durpb "github.com/golang/protobuf/ptypes/duration"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sys/unix"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/protobuf/testing/protocmp"
)
func init() {
@ -139,20 +141,27 @@ func (s) TestGetSocketOptions(t *testing.T) {
},
}
svr := newCZServer()
ids := make([]int64, len(ss))
ids := make([]*channelz.Identifier, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
ids[i], _ = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
defer channelz.RemoveEntry(ids[i])
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i, s := range ss {
resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]})
metrics := resp.GetSocket()
if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) {
t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics))
resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i].Int()})
got, want := resp.GetSocket().GetRef(), &channelzpb.SocketRef{SocketId: ids[i].Int(), Name: strconv.Itoa(i)}
if !cmp.Equal(got, want, cmpopts.IgnoreUnexported(channelzpb.SocketRef{})) {
t.Fatalf("resp.GetSocket() returned metrics.GetRef() = %#v, want %#v", got, want)
}
socket, err := socketProtoToStruct(resp.GetSocket())
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(s, socket, protocmp.Transform(), cmp.AllowUnexported(dummySocket{})); diff != "" {
t.Fatalf("unexpected socket, diff (-want +got):\n%s", diff)
}
}
}

View File

@ -22,18 +22,21 @@ import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/protobuf/testing/protocmp"
)
func init() {
@ -61,14 +64,6 @@ type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptio
// It is only defined under linux environment on x86 architecture.
var protoToSocketOpt protoToSocketOptFunc
// emptyTime is used for detecting unset value of time.Time type.
// For go1.7 and earlier, ptypes.Timestamp will fill in the loc field of time.Time
// with &utcLoc. However zero value of a time.Time type value loc field is nil.
// This behavior will make reflect.DeepEqual fail upon unset time.Time field,
// and cause false positive fatal error.
// TODO: Go1.7 is no longer supported - does this need a change?
var emptyTime time.Time
const defaultTestTimeout = 10 * time.Second
type dummyChannel struct {
@ -149,7 +144,7 @@ func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
}
}
func channelProtoToStruct(c *channelzpb.Channel) *dummyChannel {
func channelProtoToStruct(c *channelzpb.Channel) (*dummyChannel, error) {
dc := &dummyChannel{}
pdata := c.GetData()
switch pdata.GetState().GetState() {
@ -170,29 +165,29 @@ func channelProtoToStruct(c *channelzpb.Channel) *dummyChannel {
dc.callsStarted = pdata.CallsStarted
dc.callsSucceeded = pdata.CallsSucceeded
dc.callsFailed = pdata.CallsFailed
if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
dc.lastCallStartedTimestamp = t
}
ts, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp())
if err != nil {
return nil, err
}
return dc
dc.lastCallStartedTimestamp = ts
return dc, nil
}
func serverProtoToStruct(s *channelzpb.Server) *dummyServer {
func serverProtoToStruct(s *channelzpb.Server) (*dummyServer, error) {
ds := &dummyServer{}
pdata := s.GetData()
ds.callsStarted = pdata.CallsStarted
ds.callsSucceeded = pdata.CallsSucceeded
ds.callsFailed = pdata.CallsFailed
if t, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastCallStartedTimestamp = t
}
ts, err := ptypes.Timestamp(pdata.GetLastCallStartedTimestamp())
if err != nil {
return nil, err
}
return ds
ds.lastCallStartedTimestamp = ts
return ds, nil
}
func socketProtoToStruct(s *channelzpb.Socket) *dummySocket {
func socketProtoToStruct(s *channelzpb.Socket) (*dummySocket, error) {
ds := &dummySocket{}
pdata := s.GetData()
ds.streamsStarted = pdata.GetStreamsStarted()
@ -201,26 +196,26 @@ func socketProtoToStruct(s *channelzpb.Socket) *dummySocket {
ds.messagesSent = pdata.GetMessagesSent()
ds.messagesReceived = pdata.GetMessagesReceived()
ds.keepAlivesSent = pdata.GetKeepAlivesSent()
if t, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastLocalStreamCreatedTimestamp = t
}
ts, err := ptypes.Timestamp(pdata.GetLastLocalStreamCreatedTimestamp())
if err != nil {
return nil, err
}
if t, err := ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastRemoteStreamCreatedTimestamp = t
}
ds.lastLocalStreamCreatedTimestamp = ts
ts, err = ptypes.Timestamp(pdata.GetLastRemoteStreamCreatedTimestamp())
if err != nil {
return nil, err
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageSentTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageSentTimestamp = t
}
ds.lastRemoteStreamCreatedTimestamp = ts
ts, err = ptypes.Timestamp(pdata.GetLastMessageSentTimestamp())
if err != nil {
return nil, err
}
if t, err := ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp()); err == nil {
if !t.Equal(emptyTime) {
ds.lastMessageReceivedTimestamp = t
}
ds.lastMessageSentTimestamp = ts
ts, err = ptypes.Timestamp(pdata.GetLastMessageReceivedTimestamp())
if err != nil {
return nil, err
}
ds.lastMessageReceivedTimestamp = ts
if v := pdata.GetLocalFlowControlWindow(); v != nil {
ds.localFlowControlWindow = v.Value
}
@ -240,7 +235,7 @@ func socketProtoToStruct(s *channelzpb.Socket) *dummySocket {
ds.remoteAddr = protoToAddr(remote)
}
ds.remoteName = s.GetRemoteName()
return ds
return ds, nil
}
func protoToSecurity(protoSecurity *channelzpb.Security) credentials.ChannelzSecurityValue {
@ -325,7 +320,7 @@ func (s) TestGetTopChannels(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
for _, c := range tcs {
id := channelz.RegisterChannel(c, 0, "")
id := channelz.RegisterChannel(c, nil, "")
defer channelz.RemoveEntry(id)
}
s := newCZServer()
@ -336,12 +331,16 @@ func (s) TestGetTopChannels(t *testing.T) {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
for i, c := range resp.GetChannel() {
if !reflect.DeepEqual(channelProtoToStruct(c), tcs[i]) {
t.Fatalf("dummyChannel: %d, want: %#v, got: %#v", i, tcs[i], channelProtoToStruct(c))
channel, err := channelProtoToStruct(c)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(tcs[i], channel, protocmp.Transform(), cmp.AllowUnexported(dummyChannel{})); diff != "" {
t.Fatalf("unexpected channel, diff (-want +got):\n%s", diff)
}
}
for i := 0; i < 50; i++ {
id := channelz.RegisterChannel(tcs[0], 0, "")
id := channelz.RegisterChannel(tcs[0], nil, "")
defer channelz.RemoveEntry(id)
}
resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
@ -385,8 +384,12 @@ func (s) TestGetServers(t *testing.T) {
t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
}
for i, s := range resp.GetServer() {
if !reflect.DeepEqual(serverProtoToStruct(s), ss[i]) {
t.Fatalf("dummyServer: %d, want: %#v, got: %#v", i, ss[i], serverProtoToStruct(s))
server, err := serverProtoToStruct(s)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(ss[i], server, protocmp.Transform(), cmp.AllowUnexported(dummyServer{})); diff != "" {
t.Fatalf("unexpected server, diff (-want +got):\n%s", diff)
}
}
for i := 0; i < 50; i++ {
@ -405,34 +408,34 @@ func (s) TestGetServerSockets(t *testing.T) {
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
ids := make([]*channelz.Identifier, 3)
ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
}
// GetServerSockets only return normal sockets.
want := map[int64]string{
ids[1]: refNames[1],
ids[2]: refNames[2],
ids[1].Int(): refNames[1],
ids[2].Int(): refNames[2],
}
if !reflect.DeepEqual(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
}
for i := 0; i < 50; i++ {
id := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
id, _ := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
defer channelz.RemoveEntry(id)
}
resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: 0})
resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0})
if resp.GetEnd() {
t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
}
@ -446,10 +449,10 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
ids := make([]int64, 3)
ids[0] = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
ids := make([]*channelz.Identifier, 3)
ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
@ -458,16 +461,16 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
defer cancel()
// Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be
// included in the response.
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID, StartSocketId: ids[1] + 1})
resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: ids[1].Int() + 1})
if !resp.GetEnd() {
t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
}
// GetServerSockets only return normal socket-2, socket-1 should be
// filtered by start ID.
want := map[int64]string{
ids[2]: refNames[2],
ids[2].Int(): refNames[2],
}
if !reflect.DeepEqual(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
}
}
@ -475,38 +478,45 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
func (s) TestGetChannel(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]),
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()),
Severity: channelz.CtInfo,
},
})
ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
var err error
ids[2], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
if err != nil {
t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
}
channelz.AddTraceEvent(logger, ids[2], 0, &channelz.TraceEventDesc{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]),
Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()),
Severity: channelz.CtInfo,
},
})
ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
channelz.AddTraceEvent(logger, ids[3], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3]),
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3].Int()),
Severity: channelz.CtInfo,
},
})
@ -518,21 +528,23 @@ func (s) TestGetChannel(t *testing.T) {
Desc: "Resolver returns an empty address list",
Severity: channelz.CtWarning,
})
for _, id := range ids {
defer channelz.RemoveEntry(id)
}
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0]})
resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0].Int()})
metrics := resp.GetChannel()
subChans := metrics.GetSubchannelRef()
if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] {
t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans)
if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2].Int() {
t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2].Int(), Name: refNames[2]}}, subChans)
}
nestedChans := metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1] {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans)
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1].Int() {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1].Int(), Name: refNames[1]}}, nestedChans)
}
trace := metrics.GetData().GetTrace()
want := []struct {
@ -542,14 +554,14 @@ func (s) TestGetChannel(t *testing.T) {
childRef string
}{
{desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1], childRef: refNames[1]},
{desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2], childRef: refNames[2]},
{desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1].Int(), childRef: refNames[1]},
{desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2].Int(), childRef: refNames[2]},
{desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO},
{desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING},
}
for i, e := range trace.Events {
if e.GetDescription() != want[i].desc {
if !strings.Contains(e.GetDescription(), want[i].desc) {
t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription())
}
if e.GetSeverity() != want[i].severity {
@ -564,11 +576,11 @@ func (s) TestGetChannel(t *testing.T) {
}
}
}
resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1]})
resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1].Int()})
metrics = resp.GetChannel()
nestedChans = metrics.GetChannelRef()
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans)
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3].Int() {
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3].Int(), Name: refNames[3]}}, nestedChans)
}
}
@ -581,23 +593,27 @@ func (s) TestGetSubChannel(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer cleanupWrapper(czCleanup, t)
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
ids := make([]int64, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
ids := make([]*channelz.Identifier, 4)
ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
var err error
ids[1], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
if err != nil {
t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
}
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subchanCreated,
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0]),
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0].Int()),
Severity: channelz.CtInfo,
},
})
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
ids[3], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
Desc: subchanConnectivityChange,
Severity: channelz.CtInfo,
@ -612,13 +628,13 @@ func (s) TestGetSubChannel(t *testing.T) {
svr := newCZServer()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]})
resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1].Int()})
metrics := resp.GetSubchannel()
want := map[int64]string{
ids[2]: refNames[2],
ids[3]: refNames[3],
ids[2].Int(): refNames[2],
ids[3].Int(): refNames[3],
}
if !reflect.DeepEqual(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
if !cmp.Equal(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
}
@ -726,20 +742,27 @@ func (s) TestGetSocket(t *testing.T) {
},
}
svr := newCZServer()
ids := make([]int64, len(ss))
ids := make([]*channelz.Identifier, len(ss))
svrID := channelz.RegisterServer(&dummyServer{}, "")
defer channelz.RemoveEntry(svrID)
for i, s := range ss {
ids[i] = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
ids[i], _ = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
defer channelz.RemoveEntry(ids[i])
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i, s := range ss {
resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i]})
metrics := resp.GetSocket()
if !reflect.DeepEqual(metrics.GetRef(), &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}) || !reflect.DeepEqual(socketProtoToStruct(metrics), s) {
t.Fatalf("resp.GetSocket() want: metrics.GetRef() = %#v and %#v, got: metrics.GetRef() = %#v and %#v", &channelzpb.SocketRef{SocketId: ids[i], Name: strconv.Itoa(i)}, s, metrics.GetRef(), socketProtoToStruct(metrics))
resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i].Int()})
got, want := resp.GetSocket().GetRef(), &channelzpb.SocketRef{SocketId: ids[i].Int(), Name: strconv.Itoa(i)}
if !cmp.Equal(got, want, cmpopts.IgnoreUnexported(channelzpb.SocketRef{})) {
t.Fatalf("resp.GetSocket() returned metrics.GetRef() = %#v, want %#v", got, want)
}
socket, err := socketProtoToStruct(resp.GetSocket())
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(s, socket, protocmp.Transform(), cmp.AllowUnexported(dummySocket{})); diff != "" {
t.Fatalf("unexpected socket, diff (-want +got):\n%s", diff)
}
}
}

View File

@ -159,23 +159,20 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtInfo,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.Info(logger, cc.channelzID, "Channel Created")
}
cc.csMgr.channelzID = cc.channelzID
pid := cc.dopts.channelzParentID
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, pid, target)
ted := &channelz.TraceEventDesc{
Desc: "Channel created",
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 1, ted)
cc.csMgr.channelzID = cc.channelzID
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
@ -398,7 +395,7 @@ type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
channelzID int64
channelzID *channelz.Identifier
}
// updateState updates the connectivity.State of ClientConn.
@ -490,7 +487,7 @@ type ClientConn struct {
firstResolveEvent *grpcsync.Event
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
lceMu sync.Mutex // protects lastConnectionError
@ -768,17 +765,21 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
cc.mu.Unlock()
return nil, ErrClientConnClosing
}
if channelz.IsOn() {
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
Severity: channelz.CtInfo,
},
})
var err error
ac.channelzID, err = channelz.RegisterSubChannel(ac, cc.channelzID, "")
if err != nil {
return nil, err
}
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel created",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID.Int()),
Severity: channelz.CtInfo,
},
})
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
@ -1085,22 +1086,22 @@ func (cc *ClientConn) Close() error {
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
if channelz.IsOn() {
ted := &channelz.TraceEventDesc{
Desc: "Channel Deleted",
ted := &channelz.TraceEventDesc{
Desc: "Channel deleted",
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != nil {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID.Int()),
Severity: channelz.CtInfo,
}
if cc.dopts.channelzParentID != 0 {
ted.Parent = &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(cc.channelzID)
}
channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
channelz.RemoveEntry(cc.channelzID)
return nil
}
@ -1130,7 +1131,7 @@ type addrConn struct {
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
channelzID int64 // channelz unique identification number.
channelzID *channelz.Identifier
czData *channelzData
}
@ -1312,14 +1313,12 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
defer cancel()
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
copts.ChannelzParentID = ac.channelzID
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose)
if err != nil {
// newTr is either nil, or closed.
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
return err
}
@ -1332,7 +1331,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
newTr.Close(transport.ErrConnClosing)
if connectCtx.Err() == context.DeadlineExceeded {
err := errors.New("failed to receive server preface within timeout")
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err)
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %s: %v", addr, err)
return err
}
return nil
@ -1497,19 +1496,18 @@ func (ac *addrConn) tearDown(err error) {
curTr.GracefulClose()
ac.mu.Lock()
}
if channelz.IsOn() {
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel Deleted",
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
Desc: "Subchannel deleted",
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelzID.Int()),
Severity: channelz.CtInfo,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
Severity: channelz.CtInfo,
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
// the entity being deleted, and thus prevent it from being deleted right away.
channelz.RemoveEntry(ac.channelzID)
}
},
})
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from
// being deleted right away.
channelz.RemoveEntry(ac.channelzID)
ac.mu.Unlock()
}

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
@ -57,7 +58,7 @@ type dialOptions struct {
callOptions []CallOption
// This is used by WithBalancerName dial option.
balancerBuilder balancer.Builder
channelzParentID int64
channelzParentID *channelz.Identifier
disableServiceConfig bool
disableRetry bool
disableHealthCheck bool
@ -498,7 +499,7 @@ func WithAuthority(a string) DialOption {
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithChannelzParentID(id int64) DialOption {
func WithChannelzParentID(id *channelz.Identifier) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.channelzParentID = id
})

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
@ -474,17 +475,15 @@ func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
// to the balancergroup at creation time is passed to child policies.
func (s) TestBalancerGroupBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
userAgent = "ua"
)
// Setup the stub balancer such that we can read the build options passed to
// it in the UpdateClientConnState method.
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil),
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{

View File

@ -25,6 +25,7 @@ package channelz
import (
"context"
"errors"
"fmt"
"sort"
"sync"
@ -184,54 +185,77 @@ func GetServer(id int64) *ServerMetric {
return db.get().GetServer(id)
}
// RegisterChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
// assigned to this channel.
func RegisterChannel(c Channel, pid int64, ref string) int64 {
// RegisterChannel registers the given channel c in the channelz database with
// ref as its reference name, and adds it to the child list of its parent
// (identified by pid). pid == nil means no parent.
//
// Returns a unique channelz identifier assigned to this channel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
id := idGen.genID()
var parent int64
isTopChannel := true
if pid != nil {
isTopChannel = false
parent = pid.Int()
}
if !IsOn() {
return newIdentifer(RefChannel, id, pid)
}
cn := &channel{
refName: ref,
c: c,
subChans: make(map[int64]string),
nestedChans: make(map[int64]string),
id: id,
pid: pid,
pid: parent,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
if pid == 0 {
db.get().addChannel(id, cn, true, pid)
} else {
db.get().addChannel(id, cn, false, pid)
}
return id
db.get().addChannel(id, cn, isTopChannel, parent)
return newIdentifer(RefChannel, id, pid)
}
// RegisterSubChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
if pid == 0 {
logger.Error("a SubChannel's parent id cannot be 0")
return 0
// RegisterSubChannel registers the given subChannel c in the channelz database
// with ref as its reference name, and adds it to the child list of its parent
// (identified by pid).
//
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
id := idGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
}
sc := &subChannel{
refName: ref,
c: c,
sockets: make(map[int64]string),
id: id,
pid: pid,
pid: pid.Int(),
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
db.get().addSubChannel(id, sc, pid)
return id
db.get().addSubChannel(id, sc, pid.Int())
return newIdentifer(RefSubChannel, id, pid), nil
}
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
func RegisterServer(s Server, ref string) int64 {
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
id := idGen.genID()
if !IsOn() {
return newIdentifer(RefServer, id, nil)
}
svr := &server{
refName: ref,
s: s,
@ -240,71 +264,92 @@ func RegisterServer(s Server, ref string) int64 {
id: id,
}
db.get().addServer(id, svr)
return id
return newIdentifer(RefServer, id, nil)
}
// RegisterListenSocket registers the given listen socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this listen socket.
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
logger.Error("a ListenSocket's parent id cannot be 0")
return 0
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := idGen.genID()
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addListenSocket(id, ls, pid)
return id
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addListenSocket(id, ls, pid.Int())
return newIdentifer(RefListenSocket, id, pid), nil
}
// RegisterNormalSocket registers the given normal socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// with ref as its reference name, and adds it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this normal socket.
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
logger.Error("a NormalSocket's parent id cannot be 0")
return 0
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
}
id := idGen.genID()
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addNormalSocket(id, ns, pid)
return id
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addNormalSocket(id, ns, pid.Int())
return newIdentifer(RefNormalSocket, id, pid), nil
}
// RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database.
func RemoveEntry(id int64) {
db.get().removeEntry(id)
//
// If channelz is not turned ON, this function is a no-op.
func RemoveEntry(id *Identifier) {
if !IsOn() {
return
}
db.get().removeEntry(id.Int())
}
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
// to the channel trace.
// The Parent field is optional. It is used for event that will be recorded in the entity's parent
// trace also.
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe
// the event to be added to the channel trace.
//
// The Parent field is optional. It is used for an event that will be recorded
// in the entity's parent trace.
type TraceEventDesc struct {
Desc string
Severity Severity
Parent *TraceEventDesc
}
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
func AddTraceEvent(l grpclog.DepthLoggerV2, id int64, depth int, desc *TraceEventDesc) {
for d := desc; d != nil; d = d.Parent {
switch d.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, d.Desc)
case CtWarning:
l.WarningDepth(depth+1, d.Desc)
case CtError:
l.ErrorDepth(depth+1, d.Desc)
}
// AddTraceEvent adds trace related to the entity with specified id, using the
// provided TraceEventDesc.
//
// If channelz is not turned ON, this will simply log the event descriptions.
func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
// Log only the trace description associated with the bottom most entity.
switch desc.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, withParens(id)+desc.Desc)
case CtWarning:
l.WarningDepth(depth+1, withParens(id)+desc.Desc)
case CtError:
l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
}
if getMaxTraceEntry() == 0 {
return
}
db.get().traceEvent(id, desc)
if IsOn() {
db.get().traceEvent(id.Int(), desc)
}
}
// channelMap is the storage data structure for channelz.

75
internal/channelz/id.go Normal file
View File

@ -0,0 +1,75 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import "fmt"
// Identifier is an opaque identifier which uniquely identifies an entity in the
// channelz database.
type Identifier struct {
typ RefChannelType
id int64
str string
pid *Identifier
}
// Type returns the entity type corresponding to id.
func (id *Identifier) Type() RefChannelType {
return id.typ
}
// Int returns the integer identifier corresponding to id.
func (id *Identifier) Int() int64 {
return id.id
}
// String returns a string representation of the entity corresponding to id.
//
// This includes some information about the parent as well. Examples:
// Top-level channel: [Channel #channel-number]
// Nested channel: [Channel #parent-channel-number Channel #channel-number]
// Sub channel: [Channel #parent-channel SubChannel #subchannel-number]
func (id *Identifier) String() string {
return id.str
}
// Equal returns true if other is the same as id.
func (id *Identifier) Equal(other *Identifier) bool {
if (id != nil) != (other != nil) {
return false
}
if id == nil && other == nil {
return true
}
return id.typ == other.typ && id.id == other.id && id.pid == other.pid
}
// NewIdentifierForTesting returns a new opaque identifier to be used only for
// testing purposes.
func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier {
return newIdentifer(typ, id, pid)
}
func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier {
str := fmt.Sprintf("%s #%d", typ, id)
if pid != nil {
str = fmt.Sprintf("%s %s", pid, str)
}
return &Identifier{typ: typ, id: id, str: str, pid: pid}
}

View File

@ -26,77 +26,54 @@ import (
var logger = grpclog.Component("channelz")
func withParens(id *Identifier) string {
return "[" + id.String() + "] "
}
// Info logs and adds a trace event if channelz is on.
func Info(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtInfo,
})
} else {
l.InfoDepth(1, args...)
}
func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtInfo,
})
}
// Infof logs and adds a trace event if channelz is on.
func Infof(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtInfo,
})
} else {
l.InfoDepth(1, msg)
}
func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtInfo,
})
}
// Warning logs and adds a trace event if channelz is on.
func Warning(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
})
} else {
l.WarningDepth(1, args...)
}
func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
})
}
// Warningf logs and adds a trace event if channelz is on.
func Warningf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtWarning,
})
} else {
l.WarningDepth(1, msg)
}
func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtWarning,
})
}
// Error logs and adds a trace event if channelz is on.
func Error(l grpclog.DepthLoggerV2, id int64, args ...interface{}) {
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtError,
})
} else {
l.ErrorDepth(1, args...)
}
func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprint(args...),
Severity: CtError,
})
}
// Errorf logs and adds a trace event if channelz is on.
func Errorf(l grpclog.DepthLoggerV2, id int64, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
if IsOn() {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: msg,
Severity: CtError,
})
} else {
l.ErrorDepth(1, msg)
}
func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...interface{}) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
Desc: fmt.Sprintf(format, args...),
Severity: CtError,
})
}

View File

@ -686,12 +686,33 @@ const (
type RefChannelType int
const (
// RefUnknown indicates an unknown entity type, the zero value for this type.
RefUnknown RefChannelType = iota
// RefChannel indicates the referenced entity is a Channel.
RefChannel RefChannelType = iota
RefChannel
// RefSubChannel indicates the referenced entity is a SubChannel.
RefSubChannel
// RefServer indicates the referenced entity is a Server.
RefServer
// RefListenSocket indicates the referenced entity is a ListenSocket.
RefListenSocket
// RefNormalSocket indicates the referenced entity is a NormalSocket.
RefNormalSocket
)
var refChannelTypeToString = map[RefChannelType]string{
RefUnknown: "Unknown",
RefChannel: "Channel",
RefSubChannel: "SubChannel",
RefServer: "Server",
RefListenSocket: "ListenSocket",
RefNormalSocket: "NormalSocket",
}
func (r RefChannelType) String() string {
return refChannelTypeToString[r]
}
func (c *channelTrace) dumpData() *ChannelTrace {
c.mu.Lock()
ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}

View File

@ -132,7 +132,7 @@ type http2Client struct {
kpDormant bool
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
onGoAway func(GoAwayReason)
@ -351,8 +351,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
@ -898,9 +899,7 @@ func (t *http2Client) Close(err error) {
t.controlBuf.finish()
t.cancel()
t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()

View File

@ -117,7 +117,7 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
bufferPool *bufferPool
@ -275,12 +275,12 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
t.framer.writer.Flush()
defer func() {
@ -1210,9 +1210,7 @@ func (t *http2Server) Close() {
if err := t.conn.Close(); err != nil && logger.V(logLevel) {
logger.Infof("transport: error closing conn during Close: %v", err)
}
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
channelz.RemoveEntry(t.channelzID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()

View File

@ -31,6 +31,7 @@ import (
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/keepalive"
)
@ -252,11 +253,15 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
// logic is running even without any active streams.
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
PermitWithoutStream: true,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
@ -284,10 +289,14 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
// active streams, and therefore the transport stays open.
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
@ -313,10 +322,14 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
// transport even when there is an active stream.
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
client, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
}}, connCh)
copts := ConnectOptions{
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
KeepaliveParams: keepalive.ClientParameters{
Time: 1 * time.Second,
Timeout: 1 * time.Second,
},
}
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))

View File

@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
@ -529,7 +530,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
ChannelzParentID int64
ChannelzParentID *channelz.Identifier
MaxHeaderListSize *uint32
HeaderTableSize *uint32
}
@ -563,7 +564,7 @@ type ConnectOptions struct {
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
ChannelzParentID *channelz.Identifier
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.

View File

@ -41,6 +41,7 @@ import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/internal/testutils"
@ -56,16 +57,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
type server struct {
lis net.Listener
port string
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
h *testStreamHandler
ready chan struct{}
}
var (
expectedRequest = []byte("ping")
expectedResponse = []byte("pong")
@ -299,6 +290,25 @@ func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *Stream) {
}
}
type server struct {
lis net.Listener
port string
startedErr chan error // error (or nil) with server start value
mu sync.Mutex
conns map[ServerTransport]bool
h *testStreamHandler
ready chan struct{}
channelzID *channelz.Identifier
}
func newTestServer() *server {
return &server{
startedErr: make(chan error, 1),
ready: make(chan struct{}),
channelzID: channelz.NewIdentifierForTesting(channelz.RefServer, time.Now().Unix(), nil),
}
}
// start starts server. Other goroutines should block on s.readyChan for further operations.
func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
var err error
@ -422,9 +432,10 @@ func (s *server) addr() string {
return s.lis.Addr().String()
}
func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hType) *server {
server := &server{startedErr: make(chan error, 1), ready: make(chan struct{})}
go server.start(t, port, serverConfig, ht)
func setUpServerOnly(t *testing.T, port int, sc *ServerConfig, ht hType) *server {
server := newTestServer()
sc.ChannelzParentID = server.channelzID
go server.start(t, port, sc, ht)
server.wait(t, 2*time.Second)
return server
}
@ -433,9 +444,11 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
}
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, serverConfig, ht)
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, sc, ht)
addr := resolver.Address{Addr: "localhost:" + server.port}
copts.ChannelzParentID = channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if connErr != nil {
@ -1299,11 +1312,14 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
}()
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})
copts := ConnectOptions{ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil)}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("Error while creating client transport: %v", err)
}
defer ct.Close(fmt.Errorf("closed manually by test"))
str, err := ct.NewStream(connectCtx, &CallHdr{})
if err != nil {
t.Fatalf("Error while creating stream: %v", err)
@ -2181,7 +2197,11 @@ func (s) TestClientHandshakeInfo(t *testing.T) {
defer cancel()
creds := &attrTransportCreds{}
tr, err := NewClientTransport(ctx, context.Background(), addr, ConnectOptions{TransportCredentials: creds}, func() {}, func(GoAwayReason) {}, func() {})
copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}
@ -2218,7 +2238,11 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
}
tr, err := NewClientTransport(ctx, context.Background(), addr, ConnectOptions{Dialer: dialer}, func() {}, func(GoAwayReason) {}, func() {})
copts := ConnectOptions{
Dialer: dialer,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefSubChannel, time.Now().Unix(), nil),
}
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func() {}, func(GoAwayReason) {}, func() {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
}

View File

@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/serviceconfig"
)
@ -139,13 +140,18 @@ type Address struct {
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
func (a *Address) Equal(o Address) bool {
func (a Address) Equal(o Address) bool {
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
a.Attributes.Equal(o.Attributes) &&
a.BalancerAttributes.Equal(o.BalancerAttributes) &&
a.Type == o.Type && a.Metadata == o.Metadata
}
// String returns JSON formatted string representation of the address.
func (a Address) String() string {
return pretty.ToJSON(a)
}
// BuildOptions includes additional information for the builder to create
// the resolver.
type BuildOptions struct {

View File

@ -19,7 +19,6 @@
package grpc
import (
"fmt"
"strings"
"sync"
@ -27,6 +26,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
@ -97,10 +97,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if ccr.done.HasFired() {
return nil
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(s)
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
@ -125,10 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
if ccr.done.HasFired() {
return
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending new addresses to cc: %v", addrs)
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
}
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
ccr.curState.Addresses = addrs
ccr.cc.updateResolverState(ccr.curState, nil)
}
@ -141,7 +135,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
if ccr.done.HasFired() {
return
}
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %v", sc)
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: got new service config: %s", sc)
if ccr.cc.dopts.disableServiceConfig {
channelz.Info(logger, ccr.cc.channelzID, "Service config lookups disabled; ignoring config")
return
@ -151,9 +145,7 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
return
}
if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
}
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
ccr.curState.ServiceConfig = scpr
ccr.cc.updateResolverState(ccr.curState, nil)
}
@ -180,8 +172,5 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
} else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
updates = append(updates, "resolver returned new addresses")
}
channelz.AddTraceEvent(logger, ccr.cc.channelzID, 0, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
Severity: channelz.CtInfo,
})
channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}

View File

@ -134,7 +134,7 @@ type Server struct {
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
channelzID *channelz.Identifier
czData *channelzData
serverWorkerChannels []chan *serverWorkerData
@ -584,9 +584,8 @@ func NewServer(opt ...ServerOption) *Server {
s.initServerWorkers()
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
channelz.Info(logger, s.channelzID, "Server created")
return s
}
@ -712,7 +711,7 @@ var ErrServerStopped = errors.New("grpc: the server has been stopped")
type listenSocket struct {
net.Listener
channelzID int64
channelzID *channelz.Identifier
}
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
@ -724,9 +723,8 @@ func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
func (l *listenSocket) Close() error {
err := l.Listener.Close()
if channelz.IsOn() {
channelz.RemoveEntry(l.channelzID)
}
channelz.RemoveEntry(l.channelzID)
channelz.Info(logger, l.channelzID, "ListenSocket deleted")
return err
}
@ -759,11 +757,6 @@ func (s *Server) Serve(lis net.Listener) error {
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
if channelz.IsOn() {
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
}
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
@ -773,8 +766,16 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Unlock()
}()
var tempDelay time.Duration // how long to sleep on accept failure
var err error
ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
if err != nil {
s.mu.Unlock()
return err
}
s.mu.Unlock()
channelz.Info(logger, ls.channelzID, "ListenSocket created")
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
@ -1709,11 +1710,7 @@ func (s *Server) Stop() {
s.done.Fire()
}()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
listeners := s.lis
@ -1751,11 +1748,7 @@ func (s *Server) GracefulStop() {
s.quit.Fire()
defer s.done.Fire()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()

View File

@ -23,12 +23,13 @@ import (
"crypto/tls"
"fmt"
"net"
"reflect"
"regexp"
"strings"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/grpclb"
@ -455,11 +456,11 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
// Socket1 Socket2
czCleanup := channelz.NewChannelzStorageForTesting()
defer czCleanupWrapper(czCleanup, t)
topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
topChanID := channelz.RegisterChannel(&dummyChannel{}, nil, "")
subChanID1, _ := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
subChanID2, _ := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
sktID1, _ := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
sktID2, _ := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
tcs, _ := channelz.GetTopChannels(0, 0)
if tcs == nil || len(tcs) != 1 {
@ -468,7 +469,7 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
if len(tcs[0].SubChans) != 2 {
t.Fatalf("There should be two SubChannel entries")
}
sc := channelz.GetSubChannel(subChanID1)
sc := channelz.GetSubChannel(subChanID1.Int())
if sc == nil || len(sc.Sockets) != 2 {
t.Fatalf("There should be two Socket entries")
}
@ -1380,7 +1381,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
if !ok {
return false, fmt.Errorf("the SocketData.Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.SocketData.Security)
}
if !reflect.DeepEqual(securityVal.RemoteCertificate, cert.Certificate[0]) {
if !cmp.Equal(securityVal.RemoteCertificate, cert.Certificate[0]) {
return false, fmt.Errorf("SocketData.Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
}
for _, v := range cipherSuites {
@ -1397,6 +1398,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
czCleanup := channelz.NewChannelzStorageForTesting()
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
// avoid calling API to set balancer type, which will void service config's change of balancer.
e.balancer = ""
@ -1407,6 +1409,7 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
te.resolverScheme = r.Scheme()
te.clientConn(grpc.WithResolvers(r))
defer te.tearDown()
var nestedConn int64
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
@ -1431,8 +1434,9 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
if len(ncm.Trace.Events) == 0 {
return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
}
if ncm.Trace.Events[0].Desc != "Channel Created" {
return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc)
pattern := `Channel created`
if ok, _ := regexp.MatchString(pattern, ncm.Trace.Events[0].Desc); !ok {
return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncm.Trace.Events[0].Desc)
}
return true, nil
}); err != nil {
@ -1460,8 +1464,9 @@ func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
if len(ncm.Trace.Events) == 0 {
return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
}
if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" {
return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc)
pattern := `Channel created`
if ok, _ := regexp.MatchString(pattern, ncm.Trace.Events[0].Desc); !ok {
return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncm.Trace.Events[0].Desc)
}
return true, nil
}); err != nil {
@ -1509,8 +1514,9 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
if len(scm.Trace.Events) == 0 {
return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
}
if scm.Trace.Events[0].Desc != "Subchannel Created" {
return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc)
pattern := `Subchannel created`
if ok, _ := regexp.MatchString(pattern, scm.Trace.Events[0].Desc); !ok {
return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, scm.Trace.Events[0].Desc)
}
return true, nil
}); err != nil {
@ -1551,10 +1557,12 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
if len(scm.Trace.Events) == 0 {
return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
}
if got, want := scm.Trace.Events[len(scm.Trace.Events)-1].Desc, "Subchannel Deleted"; got != want {
return false, fmt.Errorf("the last trace event should be %q, not %q", want, got)
}
pattern := `Subchannel deleted`
desc := scm.Trace.Events[len(scm.Trace.Events)-1].Desc
if ok, _ := regexp.MatchString(pattern, desc); !ok {
return false, fmt.Errorf("the last trace event should be %q, not %q", pattern, desc)
}
return true, nil
}); err != nil {
t.Fatal(err)
@ -1600,7 +1608,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
if err := verifyResultWithDelay(func() (bool, error) {
cm := channelz.GetChannel(cid)
for i := len(cm.Trace.Events) - 1; i >= 0; i-- {
if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) {
if strings.Contains(cm.Trace.Events[i].Desc, fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name)) {
break
}
if i == 0 {
@ -1725,7 +1733,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
}
for i := len(scm.Trace.Events) - 1; i >= 0; i-- {
if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) {
if strings.Contains(scm.Trace.Events[i].Desc, fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2])) {
break
}
if i == 0 {
@ -1756,9 +1764,9 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
var subConn int64
te.srv.Stop()
var subConn int64
if err := verifyResultWithDelay(func() (bool, error) {
// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
// to effect of r.UpdateState(resolver.State{Addresses:[]resolver.Address{}}))
@ -1773,6 +1781,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
for k := range tcs[0].SubChans {
// get the SubChannel id for further trace inquiry.
subConn = k
t.Logf("SubChannel Id is %d", subConn)
}
}
scm := channelz.GetSubChannel(subConn)
@ -1786,8 +1795,10 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
}
var ready, connecting, transient, shutdown int
t.Log("SubChannel trace events seen so far...")
for _, e := range scm.Trace.Events {
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
t.Log(e.Desc)
if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
transient++
}
}
@ -1798,17 +1809,19 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
}
transient = 0
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
t.Log("SubChannel trace events seen so far...")
for _, e := range scm.Trace.Events {
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
t.Log(e.Desc)
if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)) {
ready++
}
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) {
if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting)) {
connecting++
}
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) {
if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
transient++
}
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) {
if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown)) {
shutdown++
}
}
@ -1851,6 +1864,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
te.srv.Stop()
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
@ -1858,14 +1872,16 @@ func (s) TestCZChannelConnectivityState(t *testing.T) {
}
var ready, connecting, transient int
t.Log("Channel trace events seen so far...")
for _, e := range tcs[0].Trace.Events {
if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) {
t.Log(e.Desc)
if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready)) {
ready++
}
if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) {
if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting)) {
connecting++
}
if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) {
if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure)) {
transient++
}
}

View File

@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/testutils"
@ -516,7 +517,6 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
const (
balancerName = "stubBalancer-TestClusterManagerForwardsBalancerBuildOptions"
parent = int64(1234)
userAgent = "ua"
defaultTestTimeout = 1 * time.Second
)
@ -526,7 +526,7 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
ccsCh := testutils.NewChannel()
bOpts := balancer.BuildOptions{
DialCreds: insecure.NewCredentials(),
ChannelzParentID: parent,
ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil),
CustomUserAgent: userAgent,
}
stub.Register(balancerName, stub.BalancerFuncs{