mirror of https://github.com/tikv/client-go.git
add more log for diagnose (#915)
* add more log for diagnose Signed-off-by: crazycs520 <crazycs520@gmail.com> * fix Signed-off-by: crazycs520 <crazycs520@gmail.com> * add more log for diagnose Signed-off-by: crazycs520 <crazycs520@gmail.com> * add more log Signed-off-by: crazycs520 <crazycs520@gmail.com> * address comment Signed-off-by: crazycs520 <crazycs520@gmail.com> --------- Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
parent
76d6d93e98
commit
9f94221c2d
|
|
@ -2473,6 +2473,24 @@ const (
|
||||||
tombstone
|
tombstone
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// String implements fmt.Stringer interface.
|
||||||
|
func (s resolveState) String() string {
|
||||||
|
switch s {
|
||||||
|
case unresolved:
|
||||||
|
return "unresolved"
|
||||||
|
case resolved:
|
||||||
|
return "resolved"
|
||||||
|
case needCheck:
|
||||||
|
return "needCheck"
|
||||||
|
case deleted:
|
||||||
|
return "deleted"
|
||||||
|
case tombstone:
|
||||||
|
return "tombstone"
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("unknown-%v", uint64(s))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// IsTiFlash returns true if the storeType is TiFlash
|
// IsTiFlash returns true if the storeType is TiFlash
|
||||||
func (s *Store) IsTiFlash() bool {
|
func (s *Store) IsTiFlash() bool {
|
||||||
return s.storeType == tikvrpc.TiFlash
|
return s.storeType == tikvrpc.TiFlash
|
||||||
|
|
@ -2612,6 +2630,12 @@ func (s *Store) changeResolveStateTo(from, to resolveState) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
|
if atomic.CompareAndSwapUint64(&s.state, uint64(from), uint64(to)) {
|
||||||
|
logutil.BgLogger().Info("change store resolve state",
|
||||||
|
zap.Uint64("store", s.storeID),
|
||||||
|
zap.String("addr", s.addr),
|
||||||
|
zap.String("from", from.String()),
|
||||||
|
zap.String("to", to.String()),
|
||||||
|
zap.String("liveness-state", s.getLivenessState().String()))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -2712,6 +2736,20 @@ const (
|
||||||
unknown
|
unknown
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// String implements fmt.Stringer interface.
|
||||||
|
func (s livenessState) String() string {
|
||||||
|
switch s {
|
||||||
|
case unreachable:
|
||||||
|
return "unreachable"
|
||||||
|
case reachable:
|
||||||
|
return "reachable"
|
||||||
|
case unknown:
|
||||||
|
return "unknown"
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("unknown-%v", uint32(s))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
|
func (s *Store) startHealthCheckLoopIfNeeded(c *RegionCache, liveness livenessState) {
|
||||||
// This mechanism doesn't support non-TiKV stores currently.
|
// This mechanism doesn't support non-TiKV stores currently.
|
||||||
if s.storeType != tikvrpc.TiKV {
|
if s.storeType != tikvrpc.TiKV {
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@
|
||||||
package locate
|
package locate
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
|
@ -608,14 +609,15 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
|
||||||
}
|
}
|
||||||
// If there is no candidate, fallback to the leader.
|
// If there is no candidate, fallback to the leader.
|
||||||
if selector.targetIdx < 0 {
|
if selector.targetIdx < 0 {
|
||||||
if len(state.option.labels) > 0 {
|
|
||||||
logutil.BgLogger().Warn(
|
|
||||||
"unable to find stores with given labels",
|
|
||||||
zap.Any("labels", state.option.labels),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
leader := selector.replicas[state.leaderIdx]
|
leader := selector.replicas[state.leaderIdx]
|
||||||
if leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1)) {
|
leaderInvalid := leader.isEpochStale() || (!state.option.leaderOnly && leader.isExhausted(1))
|
||||||
|
if len(state.option.labels) > 0 {
|
||||||
|
logutil.Logger(bo.GetCtx()).Warn("unable to find stores with given labels",
|
||||||
|
zap.Uint64("region", selector.region.GetID()),
|
||||||
|
zap.Bool("leader-invalid", leaderInvalid),
|
||||||
|
zap.Any("labels", state.option.labels))
|
||||||
|
}
|
||||||
|
if leaderInvalid {
|
||||||
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
|
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
|
||||||
selector.invalidateRegion()
|
selector.invalidateRegion()
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
@ -1189,6 +1191,7 @@ func (s *RegionRequestSender) SendReqCtx(
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
totalErrors := make(map[string]int)
|
||||||
for {
|
for {
|
||||||
if retryTimes > 0 {
|
if retryTimes > 0 {
|
||||||
req.IsRetryRequest = true
|
req.IsRetryRequest = true
|
||||||
|
|
@ -1221,10 +1224,7 @@ func (s *RegionRequestSender) SendReqCtx(
|
||||||
|
|
||||||
// TODO: Change the returned error to something like "region missing in cache",
|
// TODO: Change the returned error to something like "region missing in cache",
|
||||||
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
|
// and handle this error like EpochNotMatch, which means to re-split the request and retry.
|
||||||
logutil.Logger(bo.GetCtx()).Debug(
|
s.logSendReqError(bo, "throwing pseudo region error due to no replica available", regionID, retryTimes, req, totalErrors)
|
||||||
"throwing pseudo region error due to region not found in cache",
|
|
||||||
zap.Stringer("region", ®ionID),
|
|
||||||
)
|
|
||||||
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
|
resp, err = tikvrpc.GenRegionErrorResp(req, &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}})
|
||||||
return resp, nil, retryTimes, err
|
return resp, nil, retryTimes, err
|
||||||
}
|
}
|
||||||
|
|
@ -1250,6 +1250,8 @@ func (s *RegionRequestSender) SendReqCtx(
|
||||||
var retry bool
|
var retry bool
|
||||||
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
|
resp, retry, err = s.sendReqToRegion(bo, rpcCtx, req, timeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
msg := fmt.Sprintf("send request failed, err: %v", err.Error())
|
||||||
|
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
|
||||||
return nil, nil, retryTimes, err
|
return nil, nil, retryTimes, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1281,14 +1283,19 @@ func (s *RegionRequestSender) SendReqCtx(
|
||||||
return nil, nil, retryTimes, err
|
return nil, nil, retryTimes, err
|
||||||
}
|
}
|
||||||
if regionErr != nil {
|
if regionErr != nil {
|
||||||
|
regionErrLabel := regionErrorToLabel(regionErr)
|
||||||
|
totalErrors[regionErrLabel]++
|
||||||
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
|
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
msg := fmt.Sprintf("send request on region error failed, err: %v", err.Error())
|
||||||
|
s.logSendReqError(bo, msg, regionID, retryTimes, req, totalErrors)
|
||||||
return nil, nil, retryTimes, err
|
return nil, nil, retryTimes, err
|
||||||
}
|
}
|
||||||
if retry {
|
if retry {
|
||||||
retryTimes++
|
retryTimes++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
s.logSendReqError(bo, "send request meet region error without retry", regionID, retryTimes, req, totalErrors)
|
||||||
} else {
|
} else {
|
||||||
if s.replicaSelector != nil {
|
if s.replicaSelector != nil {
|
||||||
s.replicaSelector.onSendSuccess()
|
s.replicaSelector.onSendSuccess()
|
||||||
|
|
@ -1301,6 +1308,75 @@ func (s *RegionRequestSender) SendReqCtx(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *RegionRequestSender) logSendReqError(bo *retry.Backoffer, msg string, regionID RegionVerID, retryTimes int, req *tikvrpc.Request, totalErrors map[string]int) {
|
||||||
|
var replicaStatus []string
|
||||||
|
replicaSelectorState := "nil"
|
||||||
|
cacheRegionIsValid := "unknown"
|
||||||
|
if s.replicaSelector != nil {
|
||||||
|
switch s.replicaSelector.state.(type) {
|
||||||
|
case *accessKnownLeader:
|
||||||
|
replicaSelectorState = "accessKnownLeader"
|
||||||
|
case *accessFollower:
|
||||||
|
replicaSelectorState = "accessFollower"
|
||||||
|
case *accessByKnownProxy:
|
||||||
|
replicaSelectorState = "accessByKnownProxy"
|
||||||
|
case *tryFollower:
|
||||||
|
replicaSelectorState = "tryFollower"
|
||||||
|
case *tryNewProxy:
|
||||||
|
replicaSelectorState = "tryNewProxy"
|
||||||
|
case *invalidLeader:
|
||||||
|
replicaSelectorState = "invalidLeader"
|
||||||
|
case *invalidStore:
|
||||||
|
replicaSelectorState = "invalidStore"
|
||||||
|
case *stateBase:
|
||||||
|
replicaSelectorState = "stateBase"
|
||||||
|
case nil:
|
||||||
|
replicaSelectorState = "nil"
|
||||||
|
}
|
||||||
|
if s.replicaSelector.region != nil {
|
||||||
|
if s.replicaSelector.region.isValid() {
|
||||||
|
cacheRegionIsValid = "true"
|
||||||
|
} else {
|
||||||
|
cacheRegionIsValid = "false"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, replica := range s.replicaSelector.replicas {
|
||||||
|
replicaStatus = append(replicaStatus, fmt.Sprintf("peer: %v, store: %v, isEpochStale: %v, attempts: %v, replica-epoch: %v, store-epoch: %v, store-state: %v, store-liveness-state: %v",
|
||||||
|
replica.peer.GetId(),
|
||||||
|
replica.store.storeID,
|
||||||
|
replica.isEpochStale(),
|
||||||
|
replica.attempts,
|
||||||
|
replica.epoch,
|
||||||
|
atomic.LoadUint32(&replica.store.epoch),
|
||||||
|
replica.store.getResolveState(),
|
||||||
|
replica.store.getLivenessState(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var totalErrorStr bytes.Buffer
|
||||||
|
for err, cnt := range totalErrors {
|
||||||
|
if totalErrorStr.Len() > 0 {
|
||||||
|
totalErrorStr.WriteString(", ")
|
||||||
|
}
|
||||||
|
totalErrorStr.WriteString(err)
|
||||||
|
totalErrorStr.WriteString(":")
|
||||||
|
totalErrorStr.WriteString(strconv.Itoa(cnt))
|
||||||
|
}
|
||||||
|
logutil.Logger(bo.GetCtx()).Info(msg,
|
||||||
|
zap.Uint64("req-ts", req.GetStartTS()),
|
||||||
|
zap.String("req-type", req.Type.String()),
|
||||||
|
zap.String("region", regionID.String()),
|
||||||
|
zap.String("region-is-valid", cacheRegionIsValid),
|
||||||
|
zap.Int("retry-times", retryTimes),
|
||||||
|
zap.String("replica-read-type", req.ReplicaReadType.String()),
|
||||||
|
zap.String("replica-selector-state", replicaSelectorState),
|
||||||
|
zap.Bool("stale-read", req.StaleRead),
|
||||||
|
zap.String("replica-status", strings.Join(replicaStatus, "; ")),
|
||||||
|
zap.Int("total-backoff-ms", bo.GetTotalSleep()),
|
||||||
|
zap.Int("total-backoff-times", bo.GetTotalBackoffTimes()),
|
||||||
|
zap.String("total-region-errors", totalErrorStr.String()))
|
||||||
|
}
|
||||||
|
|
||||||
// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
|
// RPCCancellerCtxKey is context key attach rpc send cancelFunc collector to ctx.
|
||||||
type RPCCancellerCtxKey struct{}
|
type RPCCancellerCtxKey struct{}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,9 +35,11 @@
|
||||||
package retry
|
package retry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -150,6 +152,18 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
|
||||||
errMsg += "\n" + err.Error()
|
errMsg += "\n" + err.Error()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var backoffDetail bytes.Buffer
|
||||||
|
totalTimes := 0
|
||||||
|
for name, times := range b.backoffTimes {
|
||||||
|
totalTimes += times
|
||||||
|
if backoffDetail.Len() > 0 {
|
||||||
|
backoffDetail.WriteString(", ")
|
||||||
|
}
|
||||||
|
backoffDetail.WriteString(name)
|
||||||
|
backoffDetail.WriteString(":")
|
||||||
|
backoffDetail.WriteString(strconv.Itoa(times))
|
||||||
|
}
|
||||||
|
errMsg += fmt.Sprintf("\ntotal-backoff-times: %v, backoff-detail: %v", totalTimes, backoffDetail.String())
|
||||||
returnedErr := err
|
returnedErr := err
|
||||||
if longestSleepCfg != nil {
|
if longestSleepCfg != nil {
|
||||||
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
|
errMsg += fmt.Sprintf("\nlongest sleep type: %s, time: %dms", longestSleepCfg.String(), longestSleepTime)
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,8 @@
|
||||||
package kv
|
package kv
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -72,3 +74,21 @@ const (
|
||||||
func (r ReplicaReadType) IsFollowerRead() bool {
|
func (r ReplicaReadType) IsFollowerRead() bool {
|
||||||
return r != ReplicaReadLeader
|
return r != ReplicaReadLeader
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String implements fmt.Stringer interface.
|
||||||
|
func (r ReplicaReadType) String() string {
|
||||||
|
switch r {
|
||||||
|
case ReplicaReadLeader:
|
||||||
|
return "leader"
|
||||||
|
case ReplicaReadFollower:
|
||||||
|
return "follower"
|
||||||
|
case ReplicaReadMixed:
|
||||||
|
return "mixed"
|
||||||
|
case ReplicaReadLearner:
|
||||||
|
return "learner"
|
||||||
|
case ReplicaReadPreferLeader:
|
||||||
|
return "prefer-leader"
|
||||||
|
default:
|
||||||
|
return fmt.Sprintf("unknown-%v", byte(r))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue