mirror of https://github.com/grpc/grpc-go.git
stats: Add RPC event for blocking for a picker update (#6422)
This commit is contained in:
parent
02946a3f37
commit
7aab9c05b7
|
|
@ -348,7 +348,7 @@ func (cc *ClientConn) exitIdleMode() error {
|
|||
cc.idlenessState = ccIdlenessStateExitingIdle
|
||||
exitedIdle := false
|
||||
if cc.blockingpicker == nil {
|
||||
cc.blockingpicker = newPickerWrapper()
|
||||
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
|
||||
} else {
|
||||
cc.blockingpicker.exitIdleMode()
|
||||
exitedIdle = true
|
||||
|
|
|
|||
|
|
@ -28,21 +28,26 @@ import (
|
|||
"google.golang.org/grpc/internal/channelz"
|
||||
istatus "google.golang.org/grpc/internal/status"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
|
||||
// actions and unblock when there's a picker update.
|
||||
type pickerWrapper struct {
|
||||
mu sync.Mutex
|
||||
done bool
|
||||
idle bool
|
||||
blockingCh chan struct{}
|
||||
picker balancer.Picker
|
||||
mu sync.Mutex
|
||||
done bool
|
||||
idle bool
|
||||
blockingCh chan struct{}
|
||||
picker balancer.Picker
|
||||
statsHandlers []stats.Handler // to record blocking picker calls
|
||||
}
|
||||
|
||||
func newPickerWrapper() *pickerWrapper {
|
||||
return &pickerWrapper{blockingCh: make(chan struct{})}
|
||||
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
|
||||
return &pickerWrapper{
|
||||
blockingCh: make(chan struct{}),
|
||||
statsHandlers: statsHandlers,
|
||||
}
|
||||
}
|
||||
|
||||
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
|
||||
|
|
@ -95,6 +100,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
|
|||
var ch chan struct{}
|
||||
|
||||
var lastPickErr error
|
||||
|
||||
for {
|
||||
pw.mu.Lock()
|
||||
if pw.done {
|
||||
|
|
@ -129,6 +135,20 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
|
|||
continue
|
||||
}
|
||||
|
||||
// If the channel is set, it means that the pick call had to wait for a
|
||||
// new picker at some point. Either it's the first iteration and this
|
||||
// function received the first picker, or a picker errored with
|
||||
// ErrNoSubConnAvailable or errored with failfast set to false, which
|
||||
// will trigger a continue to the next iteration. In the first case this
|
||||
// conditional will hit if this call had to block (the channel is set).
|
||||
// In the second case, the only way it will get to this conditional is
|
||||
// if there is a new picker.
|
||||
if ch != nil {
|
||||
for _, sh := range pw.statsHandlers {
|
||||
sh.HandleRPC(ctx, &stats.PickerUpdated{})
|
||||
}
|
||||
}
|
||||
|
||||
ch = pw.blockingCh
|
||||
p := pw.picker
|
||||
pw.mu.Unlock()
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error
|
|||
}
|
||||
|
||||
func (s) TestBlockingPickTimeout(t *testing.T) {
|
||||
bp := newPickerWrapper()
|
||||
bp := newPickerWrapper(nil)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||
defer cancel()
|
||||
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
|
||||
|
|
@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestBlockingPick(t *testing.T) {
|
||||
bp := newPickerWrapper()
|
||||
bp := newPickerWrapper(nil)
|
||||
// All goroutines should block because picker is nil in bp.
|
||||
var finishedCount uint64
|
||||
for i := goroutineCount; i > 0; i-- {
|
||||
|
|
@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
|
||||
bp := newPickerWrapper()
|
||||
bp := newPickerWrapper(nil)
|
||||
var finishedCount uint64
|
||||
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
|
||||
// All goroutines should block because picker returns no subConn available.
|
||||
|
|
@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
|
||||
bp := newPickerWrapper()
|
||||
bp := newPickerWrapper(nil)
|
||||
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
|
||||
var finishedCount uint64
|
||||
// All goroutines should block because picker returns transientFailure and
|
||||
|
|
@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestBlockingPickSCNotReady(t *testing.T) {
|
||||
bp := newPickerWrapper()
|
||||
bp := newPickerWrapper(nil)
|
||||
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
|
||||
var finishedCount uint64
|
||||
// All goroutines should block because subConn is not ready.
|
||||
|
|
|
|||
|
|
@ -99,9 +99,11 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) {
|
|||
trace.BoolAttribute("Client", rs.Client),
|
||||
trace.BoolAttribute("FailFast", rs.FailFast),
|
||||
)
|
||||
case *stats.PickerUpdated:
|
||||
span.Annotate(nil, "Delayed LB pick complete")
|
||||
case *stats.InPayload:
|
||||
// message id - "must be calculated as two different counters starting
|
||||
// from 1 one for sent messages and one for received messages."
|
||||
// from one for sent messages and one for received messages."
|
||||
mi := atomic.AddUint32(&ti.countRecvMsg, 1)
|
||||
span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength))
|
||||
case *stats.OutPayload:
|
||||
|
|
|
|||
|
|
@ -59,6 +59,16 @@ func (s *Begin) IsClient() bool { return s.Client }
|
|||
|
||||
func (s *Begin) isRPCStats() {}
|
||||
|
||||
// PickerUpdated indicates that the LB policy provided a new picker while the
|
||||
// RPC was waiting for one.
|
||||
type PickerUpdated struct{}
|
||||
|
||||
// IsClient indicates if the stats information is from client side. Only Client
|
||||
// Side interfaces with a Picker, thus always returns true.
|
||||
func (*PickerUpdated) IsClient() bool { return true }
|
||||
|
||||
func (*PickerUpdated) isRPCStats() {}
|
||||
|
||||
// InPayload contains the information for an incoming payload.
|
||||
type InPayload struct {
|
||||
// Client is true if this InPayload is from client side.
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
|
@ -44,6 +45,8 @@ import (
|
|||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
|
@ -62,6 +65,7 @@ import (
|
|||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
|
|
@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health"
|
|||
|
||||
func init() {
|
||||
channelz.TurnOn()
|
||||
balancer.Register(triggerRPCBlockPickerBalancerBuilder{})
|
||||
}
|
||||
|
||||
type s struct {
|
||||
|
|
@ -6362,3 +6367,165 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
|
|||
t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
|
||||
}
|
||||
}
|
||||
|
||||
type statsHandlerRecordEvents struct {
|
||||
mu sync.Mutex
|
||||
s []stats.RPCStats
|
||||
}
|
||||
|
||||
func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
|
||||
return ctx
|
||||
}
|
||||
func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.s = append(h.s, s)
|
||||
}
|
||||
func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
|
||||
return ctx
|
||||
}
|
||||
func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}
|
||||
|
||||
type triggerRPCBlockPicker struct {
|
||||
pickDone func()
|
||||
}
|
||||
|
||||
func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
|
||||
bp.pickDone()
|
||||
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
|
||||
}
|
||||
|
||||
const name = "triggerRPCBlockBalancer"
|
||||
|
||||
type triggerRPCBlockPickerBalancerBuilder struct{}
|
||||
|
||||
func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
b := &triggerRPCBlockBalancer{
|
||||
blockingPickerDone: grpcsync.NewEvent(),
|
||||
ClientConn: cc,
|
||||
}
|
||||
// round_robin child to complete balancer tree with a usable leaf policy and
|
||||
// have RPCs actually work.
|
||||
builder := balancer.Get(roundrobin.Name)
|
||||
rr := builder.Build(b, bOpts)
|
||||
if rr == nil {
|
||||
panic("round robin builder returned nil")
|
||||
}
|
||||
b.Balancer = rr
|
||||
return b
|
||||
}
|
||||
|
||||
func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
return &bpbConfig{}, nil
|
||||
}
|
||||
|
||||
func (triggerRPCBlockPickerBalancerBuilder) Name() string {
|
||||
return name
|
||||
}
|
||||
|
||||
type bpbConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
}
|
||||
|
||||
// triggerRPCBlockBalancer uses a child RR balancer, but blocks all UpdateState
|
||||
// calls until the first Pick call. That first Pick returns
|
||||
// ErrNoSubConnAvailable to make the RPC block and trigger the appropriate stats
|
||||
// handler callout. After the first Pick call, it will forward at least one
|
||||
// READY picker update from the child, causing RPCs to proceed as normal using a
|
||||
// round robin balancer's picker if it updates with a READY picker.
|
||||
type triggerRPCBlockBalancer struct {
|
||||
stateMu sync.Mutex
|
||||
childState balancer.State
|
||||
|
||||
blockingPickerDone *grpcsync.Event
|
||||
// embed a ClientConn to wrap only UpdateState() operation
|
||||
balancer.ClientConn
|
||||
// embed a Balancer to wrap only UpdateClientConnState() operation
|
||||
balancer.Balancer
|
||||
}
|
||||
|
||||
func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
||||
err := bpb.Balancer.UpdateClientConnState(s)
|
||||
bpb.ClientConn.UpdateState(balancer.State{
|
||||
ConnectivityState: connectivity.Connecting,
|
||||
Picker: &triggerRPCBlockPicker{
|
||||
pickDone: func() {
|
||||
bpb.stateMu.Lock()
|
||||
defer bpb.stateMu.Unlock()
|
||||
bpb.blockingPickerDone.Fire()
|
||||
if bpb.childState.ConnectivityState == connectivity.Ready {
|
||||
bpb.ClientConn.UpdateState(bpb.childState)
|
||||
}
|
||||
},
|
||||
},
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) {
|
||||
bpb.stateMu.Lock()
|
||||
defer bpb.stateMu.Unlock()
|
||||
bpb.childState = state
|
||||
if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first
|
||||
if state.ConnectivityState == connectivity.Ready {
|
||||
bpb.ClientConn.UpdateState(state) // after the first rr picker update, only forward once READY for deterministic picker counts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call
|
||||
// that represents the RPC had to block waiting for a new picker due to
|
||||
// ErrNoSubConnAvailable being returned from the first picker call.
|
||||
func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
|
||||
sh := &statsHandlerRecordEvents{}
|
||||
ss := &stubserver.StubServer{
|
||||
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
},
|
||||
}
|
||||
|
||||
if err := ss.StartServer(); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
lbCfgJSON := `{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"triggerRPCBlockBalancer": {}
|
||||
}
|
||||
]
|
||||
}`
|
||||
|
||||
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
|
||||
mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
|
||||
defer mr.Close()
|
||||
mr.InitialState(resolver.State{
|
||||
Addresses: []resolver.Address{
|
||||
{Addr: ss.Address},
|
||||
},
|
||||
ServiceConfig: sc,
|
||||
})
|
||||
|
||||
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
testServiceClient := testgrpc.NewTestServiceClient(cc)
|
||||
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("Unexpected error from UnaryCall: %v", err)
|
||||
}
|
||||
|
||||
var pickerUpdatedCount uint
|
||||
for _, stat := range sh.s {
|
||||
if _, ok := stat.(*stats.PickerUpdated); ok {
|
||||
pickerUpdatedCount++
|
||||
}
|
||||
}
|
||||
if pickerUpdatedCount != 1 {
|
||||
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -474,6 +474,10 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
|
|||
return ctx
|
||||
}
|
||||
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
|
||||
// these calls come in nondeterministically - so can just ignore
|
||||
if _, ok := s.(*stats.PickerUpdated); ok {
|
||||
return
|
||||
}
|
||||
h.mu.Lock()
|
||||
h.s = append(h.s, s)
|
||||
h.mu.Unlock()
|
||||
|
|
|
|||
Loading…
Reference in New Issue