mirror of https://github.com/tikv/client-go.git
Validate ts only for stale read (#1607)
ref pingcap/tidb#59402 Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
parent
34130b733a
commit
1d2500631f
|
@ -962,7 +962,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS()
|
|||
testImpl(getTS, true, nil)
|
||||
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, false, nil)
|
||||
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, true, nil)
|
||||
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, oracle.ErrFutureTSRead{})
|
||||
// check is skipped for normal read
|
||||
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, nil)
|
||||
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, true, oracle.ErrFutureTSRead{})
|
||||
testImpl(func() uint64 { return math.MaxUint64 }, false, nil)
|
||||
testImpl(func() uint64 { return math.MaxUint64 }, true, oracle.ErrLatestStaleRead{})
|
||||
|
|
|
@ -36,11 +36,9 @@ package oracles
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
)
|
||||
|
||||
|
@ -152,22 +150,6 @@ func (l *localOracle) GetExternalTimestamp(ctx context.Context) (uint64, error)
|
|||
}
|
||||
|
||||
func (l *localOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
|
||||
if readTS == math.MaxUint64 {
|
||||
if isStaleRead {
|
||||
return oracle.ErrLatestStaleRead{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
currentTS, err := l.GetTimestamp(ctx, opt)
|
||||
if err != nil {
|
||||
return errors.Errorf("fail to validate read timestamp: %v", err)
|
||||
}
|
||||
if currentTS < readTS {
|
||||
return oracle.ErrFutureTSRead{
|
||||
ReadTS: readTS,
|
||||
CurrentTS: currentTS,
|
||||
}
|
||||
}
|
||||
// local oracle is not supposed to be used
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -36,7 +36,6 @@ package oracles
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -139,23 +138,6 @@ func (o *MockOracle) SetLowResolutionTimestampUpdateInterval(time.Duration) erro
|
|||
}
|
||||
|
||||
func (o *MockOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
|
||||
if readTS == math.MaxUint64 {
|
||||
if isStaleRead {
|
||||
return oracle.ErrLatestStaleRead{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
currentTS, err := o.GetTimestamp(ctx, opt)
|
||||
if err != nil {
|
||||
return errors.Errorf("fail to validate read timestamp: %v", err)
|
||||
}
|
||||
if currentTS < readTS {
|
||||
return oracle.ErrFutureTSRead{
|
||||
ReadTS: readTS,
|
||||
CurrentTS: currentTS,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -662,7 +662,22 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op
|
|||
}
|
||||
}
|
||||
|
||||
// ValidateReadTSForTidbSnapshot is a flag in context, indicating whether the read ts is for tidb_snapshot.
|
||||
// This is a special approach for release branches to minimize code changes to reduce risks.
|
||||
type ValidateReadTSForTidbSnapshot struct{}
|
||||
|
||||
func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
|
||||
// For a mistake we've seen
|
||||
if readTS >= math.MaxInt64 && readTS < math.MaxUint64 {
|
||||
return errors.Errorf("MaxInt64 <= readTS < MaxUint64, readTS=%v", readTS)
|
||||
}
|
||||
|
||||
// only check stale reads and reads using `tidb_snapshot`
|
||||
forTidbSnapshot := ctx.Value(ValidateReadTSForTidbSnapshot{}) != nil
|
||||
if !forTidbSnapshot && !isStaleRead {
|
||||
return nil
|
||||
}
|
||||
|
||||
if readTS == math.MaxUint64 {
|
||||
if isStaleRead {
|
||||
return oracle.ErrLatestStaleRead{}
|
||||
|
|
|
@ -403,7 +403,6 @@ func TestValidateReadTS(t *testing.T) {
|
|||
}
|
||||
|
||||
testImpl(true)
|
||||
testImpl(false)
|
||||
}
|
||||
|
||||
type MockPDClientWithPause struct {
|
||||
|
@ -572,8 +571,9 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) {
|
|||
mustNoNotify()
|
||||
|
||||
// It loads `ts + 3` and `ts + 4` from the mock PD, and the check cannot pass.
|
||||
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
|
||||
err = o.ValidateReadTS(ctx, ts+5, false, opt)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
mustNoNotify()
|
||||
|
||||
// Do the check again. It loads `ts + 5` from the mock PD, and the check passes.
|
||||
|
@ -618,6 +618,9 @@ func TestSetLastTSAlwaysPushTS(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestValidateReadTSFromDifferentSource(t *testing.T) {
|
||||
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
|
||||
t.Skip()
|
||||
|
||||
// If a ts is fetched from a different client to the same cluster, the ts might not be cached by the low resolution
|
||||
// ts. In this case, the validation should not be false positive.
|
||||
util.EnableFailpoints()
|
||||
|
|
Loading…
Reference in New Issue