diff --git a/channelz/service/service.go b/channelz/service/service.go index 4ffe13447..0a6f97eef 100644 --- a/channelz/service/service.go +++ b/channelz/service/service.go @@ -73,6 +73,35 @@ func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectiv } } +func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { + pbt := &channelzpb.ChannelTrace{} + pbt.NumEventsLogged = ct.EventNum + if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { + pbt.CreationTimestamp = ts + } + var events []*channelzpb.ChannelTraceEvent + for _, e := range ct.Events { + cte := &channelzpb.ChannelTraceEvent{ + Description: e.Desc, + Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), + } + if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { + cte.Timestamp = ts + } + if e.RefID != 0 { + switch e.RefType { + case channelz.RefChannel: + cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} + case channelz.RefSubChannel: + cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} + } + } + events = append(events, cte) + } + pbt.Events = events + return pbt +} + func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { c := &channelzpb.Channel{} c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} @@ -104,6 +133,7 @@ func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel { sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) } c.SocketRef = sockets + c.Data.Trace = channelTraceToProto(cm.Trace) return c } @@ -138,6 +168,7 @@ func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchann sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) } sc.SocketRef = sockets + sc.Data.Trace = channelTraceToProto(cm.Trace) return sc } diff --git a/channelz/service/service_test.go b/channelz/service/service_test.go index 10e00e8c5..5455c34e0 100644 --- a/channelz/service/service_test.go +++ b/channelz/service/service_test.go @@ -19,6 +19,7 @@ package service import ( + "fmt" "net" "reflect" "strconv" @@ -403,40 +404,131 @@ func TestGetServerSockets(t *testing.T) { func TestGetChannel(t *testing.T) { channelz.NewChannelzStorage() - refNames := []string{"top channel 1", "nested channel 1", "nested channel 2", "nested channel 3"} + refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"} ids := make([]int64, 4) ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0]) + channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + }) ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1]) + channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), + Severity: channelz.CtINFO, + }, + }) + ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2]) + channelz.AddTraceEvent(ids[2], &channelz.TraceEventDesc{ + Desc: "SubChannel Created", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), + Severity: channelz.CtINFO, + }, + }) ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3]) + channelz.AddTraceEvent(ids[3], &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3]), + Severity: channelz.CtINFO, + }, + }) + channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), + Severity: channelz.CtINFO, + }) + channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{ + Desc: "Resolver returns an empty address list", + Severity: channelz.CtWarning, + }) svr := newCZServer() resp, _ := svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[0]}) metrics := resp.GetChannel() subChans := metrics.GetSubchannelRef() if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] { - t.Fatalf("GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans) + t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans) } nestedChans := metrics.GetChannelRef() if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1] { - t.Fatalf("GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans) + t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans) + } + trace := metrics.GetData().GetTrace() + want := []struct { + desc string + severity channelzpb.ChannelTraceEvent_Severity + childID int64 + childRef string + }{ + {desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO}, + {desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1], childRef: refNames[1]}, + {desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2], childRef: refNames[2]}, + {desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO}, + {desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING}, } + for i, e := range trace.Events { + if e.GetDescription() != want[i].desc { + t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription()) + } + if e.GetSeverity() != want[i].severity { + t.Fatalf("trace: GetSeverity want %#v, got %#v", want[i].severity, e.GetSeverity()) + } + if want[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) { + t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference") + } + if e.GetChannelRef().GetChannelId() != want[i].childID || e.GetChannelRef().GetName() != want[i].childRef { + if e.GetSubchannelRef().GetSubchannelId() != want[i].childID || e.GetSubchannelRef().GetName() != want[i].childRef { + t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", want[i].childID, want[i].childRef, e.GetChannelRef(), e.GetSubchannelRef()) + } + } + } resp, _ = svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[1]}) metrics = resp.GetChannel() nestedChans = metrics.GetChannelRef() if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] { - t.Fatalf("GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans) + t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans) } } func TestGetSubChannel(t *testing.T) { + var ( + subchanCreated = "SubChannel Created" + subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) + subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0") + ) channelz.NewChannelzStorage() refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"} ids := make([]int64, 4) ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0]) + channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + }) ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1]) + channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{ + Desc: subchanCreated, + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0]), + Severity: channelz.CtINFO, + }, + }) ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2]) ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3]) + channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{ + Desc: subchanConnectivityChange, + Severity: channelz.CtINFO, + }) + channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{ + Desc: subChanPickNewAddress, + Severity: channelz.CtINFO, + }) svr := newCZServer() resp, _ := svr.GetSubchannel(context.Background(), &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]}) metrics := resp.GetSubchannel() @@ -445,7 +537,35 @@ func TestGetSubChannel(t *testing.T) { ids[3]: refNames[3], } if !reflect.DeepEqual(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) { - t.Fatalf("GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef()) + t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef()) + } + + trace := metrics.GetData().GetTrace() + wantTrace := []struct { + desc string + severity channelzpb.ChannelTraceEvent_Severity + childID int64 + childRef string + }{ + {desc: subchanCreated, severity: channelzpb.ChannelTraceEvent_CT_INFO}, + {desc: subchanConnectivityChange, severity: channelzpb.ChannelTraceEvent_CT_INFO}, + {desc: subChanPickNewAddress, severity: channelzpb.ChannelTraceEvent_CT_INFO}, + } + for i, e := range trace.Events { + if e.GetDescription() != wantTrace[i].desc { + t.Fatalf("trace: GetDescription want %#v, got %#v", wantTrace[i].desc, e.GetDescription()) + } + if e.GetSeverity() != wantTrace[i].severity { + t.Fatalf("trace: GetSeverity want %#v, got %#v", wantTrace[i].severity, e.GetSeverity()) + } + if wantTrace[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) { + t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference") + } + if e.GetChannelRef().GetChannelId() != wantTrace[i].childID || e.GetChannelRef().GetName() != wantTrace[i].childRef { + if e.GetSubchannelRef().GetSubchannelId() != wantTrace[i].childID || e.GetSubchannelRef().GetName() != wantTrace[i].childRef { + t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", wantTrace[i].childID, wantTrace[i].childRef, e.GetChannelRef(), e.GetSubchannelRef()) + } + } } } diff --git a/clientconn.go b/clientconn.go index 318ac4073..c4e896f82 100644 --- a/clientconn.go +++ b/clientconn.go @@ -137,9 +137,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if channelz.IsOn() { if cc.dopts.channelzParentID != 0 { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) + channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), + Severity: channelz.CtINFO, + }, + }) } else { cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) + channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ + Desc: "Channel Created", + Severity: channelz.CtINFO, + }) } + cc.csMgr.channelzID = cc.channelzID } if !cc.dopts.insecure { @@ -308,6 +321,7 @@ type connectivityStateManager struct { mu sync.Mutex state connectivity.State notifyChan chan struct{} + channelzID int64 } // updateState updates the connectivity.State of ClientConn. @@ -323,6 +337,12 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) { return } csm.state = state + if channelz.IsOn() { + channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel Connectivity change to %v", state), + Severity: channelz.CtINFO, + }) + } if csm.notifyChan != nil { // There are other goroutines waiting on this channel. close(csm.notifyChan) @@ -500,10 +520,26 @@ func (cc *ClientConn) switchBalancer(name string) { } builder := balancer.Get(name) + // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should + // we reuse previous one? + if channelz.IsOn() { + if builder == nil { + channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), + Severity: channelz.CtWarning, + }) + } else { + channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), + Severity: channelz.CtINFO, + }) + } + } if builder == nil { grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) builder = newPickfirstBuilder() } + cc.preBalancerName = cc.curBalancerName cc.curBalancerName = builder.Name() cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) @@ -541,6 +577,14 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { } if channelz.IsOn() { ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") + channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ + Desc: "Subchannel Created", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), + Severity: channelz.CtINFO, + }, + }) } cc.conns[ac] = struct{}{} cc.mu.Unlock() @@ -605,7 +649,7 @@ func (ac *addrConn) connect() error { ac.mu.Unlock() return nil } - ac.state = connectivity.Connecting + ac.updateConnectivityState(connectivity.Connecting) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.mu.Unlock() @@ -690,6 +734,17 @@ func (cc *ClientConn) handleServiceConfig(js string) error { if cc.dopts.disableServiceConfig { return nil } + if cc.scRaw == js { + return nil + } + if channelz.IsOn() { + channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ + // The special formatting of \"%s\" instead of %q is to provide nice printing of service config + // for human consumption. + Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js), + Severity: channelz.CtINFO, + }) + } sc, err := parseServiceConfig(js) if err != nil { return err @@ -788,6 +843,19 @@ func (cc *ClientConn) Close() error { ac.tearDown(ErrClientConnClosing) } if channelz.IsOn() { + ted := &channelz.TraceEventDesc{ + Desc: "Channel Deleted", + Severity: channelz.CtINFO, + } + if cc.dopts.channelzParentID != 0 { + ted.Parent = &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), + Severity: channelz.CtINFO, + } + } + channelz.AddTraceEvent(cc.channelzID, ted) + // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to + // the entity beng deleted, and thus prevent it from being deleted right away. channelz.RemoveEntry(cc.channelzID) } return nil @@ -807,7 +875,8 @@ type addrConn struct { mu sync.Mutex curAddr resolver.Address reconnectIdx int // The index in addrs list to start reconnecting from. - state connectivity.State + // Use updateConnectivityState for updating addrConn's connectivity state. + state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -830,6 +899,16 @@ type addrConn struct { czData *channelzData } +func (ac *addrConn) updateConnectivityState(s connectivity.State) { + ac.state = s + if channelz.IsOn() { + channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s), + Severity: channelz.CtINFO, + }) + } +} + // adjustParams updates parameters used to create transports upon // receiving a GoAway. func (ac *addrConn) adjustParams(r transport.GoAwayReason) { @@ -916,7 +995,7 @@ func (ac *addrConn) resetTransport() error { } ac.printf("connecting") if ac.state != connectivity.Connecting { - ac.state = connectivity.Connecting + ac.updateConnectivityState(connectivity.Connecting) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) } // copy ac.addrs in case of race @@ -939,6 +1018,12 @@ func (ac *addrConn) resetTransport() error { func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) { for i := ridx; i < len(addrs); i++ { addr := addrs[i] + if channelz.IsOn() { + channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), + Severity: channelz.CtINFO, + }) + } target := transport.TargetInfo{ Addr: addr.Addr, Metadata: addr.Metadata, @@ -999,7 +1084,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, return false, errConnClosing } ac.printf("ready") - ac.state = connectivity.Ready + ac.updateConnectivityState(connectivity.Ready) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.transport = newTr ac.curAddr = addr @@ -1025,7 +1110,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, ac.mu.Unlock() return false, errConnClosing } - ac.state = connectivity.TransientFailure + ac.updateConnectivityState(connectivity.TransientFailure) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.resolveNow(resolver.ResolveNowOption{}) if ac.ready != nil { @@ -1114,7 +1199,7 @@ func (ac *addrConn) transportMonitor() { } // Set connectivity state to TransientFailure before calling // resetTransport. Transition READY->CONNECTING is not valid. - ac.state = connectivity.TransientFailure + ac.updateConnectivityState(connectivity.TransientFailure) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.cc.resolveNow(resolver.ResolveNowOption{}) ac.curAddr = resolver.Address{} @@ -1175,7 +1260,7 @@ func (ac *addrConn) tearDown(err error) { // address removal and GoAway. ac.transport.GracefulClose() } - ac.state = connectivity.Shutdown + ac.updateConnectivityState(connectivity.Shutdown) ac.tearDownErr = err ac.cc.handleSubConnStateChange(ac.acbw, ac.state) if ac.events != nil { @@ -1187,6 +1272,16 @@ func (ac *addrConn) tearDown(err error) { ac.ready = nil } if channelz.IsOn() { + channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ + Desc: "Subchannel Deleted", + Severity: channelz.CtINFO, + Parent: &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), + Severity: channelz.CtINFO, + }, + }) + // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to + // the entity beng deleted, and thus prevent it from being deleted right away. channelz.RemoveEntry(ac.channelzID) } } diff --git a/internal/channelz/funcs.go b/internal/channelz/funcs.go index 586a0336b..539875b5a 100644 --- a/internal/channelz/funcs.go +++ b/internal/channelz/funcs.go @@ -27,16 +27,22 @@ import ( "sort" "sync" "sync/atomic" + "time" "google.golang.org/grpc/grpclog" ) +const ( + defaultMaxTraceEntry int32 = 30 +) + var ( db dbWrapper idGen idGenerator // EntryPerPage defines the number of channelz entries to be shown on a web page. - EntryPerPage = 50 - curState int32 + EntryPerPage = 50 + curState int32 + maxTraceEntry = defaultMaxTraceEntry ) // TurnOn turns on channelz data collection. @@ -52,6 +58,22 @@ func IsOn() bool { return atomic.CompareAndSwapInt32(&curState, 1, 1) } +// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). +// Setting it to 0 will disable channel tracing. +func SetMaxTraceEntry(i int32) { + atomic.StoreInt32(&maxTraceEntry, i) +} + +// ResetMaxTraceEntryToDefault resets the the maximum number of trace entry per entity to default. +func ResetMaxTraceEntryToDefault() { + atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry) +} + +func getMaxTraceEntry() int { + i := atomic.LoadInt32(&maxTraceEntry) + return int(i) +} + // dbWarpper wraps around a reference to internal channelz data storage, and // provide synchronized functionality to set and get the reference. type dbWrapper struct { @@ -146,6 +168,7 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 { nestedChans: make(map[int64]string), id: id, pid: pid, + trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } if pid == 0 { db.get().addChannel(id, cn, true, pid, ref) @@ -170,6 +193,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 { sockets: make(map[int64]string), id: id, pid: pid, + trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())}, } db.get().addSubChannel(id, sc, pid, ref) return id @@ -226,6 +250,24 @@ func RemoveEntry(id int64) { db.get().removeEntry(id) } +// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added +// to the channel trace. +// The Parent field is optional. It is used for event that will be recorded in the entity's parent +// trace also. +type TraceEventDesc struct { + Desc string + Severity Severity + Parent *TraceEventDesc +} + +// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc. +func AddTraceEvent(id int64, desc *TraceEventDesc) { + if getMaxTraceEntry() == 0 { + return + } + db.get().traceEvent(id, desc) +} + // channelMap is the storage data structure for channelz. // Methods of channelMap can be divided in two two categories with respect to locking. // 1. Methods acquire the global lock. @@ -251,6 +293,7 @@ func (c *channelMap) addServer(id int64, s *server) { func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) { c.mu.Lock() cn.cm = c + cn.trace.cm = c c.channels[id] = cn if isTopChannel { c.topLevelChannels[id] = struct{}{} @@ -263,6 +306,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) { c.mu.Lock() sc.cm = c + sc.trace.cm = c c.subChannels[id] = sc c.findEntry(pid).addChild(id, sc) c.mu.Unlock() @@ -284,16 +328,25 @@ func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref c.mu.Unlock() } -// removeEntry triggers the removal of an entry, which may not indeed delete the -// entry, if it has to wait on the deletion of its children, or may lead to a chain -// of entry deletion. For example, deleting the last socket of a gracefully shutting -// down server will lead to the server being also deleted. +// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to +// wait on the deletion of its children and until no other entity's channel trace references it. +// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully +// shutting down server will lead to the server being also deleted. func (c *channelMap) removeEntry(id int64) { c.mu.Lock() c.findEntry(id).triggerDelete() c.mu.Unlock() } +// c.mu must be held by the caller +func (c *channelMap) decrTraceRefCount(id int64) { + e := c.findEntry(id) + if v, ok := e.(tracedChannel); ok { + v.decrTraceRefCount() + e.deleteSelfIfReady() + } +} + // c.mu must be held by the caller. func (c *channelMap) findEntry(id int64) entry { var v entry @@ -347,6 +400,39 @@ func (c *channelMap) deleteEntry(id int64) { } } +func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) { + c.mu.Lock() + child := c.findEntry(id) + childTC, ok := child.(tracedChannel) + if !ok { + c.mu.Unlock() + return + } + childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()}) + if desc.Parent != nil { + parent := c.findEntry(child.getParentID()) + var chanType RefChannelType + switch child.(type) { + case *channel: + chanType = RefChannel + case *subChannel: + chanType = RefSubChannel + } + if parentTC, ok := parent.(tracedChannel); ok { + parentTC.getChannelTrace().append(&TraceEvent{ + Desc: desc.Parent.Desc, + Severity: desc.Parent.Severity, + Timestamp: time.Now(), + RefID: id, + RefName: childTC.getRefName(), + RefType: chanType, + }) + childTC.incrTraceRefCount() + } + } + c.mu.Unlock() +} + type int64Slice []int64 func (s int64Slice) Len() int { return len(s) } @@ -408,6 +494,7 @@ func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) { t[i].ChannelData = cn.c.ChannelzMetric() t[i].ID = cn.id t[i].RefName = cn.refName + t[i].Trace = cn.trace.dumpData() } return t, end } @@ -470,7 +557,7 @@ func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, for k := range svrskts { ids = append(ids, k) } - sort.Sort((int64Slice(ids))) + sort.Sort(int64Slice(ids)) idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) count := 0 var end bool @@ -518,6 +605,7 @@ func (c *channelMap) GetChannel(id int64) *ChannelMetric { cm.ChannelData = cn.c.ChannelzMetric() cm.ID = cn.id cm.RefName = cn.refName + cm.Trace = cn.trace.dumpData() return cm } @@ -536,6 +624,7 @@ func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { cm.ChannelData = sc.c.ChannelzMetric() cm.ID = sc.id cm.RefName = sc.refName + cm.Trace = sc.trace.dumpData() return cm } diff --git a/internal/channelz/types.go b/internal/channelz/types.go index 6fd6bb388..17c2274cb 100644 --- a/internal/channelz/types.go +++ b/internal/channelz/types.go @@ -20,6 +20,8 @@ package channelz import ( "net" + "sync" + "sync/atomic" "time" "google.golang.org/grpc/connectivity" @@ -40,6 +42,8 @@ type entry interface { // deleteSelfIfReady check whether triggerDelete() has been called before, and whether child // list is now empty. If both conditions are met, then delete self from database. deleteSelfIfReady() + // getParentID returns parent ID of the entry. 0 value parent ID means no parent. + getParentID() int64 } // dummyEntry is a fake entry to handle entry not found case. @@ -73,6 +77,10 @@ func (*dummyEntry) deleteSelfIfReady() { // code should not reach here. deleteSelfIfReady is always called on an existing entry. } +func (*dummyEntry) getParentID() int64 { + return 0 +} + // ChannelMetric defines the info channelz provides for a specific Channel, which // includes ChannelInternalMetric and channelz-specific data, such as channelz id, // child list, etc. @@ -95,6 +103,8 @@ type ChannelMetric struct { // Note current grpc implementation doesn't allow channel having sockets directly, // therefore, this is field is unused. Sockets map[int64]string + // Trace contains the most recent traced events. + Trace *ChannelTrace } // SubChannelMetric defines the info channelz provides for a specific SubChannel, @@ -121,6 +131,8 @@ type SubChannelMetric struct { // Sockets tracks the socket type children of this subchannel in the format of a map // from socket channelz id to corresponding reference string. Sockets map[int64]string + // Trace contains the most recent traced events. + Trace *ChannelTrace } // ChannelInternalMetric defines the struct that the implementor of Channel interface @@ -138,7 +150,35 @@ type ChannelInternalMetric struct { CallsFailed int64 // The last time a call was started on the channel. LastCallStartedTimestamp time.Time - //TODO: trace +} + +// ChannelTrace stores traced events on a channel/subchannel and related info. +type ChannelTrace struct { + // EventNum is the number of events that ever got traced (i.e. including those that have been deleted) + EventNum int64 + // CreationTime is the creation time of the trace. + CreationTime time.Time + // Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the + // oldest one) + Events []*TraceEvent +} + +// TraceEvent represent a single trace event +type TraceEvent struct { + // Desc is a simple description of the trace event. + Desc string + // Severity states the severity of this trace event. + Severity Severity + // Timestamp is the event time. + Timestamp time.Time + // RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is + // involved in this event. + // e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside []) + RefID int64 + // RefName is the reference name for the entity that gets referenced in the event. + RefName string + // RefType indicates the referenced entity type, i.e Channel or SubChannel. + RefType RefChannelType } // Channel is the interface that should be satisfied in order to be tracked by @@ -147,6 +187,12 @@ type Channel interface { ChannelzMetric() *ChannelInternalMetric } +type dummyChannel struct{} + +func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric { + return &ChannelInternalMetric{} +} + type channel struct { refName string c Channel @@ -156,6 +202,10 @@ type channel struct { id int64 pid int64 cm *channelMap + trace *channelTrace + // traceRefCount is the number of trace events that reference this channel. + // Non-zero traceRefCount means the trace of this channel cannot be deleted. + traceRefCount int32 } func (c *channel) addChild(id int64, e entry) { @@ -180,25 +230,96 @@ func (c *channel) triggerDelete() { c.deleteSelfIfReady() } -func (c *channel) deleteSelfIfReady() { +func (c *channel) getParentID() int64 { + return c.pid +} + +// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means +// deleting the channel reference from its parent's child list. +// +// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the +// corresponding grpc object has been invoked, and the channel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (c *channel) deleteSelfFromTree() (deleted bool) { if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { - return + return false } - c.cm.deleteEntry(c.id) // not top channel if c.pid != 0 { c.cm.findEntry(c.pid).deleteChild(c.id) } + return true +} + +// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means +// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the +// channel, and its memory will be garbage collected. +// +// The trace reference count of the channel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (c *channel) deleteSelfFromMap() (delete bool) { + if c.getTraceRefCount() != 0 { + c.c = &dummyChannel{} + return false + } + return true +} + +// deleteSelfIfReady tries to delete the channel itself from the channelz database. +// The delete process includes two steps: +// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its +// parent's child list. +// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id +// will return entry not found error. +func (c *channel) deleteSelfIfReady() { + if !c.deleteSelfFromTree() { + return + } + if !c.deleteSelfFromMap() { + return + } + c.cm.deleteEntry(c.id) + c.trace.clear() +} + +func (c *channel) getChannelTrace() *channelTrace { + return c.trace +} + +func (c *channel) incrTraceRefCount() { + atomic.AddInt32(&c.traceRefCount, 1) +} + +func (c *channel) decrTraceRefCount() { + atomic.AddInt32(&c.traceRefCount, -1) +} + +func (c *channel) getTraceRefCount() int { + i := atomic.LoadInt32(&c.traceRefCount) + return int(i) +} + +func (c *channel) getRefName() string { + return c.refName } type subChannel struct { - refName string - c Channel - closeCalled bool - sockets map[int64]string - id int64 - pid int64 - cm *channelMap + refName string + c Channel + closeCalled bool + sockets map[int64]string + id int64 + pid int64 + cm *channelMap + trace *channelTrace + traceRefCount int32 } func (sc *subChannel) addChild(id int64, e entry) { @@ -219,12 +340,82 @@ func (sc *subChannel) triggerDelete() { sc.deleteSelfIfReady() } -func (sc *subChannel) deleteSelfIfReady() { +func (sc *subChannel) getParentID() int64 { + return sc.pid +} + +// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which +// means deleting the subchannel reference from its parent's child list. +// +// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of +// the corresponding grpc object has been invoked, and the subchannel does not have any children left. +// +// The returned boolean value indicates whether the channel has been successfully deleted from tree. +func (sc *subChannel) deleteSelfFromTree() (deleted bool) { if !sc.closeCalled || len(sc.sockets) != 0 { + return false + } + sc.cm.findEntry(sc.pid).deleteChild(sc.id) + return true +} + +// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means +// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query +// the subchannel, and its memory will be garbage collected. +// +// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is +// specified in the channel tracing gRFC that as long as some other trace has reference to an entity, +// the trace of the referenced entity must not be deleted. In order to release the resource allocated +// by grpc, the reference to the grpc object is reset to a dummy object. +// +// deleteSelfFromMap must be called after deleteSelfFromTree returns true. +// +// It returns a bool to indicate whether the channel can be safely deleted from map. +func (sc *subChannel) deleteSelfFromMap() (delete bool) { + if sc.getTraceRefCount() != 0 { + // free the grpc struct (i.e. addrConn) + sc.c = &dummyChannel{} + return false + } + return true +} + +// deleteSelfIfReady tries to delete the subchannel itself from the channelz database. +// The delete process includes two steps: +// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from +// its parent's child list. +// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup +// by id will return entry not found error. +func (sc *subChannel) deleteSelfIfReady() { + if !sc.deleteSelfFromTree() { + return + } + if !sc.deleteSelfFromMap() { return } sc.cm.deleteEntry(sc.id) - sc.cm.findEntry(sc.pid).deleteChild(sc.id) + sc.trace.clear() +} + +func (sc *subChannel) getChannelTrace() *channelTrace { + return sc.trace +} + +func (sc *subChannel) incrTraceRefCount() { + atomic.AddInt32(&sc.traceRefCount, 1) +} + +func (sc *subChannel) decrTraceRefCount() { + atomic.AddInt32(&sc.traceRefCount, -1) +} + +func (sc *subChannel) getTraceRefCount() int { + i := atomic.LoadInt32(&sc.traceRefCount) + return int(i) +} + +func (sc *subChannel) getRefName() string { + return sc.refName } // SocketMetric defines the info channelz provides for a specific Socket, which @@ -318,6 +509,10 @@ func (ls *listenSocket) deleteSelfIfReady() { grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket") } +func (ls *listenSocket) getParentID() int64 { + return ls.pid +} + type normalSocket struct { refName string s Socket @@ -343,6 +538,10 @@ func (ns *normalSocket) deleteSelfIfReady() { grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket") } +func (ns *normalSocket) getParentID() int64 { + return ns.pid +} + // ServerMetric defines the info channelz provides for a specific Server, which // includes ServerInternalMetric and channelz-specific data, such as channelz id, // child list, etc. @@ -370,7 +569,6 @@ type ServerInternalMetric struct { CallsFailed int64 // The last time a call was started on the server. LastCallStartedTimestamp time.Time - //TODO: trace } // Server is the interface to be satisfied in order to be tracked by channelz as @@ -417,3 +615,88 @@ func (s *server) deleteSelfIfReady() { } s.cm.deleteEntry(s.id) } + +func (s *server) getParentID() int64 { + return 0 +} + +type tracedChannel interface { + getChannelTrace() *channelTrace + incrTraceRefCount() + decrTraceRefCount() + getRefName() string +} + +type channelTrace struct { + cm *channelMap + createdTime time.Time + eventCount int64 + mu sync.Mutex + events []*TraceEvent +} + +func (c *channelTrace) append(e *TraceEvent) { + c.mu.Lock() + if len(c.events) == getMaxTraceEntry() { + del := c.events[0] + c.events = c.events[1:] + if del.RefID != 0 { + // start recursive cleanup in a goroutine to not block the call originated from grpc. + go func() { + // need to acquire c.cm.mu lock to call the unlocked attemptCleanup func. + c.cm.mu.Lock() + c.cm.decrTraceRefCount(del.RefID) + c.cm.mu.Unlock() + }() + } + } + e.Timestamp = time.Now() + c.events = append(c.events, e) + c.eventCount++ + c.mu.Unlock() +} + +func (c *channelTrace) clear() { + c.mu.Lock() + for _, e := range c.events { + if e.RefID != 0 { + // caller should have already held the c.cm.mu lock. + c.cm.decrTraceRefCount(e.RefID) + } + } + c.mu.Unlock() +} + +// Severity is the severity level of a trace event. +// The canonical enumeration of all valid values is here: +// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126. +type Severity int + +const ( + // CtUNKNOWN indicates unknown severity of a trace event. + CtUNKNOWN Severity = iota + // CtINFO indicates info level severity of a trace event. + CtINFO + // CtWarning indicates warning level severity of a trace event. + CtWarning + // CtError indicates error level severity of a trace event. + CtError +) + +// RefChannelType is the type of the entity being referenced in a trace event. +type RefChannelType int + +const ( + // RefChannel indicates the referenced entity is a Channel. + RefChannel RefChannelType = iota + // RefSubChannel indicates the referenced entity is a SubChannel. + RefSubChannel +) + +func (c *channelTrace) dumpData() *ChannelTrace { + c.mu.Lock() + ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime} + ct.Events = c.events[:len(c.events)] + c.mu.Unlock() + return ct +} diff --git a/pickfirst.go b/pickfirst.go index bf659d49d..bda4309c0 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -56,6 +56,7 @@ func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err er if b.sc == nil { b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) if err != nil { + //TODO(yuxuanli): why not change the cc state to Idle? grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) return } diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 494d6931e..a6c02ac9e 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -23,17 +23,19 @@ import ( "strings" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/resolver" ) // ccResolverWrapper is a wrapper on top of cc for resolvers. // It implements resolver.ClientConnection interface. type ccResolverWrapper struct { - cc *ClientConn - resolver resolver.Resolver - addrCh chan []resolver.Address - scCh chan string - done chan struct{} + cc *ClientConn + resolver resolver.Resolver + addrCh chan []resolver.Address + scCh chan string + done chan struct{} + lastAddressesCount int } // split2 returns the values from strings.SplitN(s, sep, 2). @@ -114,6 +116,9 @@ func (ccr *ccResolverWrapper) watcher() { default: } grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) + if channelz.IsOn() { + ccr.addChannelzTraceEvent(addrs) + } ccr.cc.handleResolvedAddrs(addrs, nil) case sc := <-ccr.scCh: select { @@ -156,3 +161,29 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { } ccr.scCh <- sc } + +func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) { + if len(addrs) == 0 && ccr.lastAddressesCount != 0 { + channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{ + Desc: "Resolver returns an empty address list", + Severity: channelz.CtWarning, + }) + } else if len(addrs) != 0 && ccr.lastAddressesCount == 0 { + var s string + for i, a := range addrs { + if a.ServerName != "" { + s += a.Addr + "(" + a.ServerName + ")" + } else { + s += a.Addr + } + if i != len(addrs)-1 { + s += " " + } + } + channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{ + Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s), + Severity: channelz.CtINFO, + }) + } + ccr.lastAddressesCount = len(addrs) +} diff --git a/test/channelz_test.go b/test/channelz_test.go index f4e781ae5..86e851e50 100644 --- a/test/channelz_test.go +++ b/test/channelz_test.go @@ -29,7 +29,9 @@ import ( "golang.org/x/net/http2" "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/grpclb" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/leakcheck" "google.golang.org/grpc/keepalive" @@ -121,7 +123,7 @@ func TestCZTopChannelRegistrationAndDeletion(t *testing.T) { } if err := verifyResultWithDelay(func() (bool, error) { if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != c.length || end != c.end { - return false, fmt.Errorf("GetTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end) + return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end) } return true, nil }); err != nil { @@ -134,7 +136,7 @@ func TestCZTopChannelRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != 0 || !end { - return false, fmt.Errorf("GetTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end) + return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end) } return true, nil }); err != nil { @@ -162,10 +164,10 @@ func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0) if len(tcs) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tcs)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } if len(tcs[0].NestedChans) != 1 { - return false, fmt.Errorf("There should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) } return true, nil }); err != nil { @@ -179,10 +181,10 @@ func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0) if len(tcs) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tcs)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } if len(tcs[0].NestedChans) != 0 { - return false, fmt.Errorf("There should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) } return true, nil }); err != nil { @@ -212,10 +214,10 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0) if len(tcs) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tcs)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } if len(tcs[0].SubChans) != num { - return false, fmt.Errorf("There should be %d subchannel not %d", num, len(tcs[0].SubChans)) + return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) } count := 0 for k := range tcs[0].SubChans { @@ -226,7 +228,7 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { count += len(sc.Sockets) } if count != num { - return false, fmt.Errorf("There should be %d sockets not %d", num, count) + return false, fmt.Errorf("there should be %d sockets not %d", num, count) } return true, nil @@ -239,10 +241,10 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0) if len(tcs) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tcs)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } if len(tcs[0].SubChans) != num-1 { - return false, fmt.Errorf("There should be %d subchannel not %d", num-1, len(tcs[0].SubChans)) + return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(tcs[0].SubChans)) } count := 0 for k := range tcs[0].SubChans { @@ -253,7 +255,7 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { count += len(sc.Sockets) } if count != num-1 { - return false, fmt.Errorf("There should be %d sockets not %d", num-1, count) + return false, fmt.Errorf("there should be %d sockets not %d", num-1, count) } return true, nil @@ -285,14 +287,14 @@ func TestCZServerSocketRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } if len(ss[0].ListenSockets) != 1 { - return false, fmt.Errorf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets)) + return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) } ns, _ := channelz.GetServerSockets(ss[0].ID, 0) if len(ns) != num { - return false, fmt.Errorf("There should be %d normal sockets not %d", num, len(ns)) + return false, fmt.Errorf("there should be %d normal sockets not %d", num, len(ns)) } svrID = ss[0].ID return true, nil @@ -305,7 +307,7 @@ func TestCZServerSocketRegistrationAndDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ns, _ := channelz.GetServerSockets(svrID, 0) if len(ns) != num-1 { - return false, fmt.Errorf("There should be %d normal sockets not %d", num-1, len(ns)) + return false, fmt.Errorf("there should be %d normal sockets not %d", num-1, len(ns)) } return true, nil }); err != nil { @@ -325,10 +327,10 @@ func TestCZServerListenSocketDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } if len(ss[0].ListenSockets) != 1 { - return false, fmt.Errorf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets)) + return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets)) } return true, nil }); err != nil { @@ -339,7 +341,7 @@ func TestCZServerListenSocketDeletion(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should be 1 server, not %d", len(ss)) + return false, fmt.Errorf("there should be 1 server, not %d", len(ss)) } return true, nil }); err != nil { @@ -461,10 +463,10 @@ func TestCZChannelMetrics(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tcs, _ := channelz.GetTopChannels(0) if len(tcs) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tcs)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) } if len(tcs[0].SubChans) != num { - return false, fmt.Errorf("There should be %d subchannel not %d", num, len(tcs[0].SubChans)) + return false, fmt.Errorf("there should be %d subchannel not %d", num, len(tcs[0].SubChans)) } var cst, csu, cf int64 for k := range tcs[0].SubChans { @@ -477,22 +479,22 @@ func TestCZChannelMetrics(t *testing.T) { cf += sc.ChannelData.CallsFailed } if cst != 3 { - return false, fmt.Errorf("There should be 3 CallsStarted not %d", cst) + return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst) } if csu != 1 { - return false, fmt.Errorf("There should be 1 CallsSucceeded not %d", csu) + return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu) } if cf != 1 { - return false, fmt.Errorf("There should be 1 CallsFailed not %d", cf) + return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf) } if tcs[0].ChannelData.CallsStarted != 3 { - return false, fmt.Errorf("There should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted) + return false, fmt.Errorf("there should be 3 CallsStarted not %d", tcs[0].ChannelData.CallsStarted) } if tcs[0].ChannelData.CallsSucceeded != 1 { - return false, fmt.Errorf("There should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded) + return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", tcs[0].ChannelData.CallsSucceeded) } if tcs[0].ChannelData.CallsFailed != 1 { - return false, fmt.Errorf("There should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed) + return false, fmt.Errorf("there should be 1 CallsFailed not %d", tcs[0].ChannelData.CallsFailed) } return true, nil }); err != nil { @@ -539,16 +541,16 @@ func TestCZServerMetrics(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } if ss[0].ServerData.CallsStarted != 3 { - return false, fmt.Errorf("There should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted) + return false, fmt.Errorf("there should be 3 CallsStarted not %d", ss[0].ServerData.CallsStarted) } if ss[0].ServerData.CallsSucceeded != 1 { - return false, fmt.Errorf("There should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded) + return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", ss[0].ServerData.CallsSucceeded) } if ss[0].ServerData.CallsFailed != 1 { - return false, fmt.Errorf("There should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed) + return false, fmt.Errorf("there should be 1 CallsFailed not %d", ss[0].ServerData.CallsFailed) } return true, nil }); err != nil { @@ -802,10 +804,10 @@ func TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0) if len(tchan) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tchan)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) } if len(tchan[0].SubChans) != 1 { - return false, fmt.Errorf("There should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) + return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) } for scID = range tchan[0].SubChans { @@ -813,10 +815,10 @@ func TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) { } sc := channelz.GetSubChannel(scID) if sc == nil { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not 0", scID) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID) } if len(sc.Sockets) != 1 { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) } for skID = range sc.Sockets { break @@ -903,10 +905,10 @@ func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testi if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0) if len(tchan) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tchan)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) } if len(tchan[0].SubChans) != 1 { - return false, fmt.Errorf("There should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) + return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) } var id int64 for id = range tchan[0].SubChans { @@ -914,10 +916,10 @@ func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testi } sc := channelz.GetSubChannel(id) if sc == nil { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not 0", id) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) } if len(sc.Sockets) != 1 { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) } for id = range sc.Sockets { break @@ -929,16 +931,16 @@ func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testi } ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } ns, _ := channelz.GetServerSockets(ss[0].ID, 0) if len(ns) != 1 { - return false, fmt.Errorf("There should be one server normal socket, not %d", len(ns)) + return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) } sktData = ns[0].SocketData if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 { - return false, fmt.Errorf("Server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) + return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed) } return true, nil }); err != nil { @@ -969,10 +971,10 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0) if len(tchan) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tchan)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) } if len(tchan[0].SubChans) != 1 { - return false, fmt.Errorf("There should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) + return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) } var id int64 for id = range tchan[0].SubChans { @@ -980,10 +982,10 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { } sc := channelz.GetSubChannel(id) if sc == nil { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not 0", id) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) } if len(sc.Sockets) != 1 { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) } for id = range sc.Sockets { break @@ -992,16 +994,16 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { sktData := skt.SocketData // 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { - return false, fmt.Errorf("Client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } ns, _ := channelz.GetServerSockets(ss[0].ID, 0) sktData = ns[0].SocketData if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 { - return false, fmt.Errorf("Server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } cliSktID, svrSktID = id, ss[0].ID return true, nil @@ -1017,16 +1019,16 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 // Remote: 65536 - 5 (Length-Prefixed-Message size) * 10 - 10011 = 55475 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 { - return false, fmt.Errorf("Client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } ns, _ := channelz.GetServerSockets(svrSktID, 0) sktData = ns[0].SocketData if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 { - return false, fmt.Errorf("Server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } return true, nil }); err != nil { @@ -1042,16 +1044,16 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) { // Local: 65536 - 5 (Length-Prefixed-Message size) * 10 = 65486 // Remote: 65536 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 { - return false, fmt.Errorf("Client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } ns, _ := channelz.GetServerSockets(svrSktID, 0) sktData = ns[0].SocketData if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 { - return false, fmt.Errorf("Server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) + return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow) } return true, nil }); err != nil { @@ -1074,10 +1076,10 @@ func TestCZClientSocketMetricsKeepAlive(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { tchan, _ := channelz.GetTopChannels(0) if len(tchan) != 1 { - return false, fmt.Errorf("There should only be one top channel, not %d", len(tchan)) + return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan)) } if len(tchan[0].SubChans) != 1 { - return false, fmt.Errorf("There should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) + return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(tchan[0].SubChans)) } var id int64 for id = range tchan[0].SubChans { @@ -1085,17 +1087,17 @@ func TestCZClientSocketMetricsKeepAlive(t *testing.T) { } sc := channelz.GetSubChannel(id) if sc == nil { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not 0", id) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id) } if len(sc.Sockets) != 1 { - return false, fmt.Errorf("There should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) + return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(sc.Sockets)) } for id = range sc.Sockets { break } skt := channelz.GetSocket(id) if skt.SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives. - return false, fmt.Errorf("There should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) + return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", skt.SocketData.KeepAlivesSent) } return true, nil }); err != nil { @@ -1119,7 +1121,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should only be one server, not %d", len(ss)) + return false, fmt.Errorf("there should only be one server, not %d", len(ss)) } svrID = ss[0].ID return true, nil @@ -1132,7 +1134,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { ns, _ := channelz.GetServerSockets(svrID, 0) sktData := ns[0].SocketData if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { - return false, fmt.Errorf("Server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) + return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) } return true, nil }); err != nil { @@ -1144,7 +1146,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { ns, _ := channelz.GetServerSockets(svrID, 0) sktData := ns[0].SocketData if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 { - return false, fmt.Errorf("Server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) + return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) } return true, nil }); err != nil { @@ -1156,7 +1158,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) { ns, _ := channelz.GetServerSockets(svrID, 0) sktData := ns[0].SocketData if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 { - return false, fmt.Errorf("Server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) + return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived) } return true, nil }); err != nil { @@ -1179,14 +1181,650 @@ func TestCZServerSocketMetricsKeepAlive(t *testing.T) { if err := verifyResultWithDelay(func() (bool, error) { ss, _ := channelz.GetServers(0) if len(ss) != 1 { - return false, fmt.Errorf("There should be one server, not %d", len(ss)) + return false, fmt.Errorf("there should be one server, not %d", len(ss)) } ns, _ := channelz.GetServerSockets(ss[0].ID, 0) if len(ns) != 1 { - return false, fmt.Errorf("There should be one server normal socket, not %d", len(ns)) + return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns)) } if ns[0].SocketData.KeepAlivesSent != 2 { // doIdleCallToInvokeKeepAlive func is set up to send 2 KeepAlives. - return false, fmt.Errorf("There should be 2 KeepAlives sent, not %d", ns[0].SocketData.KeepAlivesSent) + return false, fmt.Errorf("there should be 2 KeepAlives sent, not %d", ns[0].SocketData.KeepAlivesSent) + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZChannelTraceCreationDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + // avoid calling API to set balancer type, which will void service config's change of balancer. + e.balancer = "" + te := newTest(t, e) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} + r.InitialAddrs(resolvedAddrs) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + var nestedConn int64 + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 1 { + return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + for k := range tcs[0].NestedChans { + nestedConn = k + } + for _, e := range tcs[0].Trace.Events { + if e.RefID == nestedConn && e.RefType != channelz.RefChannel { + return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType") + } + } + ncm := channelz.GetChannel(nestedConn) + if ncm.Trace == nil { + return false, fmt.Errorf("trace for nested channel should not be empty") + } + if len(ncm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") + } + if ncm.Trace.Events[0].Desc != "Channel Created" { + return false, fmt.Errorf("the first trace event should be \"Channel Created\", not %q", ncm.Trace.Events[0].Desc) + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}}) + + // wait for the shutdown of grpclb balancer + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 0 { + return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + ncm := channelz.GetChannel(nestedConn) + if ncm == nil { + return false, fmt.Errorf("nested channel should still exist due to parent's trace reference") + } + if ncm.Trace == nil { + return false, fmt.Errorf("trace for nested channel should not be empty") + } + if len(ncm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for nested channel not 0") + } + if ncm.Trace.Events[len(ncm.Trace.Events)-1].Desc != "Channel Deleted" { + return false, fmt.Errorf("the first trace event should be \"Channel Deleted\", not %q", ncm.Trace.Events[0].Desc) + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZSubChannelTraceCreationDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + var subConn int64 + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) + } + for k := range tcs[0].SubChans { + subConn = k + } + for _, e := range tcs[0].Trace.Events { + if e.RefID == subConn && e.RefType != channelz.RefSubChannel { + return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel") + } + } + scm := channelz.GetSubChannel(subConn) + if scm == nil { + return false, fmt.Errorf("subChannel does not exist") + } + if scm.Trace == nil { + return false, fmt.Errorf("trace for subChannel should not be empty") + } + if len(scm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") + } + if scm.Trace.Events[0].Desc != "Subchannel Created" { + return false, fmt.Errorf("the first trace event should be \"Subchannel Created\", not %q", scm.Trace.Events[0].Desc) + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewAddress([]resolver.Address{}) + + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 0 { + return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans)) + } + scm := channelz.GetSubChannel(subConn) + if scm == nil { + return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") + } + if scm.Trace == nil { + return false, fmt.Errorf("trace for SubChannel should not be empty") + } + if len(scm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") + } + if scm.Trace.Events[len(scm.Trace.Events)-1].Desc != "Subchannel Deleted" { + return false, fmt.Errorf("the first trace event should be \"Subchannel Deleted\", not %q", scm.Trace.Events[0].Desc) + } + + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZChannelAddressResolutionChange(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + e.balancer = "" + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + var cid int64 + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + cid = tcs[0].ID + for i := len(tcs[0].Trace.Events) - 1; i >= 0; i-- { + if tcs[0].Trace.Events[i].Desc == fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", te.srvAddr) { + break + } + if i == 0 { + return false, fmt.Errorf("events do not contain expected address resolution from empty address state") + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(cid) + for i := len(cm.Trace.Events) - 1; i >= 0; i-- { + if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name) { + break + } + if i == 0 { + return false, fmt.Errorf("events do not contain expected address resolution change of LB policy") + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + newSc := `{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + }, + ], + "waitForReady": false, + "timeout": ".001s" + } + ] +}` + + r.NewServiceConfig(newSc) + + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(cid) + + for i := len(cm.Trace.Events) - 1; i >= 0; i-- { + if cm.Trace.Events[i].Desc == fmt.Sprintf("Channel has a new service config \"%s\"", newSc) { + break + } + if i == 0 { + return false, fmt.Errorf("events do not contain expected address resolution of new service config") + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewAddress([]resolver.Address{}) + + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(cid) + for i := len(cm.Trace.Events) - 1; i >= 0; i-- { + if cm.Trace.Events[i].Desc == "Resolver returns an empty address list" { + break + } + if i == 0 { + return false, fmt.Errorf("events do not contain expected address resolution of empty address") + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZSubChannelPickedNewAddress(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + e.balancer = "" + te := newTest(t, e) + te.startServers(&testServer{security: e.security}, 3) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + var svrAddrs []resolver.Address + for _, a := range te.srvAddrs { + svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) + } + r.InitialAddrs(svrAddrs) + te.resolverScheme = r.Scheme() + cc := te.clientConn() + defer te.tearDown() + tc := testpb.NewTestServiceClient(cc) + // make sure the connection is up + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) + } + te.srvs[0].Stop() + te.srvs[1].Stop() + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) + } + var subConn int64 + for k := range tcs[0].SubChans { + subConn = k + } + scm := channelz.GetSubChannel(subConn) + if scm.Trace == nil { + return false, fmt.Errorf("trace for SubChannel should not be empty") + } + if len(scm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") + } + for i := len(scm.Trace.Events) - 1; i >= 0; i-- { + if scm.Trace.Events[i].Desc == fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2]) { + break + } + if i == 0 { + return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address") + } + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZSubChannelConnectivityState(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + cc := te.clientConn() + defer te.tearDown() + tc := testpb.NewTestServiceClient(cc) + // make sure the connection is up + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) + } + var subConn int64 + te.srv.Stop() + + if err := verifyResultWithDelay(func() (bool, error) { + // we need to obtain the SubChannel id before it gets deleted from Channel's children list (due + // to effect of r.NewAddress([]resolver.Address{})) + if subConn == 0 { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) + } + for k := range tcs[0].SubChans { + // get the SubChannel id for further trace inquiry. + subConn = k + } + } + scm := channelz.GetSubChannel(subConn) + if scm == nil { + return false, fmt.Errorf("subChannel should still exist due to parent's trace reference") + } + if scm.Trace == nil { + return false, fmt.Errorf("trace for SubChannel should not be empty") + } + if len(scm.Trace.Events) == 0 { + return false, fmt.Errorf("there should be at least one trace event for subChannel not 0") + } + var ready, connecting, transient, shutdown int + for _, e := range scm.Trace.Events { + if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { + transient++ + } + } + // Make sure the SubChannel has already seen transient failure before shutting it down through + // r.NewAddress([]resolver.Address{}). + if transient == 0 { + return false, fmt.Errorf("transient failure has not happened on SubChannel yet") + } + transient = 0 + r.NewAddress([]resolver.Address{}) + for _, e := range scm.Trace.Events { + if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) { + ready++ + } + if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting) { + connecting++ + } + if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure) { + transient++ + } + if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown) { + shutdown++ + } + } + // example: + // Subchannel Created + // Subchannel's connectivity state changed to CONNECTING + // Subchannel picked a new address: "localhost:36011" + // Subchannel's connectivity state changed to READY + // Subchannel's connectivity state changed to TRANSIENT_FAILURE + // Subchannel's connectivity state changed to CONNECTING + // Subchannel picked a new address: "localhost:36011" + // Subchannel's connectivity state changed to SHUTDOWN + // Subchannel Deleted + if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 { + return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown) + } + + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZChannelConnectivityState(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + cc := te.clientConn() + defer te.tearDown() + tc := testpb.NewTestServiceClient(cc) + // make sure the connection is up + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) + } + te.srv.Stop() + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + + var ready, connecting, transient int + for _, e := range tcs[0].Trace.Events { + if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready) { + ready++ + } + if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting) { + connecting++ + } + if e.Desc == fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure) { + transient++ + } + } + + // example: + // Channel Created + // Adressses resolved (from empty address state): "localhost:40467" + // SubChannel (id: 4[]) Created + // Channel's connectivity state changed to CONNECTING + // Channel's connectivity state changed to READY + // Channel's connectivity state changed to TRANSIENT_FAILURE + // Channel's connectivity state changed to CONNECTING + // Channel's connectivity state changed to TRANSIENT_FAILURE + if ready != 1 || connecting < 1 || transient < 1 { + return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient) + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZTraceOverwriteChannelDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + // avoid calling API to set balancer type, which will void service config's change of balancer. + e.balancer = "" + te := newTest(t, e) + channelz.SetMaxTraceEntry(1) + defer channelz.ResetMaxTraceEntryToDefault() + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} + r.InitialAddrs(resolvedAddrs) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + var nestedConn int64 + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 1 { + return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + for k := range tcs[0].NestedChans { + nestedConn = k + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}}) + + // wait for the shutdown of grpclb balancer + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 0 { + return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + // verify that the nested channel no longer exist due to trace referencing it got overwritten. + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(nestedConn) + if cm != nil { + return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore") + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZTraceOverwriteSubChannelDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + channelz.SetMaxTraceEntry(1) + defer channelz.ResetMaxTraceEntryToDefault() + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + var subConn int64 + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) + } + for k := range tcs[0].SubChans { + subConn = k + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + r.NewAddress([]resolver.Address{}) + + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 0 { + return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans)) + } + return true, nil + }); err != nil { + t.Fatal(err) + } + + // verify that the subchannel no longer exist due to trace referencing it got overwritten. + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(subConn) + if cm != nil { + return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") + } + return true, nil + }); err != nil { + t.Fatal(err) + } +} + +func TestCZTraceTopChannelDeletionTraceClear(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + r.InitialAddrs([]resolver.Address{{Addr: te.srvAddr}}) + te.resolverScheme = r.Scheme() + te.clientConn() + var subConn int64 + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + if err := verifyResultWithDelay(func() (bool, error) { + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != 1 { + return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans)) + } + for k := range tcs[0].SubChans { + subConn = k + } + return true, nil + }); err != nil { + t.Fatal(err) + } + te.tearDown() + // verify that the subchannel no longer exist due to parent channel got deleted and its trace cleared. + if err := verifyResultWithDelay(func() (bool, error) { + cm := channelz.GetChannel(subConn) + if cm != nil { + return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore") } return true, nil }); err != nil {