mirror of https://github.com/grpc/grpc-go.git
channelz: channel tracing (#2262)
* channelz: channel trancing * add service * update * uuu * better testing * switch to single API * fix lint * fix review comments * fix fix review * uuuupdate * switch on channel type, instead of using boolean
This commit is contained in:
parent
9cc4fdbde2
commit
acd1429515
|
@ -73,6 +73,35 @@ func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectiv
|
|||
}
|
||||
}
|
||||
|
||||
func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
|
||||
pbt := &channelzpb.ChannelTrace{}
|
||||
pbt.NumEventsLogged = ct.EventNum
|
||||
if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
|
||||
pbt.CreationTimestamp = ts
|
||||
}
|
||||
var events []*channelzpb.ChannelTraceEvent
|
||||
for _, e := range ct.Events {
|
||||
cte := &channelzpb.ChannelTraceEvent{
|
||||
Description: e.Desc,
|
||||
Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity),
|
||||
}
|
||||
if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
|
||||
cte.Timestamp = ts
|
||||
}
|
||||
if e.RefID != 0 {
|
||||
switch e.RefType {
|
||||
case channelz.RefChannel:
|
||||
cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
|
||||
case channelz.RefSubChannel:
|
||||
cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
|
||||
}
|
||||
}
|
||||
events = append(events, cte)
|
||||
}
|
||||
pbt.Events = events
|
||||
return pbt
|
||||
}
|
||||
|
||||
func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
|
||||
c := &channelzpb.Channel{}
|
||||
c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
|
||||
|
@ -104,6 +133,7 @@ func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
|
|||
sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
|
||||
}
|
||||
c.SocketRef = sockets
|
||||
c.Data.Trace = channelTraceToProto(cm.Trace)
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -138,6 +168,7 @@ func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchann
|
|||
sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
|
||||
}
|
||||
sc.SocketRef = sockets
|
||||
sc.Data.Trace = channelTraceToProto(cm.Trace)
|
||||
return sc
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
@ -403,40 +404,131 @@ func TestGetServerSockets(t *testing.T) {
|
|||
|
||||
func TestGetChannel(t *testing.T) {
|
||||
channelz.NewChannelzStorage()
|
||||
refNames := []string{"top channel 1", "nested channel 1", "nested channel 2", "nested channel 3"}
|
||||
refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
|
||||
ids := make([]int64, 4)
|
||||
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
|
||||
channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
|
||||
channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
|
||||
ids[2] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
|
||||
channelz.AddTraceEvent(ids[2], &channelz.TraceEventDesc{
|
||||
Desc: "SubChannel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
|
||||
channelz.AddTraceEvent(ids[3], &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3]),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{
|
||||
Desc: "Resolver returns an empty address list",
|
||||
Severity: channelz.CtWarning,
|
||||
})
|
||||
svr := newCZServer()
|
||||
resp, _ := svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[0]})
|
||||
metrics := resp.GetChannel()
|
||||
subChans := metrics.GetSubchannelRef()
|
||||
if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2] {
|
||||
t.Fatalf("GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans)
|
||||
t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2], Name: refNames[2]}}, subChans)
|
||||
}
|
||||
nestedChans := metrics.GetChannelRef()
|
||||
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1] {
|
||||
t.Fatalf("GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans)
|
||||
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1], Name: refNames[1]}}, nestedChans)
|
||||
}
|
||||
trace := metrics.GetData().GetTrace()
|
||||
want := []struct {
|
||||
desc string
|
||||
severity channelzpb.ChannelTraceEvent_Severity
|
||||
childID int64
|
||||
childRef string
|
||||
}{
|
||||
{desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO},
|
||||
{desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1], childRef: refNames[1]},
|
||||
{desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2]), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2], childRef: refNames[2]},
|
||||
{desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO},
|
||||
{desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING},
|
||||
}
|
||||
|
||||
for i, e := range trace.Events {
|
||||
if e.GetDescription() != want[i].desc {
|
||||
t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription())
|
||||
}
|
||||
if e.GetSeverity() != want[i].severity {
|
||||
t.Fatalf("trace: GetSeverity want %#v, got %#v", want[i].severity, e.GetSeverity())
|
||||
}
|
||||
if want[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
|
||||
t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
|
||||
}
|
||||
if e.GetChannelRef().GetChannelId() != want[i].childID || e.GetChannelRef().GetName() != want[i].childRef {
|
||||
if e.GetSubchannelRef().GetSubchannelId() != want[i].childID || e.GetSubchannelRef().GetName() != want[i].childRef {
|
||||
t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", want[i].childID, want[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
|
||||
}
|
||||
}
|
||||
}
|
||||
resp, _ = svr.GetChannel(context.Background(), &channelzpb.GetChannelRequest{ChannelId: ids[1]})
|
||||
metrics = resp.GetChannel()
|
||||
nestedChans = metrics.GetChannelRef()
|
||||
if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3] {
|
||||
t.Fatalf("GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans)
|
||||
t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3], Name: refNames[3]}}, nestedChans)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetSubChannel(t *testing.T) {
|
||||
var (
|
||||
subchanCreated = "SubChannel Created"
|
||||
subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
|
||||
subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
|
||||
)
|
||||
channelz.NewChannelzStorage()
|
||||
refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
|
||||
ids := make([]int64, 4)
|
||||
ids[0] = channelz.RegisterChannel(&dummyChannel{}, 0, refNames[0])
|
||||
channelz.AddTraceEvent(ids[0], &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
ids[1] = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
|
||||
channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{
|
||||
Desc: subchanCreated,
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0]),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
ids[2] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
|
||||
ids[3] = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
|
||||
channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{
|
||||
Desc: subchanConnectivityChange,
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
channelz.AddTraceEvent(ids[1], &channelz.TraceEventDesc{
|
||||
Desc: subChanPickNewAddress,
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
svr := newCZServer()
|
||||
resp, _ := svr.GetSubchannel(context.Background(), &channelzpb.GetSubchannelRequest{SubchannelId: ids[1]})
|
||||
metrics := resp.GetSubchannel()
|
||||
|
@ -445,7 +537,35 @@ func TestGetSubChannel(t *testing.T) {
|
|||
ids[3]: refNames[3],
|
||||
}
|
||||
if !reflect.DeepEqual(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
|
||||
t.Fatalf("GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
|
||||
t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
|
||||
}
|
||||
|
||||
trace := metrics.GetData().GetTrace()
|
||||
wantTrace := []struct {
|
||||
desc string
|
||||
severity channelzpb.ChannelTraceEvent_Severity
|
||||
childID int64
|
||||
childRef string
|
||||
}{
|
||||
{desc: subchanCreated, severity: channelzpb.ChannelTraceEvent_CT_INFO},
|
||||
{desc: subchanConnectivityChange, severity: channelzpb.ChannelTraceEvent_CT_INFO},
|
||||
{desc: subChanPickNewAddress, severity: channelzpb.ChannelTraceEvent_CT_INFO},
|
||||
}
|
||||
for i, e := range trace.Events {
|
||||
if e.GetDescription() != wantTrace[i].desc {
|
||||
t.Fatalf("trace: GetDescription want %#v, got %#v", wantTrace[i].desc, e.GetDescription())
|
||||
}
|
||||
if e.GetSeverity() != wantTrace[i].severity {
|
||||
t.Fatalf("trace: GetSeverity want %#v, got %#v", wantTrace[i].severity, e.GetSeverity())
|
||||
}
|
||||
if wantTrace[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
|
||||
t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
|
||||
}
|
||||
if e.GetChannelRef().GetChannelId() != wantTrace[i].childID || e.GetChannelRef().GetName() != wantTrace[i].childRef {
|
||||
if e.GetSubchannelRef().GetSubchannelId() != wantTrace[i].childID || e.GetSubchannelRef().GetName() != wantTrace[i].childRef {
|
||||
t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", wantTrace[i].childID, wantTrace[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
109
clientconn.go
109
clientconn.go
|
@ -137,9 +137,22 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|||
if channelz.IsOn() {
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Channel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
cc.csMgr.channelzID = cc.channelzID
|
||||
}
|
||||
|
||||
if !cc.dopts.insecure {
|
||||
|
@ -308,6 +321,7 @@ type connectivityStateManager struct {
|
|||
mu sync.Mutex
|
||||
state connectivity.State
|
||||
notifyChan chan struct{}
|
||||
channelzID int64
|
||||
}
|
||||
|
||||
// updateState updates the connectivity.State of ClientConn.
|
||||
|
@ -323,6 +337,12 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
|
|||
return
|
||||
}
|
||||
csm.state = state
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
if csm.notifyChan != nil {
|
||||
// There are other goroutines waiting on this channel.
|
||||
close(csm.notifyChan)
|
||||
|
@ -500,10 +520,26 @@ func (cc *ClientConn) switchBalancer(name string) {
|
|||
}
|
||||
|
||||
builder := balancer.Get(name)
|
||||
// TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
|
||||
// we reuse previous one?
|
||||
if channelz.IsOn() {
|
||||
if builder == nil {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
|
||||
Severity: channelz.CtWarning,
|
||||
})
|
||||
} else {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
}
|
||||
if builder == nil {
|
||||
grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
|
||||
builder = newPickfirstBuilder()
|
||||
}
|
||||
|
||||
cc.preBalancerName = cc.curBalancerName
|
||||
cc.curBalancerName = builder.Name()
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
|
||||
|
@ -541,6 +577,14 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
|||
}
|
||||
if channelz.IsOn() {
|
||||
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel Created",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
}
|
||||
cc.conns[ac] = struct{}{}
|
||||
cc.mu.Unlock()
|
||||
|
@ -605,7 +649,7 @@ func (ac *addrConn) connect() error {
|
|||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
ac.state = connectivity.Connecting
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.mu.Unlock()
|
||||
|
||||
|
@ -690,6 +734,17 @@ func (cc *ClientConn) handleServiceConfig(js string) error {
|
|||
if cc.dopts.disableServiceConfig {
|
||||
return nil
|
||||
}
|
||||
if cc.scRaw == js {
|
||||
return nil
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
|
||||
// The special formatting of \"%s\" instead of %q is to provide nice printing of service config
|
||||
// for human consumption.
|
||||
Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
sc, err := parseServiceConfig(js)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -788,6 +843,19 @@ func (cc *ClientConn) Close() error {
|
|||
ac.tearDown(ErrClientConnClosing)
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
ted := &channelz.TraceEventDesc{
|
||||
Desc: "Channel Deleted",
|
||||
Severity: channelz.CtINFO,
|
||||
}
|
||||
if cc.dopts.channelzParentID != 0 {
|
||||
ted.Parent = &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
}
|
||||
}
|
||||
channelz.AddTraceEvent(cc.channelzID, ted)
|
||||
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
|
||||
// the entity beng deleted, and thus prevent it from being deleted right away.
|
||||
channelz.RemoveEntry(cc.channelzID)
|
||||
}
|
||||
return nil
|
||||
|
@ -807,7 +875,8 @@ type addrConn struct {
|
|||
mu sync.Mutex
|
||||
curAddr resolver.Address
|
||||
reconnectIdx int // The index in addrs list to start reconnecting from.
|
||||
state connectivity.State
|
||||
// Use updateConnectivityState for updating addrConn's connectivity state.
|
||||
state connectivity.State
|
||||
// ready is closed and becomes nil when a new transport is up or failed
|
||||
// due to timeout.
|
||||
ready chan struct{}
|
||||
|
@ -830,6 +899,16 @@ type addrConn struct {
|
|||
czData *channelzData
|
||||
}
|
||||
|
||||
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
|
||||
ac.state = s
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// adjustParams updates parameters used to create transports upon
|
||||
// receiving a GoAway.
|
||||
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
|
||||
|
@ -916,7 +995,7 @@ func (ac *addrConn) resetTransport() error {
|
|||
}
|
||||
ac.printf("connecting")
|
||||
if ac.state != connectivity.Connecting {
|
||||
ac.state = connectivity.Connecting
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
}
|
||||
// copy ac.addrs in case of race
|
||||
|
@ -939,6 +1018,12 @@ func (ac *addrConn) resetTransport() error {
|
|||
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) {
|
||||
for i := ridx; i < len(addrs); i++ {
|
||||
addr := addrs[i]
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
target := transport.TargetInfo{
|
||||
Addr: addr.Addr,
|
||||
Metadata: addr.Metadata,
|
||||
|
@ -999,7 +1084,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
|
|||
return false, errConnClosing
|
||||
}
|
||||
ac.printf("ready")
|
||||
ac.state = connectivity.Ready
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.transport = newTr
|
||||
ac.curAddr = addr
|
||||
|
@ -1025,7 +1110,7 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
|
|||
ac.mu.Unlock()
|
||||
return false, errConnClosing
|
||||
}
|
||||
ac.state = connectivity.TransientFailure
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
if ac.ready != nil {
|
||||
|
@ -1114,7 +1199,7 @@ func (ac *addrConn) transportMonitor() {
|
|||
}
|
||||
// Set connectivity state to TransientFailure before calling
|
||||
// resetTransport. Transition READY->CONNECTING is not valid.
|
||||
ac.state = connectivity.TransientFailure
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||
ac.curAddr = resolver.Address{}
|
||||
|
@ -1175,7 +1260,7 @@ func (ac *addrConn) tearDown(err error) {
|
|||
// address removal and GoAway.
|
||||
ac.transport.GracefulClose()
|
||||
}
|
||||
ac.state = connectivity.Shutdown
|
||||
ac.updateConnectivityState(connectivity.Shutdown)
|
||||
ac.tearDownErr = err
|
||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||
if ac.events != nil {
|
||||
|
@ -1187,6 +1272,16 @@ func (ac *addrConn) tearDown(err error) {
|
|||
ac.ready = nil
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel Deleted",
|
||||
Severity: channelz.CtINFO,
|
||||
Parent: &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
|
||||
Severity: channelz.CtINFO,
|
||||
},
|
||||
})
|
||||
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
|
||||
// the entity beng deleted, and thus prevent it from being deleted right away.
|
||||
channelz.RemoveEntry(ac.channelzID)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,16 +27,22 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxTraceEntry int32 = 30
|
||||
)
|
||||
|
||||
var (
|
||||
db dbWrapper
|
||||
idGen idGenerator
|
||||
// EntryPerPage defines the number of channelz entries to be shown on a web page.
|
||||
EntryPerPage = 50
|
||||
curState int32
|
||||
EntryPerPage = 50
|
||||
curState int32
|
||||
maxTraceEntry = defaultMaxTraceEntry
|
||||
)
|
||||
|
||||
// TurnOn turns on channelz data collection.
|
||||
|
@ -52,6 +58,22 @@ func IsOn() bool {
|
|||
return atomic.CompareAndSwapInt32(&curState, 1, 1)
|
||||
}
|
||||
|
||||
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
|
||||
// Setting it to 0 will disable channel tracing.
|
||||
func SetMaxTraceEntry(i int32) {
|
||||
atomic.StoreInt32(&maxTraceEntry, i)
|
||||
}
|
||||
|
||||
// ResetMaxTraceEntryToDefault resets the the maximum number of trace entry per entity to default.
|
||||
func ResetMaxTraceEntryToDefault() {
|
||||
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
|
||||
}
|
||||
|
||||
func getMaxTraceEntry() int {
|
||||
i := atomic.LoadInt32(&maxTraceEntry)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
// dbWarpper wraps around a reference to internal channelz data storage, and
|
||||
// provide synchronized functionality to set and get the reference.
|
||||
type dbWrapper struct {
|
||||
|
@ -146,6 +168,7 @@ func RegisterChannel(c Channel, pid int64, ref string) int64 {
|
|||
nestedChans: make(map[int64]string),
|
||||
id: id,
|
||||
pid: pid,
|
||||
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
|
||||
}
|
||||
if pid == 0 {
|
||||
db.get().addChannel(id, cn, true, pid, ref)
|
||||
|
@ -170,6 +193,7 @@ func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
|
|||
sockets: make(map[int64]string),
|
||||
id: id,
|
||||
pid: pid,
|
||||
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
|
||||
}
|
||||
db.get().addSubChannel(id, sc, pid, ref)
|
||||
return id
|
||||
|
@ -226,6 +250,24 @@ func RemoveEntry(id int64) {
|
|||
db.get().removeEntry(id)
|
||||
}
|
||||
|
||||
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added
|
||||
// to the channel trace.
|
||||
// The Parent field is optional. It is used for event that will be recorded in the entity's parent
|
||||
// trace also.
|
||||
type TraceEventDesc struct {
|
||||
Desc string
|
||||
Severity Severity
|
||||
Parent *TraceEventDesc
|
||||
}
|
||||
|
||||
// AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
|
||||
func AddTraceEvent(id int64, desc *TraceEventDesc) {
|
||||
if getMaxTraceEntry() == 0 {
|
||||
return
|
||||
}
|
||||
db.get().traceEvent(id, desc)
|
||||
}
|
||||
|
||||
// channelMap is the storage data structure for channelz.
|
||||
// Methods of channelMap can be divided in two two categories with respect to locking.
|
||||
// 1. Methods acquire the global lock.
|
||||
|
@ -251,6 +293,7 @@ func (c *channelMap) addServer(id int64, s *server) {
|
|||
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
|
||||
c.mu.Lock()
|
||||
cn.cm = c
|
||||
cn.trace.cm = c
|
||||
c.channels[id] = cn
|
||||
if isTopChannel {
|
||||
c.topLevelChannels[id] = struct{}{}
|
||||
|
@ -263,6 +306,7 @@ func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid in
|
|||
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
|
||||
c.mu.Lock()
|
||||
sc.cm = c
|
||||
sc.trace.cm = c
|
||||
c.subChannels[id] = sc
|
||||
c.findEntry(pid).addChild(id, sc)
|
||||
c.mu.Unlock()
|
||||
|
@ -284,16 +328,25 @@ func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref
|
|||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// removeEntry triggers the removal of an entry, which may not indeed delete the
|
||||
// entry, if it has to wait on the deletion of its children, or may lead to a chain
|
||||
// of entry deletion. For example, deleting the last socket of a gracefully shutting
|
||||
// down server will lead to the server being also deleted.
|
||||
// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
|
||||
// wait on the deletion of its children and until no other entity's channel trace references it.
|
||||
// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
|
||||
// shutting down server will lead to the server being also deleted.
|
||||
func (c *channelMap) removeEntry(id int64) {
|
||||
c.mu.Lock()
|
||||
c.findEntry(id).triggerDelete()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// c.mu must be held by the caller
|
||||
func (c *channelMap) decrTraceRefCount(id int64) {
|
||||
e := c.findEntry(id)
|
||||
if v, ok := e.(tracedChannel); ok {
|
||||
v.decrTraceRefCount()
|
||||
e.deleteSelfIfReady()
|
||||
}
|
||||
}
|
||||
|
||||
// c.mu must be held by the caller.
|
||||
func (c *channelMap) findEntry(id int64) entry {
|
||||
var v entry
|
||||
|
@ -347,6 +400,39 @@ func (c *channelMap) deleteEntry(id int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
|
||||
c.mu.Lock()
|
||||
child := c.findEntry(id)
|
||||
childTC, ok := child.(tracedChannel)
|
||||
if !ok {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
|
||||
if desc.Parent != nil {
|
||||
parent := c.findEntry(child.getParentID())
|
||||
var chanType RefChannelType
|
||||
switch child.(type) {
|
||||
case *channel:
|
||||
chanType = RefChannel
|
||||
case *subChannel:
|
||||
chanType = RefSubChannel
|
||||
}
|
||||
if parentTC, ok := parent.(tracedChannel); ok {
|
||||
parentTC.getChannelTrace().append(&TraceEvent{
|
||||
Desc: desc.Parent.Desc,
|
||||
Severity: desc.Parent.Severity,
|
||||
Timestamp: time.Now(),
|
||||
RefID: id,
|
||||
RefName: childTC.getRefName(),
|
||||
RefType: chanType,
|
||||
})
|
||||
childTC.incrTraceRefCount()
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
type int64Slice []int64
|
||||
|
||||
func (s int64Slice) Len() int { return len(s) }
|
||||
|
@ -408,6 +494,7 @@ func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
|
|||
t[i].ChannelData = cn.c.ChannelzMetric()
|
||||
t[i].ID = cn.id
|
||||
t[i].RefName = cn.refName
|
||||
t[i].Trace = cn.trace.dumpData()
|
||||
}
|
||||
return t, end
|
||||
}
|
||||
|
@ -470,7 +557,7 @@ func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric,
|
|||
for k := range svrskts {
|
||||
ids = append(ids, k)
|
||||
}
|
||||
sort.Sort((int64Slice(ids)))
|
||||
sort.Sort(int64Slice(ids))
|
||||
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
|
||||
count := 0
|
||||
var end bool
|
||||
|
@ -518,6 +605,7 @@ func (c *channelMap) GetChannel(id int64) *ChannelMetric {
|
|||
cm.ChannelData = cn.c.ChannelzMetric()
|
||||
cm.ID = cn.id
|
||||
cm.RefName = cn.refName
|
||||
cm.Trace = cn.trace.dumpData()
|
||||
return cm
|
||||
}
|
||||
|
||||
|
@ -536,6 +624,7 @@ func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
|
|||
cm.ChannelData = sc.c.ChannelzMetric()
|
||||
cm.ID = sc.id
|
||||
cm.RefName = sc.refName
|
||||
cm.Trace = sc.trace.dumpData()
|
||||
return cm
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ package channelz
|
|||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/connectivity"
|
||||
|
@ -40,6 +42,8 @@ type entry interface {
|
|||
// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child
|
||||
// list is now empty. If both conditions are met, then delete self from database.
|
||||
deleteSelfIfReady()
|
||||
// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
|
||||
getParentID() int64
|
||||
}
|
||||
|
||||
// dummyEntry is a fake entry to handle entry not found case.
|
||||
|
@ -73,6 +77,10 @@ func (*dummyEntry) deleteSelfIfReady() {
|
|||
// code should not reach here. deleteSelfIfReady is always called on an existing entry.
|
||||
}
|
||||
|
||||
func (*dummyEntry) getParentID() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// ChannelMetric defines the info channelz provides for a specific Channel, which
|
||||
// includes ChannelInternalMetric and channelz-specific data, such as channelz id,
|
||||
// child list, etc.
|
||||
|
@ -95,6 +103,8 @@ type ChannelMetric struct {
|
|||
// Note current grpc implementation doesn't allow channel having sockets directly,
|
||||
// therefore, this is field is unused.
|
||||
Sockets map[int64]string
|
||||
// Trace contains the most recent traced events.
|
||||
Trace *ChannelTrace
|
||||
}
|
||||
|
||||
// SubChannelMetric defines the info channelz provides for a specific SubChannel,
|
||||
|
@ -121,6 +131,8 @@ type SubChannelMetric struct {
|
|||
// Sockets tracks the socket type children of this subchannel in the format of a map
|
||||
// from socket channelz id to corresponding reference string.
|
||||
Sockets map[int64]string
|
||||
// Trace contains the most recent traced events.
|
||||
Trace *ChannelTrace
|
||||
}
|
||||
|
||||
// ChannelInternalMetric defines the struct that the implementor of Channel interface
|
||||
|
@ -138,7 +150,35 @@ type ChannelInternalMetric struct {
|
|||
CallsFailed int64
|
||||
// The last time a call was started on the channel.
|
||||
LastCallStartedTimestamp time.Time
|
||||
//TODO: trace
|
||||
}
|
||||
|
||||
// ChannelTrace stores traced events on a channel/subchannel and related info.
|
||||
type ChannelTrace struct {
|
||||
// EventNum is the number of events that ever got traced (i.e. including those that have been deleted)
|
||||
EventNum int64
|
||||
// CreationTime is the creation time of the trace.
|
||||
CreationTime time.Time
|
||||
// Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the
|
||||
// oldest one)
|
||||
Events []*TraceEvent
|
||||
}
|
||||
|
||||
// TraceEvent represent a single trace event
|
||||
type TraceEvent struct {
|
||||
// Desc is a simple description of the trace event.
|
||||
Desc string
|
||||
// Severity states the severity of this trace event.
|
||||
Severity Severity
|
||||
// Timestamp is the event time.
|
||||
Timestamp time.Time
|
||||
// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
|
||||
// involved in this event.
|
||||
// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
|
||||
RefID int64
|
||||
// RefName is the reference name for the entity that gets referenced in the event.
|
||||
RefName string
|
||||
// RefType indicates the referenced entity type, i.e Channel or SubChannel.
|
||||
RefType RefChannelType
|
||||
}
|
||||
|
||||
// Channel is the interface that should be satisfied in order to be tracked by
|
||||
|
@ -147,6 +187,12 @@ type Channel interface {
|
|||
ChannelzMetric() *ChannelInternalMetric
|
||||
}
|
||||
|
||||
type dummyChannel struct{}
|
||||
|
||||
func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric {
|
||||
return &ChannelInternalMetric{}
|
||||
}
|
||||
|
||||
type channel struct {
|
||||
refName string
|
||||
c Channel
|
||||
|
@ -156,6 +202,10 @@ type channel struct {
|
|||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
trace *channelTrace
|
||||
// traceRefCount is the number of trace events that reference this channel.
|
||||
// Non-zero traceRefCount means the trace of this channel cannot be deleted.
|
||||
traceRefCount int32
|
||||
}
|
||||
|
||||
func (c *channel) addChild(id int64, e entry) {
|
||||
|
@ -180,25 +230,96 @@ func (c *channel) triggerDelete() {
|
|||
c.deleteSelfIfReady()
|
||||
}
|
||||
|
||||
func (c *channel) deleteSelfIfReady() {
|
||||
func (c *channel) getParentID() int64 {
|
||||
return c.pid
|
||||
}
|
||||
|
||||
// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
|
||||
// deleting the channel reference from its parent's child list.
|
||||
//
|
||||
// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
|
||||
// corresponding grpc object has been invoked, and the channel does not have any children left.
|
||||
//
|
||||
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
|
||||
func (c *channel) deleteSelfFromTree() (deleted bool) {
|
||||
if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
|
||||
return
|
||||
return false
|
||||
}
|
||||
c.cm.deleteEntry(c.id)
|
||||
// not top channel
|
||||
if c.pid != 0 {
|
||||
c.cm.findEntry(c.pid).deleteChild(c.id)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
|
||||
// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
|
||||
// channel, and its memory will be garbage collected.
|
||||
//
|
||||
// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
|
||||
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
|
||||
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
|
||||
// by grpc, the reference to the grpc object is reset to a dummy object.
|
||||
//
|
||||
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
|
||||
//
|
||||
// It returns a bool to indicate whether the channel can be safely deleted from map.
|
||||
func (c *channel) deleteSelfFromMap() (delete bool) {
|
||||
if c.getTraceRefCount() != 0 {
|
||||
c.c = &dummyChannel{}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
|
||||
// parent's child list.
|
||||
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
|
||||
// will return entry not found error.
|
||||
func (c *channel) deleteSelfIfReady() {
|
||||
if !c.deleteSelfFromTree() {
|
||||
return
|
||||
}
|
||||
if !c.deleteSelfFromMap() {
|
||||
return
|
||||
}
|
||||
c.cm.deleteEntry(c.id)
|
||||
c.trace.clear()
|
||||
}
|
||||
|
||||
func (c *channel) getChannelTrace() *channelTrace {
|
||||
return c.trace
|
||||
}
|
||||
|
||||
func (c *channel) incrTraceRefCount() {
|
||||
atomic.AddInt32(&c.traceRefCount, 1)
|
||||
}
|
||||
|
||||
func (c *channel) decrTraceRefCount() {
|
||||
atomic.AddInt32(&c.traceRefCount, -1)
|
||||
}
|
||||
|
||||
func (c *channel) getTraceRefCount() int {
|
||||
i := atomic.LoadInt32(&c.traceRefCount)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
func (c *channel) getRefName() string {
|
||||
return c.refName
|
||||
}
|
||||
|
||||
type subChannel struct {
|
||||
refName string
|
||||
c Channel
|
||||
closeCalled bool
|
||||
sockets map[int64]string
|
||||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
refName string
|
||||
c Channel
|
||||
closeCalled bool
|
||||
sockets map[int64]string
|
||||
id int64
|
||||
pid int64
|
||||
cm *channelMap
|
||||
trace *channelTrace
|
||||
traceRefCount int32
|
||||
}
|
||||
|
||||
func (sc *subChannel) addChild(id int64, e entry) {
|
||||
|
@ -219,12 +340,82 @@ func (sc *subChannel) triggerDelete() {
|
|||
sc.deleteSelfIfReady()
|
||||
}
|
||||
|
||||
func (sc *subChannel) deleteSelfIfReady() {
|
||||
func (sc *subChannel) getParentID() int64 {
|
||||
return sc.pid
|
||||
}
|
||||
|
||||
// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
|
||||
// means deleting the subchannel reference from its parent's child list.
|
||||
//
|
||||
// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
|
||||
// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
|
||||
//
|
||||
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
|
||||
func (sc *subChannel) deleteSelfFromTree() (deleted bool) {
|
||||
if !sc.closeCalled || len(sc.sockets) != 0 {
|
||||
return false
|
||||
}
|
||||
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
|
||||
// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
|
||||
// the subchannel, and its memory will be garbage collected.
|
||||
//
|
||||
// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
|
||||
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
|
||||
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
|
||||
// by grpc, the reference to the grpc object is reset to a dummy object.
|
||||
//
|
||||
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
|
||||
//
|
||||
// It returns a bool to indicate whether the channel can be safely deleted from map.
|
||||
func (sc *subChannel) deleteSelfFromMap() (delete bool) {
|
||||
if sc.getTraceRefCount() != 0 {
|
||||
// free the grpc struct (i.e. addrConn)
|
||||
sc.c = &dummyChannel{}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
|
||||
// The delete process includes two steps:
|
||||
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
|
||||
// its parent's child list.
|
||||
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
|
||||
// by id will return entry not found error.
|
||||
func (sc *subChannel) deleteSelfIfReady() {
|
||||
if !sc.deleteSelfFromTree() {
|
||||
return
|
||||
}
|
||||
if !sc.deleteSelfFromMap() {
|
||||
return
|
||||
}
|
||||
sc.cm.deleteEntry(sc.id)
|
||||
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
|
||||
sc.trace.clear()
|
||||
}
|
||||
|
||||
func (sc *subChannel) getChannelTrace() *channelTrace {
|
||||
return sc.trace
|
||||
}
|
||||
|
||||
func (sc *subChannel) incrTraceRefCount() {
|
||||
atomic.AddInt32(&sc.traceRefCount, 1)
|
||||
}
|
||||
|
||||
func (sc *subChannel) decrTraceRefCount() {
|
||||
atomic.AddInt32(&sc.traceRefCount, -1)
|
||||
}
|
||||
|
||||
func (sc *subChannel) getTraceRefCount() int {
|
||||
i := atomic.LoadInt32(&sc.traceRefCount)
|
||||
return int(i)
|
||||
}
|
||||
|
||||
func (sc *subChannel) getRefName() string {
|
||||
return sc.refName
|
||||
}
|
||||
|
||||
// SocketMetric defines the info channelz provides for a specific Socket, which
|
||||
|
@ -318,6 +509,10 @@ func (ls *listenSocket) deleteSelfIfReady() {
|
|||
grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket")
|
||||
}
|
||||
|
||||
func (ls *listenSocket) getParentID() int64 {
|
||||
return ls.pid
|
||||
}
|
||||
|
||||
type normalSocket struct {
|
||||
refName string
|
||||
s Socket
|
||||
|
@ -343,6 +538,10 @@ func (ns *normalSocket) deleteSelfIfReady() {
|
|||
grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket")
|
||||
}
|
||||
|
||||
func (ns *normalSocket) getParentID() int64 {
|
||||
return ns.pid
|
||||
}
|
||||
|
||||
// ServerMetric defines the info channelz provides for a specific Server, which
|
||||
// includes ServerInternalMetric and channelz-specific data, such as channelz id,
|
||||
// child list, etc.
|
||||
|
@ -370,7 +569,6 @@ type ServerInternalMetric struct {
|
|||
CallsFailed int64
|
||||
// The last time a call was started on the server.
|
||||
LastCallStartedTimestamp time.Time
|
||||
//TODO: trace
|
||||
}
|
||||
|
||||
// Server is the interface to be satisfied in order to be tracked by channelz as
|
||||
|
@ -417,3 +615,88 @@ func (s *server) deleteSelfIfReady() {
|
|||
}
|
||||
s.cm.deleteEntry(s.id)
|
||||
}
|
||||
|
||||
func (s *server) getParentID() int64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
type tracedChannel interface {
|
||||
getChannelTrace() *channelTrace
|
||||
incrTraceRefCount()
|
||||
decrTraceRefCount()
|
||||
getRefName() string
|
||||
}
|
||||
|
||||
type channelTrace struct {
|
||||
cm *channelMap
|
||||
createdTime time.Time
|
||||
eventCount int64
|
||||
mu sync.Mutex
|
||||
events []*TraceEvent
|
||||
}
|
||||
|
||||
func (c *channelTrace) append(e *TraceEvent) {
|
||||
c.mu.Lock()
|
||||
if len(c.events) == getMaxTraceEntry() {
|
||||
del := c.events[0]
|
||||
c.events = c.events[1:]
|
||||
if del.RefID != 0 {
|
||||
// start recursive cleanup in a goroutine to not block the call originated from grpc.
|
||||
go func() {
|
||||
// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
|
||||
c.cm.mu.Lock()
|
||||
c.cm.decrTraceRefCount(del.RefID)
|
||||
c.cm.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
}
|
||||
e.Timestamp = time.Now()
|
||||
c.events = append(c.events, e)
|
||||
c.eventCount++
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *channelTrace) clear() {
|
||||
c.mu.Lock()
|
||||
for _, e := range c.events {
|
||||
if e.RefID != 0 {
|
||||
// caller should have already held the c.cm.mu lock.
|
||||
c.cm.decrTraceRefCount(e.RefID)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Severity is the severity level of a trace event.
|
||||
// The canonical enumeration of all valid values is here:
|
||||
// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
|
||||
type Severity int
|
||||
|
||||
const (
|
||||
// CtUNKNOWN indicates unknown severity of a trace event.
|
||||
CtUNKNOWN Severity = iota
|
||||
// CtINFO indicates info level severity of a trace event.
|
||||
CtINFO
|
||||
// CtWarning indicates warning level severity of a trace event.
|
||||
CtWarning
|
||||
// CtError indicates error level severity of a trace event.
|
||||
CtError
|
||||
)
|
||||
|
||||
// RefChannelType is the type of the entity being referenced in a trace event.
|
||||
type RefChannelType int
|
||||
|
||||
const (
|
||||
// RefChannel indicates the referenced entity is a Channel.
|
||||
RefChannel RefChannelType = iota
|
||||
// RefSubChannel indicates the referenced entity is a SubChannel.
|
||||
RefSubChannel
|
||||
)
|
||||
|
||||
func (c *channelTrace) dumpData() *ChannelTrace {
|
||||
c.mu.Lock()
|
||||
ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}
|
||||
ct.Events = c.events[:len(c.events)]
|
||||
c.mu.Unlock()
|
||||
return ct
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err er
|
|||
if b.sc == nil {
|
||||
b.sc, err = b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
|
||||
if err != nil {
|
||||
//TODO(yuxuanli): why not change the cc state to Idle?
|
||||
grpclog.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -23,17 +23,19 @@ import (
|
|||
"strings"
|
||||
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
// ccResolverWrapper is a wrapper on top of cc for resolvers.
|
||||
// It implements resolver.ClientConnection interface.
|
||||
type ccResolverWrapper struct {
|
||||
cc *ClientConn
|
||||
resolver resolver.Resolver
|
||||
addrCh chan []resolver.Address
|
||||
scCh chan string
|
||||
done chan struct{}
|
||||
cc *ClientConn
|
||||
resolver resolver.Resolver
|
||||
addrCh chan []resolver.Address
|
||||
scCh chan string
|
||||
done chan struct{}
|
||||
lastAddressesCount int
|
||||
}
|
||||
|
||||
// split2 returns the values from strings.SplitN(s, sep, 2).
|
||||
|
@ -114,6 +116,9 @@ func (ccr *ccResolverWrapper) watcher() {
|
|||
default:
|
||||
}
|
||||
grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
|
||||
if channelz.IsOn() {
|
||||
ccr.addChannelzTraceEvent(addrs)
|
||||
}
|
||||
ccr.cc.handleResolvedAddrs(addrs, nil)
|
||||
case sc := <-ccr.scCh:
|
||||
select {
|
||||
|
@ -156,3 +161,29 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
|
|||
}
|
||||
ccr.scCh <- sc
|
||||
}
|
||||
|
||||
func (ccr *ccResolverWrapper) addChannelzTraceEvent(addrs []resolver.Address) {
|
||||
if len(addrs) == 0 && ccr.lastAddressesCount != 0 {
|
||||
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Resolver returns an empty address list",
|
||||
Severity: channelz.CtWarning,
|
||||
})
|
||||
} else if len(addrs) != 0 && ccr.lastAddressesCount == 0 {
|
||||
var s string
|
||||
for i, a := range addrs {
|
||||
if a.ServerName != "" {
|
||||
s += a.Addr + "(" + a.ServerName + ")"
|
||||
} else {
|
||||
s += a.Addr
|
||||
}
|
||||
if i != len(addrs)-1 {
|
||||
s += " "
|
||||
}
|
||||
}
|
||||
channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: fmt.Sprintf("Resolver returns a non-empty address list (previous one was empty) %q", s),
|
||||
Severity: channelz.CtINFO,
|
||||
})
|
||||
}
|
||||
ccr.lastAddressesCount = len(addrs)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue