From e10841f2d1589d60797b54f2806ead4ac0dc6670 Mon Sep 17 00:00:00 2001 From: zyguan Date: Fri, 29 Jul 2022 11:44:04 +0800 Subject: [PATCH] support adding `ExecDetailsV2` to tracing (#559) * support adding `ExecDetailsV2` to tracing Signed-off-by: zyguan * rename `TraceExecEnabled` to `TraceExecDetailsEnabled` Signed-off-by: zyguan * revert changes for #558 Signed-off-by: zyguan * address comments Signed-off-by: zyguan * add unit test Signed-off-by: zyguan --- internal/client/client.go | 140 +++++++++++++++++++++- internal/client/client_test.go | 160 +++++++++++++++++++++++++ internal/mockstore/mocktikv/cluster.go | 2 +- util/execdetails.go | 15 +++ 4 files changed, 313 insertions(+), 4 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 0a510c80..eb54c565 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -42,6 +42,7 @@ import ( "math" "runtime/trace" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -453,10 +454,11 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr } func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (resp *tikvrpc.Response, err error) { + var spanRPC opentracing.Span if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) + spanRPC = span.Tracer().StartSpan(fmt.Sprintf("rpcClient.SendRequest, region ID: %d, type: %s", req.RegionId, req.Type), opentracing.ChildOf(span.Context())) + defer spanRPC.Finish() + ctx = opentracing.ContextWithSpan(ctx, spanRPC) } if atomic.CompareAndSwapUint32(&c.idleNotify, 1, 0) { @@ -480,6 +482,12 @@ func (c *RPCClient) sendRequest(ctx context.Context, addr string, req *tikvrpc.R atomic.AddInt64(&detail.WaitKVRespDuration, int64(time.Since(start))) } c.updateTiKVSendReqHistogram(req, resp, start, staleRead) + + if spanRPC != nil && util.TraceExecDetailsEnabled(ctx) { + if si := buildSpanInfoFromResp(resp); si != nil { + si.addTo(spanRPC, start) + } + } }() // TiDB RPC server supports batch RPC, but batch connection will send heart beat, It's not necessary since @@ -661,3 +669,129 @@ func (c *RPCClient) CloseAddr(addr string) error { } return nil } + +type spanInfo struct { + name string + dur uint64 + async bool + children []spanInfo +} + +func (si *spanInfo) calcDur() uint64 { + if si.dur == 0 { + for _, child := range si.children { + if child.async { + // TODO: Here we just skip the duration of async process, however there might be a sync point before a + // specified span, in which case we should take the max(main_routine_duration, async_span_duration). + // It's OK for now, since only pesist-log is marked as async, whose duration is typically smaller than + // commit-log. + continue + } + si.dur += child.calcDur() + } + } + return si.dur +} + +func (si *spanInfo) addTo(parent opentracing.Span, start time.Time) time.Time { + if parent == nil { + return start + } + dur := si.calcDur() + if dur == 0 { + return start + } + end := start.Add(time.Duration(dur) * time.Nanosecond) + tracer := parent.Tracer() + span := tracer.StartSpan(si.name, opentracing.ChildOf(parent.Context()), opentracing.StartTime(start)) + t := start + for _, child := range si.children { + t = child.addTo(span, t) + } + span.FinishWithOptions(opentracing.FinishOptions{FinishTime: end}) + if si.async { + span.SetTag("async", "true") + return start + } + return end +} + +func (si *spanInfo) printTo(out io.StringWriter) { + out.WriteString(si.name) + if si.async { + out.WriteString("'") + } + if si.dur > 0 { + out.WriteString("[") + out.WriteString(time.Duration(si.dur).String()) + out.WriteString("]") + } + if len(si.children) > 0 { + out.WriteString("{") + for _, child := range si.children { + out.WriteString(" ") + child.printTo(out) + } + out.WriteString(" }") + } +} + +func (si *spanInfo) String() string { + buf := new(strings.Builder) + si.printTo(buf) + return buf.String() +} + +func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo { + details := resp.GetExecDetailsV2() + if details == nil { + return nil + } + + td := details.TimeDetail + sd := details.ScanDetailV2 + wd := details.WriteDetail + + if td == nil { + return nil + } + + spanRPC := spanInfo{name: "tikv.RPC", dur: td.TotalRpcWallTimeNs} + spanWait := spanInfo{name: "tikv.Wait", dur: td.WaitWallTimeMs * uint64(time.Millisecond)} + spanProcess := spanInfo{name: "tikv.Process", dur: td.ProcessWallTimeMs * uint64(time.Millisecond)} + + if sd != nil { + spanWait.children = append(spanWait.children, spanInfo{name: "tikv.GetSnapshot", dur: sd.GetSnapshotNanos}) + if wd == nil { + spanProcess.children = append(spanProcess.children, spanInfo{name: "tikv.RocksDBBlockRead", dur: sd.RocksdbBlockReadNanos}) + } + } + + spanRPC.children = append(spanRPC.children, spanWait, spanProcess) + + if wd != nil { + spanAsyncWrite := spanInfo{ + name: "tikv.AsyncWrite", + children: []spanInfo{ + {name: "tikv.StoreBatchWait", dur: wd.StoreBatchWaitNanos}, + {name: "tikv.ProposeSendWait", dur: wd.ProposeSendWaitNanos}, + {name: "tikv.PersistLog", dur: wd.PersistLogNanos, async: true, children: []spanInfo{ + {name: "tikv.RaftDBWriteWait", dur: wd.RaftDbWriteLeaderWaitNanos}, // MutexLock + WriteLeader + {name: "tikv.RaftDBWriteWAL", dur: wd.RaftDbSyncLogNanos}, + {name: "tikv.RaftDBWriteMemtable", dur: wd.RaftDbWriteMemtableNanos}, + }}, + {name: "tikv.CommitLog", dur: wd.CommitLogNanos}, + {name: "tikv.ApplyBatchWait", dur: wd.ApplyBatchWaitNanos}, + {name: "tikv.ApplyLog", dur: wd.ApplyLogNanos, children: []spanInfo{ + {name: "tikv.ApplyMutexLock", dur: wd.ApplyMutexLockNanos}, + {name: "tikv.ApplyWriteLeaderWait", dur: wd.ApplyWriteLeaderWaitNanos}, + {name: "tikv.ApplyWriteWAL", dur: wd.ApplyWriteWalNanos}, + {name: "tikv.ApplyWriteMemtable", dur: wd.ApplyWriteMemtableNanos}, + }}, + }, + } + spanRPC.children = append(spanRPC.children, spanAsyncWrite) + } + + return &spanRPC +} diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 002acd2e..3379ceb2 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -37,11 +37,14 @@ package client import ( "context" "fmt" + "strconv" + "strings" "sync" "sync/atomic" "testing" "time" + "github.com/opentracing/opentracing-go/mocktracer" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -471,3 +474,160 @@ func TestBatchCommandsBuilder(t *testing.T) { assert.Equal(t, len(builder.forwardingReqs), 0) assert.NotEqual(t, builder.idAlloc, 0) } + +func TestTraceExecDetails(t *testing.T) { + + assert.Nil(t, buildSpanInfoFromResp(nil)) + assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{})) + assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}})) + assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ExecDetailsV2: &kvrpcpb.ExecDetailsV2{}}})) + + buildSpanInfo := func(details *kvrpcpb.ExecDetailsV2) *spanInfo { + return buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{ExecDetailsV2: details}}) + } + fmtMockTracer := func(tracer *mocktracer.MockTracer) string { + buf := new(strings.Builder) + for i, span := range tracer.FinishedSpans() { + if i > 0 { + buf.WriteString("\n") + } + buf.WriteString("[") + buf.WriteString(span.StartTime.Format("05.000")) + buf.WriteString(",") + buf.WriteString(span.FinishTime.Format("05.000")) + buf.WriteString("] ") + buf.WriteString(span.OperationName) + if span.Tag("async") != nil { + buf.WriteString("'") + } + } + return buf.String() + } + baseTime := time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC) + assert.Equal(t, baseTime, (&spanInfo{}).addTo(nil, baseTime)) + + for i, tt := range []struct { + details *kvrpcpb.ExecDetailsV2 + infoOut string + traceOut string + }{ + { + &kvrpcpb.ExecDetailsV2{ + TimeDetail: &kvrpcpb.TimeDetail{TotalRpcWallTimeNs: uint64(time.Second)}, + }, + "tikv.RPC[1s]{ tikv.Wait tikv.Process }", + "[00.000,01.000] tikv.RPC", + }, + { + &kvrpcpb.ExecDetailsV2{ + TimeDetail: &kvrpcpb.TimeDetail{ + TotalRpcWallTimeNs: uint64(time.Second), + WaitWallTimeMs: 100, + ProcessWallTimeMs: 500, + }, + }, + "tikv.RPC[1s]{ tikv.Wait[100ms] tikv.Process[500ms] }", + strings.Join([]string{ + "[00.000,00.100] tikv.Wait", + "[00.100,00.600] tikv.Process", + "[00.000,01.000] tikv.RPC", + }, "\n"), + }, + { + &kvrpcpb.ExecDetailsV2{ + TimeDetail: &kvrpcpb.TimeDetail{ + TotalRpcWallTimeNs: uint64(time.Second), + WaitWallTimeMs: 100, + ProcessWallTimeMs: 500, + }, + ScanDetailV2: &kvrpcpb.ScanDetailV2{ + GetSnapshotNanos: uint64(80 * time.Millisecond), + RocksdbBlockReadNanos: uint64(200 * time.Millisecond), + }, + }, + "tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms]{ tikv.RocksDBBlockRead[200ms] } }", + strings.Join([]string{ + "[00.000,00.080] tikv.GetSnapshot", + "[00.000,00.100] tikv.Wait", + "[00.100,00.300] tikv.RocksDBBlockRead", + "[00.100,00.600] tikv.Process", + "[00.000,01.000] tikv.RPC", + }, "\n"), + }, + { + // WriteDetail hides RocksDBBlockRead + &kvrpcpb.ExecDetailsV2{ + TimeDetail: &kvrpcpb.TimeDetail{ + TotalRpcWallTimeNs: uint64(time.Second), + WaitWallTimeMs: 100, + ProcessWallTimeMs: 500, + }, + ScanDetailV2: &kvrpcpb.ScanDetailV2{ + GetSnapshotNanos: uint64(80 * time.Millisecond), + RocksdbBlockReadNanos: uint64(200 * time.Millisecond), + }, + WriteDetail: &kvrpcpb.WriteDetail{}, + }, + "tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms] tikv.AsyncWrite{ tikv.StoreBatchWait tikv.ProposeSendWait tikv.PersistLog'{ tikv.RaftDBWriteWait tikv.RaftDBWriteWAL tikv.RaftDBWriteMemtable } tikv.CommitLog tikv.ApplyBatchWait tikv.ApplyLog{ tikv.ApplyMutexLock tikv.ApplyWriteLeaderWait tikv.ApplyWriteWAL tikv.ApplyWriteMemtable } } }", + strings.Join([]string{ + "[00.000,00.080] tikv.GetSnapshot", + "[00.000,00.100] tikv.Wait", + "[00.100,00.600] tikv.Process", + "[00.000,01.000] tikv.RPC", + }, "\n"), + }, + { + &kvrpcpb.ExecDetailsV2{ + TimeDetail: &kvrpcpb.TimeDetail{ + TotalRpcWallTimeNs: uint64(time.Second), + }, + ScanDetailV2: &kvrpcpb.ScanDetailV2{ + GetSnapshotNanos: uint64(80 * time.Millisecond), + }, + WriteDetail: &kvrpcpb.WriteDetail{ + StoreBatchWaitNanos: uint64(10 * time.Millisecond), + ProposeSendWaitNanos: uint64(10 * time.Millisecond), + PersistLogNanos: uint64(100 * time.Millisecond), + RaftDbWriteLeaderWaitNanos: uint64(20 * time.Millisecond), + RaftDbSyncLogNanos: uint64(30 * time.Millisecond), + RaftDbWriteMemtableNanos: uint64(30 * time.Millisecond), + CommitLogNanos: uint64(200 * time.Millisecond), + ApplyBatchWaitNanos: uint64(20 * time.Millisecond), + ApplyLogNanos: uint64(300 * time.Millisecond), + ApplyMutexLockNanos: uint64(10 * time.Millisecond), + ApplyWriteLeaderWaitNanos: uint64(10 * time.Millisecond), + ApplyWriteWalNanos: uint64(80 * time.Millisecond), + ApplyWriteMemtableNanos: uint64(50 * time.Millisecond), + }, + }, + "tikv.RPC[1s]{ tikv.Wait{ tikv.GetSnapshot[80ms] } tikv.Process tikv.AsyncWrite{ tikv.StoreBatchWait[10ms] tikv.ProposeSendWait[10ms] tikv.PersistLog'[100ms]{ tikv.RaftDBWriteWait[20ms] tikv.RaftDBWriteWAL[30ms] tikv.RaftDBWriteMemtable[30ms] } tikv.CommitLog[200ms] tikv.ApplyBatchWait[20ms] tikv.ApplyLog[300ms]{ tikv.ApplyMutexLock[10ms] tikv.ApplyWriteLeaderWait[10ms] tikv.ApplyWriteWAL[80ms] tikv.ApplyWriteMemtable[50ms] } } }", + strings.Join([]string{ + "[00.000,00.080] tikv.GetSnapshot", + "[00.000,00.080] tikv.Wait", + "[00.080,00.090] tikv.StoreBatchWait", + "[00.090,00.100] tikv.ProposeSendWait", + "[00.100,00.120] tikv.RaftDBWriteWait", + "[00.120,00.150] tikv.RaftDBWriteWAL", + "[00.150,00.180] tikv.RaftDBWriteMemtable", + "[00.100,00.200] tikv.PersistLog'", + "[00.100,00.300] tikv.CommitLog", + "[00.300,00.320] tikv.ApplyBatchWait", + "[00.320,00.330] tikv.ApplyMutexLock", + "[00.330,00.340] tikv.ApplyWriteLeaderWait", + "[00.340,00.420] tikv.ApplyWriteWAL", + "[00.420,00.470] tikv.ApplyWriteMemtable", + "[00.320,00.620] tikv.ApplyLog", + "[00.080,00.620] tikv.AsyncWrite", + "[00.000,01.000] tikv.RPC", + }, "\n"), + }, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + info := buildSpanInfo(tt.details) + assert.Equal(t, tt.infoOut, info.String()) + tracer := mocktracer.New() + info.addTo(tracer.StartSpan("root"), baseTime) + assert.Equal(t, tt.traceOut, fmtMockTracer(tracer)) + }) + } +} diff --git a/internal/mockstore/mocktikv/cluster.go b/internal/mockstore/mocktikv/cluster.go index 9154f66a..e90f043d 100644 --- a/internal/mockstore/mocktikv/cluster.go +++ b/internal/mockstore/mocktikv/cluster.go @@ -42,7 +42,7 @@ import ( "sync" "time" - "github.com/golang/protobuf/proto" //nolint + "github.com/golang/protobuf/proto" //nolint:staticcheck "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/client-go/v2/internal/mockstore/cluster" diff --git a/util/execdetails.go b/util/execdetails.go index 6c6d78f3..02e9d42f 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -36,6 +36,7 @@ package util import ( "bytes" + "context" "math" "strconv" "sync" @@ -48,6 +49,7 @@ import ( type commitDetailCtxKeyType struct{} type lockKeysDetailCtxKeyType struct{} type execDetailsCtxKeyType struct{} +type traceExecDetailsCtxKeyType struct{} var ( // CommitDetailCtxKey presents CommitDetail info key in context. @@ -58,8 +60,21 @@ var ( // ExecDetailsKey presents ExecDetail info key in context. ExecDetailsKey = execDetailsCtxKeyType{} + + // traceExecDetailsKey is a context key whose value indicates whether to add ExecDetails to trace. + traceExecDetailsKey = traceExecDetailsCtxKeyType{} ) +// ContextWithTraceExecDetails returns a context with trace-exec-details enabled +func ContextWithTraceExecDetails(ctx context.Context) context.Context { + return context.WithValue(ctx, traceExecDetailsKey, struct{}{}) +} + +// TraceExecDetailsEnabled checks whether trace-exec-details enabled +func TraceExecDetailsEnabled(ctx context.Context) bool { + return ctx.Value(traceExecDetailsKey) != nil +} + // TiKVExecDetails is the detail execution information at TiKV side. type TiKVExecDetails struct { TimeDetail *TimeDetail