diff --git a/internal/client/client.go b/internal/client/client.go index 818ff6bd..1828fbde 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -743,3 +743,56 @@ func (m *rpcMetrics) get(cmd tikvrpc.CmdType, stale bool, internal bool) prometh } return lat.(prometheus.Observer) } + +type storeMetrics struct { + storeID uint64 + rpcLatHist *rpcMetrics + rpcSrcLatSum sync.Map + rpcNetLatExternal prometheus.Observer + rpcNetLatInternal prometheus.Observer +} + +func newStoreMetrics(storeID uint64) *storeMetrics { + store := strconv.FormatUint(storeID, 10) + m := &storeMetrics{ + storeID: storeID, + rpcLatHist: deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: store})), + rpcNetLatExternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "false"), + rpcNetLatInternal: metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(store, "true"), + } + return m +} + +func (m *storeMetrics) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) { + seconds := latency.Seconds() + stale := req.GetStaleRead() + source := req.GetRequestSource() + internal := util.IsInternalRequest(req.GetRequestSource()) + + m.rpcLatHist.get(req.Type, stale, internal).Observe(seconds) + + srcLatSum, ok := m.rpcSrcLatSum.Load(source) + if !ok { + srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqBySourceSummary.MustCurryWith( + prometheus.Labels{metrics.LblStore: strconv.FormatUint(m.storeID, 10), metrics.LblSource: source})) + m.rpcSrcLatSum.Store(source, srcLatSum) + } + srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds) + + if execDetail := resp.GetExecDetailsV2(); execDetail != nil { + var totalRpcWallTimeNs uint64 + if execDetail.TimeDetailV2 != nil { + totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs + } else if execDetail.TimeDetail != nil { + totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs + } + if totalRpcWallTimeNs > 0 { + lat := latency - time.Duration(totalRpcWallTimeNs) + if internal { + m.rpcNetLatInternal.Observe(lat.Seconds()) + } else { + m.rpcNetLatExternal.Observe(lat.Seconds()) + } + } + } +} diff --git a/internal/client/conn_pool.go b/internal/client/conn_pool.go index 2ac8f032..11c0a4b0 100644 --- a/internal/client/conn_pool.go +++ b/internal/client/conn_pool.go @@ -23,12 +23,9 @@ import ( grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" - "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/tikvrpc" - "github.com/tikv/client-go/v2/util" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" @@ -55,12 +52,7 @@ type connPool struct { monitor *connMonitor - metrics struct { - rpcLatHist *rpcMetrics - rpcSrcLatSum sync.Map - rpcNetLatExternal prometheus.Observer - rpcNetLatInternal prometheus.Observer - } + metrics atomic.Pointer[storeMetrics] } func newConnPool(maxSize uint, addr string, ver uint64, security config.Security, @@ -74,9 +66,6 @@ func newConnPool(maxSize uint, addr string, ver uint64, security config.Security dialTimeout: dialTimeout, monitor: m, } - a.metrics.rpcLatHist = deriveRPCMetrics(metrics.TiKVSendReqHistogram.MustCurryWith(prometheus.Labels{metrics.LblStore: addr})) - a.metrics.rpcNetLatExternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "false") - a.metrics.rpcNetLatInternal = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(addr, "true") if err := a.Init(addr, security, idleNotify, enableBatch, eventListener, opts...); err != nil { return nil, err } @@ -221,35 +210,15 @@ func (a *connPool) Close() { } func (a *connPool) updateRPCMetrics(req *tikvrpc.Request, resp *tikvrpc.Response, latency time.Duration) { - seconds := latency.Seconds() - stale := req.GetStaleRead() - source := req.GetRequestSource() - internal := util.IsInternalRequest(req.GetRequestSource()) - - a.metrics.rpcLatHist.get(req.Type, stale, internal).Observe(seconds) - - srcLatSum, ok := a.metrics.rpcSrcLatSum.Load(source) - if !ok { - srcLatSum = deriveRPCMetrics(metrics.TiKVSendReqSummary.MustCurryWith( - prometheus.Labels{metrics.LblStore: a.target, metrics.LblSource: source})) - a.metrics.rpcSrcLatSum.Store(source, srcLatSum) - } - srcLatSum.(*rpcMetrics).get(req.Type, stale, internal).Observe(seconds) - - if execDetail := resp.GetExecDetailsV2(); execDetail != nil { - var totalRpcWallTimeNs uint64 - if execDetail.TimeDetailV2 != nil { - totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs - } else if execDetail.TimeDetail != nil { - totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs - } - if totalRpcWallTimeNs > 0 { - lat := latency - time.Duration(totalRpcWallTimeNs) - if internal { - a.metrics.rpcNetLatInternal.Observe(lat.Seconds()) - } else { - a.metrics.rpcNetLatExternal.Observe(lat.Seconds()) - } - } + m := a.metrics.Load() + storeID := req.Context.GetPeer().GetStoreId() + if m == nil || m.storeID != storeID { + // The client selects a connPool by addr via RPCClient.getConnPool, so it's possible that the storeID of the + // selected connPool is not the same as the storeID in req.Context. We need to create a new storeMetrics for the + // new storeID. Note that connPool.metrics just works as a cache, the metric data is stored in corresponding + // MetricVec, so it's ok to overwrite it here. + m = newStoreMetrics(storeID) + a.metrics.Store(m) } + m.updateRPCMetrics(req, resp, latency) } diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index f05e87a7..931f7418 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -353,6 +353,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *router.Region) (*R if err != nil { return nil, err } + + if !exists { + updateStoreLivenessGauge(store) + } + // Filter out the peer on a tombstone or down store. if addr == "" || slices.ContainsFunc(pdRegion.DownPeers, func(dp *metapb.Peer) bool { return isSamePeer(dp, p) }) { continue @@ -749,47 +754,11 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { // cache GC is incompatible with cache refresh c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval) } - c.bg.schedule( - func(ctx context.Context, _ time.Time) bool { - refreshFullStoreList(ctx, c.stores) - return false - }, refreshStoreListInterval, - ) + updater := &storeCacheUpdater{stores: c.stores} + c.bg.schedule(updater.tick, refreshStoreListInterval) return c } -// Try to refresh full store list. Errors are ignored. -func refreshFullStoreList(ctx context.Context, stores storeCache) { - storeList, err := stores.fetchAllStores(ctx) - if err != nil { - logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err)) - return - } - for _, store := range storeList { - _, exist := stores.get(store.GetId()) - if exist { - continue - } - // GetAllStores is supposed to return only Up and Offline stores. - // This check is being defensive and to make it consistent with store resolve code. - if store == nil || store.GetState() == metapb.StoreState_Tombstone { - continue - } - addr := store.GetAddress() - if addr == "" { - continue - } - s := stores.getOrInsertDefault(store.GetId()) - // TODO: maybe refactor this, together with other places initializing Store - s.addr = addr - s.peerAddr = store.GetPeerAddress() - s.saddr = store.GetStatusAddress() - s.storeType = tikvrpc.GetStoreTypeByMeta(store) - s.labels = store.GetLabels() - s.changeResolveStateTo(unresolved, resolved) - } -} - // only used fot test. func newTestRegionCache() *RegionCache { c := &RegionCache{} @@ -2760,6 +2729,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV const cleanCacheInterval = time.Second const cleanRegionNumPerRound = 50 const refreshStoreListInterval = 10 * time.Second +const cleanStoreMetricsInterval = time.Minute // gcScanItemHook is only used for testing var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)]) diff --git a/internal/locate/store_cache.go b/internal/locate/store_cache.go index 259ab884..2cd24226 100644 --- a/internal/locate/store_cache.go +++ b/internal/locate/store_cache.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/logutil" @@ -34,6 +35,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "golang.org/x/sync/singleflight" "google.golang.org/grpc" @@ -56,7 +58,7 @@ type testingKnobs interface { type storeRegistry interface { fetchStore(ctx context.Context, id uint64) (*metapb.Store, error) - fetchAllStores(ctx context.Context) ([]*metapb.Store, error) + fetchAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) } type storeCache interface { @@ -122,8 +124,8 @@ func (c *storeCacheImpl) fetchStore(ctx context.Context, id uint64) (*metapb.Sto return c.pdClient.GetStore(ctx, id) } -func (c *storeCacheImpl) fetchAllStores(ctx context.Context) ([]*metapb.Store, error) { - return c.pdClient.GetAllStores(ctx) +func (c *storeCacheImpl) fetchAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { + return c.pdClient.GetAllStores(ctx, opts...) } func (c *storeCacheImpl) get(id uint64) (store *Store, exists bool) { @@ -386,6 +388,36 @@ func (s resolveState) String() string { } } +// initByStoreMeta initializes the store fields by the given store meta. It should be protected by `resolveMutex`. +func (s *Store) initByStoreMeta(store *metapb.Store) error { + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + // The store is a tombstone. + s.setResolveState(tombstone) + return nil + } + addr := store.GetAddress() + if addr == "" { + return errors.Errorf("empty store(%d) address", s.storeID) + } + s.addr = addr + s.peerAddr = store.GetPeerAddress() + s.saddr = store.GetStatusAddress() + s.storeType = tikvrpc.GetStoreTypeByMeta(store) + s.labels = store.GetLabels() + // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. + s.changeResolveStateTo(unresolved, resolved) + + return nil +} + +// initResolveLite likes initResolve but initializes the store by the given store meta directly. +func (s *Store) initResolveLite(store *metapb.Store) error { + s.resolveMutex.Lock() + err := s.initByStoreMeta(store) + s.resolveMutex.Unlock() + return err +} + // initResolve resolves the address of the store that never resolved and returns an // empty string if it's a tombstone. func (s *Store) initResolve(bo *retry.Backoffer, c storeCache) (addr string, err error) { @@ -419,22 +451,9 @@ func (s *Store) initResolve(bo *retry.Backoffer, c storeCache) (addr string, err } continue } - // The store is a tombstone. - if store == nil || store.GetState() == metapb.StoreState_Tombstone { - s.setResolveState(tombstone) - return "", nil + if err := s.initByStoreMeta(store); err != nil { + return "", err } - addr = store.GetAddress() - if addr == "" { - return "", errors.Errorf("empty store(%d) address", s.storeID) - } - s.addr = addr - s.peerAddr = store.GetPeerAddress() - s.saddr = store.GetStatusAddress() - s.storeType = tikvrpc.GetStoreTypeByMeta(store) - s.labels = store.GetLabels() - // Shouldn't have other one changing its state concurrently, but we still use changeResolveStateTo for safety. - s.changeResolveStateTo(unresolved, resolved) return s.addr, nil } } @@ -615,6 +634,7 @@ func (s *Store) requestLivenessAndStartHealthCheckLoopIfNeeded(bo *retry.Backoff // It may be already started by another thread. if atomic.CompareAndSwapUint32(&s.livenessState, uint32(reachable), uint32(liveness)) { + updateStoreLivenessGauge(s) s.unreachableSince = time.Now() reResolveInterval := storeReResolveInterval if val, err := util.EvalFailpoint("injectReResolveInterval"); err == nil { @@ -652,6 +672,7 @@ func startHealthCheckLoop(scheduler *bgRunner, c storeCache, s *Store, liveness liveness = requestLiveness(ctx, s, c) atomic.StoreUint32(&s.livenessState, uint32(liveness)) + updateStoreLivenessGauge(s) if liveness == reachable { logutil.BgLogger().Info("[health check] store became reachable", zap.Uint64("storeID", s.storeID)) return true @@ -1098,3 +1119,90 @@ func (s *Store) resetReplicaFlowsStats(destType replicaFlowsType) { func (s *Store) recordReplicaFlowsStats(destType replicaFlowsType) { atomic.AddUint64(&s.replicaFlowsStats[destType], 1) } + +func updateStoreLivenessGauge(store *Store) { + if store.storeType != tikvrpc.TiKV || store.getResolveState() != resolved { + return + } + metrics.TiKVStoreLivenessGauge.WithLabelValues(strconv.FormatUint(store.storeID, 10)).Set(float64(store.getLivenessState())) +} + +type storeCacheUpdater struct { + stores storeCache + + lastCleanUpTime time.Time + nextCleanUpStore uint64 +} + +func (u *storeCacheUpdater) tick(ctx context.Context, now time.Time) bool { + storeList, err := u.stores.fetchAllStores(ctx, opt.WithExcludeTombstone()) + if err != nil { + logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err)) + return false + } + u.insertMissingStores(ctx, storeList) + u.cleanUpStaleStoreMetrics(ctx, storeList, now) + return false +} + +// insertMissingStores adds the stores that are not in the cache. +func (u *storeCacheUpdater) insertMissingStores(ctx context.Context, storeList []*metapb.Store) { + for _, store := range storeList { + // storeList is supposed to contains only Up and Offline stores. + // This check is being defensive and to make it consistent with store resolve code. + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + continue + } + _, exist := u.stores.get(store.GetId()) + if exist { + continue + } + s := u.stores.getOrInsertDefault(store.GetId()) + if err := s.initResolveLite(store); err != nil { + logutil.Logger(ctx).Warn("init resolve store failed", zap.Uint64("storeID", store.GetId()), zap.Error(err)) + continue + } + updateStoreLivenessGauge(s) + } +} + +func (u *storeCacheUpdater) cleanUpStaleStoreMetrics(ctx context.Context, storeList []*metapb.Store, now time.Time) { + if now.Sub(u.lastCleanUpTime) < cleanStoreMetricsInterval { + return + } + u.lastCleanUpTime = now + + // find a stale store id + id := u.nextCleanUpStore + if id == 0 { + validStoreIDs := make(map[uint64]struct{}, len(storeList)) + for _, store := range storeList { + if store.GetId() != 0 && store.GetState() != metapb.StoreState_Tombstone { + validStoreIDs[store.GetId()] = struct{}{} + } + } + u.nextCleanUpStore = metrics.FindNextStaleStoreID(metrics.TiKVStoreLivenessGauge, validStoreIDs) + // delay cleaning up this store for one cleanStoreMetricsInterval + return + } else { + u.nextCleanUpStore = 0 + } + + // confirm that the store is indeed stale from pd + store, err := u.stores.fetchStore(ctx, id) + if err != nil && !isStoreNotFoundError(err) { + logutil.Logger(ctx).Info("cannot confirm store state", zap.Uint64("storeID", id), zap.Error(err)) + return + } + if store != nil && store.GetState() != metapb.StoreState_Tombstone { + logutil.Logger(ctx).Info("skip cleaning up metrics of a valid store", zap.Uint64("storeID", id), zap.String("state", store.GetState().String())) + return + } + + // clean up metrics which are associated with the stale store id + logutil.Logger(ctx).Info("clean up store metrics", zap.Uint64("storeID", id)) + filter := prometheus.Labels{metrics.LblStore: strconv.FormatUint(id, 10)} + for _, m := range metrics.GetStoreMetricVecList() { + m.DeletePartialMatch(filter) + } +} diff --git a/metrics/metrics.go b/metrics/metrics.go index e34d59ce..6e161d23 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -35,6 +35,9 @@ package metrics import ( + "strconv" + "sync/atomic" + "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" ) @@ -44,7 +47,7 @@ var ( TiKVTxnCmdHistogram *prometheus.HistogramVec TiKVBackoffHistogram *prometheus.HistogramVec TiKVSendReqHistogram *prometheus.HistogramVec - TiKVSendReqSummary *prometheus.SummaryVec + TiKVSendReqBySourceSummary *prometheus.SummaryVec TiKVRPCNetLatencyHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec @@ -104,6 +107,7 @@ var ( TiKVPrewriteAssertionUsageCounter *prometheus.CounterVec TiKVGrpcConnectionState *prometheus.GaugeVec TiKVAggressiveLockedKeysCounter *prometheus.CounterVec + TiKVStoreLivenessGauge *prometheus.GaugeVec TiKVStoreSlowScoreGauge *prometheus.GaugeVec TiKVFeedbackSlowScoreGauge *prometheus.GaugeVec TiKVHealthFeedbackOpsCounter *prometheus.CounterVec @@ -129,6 +133,7 @@ const ( LblType = "type" LblResult = "result" LblStore = "store" + LblTarget = "target" LblCommit = "commit" LblAbort = "abort" LblRollback = "rollback" @@ -149,7 +154,19 @@ const ( LblReason = "reason" ) +var storeMetricVecList atomic.Pointer[[]MetricVec] + +func GetStoreMetricVecList() []MetricVec { + lst := storeMetricVecList.Load() + if lst == nil { + return nil + } + return *lst +} + func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { + var storeMetrics []MetricVec + TiKVTxnCmdHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, @@ -179,8 +196,9 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.0005, 2, 24), // 0.5ms ~ 1.2h ConstLabels: constLabels, }, []string{LblType, LblStore, LblStaleRead, LblScope}) + storeMetrics = append(storeMetrics, TiKVSendReqHistogram) - TiKVSendReqSummary = prometheus.NewSummaryVec( + TiKVSendReqBySourceSummary = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Namespace: namespace, Subsystem: subsystem, @@ -188,6 +206,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Summary of sending request with multi dimensions.", ConstLabels: constLabels, }, []string{LblType, LblStore, LblStaleRead, LblScope, LblSource}) + storeMetrics = append(storeMetrics, TiKVSendReqBySourceSummary) TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -198,6 +217,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 0.1ms ~ 52s ConstLabels: constLabels, }, []string{LblStore, LblScope}) + storeMetrics = append(storeMetrics, TiKVRPCNetLatencyHistogram) TiKVLockResolverCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -216,6 +236,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Counter of region errors.", ConstLabels: constLabels, }, []string{LblType, LblStore}) + storeMetrics = append(storeMetrics, TiKVRegionErrorCounter) TiKVRPCErrorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -225,6 +246,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Counter of rpc errors.", ConstLabels: constLabels, }, []string{LblType, LblStore}) + storeMetrics = append(storeMetrics, TiKVRPCErrorCounter) TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -340,6 +362,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), // 0.5ms ~ 262s ConstLabels: constLabels, }, []string{LblStore}) + storeMetrics = append(storeMetrics, TiKVStatusDuration) TiKVStatusCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -358,7 +381,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.02, 2, 8), // 20ms ~ 2.56s Help: "batch send tail latency", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchSendLoopDuration = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -367,7 +390,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "batch_send_loop_duration_seconds", Help: "batch send loop duration breakdown by steps", ConstLabels: constLabels, - }, []string{LblStore, "step"}) + }, []string{LblTarget, "step"}) TiKVBatchRecvTailLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -377,7 +400,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(0.02, 2, 8), // 20ms ~ 2.56s Help: "batch recv tail latency", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchRecvLoopDuration = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -386,7 +409,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "batch_recv_loop_duration_seconds", Help: "batch recv loop duration breakdown by steps", ConstLabels: constLabels, - }, []string{LblStore, "step"}) + }, []string{LblTarget, "step"}) TiKVBatchHeadArrivalInterval = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -395,7 +418,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "batch_head_arrival_interval_seconds", Help: "arrival interval of the head request in batch", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchBestSize = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -404,7 +427,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "batch_best_size", Help: "best batch size estimated by the batch client", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchMoreRequests = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -413,7 +436,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Name: "batch_more_requests_total", Help: "number of requests batched by extra fetch", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchWaitOverLoad = prometheus.NewCounter( prometheus.CounterOpts{ @@ -432,7 +455,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(1, 2, 11), // 1 ~ 1024 Help: "number of requests pending in the batch channel", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchRequests = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -442,7 +465,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Buckets: prometheus.ExponentialBuckets(1, 2, 11), // 1 ~ 1024 Help: "number of requests in one batch", ConstLabels: constLabels, - }, []string{LblStore}) + }, []string{LblTarget}) TiKVBatchRequestDuration = prometheus.NewSummaryVec( prometheus.SummaryOpts{ @@ -585,6 +608,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "store token is up to the limit, probably because one of the stores is the hotspot or unavailable", ConstLabels: constLabels, }, []string{LblAddress, LblStore}) + storeMetrics = append(storeMetrics, TiKVStoreLimitErrorCounter) TiKVGRPCConnTransientFailureCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -594,6 +618,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Counter of gRPC connection transient failure", ConstLabels: constLabels, }, []string{LblAddress, LblStore}) + storeMetrics = append(storeMetrics, TiKVGRPCConnTransientFailureCounter) TiKVPanicCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -631,6 +656,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Counter of tikv safe_ts being updated.", ConstLabels: constLabels, }, []string{LblResult, LblStore}) + storeMetrics = append(storeMetrics, TiKVSafeTSUpdateCounter) TiKVMinSafeTSGapSeconds = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -640,6 +666,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "The minimal (non-zero) SafeTS gap for each store.", ConstLabels: constLabels, }, []string{LblStore}) + storeMetrics = append(storeMetrics, TiKVMinSafeTSGapSeconds) TiKVReplicaSelectorFailureCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -735,6 +762,16 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { ConstLabels: constLabels, }, []string{LblType}) + TiKVStoreLivenessGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "store_liveness_state", + Help: "Liveness state of each tikv", + ConstLabels: constLabels, + }, []string{LblStore}) + storeMetrics = append(storeMetrics, TiKVStoreLivenessGauge) + TiKVStoreSlowScoreGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -743,6 +780,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Slow scores of each tikv node based on RPC timecosts", ConstLabels: constLabels, }, []string{LblStore}) + storeMetrics = append(storeMetrics, TiKVStoreSlowScoreGauge) TiKVFeedbackSlowScoreGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -752,6 +790,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Slow scores of each tikv node that is calculated by TiKV and sent to the client by health feedback", ConstLabels: constLabels, }, []string{LblStore}) + storeMetrics = append(storeMetrics, TiKVFeedbackSlowScoreGauge) TiKVHealthFeedbackOpsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -770,6 +809,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { Help: "Counter of flows under PreferLeader mode.", ConstLabels: constLabels, }, []string{LblType, LblStore}) + storeMetrics = append(storeMetrics, TiKVPreferLeaderFlowsGauge) TiKVStaleReadCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -899,6 +939,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) { }, []string{LblType, LblResult}) initShortcuts() + storeMetricVecList.Store(&storeMetrics) } func init() { @@ -921,7 +962,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnCmdHistogram) prometheus.MustRegister(TiKVBackoffHistogram) prometheus.MustRegister(TiKVSendReqHistogram) - prometheus.MustRegister(TiKVSendReqSummary) + prometheus.MustRegister(TiKVSendReqBySourceSummary) prometheus.MustRegister(TiKVRPCNetLatencyHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) @@ -980,6 +1021,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVPrewriteAssertionUsageCounter) prometheus.MustRegister(TiKVGrpcConnectionState) prometheus.MustRegister(TiKVAggressiveLockedKeysCounter) + prometheus.MustRegister(TiKVStoreLivenessGauge) prometheus.MustRegister(TiKVStoreSlowScoreGauge) prometheus.MustRegister(TiKVFeedbackSlowScoreGauge) prometheus.MustRegister(TiKVHealthFeedbackOpsCounter) @@ -1054,3 +1096,45 @@ func ObserveReadSLI(readKeys uint64, readTime float64, readSize float64) { } } } + +type LabelPair = dto.LabelPair + +type MetricVec interface { + prometheus.Collector + DeletePartialMatch(labels prometheus.Labels) int +} + +// FindNextStaleStoreID finds a stale store ID which is not in validStoreIDs but may still be tracked by the collector. +func FindNextStaleStoreID(collector prometheus.Collector, validStoreIDs map[uint64]struct{}) uint64 { + if collector == nil { + return 0 + } + ch := make(chan prometheus.Metric, 8) + go func() { + collector.Collect(ch) + close(ch) + }() + var ( + data dto.Metric + id uint64 + ) + for m := range ch { + if id != 0 { + continue + } + if err := m.Write(&data); err != nil { + continue + } + var thisID uint64 + for _, l := range data.Label { + if l.GetName() == LblStore { + thisID, _ = strconv.ParseUint(l.GetValue(), 10, 64) + break + } + } + if _, ok := validStoreIDs[thisID]; !ok && thisID != 0 { + id = thisID + } + } + return id +}