Channelz: Entity Registration and Deletion (#1811)

This commit is contained in:
lyuxuan 2018-04-09 11:13:06 -07:00 committed by GitHub
parent 7316918402
commit 7f73c863c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1535 additions and 13 deletions

View File

@ -126,6 +126,8 @@ type BuildOptions struct {
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error)
// ChannelzParentID is the entity parent's channelz unique identification number.
ChannelzParentID int64
}
// Builder creates a balancer.

573
channelz/funcs.go Normal file
View File

@ -0,0 +1,573 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package channelz defines APIs for enabling channelz service, entry
// registration/deletion, and accessing channelz data. It also defines channelz
// metric struct formats.
//
// All APIs in this package are experimental.
package channelz
import (
"sort"
"sync"
"sync/atomic"
"google.golang.org/grpc/grpclog"
)
var (
db dbWrapper
idGen idGenerator
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = 50
curState int32
)
// TurnOn turns on channelz data collection.
func TurnOn() {
if !IsOn() {
NewChannelzStorage()
atomic.StoreInt32(&curState, 1)
}
}
// IsOn returns whether channelz data collection is on.
func IsOn() bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
}
// dbWarpper wraps around a reference to internal channelz data storage, and
// provide synchronized functionality to set and get the reference.
type dbWrapper struct {
mu sync.RWMutex
DB *channelMap
}
func (d *dbWrapper) set(db *channelMap) {
d.mu.Lock()
d.DB = db
d.mu.Unlock()
}
func (d *dbWrapper) get() *channelMap {
d.mu.RLock()
defer d.mu.RUnlock()
return d.DB
}
// NewChannelzStorage initializes channelz data storage and id generator.
//
// Note: This function is exported for testing purpose only. User should not call
// it in most cases.
func NewChannelzStorage() {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
listenSockets: make(map[int64]*listenSocket),
normalSockets: make(map[int64]*normalSocket),
servers: make(map[int64]*server),
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id)
}
// GetServers returns a slice of server's ServerMetric, along with a
// boolean indicating whether there's more servers to be queried for.
//
// The arg id specifies that only server with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetServers(id int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
// SocketMetric, along with a boolean indicating whether there's more sockets to
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
// included in the result. The returned slice is up to a length of EntryPerPage,
// and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
func GetChannel(id int64) *ChannelMetric {
return db.get().GetChannel(id)
}
// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
func GetSubChannel(id int64) *SubChannelMetric {
return db.get().GetSubChannel(id)
}
// GetSocket returns the SocketInternalMetric for the socket (identified by id).
func GetSocket(id int64) *SocketMetric {
return db.get().GetSocket(id)
}
// RegisterChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). pid = 0 means no parent. It returns the unique channelz tracking id
// assigned to this channel.
func RegisterChannel(c Channel, pid int64, ref string) int64 {
id := idGen.genID()
cn := &channel{
refName: ref,
c: c,
subChans: make(map[int64]string),
nestedChans: make(map[int64]string),
id: id,
pid: pid,
}
if pid == 0 {
db.get().addChannel(id, cn, true, pid, ref)
} else {
db.get().addChannel(id, cn, false, pid, ref)
}
return id
}
// RegisterSubChannel registers the given channel c in channelz database with ref
// as its reference name, and add it to the child list of its parent (identified
// by pid). It returns the unique channelz tracking id assigned to this subchannel.
func RegisterSubChannel(c Channel, pid int64, ref string) int64 {
if pid == 0 {
grpclog.Error("a SubChannel's parent id cannot be 0")
return 0
}
id := idGen.genID()
sc := &subChannel{
refName: ref,
c: c,
sockets: make(map[int64]string),
id: id,
pid: pid,
}
db.get().addSubChannel(id, sc, pid, ref)
return id
}
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
func RegisterServer(s Server, ref string) int64 {
id := idGen.genID()
svr := &server{
refName: ref,
s: s,
sockets: make(map[int64]string),
listenSockets: make(map[int64]string),
id: id,
}
db.get().addServer(id, svr)
return id
}
// RegisterListenSocket registers the given listen socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this listen socket.
func RegisterListenSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
grpclog.Error("a ListenSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addListenSocket(id, ls, pid, ref)
return id
}
// RegisterNormalSocket registers the given normal socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this normal socket.
func RegisterNormalSocket(s Socket, pid int64, ref string) int64 {
if pid == 0 {
grpclog.Error("a NormalSocket's parent id cannot be 0")
return 0
}
id := idGen.genID()
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid}
db.get().addNormalSocket(id, ns, pid, ref)
return id
}
// RemoveEntry removes an entry with unique channelz trakcing id to be id from
// channelz database.
func RemoveEntry(id int64) {
db.get().removeEntry(id)
}
// channelMap is the storage data structure for channelz.
// Methods of channelMap can be divided in two two categories with respect to locking.
// 1. Methods acquire the global lock.
// 2. Methods that can only be called when global lock is held.
// A second type of method need always to be called inside a first type of method.
type channelMap struct {
mu sync.RWMutex
topLevelChannels map[int64]struct{}
servers map[int64]*server
channels map[int64]*channel
subChannels map[int64]*subChannel
listenSockets map[int64]*listenSocket
normalSockets map[int64]*normalSocket
}
func (c *channelMap) addServer(id int64, s *server) {
c.mu.Lock()
s.cm = c
c.servers[id] = s
c.mu.Unlock()
}
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) {
c.mu.Lock()
cn.cm = c
c.channels[id] = cn
if isTopChannel {
c.topLevelChannels[id] = struct{}{}
} else {
c.findEntry(pid).addChild(id, cn)
}
c.mu.Unlock()
}
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) {
c.mu.Lock()
sc.cm = c
c.subChannels[id] = sc
c.findEntry(pid).addChild(id, sc)
c.mu.Unlock()
}
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) {
c.mu.Lock()
ls.cm = c
c.listenSockets[id] = ls
c.findEntry(pid).addChild(id, ls)
c.mu.Unlock()
}
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) {
c.mu.Lock()
ns.cm = c
c.normalSockets[id] = ns
c.findEntry(pid).addChild(id, ns)
c.mu.Unlock()
}
// removeEntry triggers the removal of an entry, which may not indeed delete the
// entry, if it has to wait on the deletion of its children, or may lead to a chain
// of entry deletion. For example, deleting the last socket of a gracefully shutting
// down server will lead to the server being also deleted.
func (c *channelMap) removeEntry(id int64) {
c.mu.Lock()
c.findEntry(id).triggerDelete()
c.mu.Unlock()
}
// c.mu must be held by the caller.
func (c *channelMap) findEntry(id int64) entry {
var v entry
var ok bool
if v, ok = c.channels[id]; ok {
return v
}
if v, ok = c.subChannels[id]; ok {
return v
}
if v, ok = c.servers[id]; ok {
return v
}
if v, ok = c.listenSockets[id]; ok {
return v
}
if v, ok = c.normalSockets[id]; ok {
return v
}
return &dummyEntry{idNotFound: id}
}
// c.mu must be held by the caller
// deleteEntry simply deletes an entry from the channelMap. Before calling this
// method, caller must check this entry is ready to be deleted, i.e removeEntry()
// has been called on it, and no children still exist.
// Conditionals are ordered by the expected frequency of deletion of each entity
// type, in order to optimize performance.
func (c *channelMap) deleteEntry(id int64) {
var ok bool
if _, ok = c.normalSockets[id]; ok {
delete(c.normalSockets, id)
return
}
if _, ok = c.subChannels[id]; ok {
delete(c.subChannels, id)
return
}
if _, ok = c.channels[id]; ok {
delete(c.channels, id)
delete(c.topLevelChannels, id)
return
}
if _, ok = c.listenSockets[id]; ok {
delete(c.listenSockets, id)
return
}
if _, ok = c.servers[id]; ok {
delete(c.servers, id)
return
}
}
type int64Slice []int64
func (s int64Slice) Len() int { return len(s) }
func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
func copyMap(m map[int64]string) map[int64]string {
n := make(map[int64]string)
for k, v := range m {
n[k] = v
}
return n
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
c.mu.RLock()
l := len(c.topLevelChannels)
ids := make([]int64, 0, l)
cns := make([]*channel, 0, min(l, EntryPerPage))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
break
}
if cn, ok := c.channels[v]; ok {
cns = append(cns, cn)
t = append(t, &ChannelMetric{
NestedChans: copyMap(cn.nestedChans),
SubChans: copyMap(cn.subChans),
})
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
for i, cn := range cns {
t[i].ChannelData = cn.c.ChannelzMetric()
t[i].ID = cn.id
t[i].RefName = cn.refName
}
return t, end
}
func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
c.mu.RLock()
l := len(c.servers)
ids := make([]int64, 0, l)
ss := make([]*server, 0, min(l, EntryPerPage))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
break
}
if svr, ok := c.servers[v]; ok {
ss = append(ss, svr)
s = append(s, &ServerMetric{
ListenSockets: copyMap(svr.listenSockets),
})
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
for i, svr := range ss {
s[i].ServerData = svr.s.ChannelzMetric()
s[i].ID = svr.id
s[i].RefName = svr.refName
}
return s, end
}
func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
var svr *server
var ok bool
c.mu.RLock()
if svr, ok = c.servers[id]; !ok {
// server with id doesn't exist.
c.mu.RUnlock()
return nil, true
}
svrskts := svr.sockets
l := len(svrskts)
ids := make([]int64, 0, l)
sks := make([]*normalSocket, 0, min(l, EntryPerPage))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort((int64Slice(ids)))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
var end bool
for i, v := range ids[idx:] {
if count == EntryPerPage {
break
}
if ns, ok := c.normalSockets[v]; ok {
sks = append(sks, ns)
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
var s []*SocketMetric
for _, ns := range sks {
sm := &SocketMetric{}
sm.SocketData = ns.s.ChannelzMetric()
sm.ID = ns.id
sm.RefName = ns.refName
s = append(s, sm)
}
return s, end
}
func (c *channelMap) GetChannel(id int64) *ChannelMetric {
cm := &ChannelMetric{}
var cn *channel
var ok bool
c.mu.RLock()
if cn, ok = c.channels[id]; !ok {
// channel with id doesn't exist.
c.mu.RUnlock()
return nil
}
cm.NestedChans = copyMap(cn.nestedChans)
cm.SubChans = copyMap(cn.subChans)
c.mu.RUnlock()
cm.ChannelData = cn.c.ChannelzMetric()
cm.ID = cn.id
cm.RefName = cn.refName
return cm
}
func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
cm := &SubChannelMetric{}
var sc *subChannel
var ok bool
c.mu.RLock()
if sc, ok = c.subChannels[id]; !ok {
// subchannel with id doesn't exist.
c.mu.RUnlock()
return nil
}
cm.Sockets = copyMap(sc.sockets)
c.mu.RUnlock()
cm.ChannelData = sc.c.ChannelzMetric()
cm.ID = sc.id
cm.RefName = sc.refName
return cm
}
func (c *channelMap) GetSocket(id int64) *SocketMetric {
sm := &SocketMetric{}
c.mu.RLock()
if ls, ok := c.listenSockets[id]; ok {
c.mu.RUnlock()
sm.SocketData = ls.s.ChannelzMetric()
sm.ID = ls.id
sm.RefName = ls.refName
return sm
}
if ns, ok := c.normalSockets[id]; ok {
c.mu.RUnlock()
sm.SocketData = ns.s.ChannelzMetric()
sm.ID = ns.id
sm.RefName = ns.refName
return sm
}
c.mu.RUnlock()
return nil
}
type idGenerator struct {
id int64
}
func (i *idGenerator) reset() {
atomic.StoreInt64(&i.id, 0)
}
func (i *idGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}

415
channelz/types.go Normal file
View File

@ -0,0 +1,415 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"net"
"time"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
)
// entry represents a node in the channelz database.
type entry interface {
// addChild adds a child e, whose channelz id is id to child list
addChild(id int64, e entry)
// deleteChild deletes a child with channelz id to be id from child list
deleteChild(id int64)
// triggerDelete tries to delete self from channelz database. However, if child
// list is not empty, then deletion from the database is on hold until the last
// child is deleted from database.
triggerDelete()
// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child
// list is now empty. If both conditions are met, then delete self from database.
deleteSelfIfReady()
}
// dummyEntry is a fake entry to handle entry not found case.
type dummyEntry struct {
idNotFound int64
}
func (d *dummyEntry) addChild(id int64, e entry) {
// Note: It is possible for a normal program to reach here under race condition.
// For example, there could be a race between ClientConn.Close() info being propagated
// to addrConn and http2Client. ClientConn.Close() cancel the context and result
// in http2Client to error. The error info is then caught by transport monitor
// and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore,
// the addrConn will create a new transport. And when registering the new transport in
// channelz, its parent addrConn could have already been torn down and deleted
// from channelz tracking, and thus reach the code here.
grpclog.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
}
func (d *dummyEntry) deleteChild(id int64) {
// It is possible for a normal program to reach here under race condition.
// Refer to the example described in addChild().
grpclog.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
}
func (d *dummyEntry) triggerDelete() {
grpclog.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
}
func (*dummyEntry) deleteSelfIfReady() {
// code should not reach here. deleteSelfIfReady is always called on an existing entry.
}
// ChannelMetric defines the info channelz provides for a specific Channel, which
// includes ChannelInternalMetric and channelz-specific data, such as channelz id,
// child list, etc.
type ChannelMetric struct {
// ID is the channelz id of this channel.
ID int64
// RefName is the human readable reference string of this channel.
RefName string
// ChannelData contains channel internal metric reported by the channel through
// ChannelzMetric().
ChannelData *ChannelInternalMetric
// NestedChans tracks the nested channel type children of this channel in the format of
// a map from nested channel channelz id to corresponding reference string.
NestedChans map[int64]string
// SubChans tracks the subchannel type children of this channel in the format of a
// map from subchannel channelz id to corresponding reference string.
SubChans map[int64]string
// Sockets tracks the socket type children of this channel in the format of a map
// from socket channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow channel having sockets directly,
// therefore, this is field is unused.
Sockets map[int64]string
}
// SubChannelMetric defines the info channelz provides for a specific SubChannel,
// which includes ChannelInternalMetric and channelz-specific data, such as
// channelz id, child list, etc.
type SubChannelMetric struct {
// ID is the channelz id of this subchannel.
ID int64
// RefName is the human readable reference string of this subchannel.
RefName string
// ChannelData contains subchannel internal metric reported by the subchannel
// through ChannelzMetric().
ChannelData *ChannelInternalMetric
// NestedChans tracks the nested channel type children of this subchannel in the format of
// a map from nested channel channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow subchannel to have nested channels
// as children, therefore, this field is unused.
NestedChans map[int64]string
// SubChans tracks the subchannel type children of this subchannel in the format of a
// map from subchannel channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow subchannel to have subchannels
// as children, therefore, this field is unused.
SubChans map[int64]string
// Sockets tracks the socket type children of this subchannel in the format of a map
// from socket channelz id to corresponding reference string.
Sockets map[int64]string
}
// ChannelInternalMetric defines the struct that the implementor of Channel interface
// should return from ChannelzMetric().
type ChannelInternalMetric struct {
// current connectivity state of the channel.
State connectivity.State
// The target this channel originally tried to connect to. May be absent
Target string
// The number of calls started on the channel.
CallsStarted int64
// The number of calls that have completed with an OK status.
CallsSucceeded int64
// The number of calls that have a completed with a non-OK status.
CallsFailed int64
// The last time a call was started on the channel.
LastCallStartedTimestamp time.Time
//TODO: trace
}
// Channel is the interface that should be satisfied in order to be tracked by
// channelz as Channel or SubChannel.
type Channel interface {
ChannelzMetric() *ChannelInternalMetric
}
type channel struct {
refName string
c Channel
closeCalled bool
nestedChans map[int64]string
subChans map[int64]string
id int64
pid int64
cm *channelMap
}
func (c *channel) addChild(id int64, e entry) {
switch v := e.(type) {
case *subChannel:
c.subChans[id] = v.refName
case *channel:
c.nestedChans[id] = v.refName
default:
grpclog.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
}
}
func (c *channel) deleteChild(id int64) {
delete(c.subChans, id)
delete(c.nestedChans, id)
c.deleteSelfIfReady()
}
func (c *channel) triggerDelete() {
c.closeCalled = true
c.deleteSelfIfReady()
}
func (c *channel) deleteSelfIfReady() {
if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
return
}
c.cm.deleteEntry(c.id)
// not top channel
if c.pid != 0 {
c.cm.findEntry(c.pid).deleteChild(c.id)
}
}
type subChannel struct {
refName string
c Channel
closeCalled bool
sockets map[int64]string
id int64
pid int64
cm *channelMap
}
func (sc *subChannel) addChild(id int64, e entry) {
if v, ok := e.(*normalSocket); ok {
sc.sockets[id] = v.refName
} else {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
}
}
func (sc *subChannel) deleteChild(id int64) {
delete(sc.sockets, id)
sc.deleteSelfIfReady()
}
func (sc *subChannel) triggerDelete() {
sc.closeCalled = true
sc.deleteSelfIfReady()
}
func (sc *subChannel) deleteSelfIfReady() {
if !sc.closeCalled || len(sc.sockets) != 0 {
return
}
sc.cm.deleteEntry(sc.id)
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
}
// SocketMetric defines the info channelz provides for a specific Socket, which
// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc.
type SocketMetric struct {
// ID is the channelz id of this socket.
ID int64
// RefName is the human readable reference string of this socket.
RefName string
// SocketData contains socket internal metric reported by the socket through
// ChannelzMetric().
SocketData *SocketInternalMetric
}
// SocketInternalMetric defines the struct that the implementor of Socket interface
// should return from ChannelzMetric().
type SocketInternalMetric struct {
// The number of streams that have been started.
StreamsStarted int64
// The number of streams that have ended successfully with the EoS bit set for
// both end points.
StreamsSucceeded int64
// The number of incoming streams that have a completed with a non-OK status.
StreamsFailed int64
// The number of messages successfully sent on this socket.
MessagesSent int64
MessagesReceived int64
// The number of keep alives sent. This is typically implemented with HTTP/2
// ping messages.
KeepAlivesSent int64
// The last time a stream was created by this endpoint. Usually unset for
// servers.
LastLocalStreamCreatedTimestamp time.Time
// The last time a stream was created by the remote endpoint. Usually unset
// for clients.
LastRemoteStreamCreatedTimestamp time.Time
// The last time a message was sent by this endpoint.
LastMessageSentTimestamp time.Time
// The last time a message was received by this endpoint.
LastMessageReceivedTimestamp time.Time
// The amount of window, granted to the local endpoint by the remote endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
LocalFlowControlWindow int64
// The amount of window, granted to the remote endpoint by the local endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
RemoteFlowControlWindow int64
// The locally bound address.
LocalAddr net.Addr
// The remote bound address. May be absent.
RemoteAddr net.Addr
// Optional, represents the name of the remote endpoint, if different than
// the original target name.
RemoteName string
//TODO: socket options
//TODO: Security
}
// Socket is the interface that should be satisfied in order to be tracked by
// channelz as Socket.
type Socket interface {
ChannelzMetric() *SocketInternalMetric
}
type listenSocket struct {
refName string
s Socket
id int64
pid int64
cm *channelMap
}
func (ls *listenSocket) addChild(id int64, e entry) {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
}
func (ls *listenSocket) deleteChild(id int64) {
grpclog.Errorf("cannot delete a child (id = %d) from a listen socket", id)
}
func (ls *listenSocket) triggerDelete() {
ls.cm.deleteEntry(ls.id)
ls.cm.findEntry(ls.pid).deleteChild(ls.id)
}
func (ls *listenSocket) deleteSelfIfReady() {
grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket")
}
type normalSocket struct {
refName string
s Socket
id int64
pid int64
cm *channelMap
}
func (ns *normalSocket) addChild(id int64, e entry) {
grpclog.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e)
}
func (ns *normalSocket) deleteChild(id int64) {
grpclog.Errorf("cannot delete a child (id = %d) from a normal socket", id)
}
func (ns *normalSocket) triggerDelete() {
ns.cm.deleteEntry(ns.id)
ns.cm.findEntry(ns.pid).deleteChild(ns.id)
}
func (ns *normalSocket) deleteSelfIfReady() {
grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket")
}
// ServerMetric defines the info channelz provides for a specific Server, which
// includes ServerInternalMetric and channelz-specific data, such as channelz id,
// child list, etc.
type ServerMetric struct {
// ID is the channelz id of this server.
ID int64
// RefName is the human readable reference string of this server.
RefName string
// ServerData contains server internal metric reported by the server through
// ChannelzMetric().
ServerData *ServerInternalMetric
// ListenSockets tracks the listener socket type children of this server in the
// format of a map from socket channelz id to corresponding reference string.
ListenSockets map[int64]string
}
// ServerInternalMetric defines the struct that the implementor of Server interface
// should return from ChannelzMetric().
type ServerInternalMetric struct {
// The number of incoming calls started on the server.
CallsStarted int64
// The number of incoming calls that have completed with an OK status.
CallsSucceeded int64
// The number of incoming calls that have a completed with a non-OK status.
CallsFailed int64
// The last time a call was started on the server.
LastCallStartedTimestamp time.Time
//TODO: trace
}
// Server is the interface to be satisfied in order to be tracked by channelz as
// Server.
type Server interface {
ChannelzMetric() *ServerInternalMetric
}
type server struct {
refName string
s Server
closeCalled bool
sockets map[int64]string
listenSockets map[int64]string
id int64
cm *channelMap
}
func (s *server) addChild(id int64, e entry) {
switch v := e.(type) {
case *normalSocket:
s.sockets[id] = v.refName
case *listenSocket:
s.listenSockets[id] = v.refName
default:
grpclog.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
}
}
func (s *server) deleteChild(id int64) {
delete(s.sockets, id)
delete(s.listenSockets, id)
s.deleteSelfIfReady()
}
func (s *server) triggerDelete() {
s.closeCalled = true
s.deleteSelfIfReady()
}
func (s *server) deleteSelfIfReady() {
if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 {
return
}
s.cm.deleteEntry(s.id)
}

View File

@ -32,6 +32,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
@ -109,6 +110,7 @@ type dialOptions struct {
// This is to support grpclb.
resolverBuilder resolver.Builder
waitForHandshake bool
channelzParentID int64
}
const (
@ -116,6 +118,12 @@ const (
defaultClientMaxSendMessageSize = math.MaxInt32
)
// RegisterChannelz turns on channelz service.
// This is an EXPERIMENTAL API.
func RegisterChannelz() {
channelz.TurnOn()
}
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
@ -396,6 +404,14 @@ func WithAuthority(a string) DialOption {
}
}
// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's
// parent. This function is used in nested channel creation (e.g. grpclb dial).
func WithChannelzParentID(id int64) DialOption {
return func(o *dialOptions) {
o.channelzParentID = id
}
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
@ -423,6 +439,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
opt(&cc.dopts)
}
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
} else {
cc.channelzID = channelz.RegisterChannel(cc, 0, target)
}
}
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil {
return nil, errNoTransportSecurity
@ -538,8 +562,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
Dialer: cc.dopts.copts.Dialer,
DialCreds: credsClone,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
}
// Build the resolver.
@ -641,6 +666,8 @@ type ClientConn struct {
preBalancerName string // previous balancer name.
curAddresses []resolver.Address
balancerWrapper *ccBalancerWrapper
channelzID int64 // channelz unique identification number
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@ -804,6 +831,9 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
cc.mu.Unlock()
return nil, ErrClientConnClosing
}
if channelz.IsOn() {
ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
}
cc.conns[ac] = struct{}{}
cc.mu.Unlock()
return ac, nil
@ -822,6 +852,12 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
ac.tearDown(err)
}
// ChannelzMetric returns ChannelInternalMetric of current ClientConn.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{}
}
// connect starts to creating transport and also starts the transport monitor
// goroutine for this ac.
// It does nothing if the ac is not IDLE.
@ -979,6 +1015,9 @@ func (cc *ClientConn) Close() error {
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
if channelz.IsOn() {
channelz.RemoveEntry(cc.channelzID)
}
return nil
}
@ -1012,6 +1051,8 @@ type addrConn struct {
// connectDeadline is the time by which all connection
// negotiations must complete.
connectDeadline time.Time
channelzID int64 // channelz unique identification number
}
// adjustParams updates parameters used to create transports upon
@ -1153,6 +1194,9 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline,
// Do not cancel in the success path because of
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
if channelz.IsOn() {
copts.ChannelzParentID = ac.channelzID
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
if err != nil {
cancel()
@ -1399,6 +1443,9 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
if channelz.IsOn() {
channelz.RemoveEntry(ac.channelzID)
}
return
}
@ -1408,6 +1455,10 @@ func (ac *addrConn) getState() connectivity.State {
return ac.state
}
func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{}
}
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//

View File

@ -26,6 +26,8 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/connectivity"
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog"
@ -168,6 +170,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D
}
}
}
func (lb *lbBalancer) callRemoteBalancer() error {
lbClient := &loadBalancerClient{cc: lb.ccRemoteLB}
ctx, cancel := context.WithCancel(context.Background())
@ -243,9 +246,13 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) {
// Explicitly set pickfirst as the balancer.
dopts = append(dopts, WithBalancerName(PickFirstBalancerName))
dopts = append(dopts, withResolverBuilder(lb.manualResolver))
// Dial using manualResolver.Scheme, which is a random scheme generated
if channelz.IsOn() {
dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID))
}
// DialContext using manualResolver.Scheme, which is a random scheme generated
// when init grpclb. The target name is not important.
cc, err := Dial("grpclb:///grpclb.server", dopts...)
cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
}

View File

@ -37,6 +37,8 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
@ -97,11 +99,14 @@ type Server struct {
m map[string]*service // service name -> service info
events trace.EventLog
quit chan struct{}
done chan struct{}
quitOnce sync.Once
doneOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
quit chan struct{}
done chan struct{}
quitOnce sync.Once
doneOnce sync.Once
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
}
type options struct {
@ -343,6 +348,10 @@ func NewServer(opt ...ServerOption) *Server {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(s, "")
}
return s
}
@ -458,6 +467,23 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
return s.opts.creds.ServerHandshake(rawConn)
}
type listenSocket struct {
net.Listener
channelzID int64
}
func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{}
}
func (l *listenSocket) Close() error {
err := l.Listener.Close()
if channelz.IsOn() {
channelz.RemoveEntry(l.channelzID)
}
return err
}
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
@ -482,17 +508,29 @@ func (s *Server) Serve(lis net.Listener) error {
// Stop or GracefulStop called; block until done and return nil.
case <-s.quit:
<-s.done
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
default:
}
}()
s.lis[lis] = true
ls := &listenSocket{Listener: lis}
s.lis[ls] = true
if channelz.IsOn() {
ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "")
}
s.mu.Unlock()
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[lis] {
lis.Close()
delete(s.lis, lis)
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
@ -614,6 +652,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
ChannelzParentID: s.channelzID,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@ -624,6 +663,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}
return st
}
@ -751,6 +791,12 @@ func (s *Server) removeConn(c io.Closer) {
}
}
// ChannelzMetric returns ServerInternalMetric of current server.
// This is an EXPERIMENTAL API.
func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
return &channelz.ServerInternalMetric{}
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
var (
outPayload *stats.OutPayload
@ -1224,6 +1270,12 @@ func (s *Server) Stop() {
})
}()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.mu.Lock()
listeners := s.lis
s.lis = nil
@ -1262,11 +1314,17 @@ func (s *Server) GracefulStop() {
})
}()
s.channelzRemoveOnce.Do(func() {
if channelz.IsOn() {
channelz.RemoveEntry(s.channelzID)
}
})
s.mu.Lock()
if s.conns == nil {
s.mu.Unlock()
return
}
for lis := range s.lis {
lis.Close()
}

350
test/channelz_test.go Normal file
View File

@ -0,0 +1,350 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package test
import (
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/test/leakcheck"
)
func init() {
channelz.TurnOn()
}
func (te *test) startServers(ts testpb.TestServiceServer, num int) {
for i := 0; i < num; i++ {
te.startServer(ts)
te.srvs = append(te.srvs, te.srv)
te.srvAddrs = append(te.srvAddrs, te.srvAddr)
te.srv = nil
te.srvAddr = ""
}
}
func TestCZServerRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
testcases := []struct {
total int
start int64
length int
end bool
}{
{total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
{total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
{total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
{total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
}
for _, c := range testcases {
channelz.NewChannelzStorage()
e := tcpClearRREnv
te := newTest(t, e)
te.startServers(&testServer{security: e.security}, c.total)
ss, end := channelz.GetServers(c.start)
if len(ss) != c.length || end != c.end {
t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
}
te.tearDown()
ss, end = channelz.GetServers(c.start)
if len(ss) != 0 || !end {
t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
}
}
}
func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
testcases := []struct {
total int
start int64
length int
end bool
}{
{total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
{total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
{total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
{total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
}
for _, c := range testcases {
channelz.NewChannelzStorage()
e := tcpClearRREnv
te := newTest(t, e)
var ccs []*grpc.ClientConn
for i := 0; i < c.total; i++ {
cc := te.clientConn()
te.cc = nil
// avoid making next dial blocking
te.srvAddr = ""
ccs = append(ccs, cc)
}
time.Sleep(10 * time.Millisecond)
tcs, end := channelz.GetTopChannels(c.start)
if len(tcs) != c.length || end != c.end {
t.Fatalf("GetTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
}
for _, cc := range ccs {
cc.Close()
}
tcs, end = channelz.GetTopChannels(c.start)
if len(tcs) != 0 || !end {
t.Fatalf("GetTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
}
te.tearDown()
}
}
func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
channelz.NewChannelzStorage()
e := tcpClearRREnv
// avoid calling API to set balancer type, which will void service config's change of balancer.
e.balancer = ""
te := newTest(t, e)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}}
r.InitialAddrs(resolvedAddrs)
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
time.Sleep(10 * time.Millisecond)
tcs, _ := channelz.GetTopChannels(0)
if len(tcs) != 1 {
t.Fatalf("There should only be one top channel, not %d", len(tcs))
}
if len(tcs[0].NestedChans) != 1 {
t.Fatalf("There should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans))
}
r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`)
r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}})
// wait for the shutdown of grpclb balancer
time.Sleep(10 * time.Millisecond)
tcs, _ = channelz.GetTopChannels(0)
if len(tcs) != 1 {
t.Fatalf("There should only be one top channel, not %d", len(tcs))
}
if len(tcs[0].NestedChans) != 0 {
t.Fatalf("There should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans))
}
}
func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
channelz.NewChannelzStorage()
e := tcpClearRREnv
num := 3 // number of backends
te := newTest(t, e)
var svrAddrs []resolver.Address
te.startServers(&testServer{security: e.security}, num)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
}
r.InitialAddrs(svrAddrs)
te.resolverScheme = r.Scheme()
te.clientConn()
defer te.tearDown()
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
time.Sleep(100 * time.Millisecond)
tcs, _ := channelz.GetTopChannels(0)
if len(tcs) != 1 {
t.Fatalf("There should only be one top channel, not %d", len(tcs))
}
if len(tcs[0].SubChans) != num {
t.Fatalf("There should be %d subchannel not %d", num, len(tcs[0].SubChans))
}
count := 0
for k := range tcs[0].SubChans {
sc := channelz.GetSubChannel(k)
if sc == nil {
t.Fatalf("got <nil> subchannel")
}
count += len(sc.Sockets)
}
if count != num {
t.Fatalf("There should be %d sockets not %d", num, count)
}
r.NewAddress(svrAddrs[:len(svrAddrs)-1])
time.Sleep(100 * time.Millisecond)
tcs, _ = channelz.GetTopChannels(0)
if len(tcs[0].SubChans) != num-1 {
t.Fatalf("There should be %d subchannel not %d", num-1, len(tcs[0].SubChans))
}
count = 0
for k := range tcs[0].SubChans {
sc := channelz.GetSubChannel(k)
if sc == nil {
t.Fatalf("got <nil> subchannel")
}
count += len(sc.Sockets)
}
if count != num-1 {
t.Fatalf("There should be %d sockets not %d", num-1, count)
}
}
func TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
channelz.NewChannelzStorage()
e := tcpClearRREnv
num := 3 // number of clients
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
var ccs []*grpc.ClientConn
for i := 0; i < num; i++ {
cc := te.clientConn()
te.cc = nil
ccs = append(ccs, cc)
}
defer func() {
for _, c := range ccs[:len(ccs)-1] {
c.Close()
}
}()
time.Sleep(10 * time.Millisecond)
ss, _ := channelz.GetServers(0)
if len(ss) != 1 {
t.Fatalf("There should only be one server, not %d", len(ss))
}
if len(ss[0].ListenSockets) != 1 {
t.Fatalf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets))
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
if len(ns) != num {
t.Fatalf("There should be %d normal sockets not %d", num, len(ns))
}
ccs[len(ccs)-1].Close()
time.Sleep(10 * time.Millisecond)
ns, _ = channelz.GetServerSockets(ss[0].ID, 0)
if len(ns) != num-1 {
t.Fatalf("There should be %d normal sockets not %d", num-1, len(ns))
}
}
func TestCZServerListenSocketDeletion(t *testing.T) {
defer leakcheck.Check(t)
channelz.NewChannelzStorage()
s := grpc.NewServer()
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
go s.Serve(lis)
time.Sleep(10 * time.Millisecond)
ss, _ := channelz.GetServers(0)
if len(ss) != 1 {
t.Fatalf("There should only be one server, not %d", len(ss))
}
if len(ss[0].ListenSockets) != 1 {
t.Fatalf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets))
}
lis.Close()
time.Sleep(10 * time.Millisecond)
ss, _ = channelz.GetServers(0)
if len(ss) != 1 {
t.Fatalf("There should be 1 server, not %d", len(ss))
}
s.Stop()
}
type dummyChannel struct{}
func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
return &channelz.ChannelInternalMetric{}
}
type dummySocket struct{}
func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{}
}
func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
// +--+TopChan+---+
// | |
// v v
// +-+SubChan1+--+ SubChan2
// | |
// v v
// Socket1 Socket2
channelz.NewChannelzStorage()
topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "")
subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "")
sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
tcs, _ := channelz.GetTopChannels(0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
if len(tcs[0].SubChans) != 2 {
t.Fatalf("There should be two SubChannel entries")
}
sc := channelz.GetSubChannel(subChanID1)
if sc == nil || len(sc.Sockets) != 2 {
t.Fatalf("There should be two Socket entries")
}
channelz.RemoveEntry(topChanID)
tcs, _ = channelz.GetTopChannels(0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
channelz.RemoveEntry(subChanID1)
channelz.RemoveEntry(subChanID2)
tcs, _ = channelz.GetTopChannels(0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
if len(tcs[0].SubChans) != 1 {
t.Fatalf("There should be one SubChannel entry")
}
channelz.RemoveEntry(sktID1)
channelz.RemoveEntry(sktID2)
tcs, _ = channelz.GetTopChannels(0)
if tcs != nil {
t.Fatalf("There should be no TopChannel entry")
}
}

View File

@ -68,6 +68,10 @@ import (
"google.golang.org/grpc/testdata"
)
func init() {
grpc.RegisterChannelz()
}
var (
// For headers:
testMetadata = metadata.MD{
@ -478,6 +482,10 @@ type test struct {
srv *grpc.Server
srvAddr string
// srvs and srvAddrs are set once startServers is called.
srvs []*grpc.Server
srvAddrs []string
cc *grpc.ClientConn // nil until requested via clientConn
restoreLogs func() // nil unless declareLogNoise is used
}
@ -498,6 +506,11 @@ func (te *test) tearDown() {
if te.srv != nil {
te.srv.Stop()
}
if len(te.srvs) != 0 {
for _, s := range te.srvs {
s.Stop()
}
}
}
// newTest returns a new test using the provided testing.T and

View File

@ -420,6 +420,10 @@ func (ht *serverHandlerTransport) runStream() {
}
}
func (ht *serverHandlerTransport) IncrMsgSent() {}
func (ht *serverHandlerTransport) IncrMsgRecv() {}
func (ht *serverHandlerTransport) Drain() {
panic("Drain() is not implemented")
}

View File

@ -30,6 +30,8 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@ -101,6 +103,8 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
channelzID int64 // channelz unique identification number
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@ -238,6 +242,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
}
t.statsHandler.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
}
// Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity.
@ -665,6 +672,9 @@ func (t *http2Client) Close() error {
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Notify all active streams.
for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil)
@ -1188,3 +1198,10 @@ func (t *http2Client) Error() <-chan struct{} {
func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{}
}
func (t *http2Client) IncrMsgSent() {}
func (t *http2Client) IncrMsgRecv() {}

View File

@ -35,6 +35,8 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/channelz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
@ -90,6 +92,8 @@ type http2Server struct {
initialWindowSize int32
bdpEst *bdpEstimator
channelzID int64 // channelz unique identification number
mu sync.Mutex // guard the following
// drainChan is initialized when drain(...) is called the first time.
@ -218,6 +222,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
connBegin := &stats.ConnBegin{}
t.stats.HandleConn(t.ctx, connBegin)
}
if channelz.IsOn() {
t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
}
t.framer.writer.Flush()
defer func() {
@ -907,6 +914,9 @@ func (t *http2Server) Close() error {
t.controlBuf.finish()
t.cancel()
err := t.conn.Close()
if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID)
}
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@ -1027,6 +1037,13 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, nil
}
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
return &channelz.SocketInternalMetric{}
}
func (t *http2Server) IncrMsgSent() {}
func (t *http2Server) IncrMsgRecv() {}
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
func getJitter(v time.Duration) time.Duration {

View File

@ -413,6 +413,7 @@ type ServerConfig struct {
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
ChannelzParentID int64
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@ -448,6 +449,8 @@ type ConnectOptions struct {
WriteBufferSize int
// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
ReadBufferSize int
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID int64
}
// TargetInfo contains the information of the target such as network address and metadata.
@ -547,6 +550,12 @@ type ClientTransport interface {
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
// IncrMsgSent increments the number of message sent through this transport.
IncrMsgSent()
// IncrMsgRecv increments the number of message received through this transport.
IncrMsgRecv()
}
// ServerTransport is the common interface for all gRPC server-side transport
@ -580,6 +589,12 @@ type ServerTransport interface {
// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain()
// IncrMsgSent increments the number of message sent through this transport.
IncrMsgSent()
// IncrMsgRecv increments the number of message received through this transport.
IncrMsgRecv()
}
// streamErrorf creates an StreamError with the specified error code and description.