enable ts validation for normal read (#1619)

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2025-04-25 16:56:04 +08:00 committed by GitHub
parent 7cca6713e9
commit 8645f93e75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 18 additions and 12 deletions

View File

@ -920,6 +920,8 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestStats() {
}
func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS() {
oracles.EnableTSValidation.Store(true)
defer oracles.EnableTSValidation.Store(false)
o, err := oracles.NewPdOracle(s.pdCli, &oracles.PDOracleOptions{
UpdateInterval: time.Second * 2,
})
@ -962,8 +964,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRegionRequestValidateReadTS()
testImpl(getTS, true, nil)
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, false, nil)
testImpl(func() uint64 { return addTS(getTS(), -time.Minute) }, true, nil)
// check is skipped for normal read
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, nil)
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, false, oracle.ErrFutureTSRead{})
testImpl(func() uint64 { return addTS(getTS(), +time.Minute) }, true, oracle.ErrFutureTSRead{})
testImpl(func() uint64 { return math.MaxUint64 }, false, nil)
testImpl(func() uint64 { return math.MaxUint64 }, true, oracle.ErrLatestStaleRead{})

View File

@ -60,6 +60,8 @@ const slowDist = 30 * time.Millisecond
type adaptiveUpdateTSIntervalState int
var EnableTSValidation atomic.Bool
const (
adaptiveUpdateTSIntervalStateNone adaptiveUpdateTSIntervalState = iota
// adaptiveUpdateTSIntervalStateNormal represents the state that the adaptive update ts interval is synced with the
@ -667,17 +669,15 @@ func (o *pdOracle) getCurrentTSForValidation(ctx context.Context, opt *oracle.Op
type ValidateReadTSForTidbSnapshot struct{}
func (o *pdOracle) ValidateReadTS(ctx context.Context, readTS uint64, isStaleRead bool, opt *oracle.Option) error {
if !EnableTSValidation.Load() {
return nil
}
// For a mistake we've seen
if readTS >= math.MaxInt64 && readTS < math.MaxUint64 {
return errors.Errorf("MaxInt64 <= readTS < MaxUint64, readTS=%v", readTS)
}
// only check stale reads and reads using `tidb_snapshot`
forTidbSnapshot := ctx.Value(ValidateReadTSForTidbSnapshot{}) != nil
if !forTidbSnapshot && !isStaleRead {
return nil
}
if readTS == math.MaxUint64 {
if isStaleRead {
return oracle.ErrLatestStaleRead{}

View File

@ -352,6 +352,8 @@ func TestAdaptiveUpdateTSInterval(t *testing.T) {
}
func TestValidateReadTS(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
testImpl := func(staleRead bool) {
pdClient := MockPdClient{}
o, err := NewPdOracle(&pdClient, &PDOracleOptions{
@ -429,6 +431,8 @@ func (c *MockPDClientWithPause) WithCallerComponent(component caller.Component)
}
func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
util.EnableFailpoints()
pdClient := &MockPDClientWithPause{}
@ -536,6 +540,8 @@ func TestValidateReadTSForStaleReadReusingGetTSResult(t *testing.T) {
}
func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) {
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
oracleInterface, err := NewPdOracle(&MockPdClient{}, &PDOracleOptions{
UpdateInterval: time.Second * 2,
NoUpdateTS: true,
@ -573,7 +579,7 @@ func TestValidateReadTSForNormalReadDoNotAffectUpdateInterval(t *testing.T) {
// It loads `ts + 3` and `ts + 4` from the mock PD, and the check cannot pass.
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
err = o.ValidateReadTS(ctx, ts+5, false, opt)
assert.NoError(t, err)
assert.Error(t, err)
mustNoNotify()
// Do the check again. It loads `ts + 5` from the mock PD, and the check passes.
@ -618,9 +624,8 @@ func TestSetLastTSAlwaysPushTS(t *testing.T) {
}
func TestValidateReadTSFromDifferentSource(t *testing.T) {
// Updated: 2025-03-12, the non-stale read check is temporarily skipped.
t.Skip()
EnableTSValidation.Store(true)
defer EnableTSValidation.Store(false)
// If a ts is fetched from a different client to the same cluster, the ts might not be cached by the low resolution
// ts. In this case, the validation should not be false positive.
util.EnableFailpoints()