client-go/oracle/oracles/pd.go

787 lines
31 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/oracle/oracles/pd.go
//
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package oracles
import (
"context"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/tso"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)
var _ oracle.Oracle = &pdOracle{}
const slowDist = 30 * time.Millisecond
type adaptiveUpdateTSIntervalState int
var EnableTSValidation atomic.Bool
const (
adaptiveUpdateTSIntervalStateNone adaptiveUpdateTSIntervalState = iota
// adaptiveUpdateTSIntervalStateNormal represents the state that the adaptive update ts interval is synced with the
// configuration without performing any automatic adjustment.
adaptiveUpdateTSIntervalStateNormal
// adaptiveUpdateTSIntervalStateAdapting represents the state that as there are recently some stale read / snapshot
// read operations requesting a short staleness (now - readTS is nearly or exceeds the current update interval),
// so that we automatically shrink the update interval. Otherwise, read operations may don't have low resolution ts
// that is new enough for checking the legality of the read ts, causing them have to fetch the latest ts from PD,
// which is time-consuming.
adaptiveUpdateTSIntervalStateAdapting
// adaptiveUpdateTSIntervalStateRecovering represents the state that the update ts interval have once been shrunk,
// to adapt to reads with short staleness, but there isn't any such read operations for a while, so that we
// gradually recover the update interval to the configured value.
adaptiveUpdateTSIntervalStateRecovering
// adaptiveUpdateTSIntervalStateUnadjustable represents the state that the user has configured a very short update
// interval, so that we don't have any space to automatically adjust it.
adaptiveUpdateTSIntervalStateUnadjustable
)
func (s adaptiveUpdateTSIntervalState) String() string {
switch s {
case adaptiveUpdateTSIntervalStateNormal:
return "normal"
case adaptiveUpdateTSIntervalStateAdapting:
return "adapting"
case adaptiveUpdateTSIntervalStateRecovering:
return "recovering"
case adaptiveUpdateTSIntervalStateUnadjustable:
return "unadjustable"
default:
return fmt.Sprintf("unknown(%v)", int(s))
}
}
const (
// minAllowedAdaptiveUpdateTSInterval is the lower bound of the adaptive update ts interval for avoiding an abnormal
// read operation causing the update interval to be too short.
minAllowedAdaptiveUpdateTSInterval = 500 * time.Millisecond
// adaptiveUpdateTSIntervalShrinkingPreserve is the duration that we additionally shrinks when adapting to a read
// operation that requires a short staleness.
adaptiveUpdateTSIntervalShrinkingPreserve = 100 * time.Millisecond
// adaptiveUpdateTSIntervalBlockRecoverThreshold is the threshold of the difference between the current update
// interval and the staleness the read operation request to prevent the update interval from recovering back to
// normal.
adaptiveUpdateTSIntervalBlockRecoverThreshold = 200 * time.Millisecond
// adaptiveUpdateTSIntervalRecoverPerSecond is the duration that the update interval should grow per second when
// recovering to normal state from adapting state.
adaptiveUpdateTSIntervalRecoverPerSecond = 20 * time.Millisecond
// adaptiveUpdateTSIntervalDelayBeforeRecovering is the duration that we should hold the current adaptive update
// interval before turning back to normal state.
adaptiveUpdateTSIntervalDelayBeforeRecovering = 5 * time.Minute
)
// pdOracle is an Oracle that uses a placement driver client as source.
type pdOracle struct {
c pd.Client
// txn_scope (string) -> lastTSPointer (*atomic.Pointer[lastTSO])
lastTSMap sync.Map
quit chan struct{}
// The configured interval to update the low resolution ts. Set by SetLowResolutionTimestampUpdateInterval.
// For TiDB, this is directly controlled by the system variable `tidb_low_resolution_tso_update_interval`.
lastTSUpdateInterval atomic.Int64
// The actual interval to update the low resolution ts. If the configured one is too large to satisfy the
// requirement of the stale read or snapshot read, the actual interval can be automatically set to a shorter
// value than lastTSUpdateInterval.
// This value is also possible to be updated by SetLowResolutionTimestampUpdateInterval, which may happen when
// user adjusting the update interval manually.
adaptiveLastTSUpdateInterval atomic.Int64
adaptiveUpdateIntervalState struct {
// The mutex to avoid racing between updateTS goroutine and SetLowResolutionTimestampUpdateInterval.
mu sync.Mutex
// The most recent time that a stale read / snapshot read requests a timestamp that is close enough to
// the current adaptive update interval. If there is such a request recently, the adaptive interval
// should avoid falling back to the original (configured) value.
// Stored in unix microseconds to make it able to be accessed atomically.
lastShortStalenessReadTime atomic.Int64
// When someone requests need shrinking the update interval immediately, it sends the duration it expects to
// this channel.
shrinkIntervalCh chan time.Duration
// Only accessed in updateTS goroutine. No need to use atomic value.
lastTick time.Time
// Represents a description about the current state.
state adaptiveUpdateTSIntervalState
}
// When the low resolution ts is not new enough and there are many concurrent stane read / snapshot read
// operations that needs to validate the read ts, we can use this to avoid too many concurrent GetTS calls by
// reusing a result for different `ValidateReadTS` calls. This can be done because that
// we don't require the ts for validation to be strictly the latest one.
// Note that the result can't be reused for different txnScopes. The txnScope is used as the key.
tsForValidation singleflight.Group
}
// lastTSO stores the last timestamp oracle gets from PD server and the local time when the TSO is fetched.
type lastTSO struct {
tso uint64
arrival time.Time
}
type PDOracleOptions struct {
// The duration to update the last ts, i.e., the low resolution ts.
UpdateInterval time.Duration
// Disable the background periodic update of the last ts. This is for test purposes only.
NoUpdateTS bool
}
// NewPdOracle create an Oracle that uses a pd client source.
// Refer https://github.com/tikv/pd/blob/master/client/client.go for more details.
// PdOracle maintains `lastTS` to store the last timestamp got from PD server. If
// `GetTimestamp()` is not called after `lastTSUpdateInterval`, it will be called by
// itself to keep up with the timestamp on PD server.
func NewPdOracle(pdClient pd.Client, options *PDOracleOptions) (oracle.Oracle, error) {
if options.UpdateInterval <= 0 {
return nil, fmt.Errorf("updateInterval must be > 0")
}
o := &pdOracle{
c: pdClient.WithCallerComponent("oracle"),
quit: make(chan struct{}),
lastTSUpdateInterval: atomic.Int64{},
}
o.adaptiveUpdateIntervalState.shrinkIntervalCh = make(chan time.Duration, 1)
o.lastTSUpdateInterval.Store(int64(options.UpdateInterval))
o.adaptiveLastTSUpdateInterval.Store(int64(options.UpdateInterval))
o.adaptiveUpdateIntervalState.lastTick = time.Now()
ctx := context.TODO()
if !options.NoUpdateTS {
go o.updateTS(ctx)
}
// Initialize the timestamp of the global txnScope by Get.
_, err := o.GetTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
if err != nil {
o.Close()
return nil, err
}
return o, nil
}
// IsExpired returns whether lockTS+TTL is expired, both are ms. It uses `lastTS`
// to compare, may return false negative result temporarily.
func (o *pdOracle) IsExpired(lockTS, TTL uint64, opt *oracle.Option) bool {
lastTS, exist := o.getLastTS(opt.TxnScope)
if !exist {
return true
}
return oracle.ExtractPhysical(lastTS) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
}
// GetTimestamp gets a new increasing time.
func (o *pdOracle) GetTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) {
ts, err := o.getTimestamp(ctx, opt.TxnScope)
if err != nil {
return 0, err
}
o.setLastTS(ts, opt.TxnScope)
return ts, nil
}
// GetAllTSOKeyspaceGroupMinTS gets a minimum timestamp from all TSO keyspace groups.
func (o *pdOracle) GetAllTSOKeyspaceGroupMinTS(ctx context.Context) (uint64, error) {
return o.getMinTimestampInAllTSOGroup(ctx)
}
type tsFuture struct {
tso.TSFuture
o *pdOracle
txnScope string
}
// Wait implements the oracle.Future interface.
func (f *tsFuture) Wait() (uint64, error) {
now := time.Now()
physical, logical, err := f.TSFuture.Wait()
metrics.TiKVTSFutureWaitDuration.Observe(time.Since(now).Seconds())
if err != nil {
return 0, errors.WithStack(err)
}
ts := oracle.ComposeTS(physical, logical)
f.o.setLastTS(ts, f.txnScope)
return ts, nil
}
func (o *pdOracle) GetTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future {
return &tsFuture{o.c.GetTSAsync(ctx), o, opt.TxnScope}
}
func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, error) {
now := time.Now()
physical, logical, err := o.c.GetTS(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
dist := time.Since(now)
if dist > slowDist {
logutil.Logger(ctx).Warn("get timestamp too slow",
zap.Duration("cost time", dist))
}
return oracle.ComposeTS(physical, logical), nil
}
func (o *pdOracle) getMinTimestampInAllTSOGroup(ctx context.Context) (uint64, error) {
now := time.Now()
physical, logical, err := o.c.GetMinTS(ctx)
if err != nil {
return 0, errors.WithStack(err)
}
dist := time.Since(now)
if dist > slowDist {
logutil.Logger(ctx).Warn("get minimum timestamp too slow",
zap.Duration("cost time", dist))
}
return oracle.ComposeTS(physical, logical), nil
}
func (o *pdOracle) setLastTS(ts uint64, txnScope string) {
if txnScope == "" {
txnScope = oracle.GlobalTxnScope
}
current := &lastTSO{
tso: ts,
arrival: time.Now(),
}
lastTSInterface, ok := o.lastTSMap.Load(txnScope)
if !ok {
pointer := &atomic.Pointer[lastTSO]{}
pointer.Store(current)
// do not handle the stored case, because it only runs once.
lastTSInterface, _ = o.lastTSMap.LoadOrStore(txnScope, pointer)
}
lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO])
for {
last := lastTSPointer.Load()
if current.tso <= last.tso {
return
}
if last.arrival.After(current.arrival) {
current.arrival = last.arrival
}
if lastTSPointer.CompareAndSwap(last, current) {
return
}
}
}
func (o *pdOracle) getLastTS(txnScope string) (uint64, bool) {
last, exist := o.getLastTSWithArrivalTS(txnScope)
if !exist {
return 0, false
}
return last.tso, true
}
func (o *pdOracle) getLastTSWithArrivalTS(txnScope string) (*lastTSO, bool) {
if txnScope == "" {
txnScope = oracle.GlobalTxnScope
}
lastTSInterface, ok := o.lastTSMap.Load(txnScope)
if !ok {
return nil, false
}
lastTSPointer := lastTSInterface.(*atomic.Pointer[lastTSO])
last := lastTSPointer.Load()
if last == nil {
return nil, false
}
return last, true
}
func (o *pdOracle) nextUpdateInterval(now time.Time, requiredStaleness time.Duration) time.Duration {
o.adaptiveUpdateIntervalState.mu.Lock()
defer o.adaptiveUpdateIntervalState.mu.Unlock()
configuredInterval := time.Duration(o.lastTSUpdateInterval.Load())
prevAdaptiveUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load())
lastReachDropThresholdTime := time.UnixMilli(o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load())
currentAdaptiveUpdateInterval := prevAdaptiveUpdateInterval
// Shortcut
const none = adaptiveUpdateTSIntervalStateNone
// The following `checkX` functions checks whether it should transit to the X state. Returns
// a tuple representing (state, newInterval).
// When `checkX` returns a valid state, it means that the current situation matches the state. In this case, it
// also returns the new interval that should be used next.
// When it returns `none`, we need to check if it should transit to other states. For each call to
// nextUpdateInterval, if all attempts to `checkX` function returns false, it keeps the previous state unchanged.
checkUnadjustable := func() (adaptiveUpdateTSIntervalState, time.Duration) {
// If the user has configured a very short interval, we don't have any space to adjust it. Just use
// the user's configured value directly.
if configuredInterval <= minAllowedAdaptiveUpdateTSInterval {
return adaptiveUpdateTSIntervalStateUnadjustable, configuredInterval
}
return none, 0
}
checkNormal := func() (adaptiveUpdateTSIntervalState, time.Duration) {
// If the current actual update interval is synced with the configured value, and it's not unadjustable state,
// then it's the normal state.
if configuredInterval > minAllowedAdaptiveUpdateTSInterval && currentAdaptiveUpdateInterval == configuredInterval {
return adaptiveUpdateTSIntervalStateNormal, currentAdaptiveUpdateInterval
}
return none, 0
}
checkAdapting := func() (adaptiveUpdateTSIntervalState, time.Duration) {
if requiredStaleness != 0 && requiredStaleness < currentAdaptiveUpdateInterval && currentAdaptiveUpdateInterval > minAllowedAdaptiveUpdateTSInterval {
// If we are calculating the interval because of a request that requires a shorter staleness, we shrink the
// update interval immediately to adapt to it.
// We shrink the update interval to a value slightly lower than the requested staleness to avoid potential
// frequent shrinking operations. But there's a lower bound to prevent loading ts too frequently.
newInterval := max(requiredStaleness-adaptiveUpdateTSIntervalShrinkingPreserve, minAllowedAdaptiveUpdateTSInterval)
return adaptiveUpdateTSIntervalStateAdapting, newInterval
}
if currentAdaptiveUpdateInterval != configuredInterval && now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering {
// There is a recent request that requires a short staleness. Keep the current adaptive interval.
// If it's not adapting state, it's possible that it's previously in recovering state, and it stops recovering
// as there is a new read operation requesting a short staleness.
return adaptiveUpdateTSIntervalStateAdapting, currentAdaptiveUpdateInterval
}
return none, 0
}
checkRecovering := func() (adaptiveUpdateTSIntervalState, time.Duration) {
if currentAdaptiveUpdateInterval == configuredInterval || now.Sub(lastReachDropThresholdTime) < adaptiveUpdateTSIntervalDelayBeforeRecovering {
return none, 0
}
timeSinceLastTick := now.Sub(o.adaptiveUpdateIntervalState.lastTick)
newInterval := currentAdaptiveUpdateInterval + time.Duration(timeSinceLastTick.Seconds()*float64(adaptiveUpdateTSIntervalRecoverPerSecond))
if newInterval > configuredInterval {
newInterval = configuredInterval
}
return adaptiveUpdateTSIntervalStateRecovering, newInterval
}
// Check the specified states in order, until the state becomes determined.
// If it's still undetermined after all checks, keep the previous state.
nextState := func(checkFuncs ...func() (adaptiveUpdateTSIntervalState, time.Duration)) time.Duration {
for _, f := range checkFuncs {
state, newInterval := f()
if state == none {
continue
}
currentAdaptiveUpdateInterval = newInterval
// If the final state is the recovering state, do an additional step to check whether it can go back to
// normal state immediately.
if state == adaptiveUpdateTSIntervalStateRecovering {
var nextState adaptiveUpdateTSIntervalState
nextState, newInterval = checkNormal()
if nextState != none {
state = nextState
currentAdaptiveUpdateInterval = newInterval
}
}
o.adaptiveLastTSUpdateInterval.Store(int64(currentAdaptiveUpdateInterval))
if o.adaptiveUpdateIntervalState.state != state {
logutil.BgLogger().Info("adaptive update ts interval state transition",
zap.Duration("configuredInterval", configuredInterval),
zap.Duration("prevAdaptiveUpdateInterval", prevAdaptiveUpdateInterval),
zap.Duration("newAdaptiveUpdateInterval", currentAdaptiveUpdateInterval),
zap.Duration("requiredStaleness", requiredStaleness),
zap.Stringer("prevState", o.adaptiveUpdateIntervalState.state),
zap.Stringer("newState", state))
o.adaptiveUpdateIntervalState.state = state
}
return currentAdaptiveUpdateInterval
}
return currentAdaptiveUpdateInterval
}
var newInterval time.Duration
if requiredStaleness != 0 {
newInterval = nextState(checkUnadjustable, checkAdapting)
} else {
newInterval = nextState(checkUnadjustable, checkAdapting, checkNormal, checkRecovering)
}
metrics.TiKVLowResolutionTSOUpdateIntervalSecondsGauge.Set(newInterval.Seconds())
return newInterval
}
func (o *pdOracle) updateTS(ctx context.Context) {
currentInterval := time.Duration(o.lastTSUpdateInterval.Load())
ticker := time.NewTicker(currentInterval)
defer ticker.Stop()
// Note that as `doUpdate` updates last tick time while `nextUpdateInterval` may perform calculation depending on the
// last tick time, `doUpdate` should be called after finishing calculating the next interval.
doUpdate := func(now time.Time) {
// Update the timestamp for each txnScope
o.lastTSMap.Range(func(key, _ interface{}) bool {
txnScope := key.(string)
ts, err := o.getTimestamp(ctx, txnScope)
if err != nil {
logutil.Logger(ctx).Error("updateTS error", zap.String("txnScope", txnScope), zap.Error(err))
return true
}
o.setLastTS(ts, txnScope)
return true
})
o.adaptiveUpdateIntervalState.lastTick = now
}
for {
select {
case now := <-ticker.C:
// nextUpdateInterval has calculation that depends on the time of the last tick. Calculate next interval
// before `doUpdate` as `doUpdate` is responsible for updating the time of the last tick.
newInterval := o.nextUpdateInterval(now, 0)
doUpdate(now)
if newInterval != currentInterval {
currentInterval = newInterval
ticker.Reset(currentInterval)
}
case requiredStaleness := <-o.adaptiveUpdateIntervalState.shrinkIntervalCh:
now := time.Now()
newInterval := o.nextUpdateInterval(now, requiredStaleness)
if newInterval != currentInterval {
currentInterval = newInterval
if time.Since(o.adaptiveUpdateIntervalState.lastTick) >= currentInterval {
doUpdate(time.Now())
}
ticker.Reset(currentInterval)
}
case <-o.quit:
return
}
}
}
// UntilExpired implement oracle.Oracle interface.
func (o *pdOracle) UntilExpired(lockTS uint64, TTL uint64, opt *oracle.Option) int64 {
lastTS, ok := o.getLastTS(opt.TxnScope)
if !ok {
return 0
}
return oracle.ExtractPhysical(lockTS) + int64(TTL) - oracle.ExtractPhysical(lastTS)
}
func (o *pdOracle) Close() {
close(o.quit)
}
// A future that resolves immediately to a low resolution timestamp.
type lowResolutionTsFuture struct {
ts uint64
err error
}
// Wait implements the oracle.Future interface.
func (f lowResolutionTsFuture) Wait() (uint64, error) {
return f.ts, f.err
}
// SetLowResolutionTimestampUpdateInterval sets the refresh interval for low resolution timestamps. Note this will take
// effect up to the previous update interval amount of time after being called.
// This setting may not be strictly followed. If Stale Read requests too new data to be available, the low resolution
// ts may be actually updated in a shorter interval than the configured one.
func (o *pdOracle) SetLowResolutionTimestampUpdateInterval(newUpdateInterval time.Duration) error {
if newUpdateInterval <= 0 {
return fmt.Errorf("updateInterval must be > 0")
}
o.adaptiveUpdateIntervalState.mu.Lock()
defer o.adaptiveUpdateIntervalState.mu.Unlock()
prevConfigured := o.lastTSUpdateInterval.Swap(int64(newUpdateInterval))
adaptiveUpdateInterval := o.adaptiveLastTSUpdateInterval.Load()
if newUpdateInterval == time.Duration(prevConfigured) {
// The config is unchanged. Do nothing.
return nil
}
var adaptiveUpdateIntervalUpdated bool
if adaptiveUpdateInterval == prevConfigured || newUpdateInterval < time.Duration(adaptiveUpdateInterval) {
// If the adaptive update interval is the same as the configured one, treat it as the adaptive adjusting
// mechanism not taking effect. So update it immediately.
// If the new configured interval is short so that it's smaller than the current adaptive interval, also shrink
// the adaptive interval immediately.
o.adaptiveLastTSUpdateInterval.Store(int64(newUpdateInterval))
adaptiveUpdateIntervalUpdated = true
}
logutil.Logger(context.Background()).Info("updated low resolution ts update interval",
zap.Duration("previous", time.Duration(prevConfigured)),
zap.Duration("new", newUpdateInterval),
zap.Duration("prevAdaptiveUpdateInterval", time.Duration(adaptiveUpdateInterval)),
zap.Bool("adaptiveUpdateIntervalUpdated", adaptiveUpdateIntervalUpdated))
return nil
}
// GetLowResolutionTimestamp gets a new increasing time.
func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context, opt *oracle.Option) (uint64, error) {
lastTS, ok := o.getLastTS(opt.TxnScope)
if !ok {
return 0, errors.Errorf("get low resolution timestamp fail, invalid txnScope = %s", opt.TxnScope)
}
return lastTS, nil
}
func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *oracle.Option) oracle.Future {
lastTS, ok := o.getLastTS(opt.TxnScope)
if !ok {
return lowResolutionTsFuture{
ts: 0,
err: errors.Errorf("get low resolution timestamp async fail, invalid txnScope = %s", opt.TxnScope),
}
}
return lowResolutionTsFuture{
ts: lastTS,
err: nil,
}
}
func (o *pdOracle) getStaleTimestamp(txnScope string, prevSecond uint64) (uint64, error) {
last, ok := o.getLastTSWithArrivalTS(txnScope)
if !ok {
return 0, errors.Errorf("get stale timestamp fail, txnScope: %s", txnScope)
}
return o.getStaleTimestampWithLastTS(last, prevSecond)
}
func (o *pdOracle) getStaleTimestampWithLastTS(last *lastTSO, prevSecond uint64) (uint64, error) {
ts, arrivalTime := last.tso, last.arrival
physicalTime := oracle.GetTimeFromTS(ts)
if uint64(physicalTime.Unix()) <= prevSecond {
return 0, errors.Errorf("invalid prevSecond %v", prevSecond)
}
staleTime := physicalTime.Add(time.Now().Add(-time.Duration(prevSecond) * time.Second).Sub(arrivalTime))
return oracle.GoTimeToTS(staleTime), nil
}
// GetStaleTimestamp generate a TSO which represents for the TSO prevSecond secs ago.
func (o *pdOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
ts, err = o.getStaleTimestamp(txnScope, prevSecond)
if err != nil {
if !strings.HasPrefix(err.Error(), "invalid prevSecond") {
// If any error happened, we will try to fetch tso and set it as last ts.
_, tErr := o.GetTimestamp(ctx, &oracle.Option{TxnScope: txnScope})
if tErr != nil {
return 0, tErr
}
}
return 0, err
}
return ts, nil
}
func (o *pdOracle) SetExternalTimestamp(ctx context.Context, ts uint64) error {
return o.c.SetExternalTimestamp(ctx, ts)
}
func (o *pdOracle) GetExternalTimestamp(ctx context.Context) (uint64, error) {
return o.c.GetExternalTimestamp(ctx)
}
func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Option) (uint64, error) {
ch := o.tsForValidation.DoChan(opt.TxnScope, func() (interface{}, error) {
metrics.TiKVValidateReadTSFromPDCount.Inc()
// If the call that triggers the execution of this function is canceled by the context, other calls that are
// waiting for reusing the same result should not be canceled. So pass context.Background() instead of the
// current ctx.
res, err := o.GetTimestamp(context.Background(), opt)
_, _ = util.EvalFailpoint("getCurrentTSForValidationBeforeReturn")
return res, err
})
select {
case <-ctx.Done():
return 0, errors.WithStack(ctx.Err())
case res := <-ch:
if res.Err != nil {
return 0, errors.WithStack(res.Err)
}
return res.Val.(uint64), nil
}
}
// ValidateReadTSForTidbSnapshot is a flag in context, indicating whether the read ts is for tidb_snapshot.
// This is a special approach for release branches to minimize code changes to reduce risks.
type ValidateReadTSForTidbSnapshot struct{}
func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
if !EnableTSValidation.Load() {
return nil
}
// For a mistake we've seen
if readTS >= math.MaxInt64 && readTS < math.MaxUint64 {
return errors.Errorf("MaxInt64 <= readTS < MaxUint64, readTS=%v", readTS)
}
if readTS == math.MaxUint64 {
if isStaleRead {
return oracle.ErrLatestStaleRead{}
}
return nil
}
retrying := false
for {
latestTSInfo, exists := o.getLastTSWithArrivalTS(opt.TxnScope)
// If we fail to get latestTSInfo or the readTS exceeds it, get a timestamp from PD to double-check.
// But we don't need to strictly fetch the latest TS. So if there are already concurrent calls to this function
// loading the latest TS, we can just reuse the same result to avoid too many concurrent GetTS calls.
if !exists || readTS > latestTSInfo.tso {
currentTS, err := o.getCurrentTSForValidation(ctx, opt)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if isStaleRead && !retrying {
// Trigger the adjustment at most once in a single invocation.
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, currentTS, time.Now())
}
if readTS > currentTS {
// It's possible that the caller is checking a ts that's legal but not fetched from the current oracle
// object. In this case, it's possible that:
// * The ts is not be cached by the low resolution ts (so that readTS > latestTSInfo.TSO);
// * ... and then the getCurrentTSForValidation (which uses a singleflight internally) reuse a
// previously-started call and returns an older ts
// so that it may cause the check false-positive.
// To handle this case, we do not fail immediately when the check doesn't at once; instead, retry one
// more time. In the retry:
// * Considering that there can already be some other concurrent GetTimestamp operation that may have updated
// the low resolution ts, so check it again. If it passes, then no need to get the next ts from PD,
// which is slow.
// * Then, call getCurrentTSForValidation and check again. As the current GetTimestamp operation
// inside getCurrentTSForValidation must be started after finishing the previous one (while the
// latter is finished after starting this invocation to ValidateReadTS), then we can conclude that
// the next ts returned by getCurrentTSForValidation must be greater than any ts allocated by PD
// before the current invocation to ValidateReadTS.
skipRetry := false
if val, err1 := util.EvalFailpoint("validateReadTSRetryGetTS"); err1 == nil {
if str, ok := val.(string); ok {
if str == "skip" {
skipRetry = true
}
}
}
if !retrying && !skipRetry {
retrying = true
continue
}
return oracle.ErrFutureTSRead{
ReadTS: readTS,
CurrentTS: currentTS,
}
}
} else if !retrying && isStaleRead {
// Trigger the adjustment at most once in a single invocation.
estimatedCurrentTS, err := o.getStaleTimestampWithLastTS(latestTSInfo, 0)
if err != nil {
logutil.Logger(ctx).Warn("failed to estimate current ts by getSlateTimestamp for auto-adjusting update low resolution ts interval",
zap.Error(err), zap.Uint64("readTS", readTS), zap.String("txnScope", opt.TxnScope))
} else {
o.adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS, estimatedCurrentTS, time.Now())
}
}
return nil
}
}
// adjustUpdateLowResolutionTSIntervalWithRequestedStaleness triggers adjustments the update interval of low resolution
// ts, if necessary, to suite the usage of stale read.
// This method is not supposed to be called when performing non-stale-read operations.
func (o *pdOracle) adjustUpdateLowResolutionTSIntervalWithRequestedStaleness(readTS uint64, currentTS uint64, now time.Time) {
requiredStaleness := oracle.GetTimeFromTS(currentTS).Sub(oracle.GetTimeFromTS(readTS))
// Do not acquire the mutex, as here we only needs a rough check.
// So it's possible that we get inconsistent values from these two atomic fields, but it won't cause any problem.
currentUpdateInterval := time.Duration(o.adaptiveLastTSUpdateInterval.Load())
if requiredStaleness <= currentUpdateInterval+adaptiveUpdateTSIntervalBlockRecoverThreshold {
// Record the most recent time when there's a read operation requesting the staleness close enough to the
// current update interval.
nowMillis := now.UnixMilli()
last := o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.Load()
if last < nowMillis {
// Do not retry if the CAS fails (which may happen when there are other goroutines updating it
// concurrently), as we don't actually need to set it strictly.
o.adaptiveUpdateIntervalState.lastShortStalenessReadTime.CompareAndSwap(last, nowMillis)
}
}
if requiredStaleness <= currentUpdateInterval && currentUpdateInterval > minAllowedAdaptiveUpdateTSInterval {
// Considering system time / PD time drifts, it's possible that we get a non-positive value from the
// calculation. Make sure it's always positive before passing it to the updateTS goroutine.
// Note that `nextUpdateInterval` method expects the requiredStaleness is always non-zero when triggerred
// by this path.
requiredStaleness = max(requiredStaleness, time.Millisecond)
// Try to non-blocking send a signal to notify it to change the interval immediately. But if the channel is
// busy, it means that there's another concurrent call trying to update it. Just skip it in this case.
select {
case o.adaptiveUpdateIntervalState.shrinkIntervalCh <- requiredStaleness:
default:
}
}
}