channelz: use atomic instead of mutex (#2218)

This commit is contained in:
lyuxuan 2018-08-06 11:17:12 -07:00 committed by GitHub
parent a344a35754
commit f4da7eee53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 166 deletions

View File

@ -135,6 +135,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
@ -374,12 +375,8 @@ type ClientConn struct {
balancerWrapper *ccBalancerWrapper
retryThrottler atomic.Value
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
callsStarted int64
callsSucceeded int64
callsFailed int64
lastCallStartedTime time.Time
channelzID int64 // channelz unique identification number
czData *channelzData
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@ -532,9 +529,10 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
cc: cc,
addrs: addrs,
dopts: cc.dopts,
cc: cc,
addrs: addrs,
dopts: cc.dopts,
czData: new(channelzData),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@ -567,16 +565,13 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
// ChannelzMetric returns ChannelInternalMetric of current ClientConn.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
state := cc.GetState()
cc.czmu.RLock()
defer cc.czmu.RUnlock()
return &channelz.ChannelInternalMetric{
State: state,
State: cc.GetState(),
Target: cc.target,
CallsStarted: cc.callsStarted,
CallsSucceeded: cc.callsSucceeded,
CallsFailed: cc.callsFailed,
LastCallStartedTimestamp: cc.lastCallStartedTime,
CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
}
}
@ -587,23 +582,16 @@ func (cc *ClientConn) Target() string {
}
func (cc *ClientConn) incrCallsStarted() {
cc.czmu.Lock()
cc.callsStarted++
// TODO(yuxuanli): will make this a time.Time pointer improve performance?
cc.lastCallStartedTime = time.Now()
cc.czmu.Unlock()
atomic.AddInt64(&cc.czData.callsStarted, 1)
atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (cc *ClientConn) incrCallsSucceeded() {
cc.czmu.Lock()
cc.callsSucceeded++
cc.czmu.Unlock()
atomic.AddInt64(&cc.czData.callsSucceeded, 1)
}
func (cc *ClientConn) incrCallsFailed() {
cc.czmu.Lock()
cc.callsFailed++
cc.czmu.Unlock()
atomic.AddInt64(&cc.czData.callsFailed, 1)
}
// connect starts to creating transport and also starts the transport monitor
@ -822,12 +810,8 @@ type addrConn struct {
// negotiations must complete.
connectDeadline time.Time
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
callsStarted int64
callsSucceeded int64
callsFailed int64
lastCallStartedTime time.Time
channelzID int64 // channelz unique identification number
czData *channelzData
}
// adjustParams updates parameters used to create transports upon
@ -1197,36 +1181,27 @@ func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
ac.mu.Lock()
addr := ac.curAddr.Addr
ac.mu.Unlock()
state := ac.getState()
ac.czmu.RLock()
defer ac.czmu.RUnlock()
return &channelz.ChannelInternalMetric{
State: state,
State: ac.getState(),
Target: addr,
CallsStarted: ac.callsStarted,
CallsSucceeded: ac.callsSucceeded,
CallsFailed: ac.callsFailed,
LastCallStartedTimestamp: ac.lastCallStartedTime,
CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
}
}
func (ac *addrConn) incrCallsStarted() {
ac.czmu.Lock()
ac.callsStarted++
ac.lastCallStartedTime = time.Now()
ac.czmu.Unlock()
atomic.AddInt64(&ac.czData.callsStarted, 1)
atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (ac *addrConn) incrCallsSucceeded() {
ac.czmu.Lock()
ac.callsSucceeded++
ac.czmu.Unlock()
atomic.AddInt64(&ac.czData.callsSucceeded, 1)
}
func (ac *addrConn) incrCallsFailed() {
ac.czmu.Lock()
ac.callsFailed++
ac.czmu.Unlock()
atomic.AddInt64(&ac.czData.callsFailed, 1)
}
type retryThrottler struct {

View File

@ -111,19 +111,7 @@ type http2Client struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
kpCount int64
// The number of streams that have started, including already finished ones.
streamsStarted int64
// The number of streams that have ended successfully by receiving EoS bit set
// frame from server.
streamsSucceeded int64
streamsFailed int64
lastStreamCreated time.Time
msgSent int64
msgRecv int64
lastMsgSent time.Time
lastMsgRecv time.Time
czData *channelzData
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@ -234,6 +222,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
@ -550,10 +539,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
}
t.activeStreams[id] = s
if channelz.IsOn() {
t.czmu.Lock()
t.streamsStarted++
t.lastStreamCreated = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
var sendPing bool
// If the number of active streams change from 0 to 1, then check if keepalive
@ -707,13 +694,11 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
}
t.mu.Unlock()
if channelz.IsOn() {
t.czmu.Lock()
if eosReceived {
t.streamsSucceeded++
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
t.streamsFailed++
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
t.czmu.Unlock()
}
},
rst: rst,
@ -1263,9 +1248,7 @@ func (t *http2Client) keepalive() {
} else {
t.mu.Unlock()
if channelz.IsOn() {
t.czmu.Lock()
t.kpCount++
t.czmu.Unlock()
atomic.AddInt64(&t.czData.kpCount, 1)
}
// Send ping.
t.controlBuf.put(p)
@ -1305,17 +1288,16 @@ func (t *http2Client) GoAway() <-chan struct{} {
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
t.czmu.RLock()
s := channelz.SocketInternalMetric{
StreamsStarted: t.streamsStarted,
StreamsSucceeded: t.streamsSucceeded,
StreamsFailed: t.streamsFailed,
MessagesSent: t.msgSent,
MessagesReceived: t.msgRecv,
KeepAlivesSent: t.kpCount,
LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
LastMessageSentTimestamp: t.lastMsgSent,
LastMessageReceivedTimestamp: t.lastMsgRecv,
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
@ -1325,23 +1307,18 @@ func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Client) IncrMsgSent() {
t.czmu.Lock()
t.msgSent++
t.lastMsgSent = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
t.czmu.Lock()
t.msgRecv++
t.lastMsgRecv = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {

View File

@ -118,19 +118,7 @@ type http2Server struct {
// Fields below are for channelz metric collection.
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
kpCount int64
// The number of streams that have started, including already finished ones.
streamsStarted int64
// The number of streams that have ended successfully by sending frame with
// EoS bit set.
streamsSucceeded int64
streamsFailed int64
lastStreamCreated time.Time
msgSent int64
msgRecv int64
lastMsgSent time.Time
lastMsgRecv time.Time
czData *channelzData
}
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@ -231,6 +219,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
}
t.controlBuf = newControlBuffer(t.ctxDone)
if dynamicWindow {
@ -392,10 +381,8 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
}
t.mu.Unlock()
if channelz.IsOn() {
t.czmu.Lock()
t.streamsStarted++
t.lastStreamCreated = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
@ -977,9 +964,7 @@ func (t *http2Server) keepalive() {
}
pingSent = true
if channelz.IsOn() {
t.czmu.Lock()
t.kpCount++
t.czmu.Unlock()
atomic.AddInt64(&t.czData.kpCount, 1)
}
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
@ -1044,13 +1029,11 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hd
}
t.mu.Unlock()
if channelz.IsOn() {
t.czmu.Lock()
if eosReceived {
t.streamsSucceeded++
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
} else {
t.streamsFailed++
atomic.AddInt64(&t.czData.streamsFailed, 1)
}
t.czmu.Unlock()
}
},
}
@ -1138,17 +1121,16 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
}
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
t.czmu.RLock()
s := channelz.SocketInternalMetric{
StreamsStarted: t.streamsStarted,
StreamsSucceeded: t.streamsSucceeded,
StreamsFailed: t.streamsFailed,
MessagesSent: t.msgSent,
MessagesReceived: t.msgRecv,
KeepAlivesSent: t.kpCount,
LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
LastMessageSentTimestamp: t.lastMsgSent,
LastMessageReceivedTimestamp: t.lastMsgRecv,
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
@ -1158,23 +1140,18 @@ func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
t.czmu.RUnlock()
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Server) IncrMsgSent() {
t.czmu.Lock()
t.msgSent++
t.lastMsgSent = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
}
func (t *http2Server) IncrMsgRecv() {
t.czmu.Lock()
t.msgRecv++
t.lastMsgRecv = time.Now()
t.czmu.Unlock()
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
}
func (t *http2Server) getOutFlowWindow() int64 {

View File

@ -683,3 +683,27 @@ const (
// "too_many_pings".
GoAwayTooManyPings GoAwayReason = 2
)
// channelzData is used to store channelz related data for http2Client and http2Server.
// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
type channelzData struct {
kpCount int64
// The number of streams that have started, including already finished ones.
streamsStarted int64
// Client side: The number of streams that have ended successfully by receiving
// EoS bit set frame from server.
// Server side: The number of streams that have ended successfully by sending
// frame with EoS bit set.
streamsSucceeded int64
streamsFailed int64
// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
// instead of time.Time since it's more costly to atomically update time.Time variable than int64
// variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
lastStreamCreatedTime int64
msgSent int64
msgRecv int64
lastMsgSentTime int64
lastMsgRecvTime int64
}

View File

@ -750,6 +750,19 @@ func parseDialTarget(target string) (net string, addr string) {
return net, target
}
// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
type channelzData struct {
callsStarted int64
callsFailed int64
callsSucceeded int64
// lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of
// time.Time since it's more costly to atomically update time.Time variable than int64 variable.
lastCallStartedTime int64
}
// The SupportPackageIsVersion variables are referenced from generated protocol
// buffer files to ensure compatibility with the gRPC version used. The latest
// support package version is 5.

View File

@ -30,6 +30,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"io/ioutil"
@ -104,12 +105,8 @@ type Server struct {
channelzRemoveOnce sync.Once
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
channelzID int64 // channelz unique identification number
czmu sync.RWMutex
callsStarted int64
callsFailed int64
callsSucceeded int64
lastCallStartedTime time.Time
channelzID int64 // channelz unique identification number
czData *channelzData
}
type options struct {
@ -357,12 +354,13 @@ func NewServer(opt ...ServerOption) *Server {
o(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
quit: make(chan struct{}),
done: make(chan struct{}),
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
quit: make(chan struct{}),
done: make(chan struct{}),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
@ -784,33 +782,25 @@ func (s *Server) removeConn(c io.Closer) {
// ChannelzMetric returns ServerInternalMetric of current server.
// This is an EXPERIMENTAL API.
func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric {
s.czmu.RLock()
defer s.czmu.RUnlock()
return &channelz.ServerInternalMetric{
CallsStarted: s.callsStarted,
CallsSucceeded: s.callsSucceeded,
CallsFailed: s.callsFailed,
LastCallStartedTimestamp: s.lastCallStartedTime,
CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
}
}
func (s *Server) incrCallsStarted() {
s.czmu.Lock()
s.callsStarted++
s.lastCallStartedTime = time.Now()
s.czmu.Unlock()
atomic.AddInt64(&s.czData.callsStarted, 1)
atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
}
func (s *Server) incrCallsSucceeded() {
s.czmu.Lock()
s.callsSucceeded++
s.czmu.Unlock()
atomic.AddInt64(&s.czData.callsSucceeded, 1)
}
func (s *Server) incrCallsFailed() {
s.czmu.Lock()
s.callsFailed++
s.czmu.Unlock()
atomic.AddInt64(&s.czData.callsFailed, 1)
}
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {