feat: Broadcast min_commit_ts for pipelined transactions (#1458)

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2024-09-25 09:34:14 +08:00 committed by GitHub
parent 7d0f0bc93b
commit 527f80a186
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 458 additions and 31 deletions

2
go.mod
View File

@ -15,7 +15,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0

4
go.sum
View File

@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@ -6,7 +6,7 @@ require (
github.com/ninedraft/israce v0.0.3
github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f
github.com/pingcap/failpoint v0.0.0-20240527053858-9b3b6e34194a
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/tidb v1.1.0-beta.0.20240703042657-230bbc2ef5ef
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.9.0

View File

@ -357,8 +357,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E=
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0=
github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc=
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E=

View File

@ -726,9 +726,47 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache {
// cache GC is incompatible with cache refresh
c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval)
}
c.bg.schedule(
func(ctx context.Context, _ time.Time) bool {
refreshFullStoreList(ctx, c.stores)
return false
}, refreshStoreListInterval,
)
return c
}
// Try to refresh full store list. Errors are ignored.
func refreshFullStoreList(ctx context.Context, stores storeCache) {
storeList, err := stores.fetchAllStores(ctx)
if err != nil {
logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err))
return
}
for _, store := range storeList {
_, exist := stores.get(store.GetId())
if exist {
continue
}
// GetAllStores is supposed to return only Up and Offline stores.
// This check is being defensive and to make it consistent with store resolve code.
if store == nil || store.GetState() == metapb.StoreState_Tombstone {
continue
}
addr := store.GetAddress()
if addr == "" {
continue
}
s := stores.getOrInsertDefault(store.GetId())
// TODO: maybe refactor this, together with other places initializing Store
s.addr = addr
s.peerAddr = store.GetPeerAddress()
s.saddr = store.GetStatusAddress()
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
s.changeResolveStateTo(unresolved, resolved)
}
}
// only used fot test.
func newTestRegionCache() *RegionCache {
c := &RegionCache{}
@ -2649,6 +2687,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
const cleanCacheInterval = time.Second
const cleanRegionNumPerRound = 50
const refreshStoreListInterval = 10 * time.Second
// gcScanItemHook is only used for testing
var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)])

View File

@ -582,6 +582,10 @@ func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvr
return nil, errors.New("unreachable")
}
func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) {
return nil, errors.New("unreachable")
}
func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() {
// prepare a mock tikv grpc server
addr := "localhost:56341"

View File

@ -309,7 +309,7 @@ const unsafeDestroyRangeTimeout = 5 * time.Minute
// multiple times on an single range.
func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKey []byte) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := s.listStoresForUnsafeDestory(ctx)
stores, err := s.listStoresForUnsafeDestroy(ctx)
if err != nil {
metrics.TiKVUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("get_stores").Inc()
return err
@ -366,7 +366,7 @@ func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKe
return nil
}
func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Store, error) {
func (s *KVStore) listStoresForUnsafeDestroy(ctx context.Context) ([]*metapb.Store, error) {
stores, err := s.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.WithStack(err)

View File

@ -51,7 +51,6 @@ type testKVSuite struct {
func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) {
return 0, nil, nil
})

View File

@ -100,6 +100,7 @@ const (
CmdLockWaitInfo
CmdGetHealthFeedback
CmdBroadcastTxnStatus
CmdCop CmdType = 512 + iota
CmdCopStream
@ -221,6 +222,8 @@ func (t CmdType) String() string {
return "LockWaitInfo"
case CmdGetHealthFeedback:
return "GetHealthFeedback"
case CmdBroadcastTxnStatus:
return "BroadcastTxnStatus"
case CmdFlashbackToVersion:
return "FlashbackToVersion"
case CmdPrepareFlashbackToVersion:
@ -568,6 +571,10 @@ func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest {
return req.Req.(*kvrpcpb.GetHealthFeedbackRequest)
}
func (req *Request) BroadcastTxnStatus() *kvrpcpb.BroadcastTxnStatusRequest {
return req.Req.(*kvrpcpb.BroadcastTxnStatusRequest)
}
// FlashbackToVersion returns FlashbackToVersionRequest in request.
func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest {
return req.Req.(*kvrpcpb.FlashbackToVersionRequest)
@ -653,6 +660,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}}
case CmdGetHealthFeedback:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}}
case CmdBroadcastTxnStatus:
return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BroadcastTxnStatus{BroadcastTxnStatus: req.BroadcastTxnStatus()}}
}
return nil
}
@ -730,6 +739,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res
return &Response{Resp: res.BufferBatchGet}, nil
case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback:
return &Response{Resp: res.GetHealthFeedback}, nil
case *tikvpb.BatchCommandsResponse_Response_BroadcastTxnStatus:
return &Response{Resp: res.BroadcastTxnStatus}, nil
}
panic("unreachable")
}
@ -1143,6 +1154,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp
resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet())
case CmdGetHealthFeedback:
resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback())
case CmdBroadcastTxnStatus:
resp.Resp, err = client.BroadcastTxnStatus(ctx, req.BroadcastTxnStatus())
default:
return nil, errors.Errorf("invalid request type: %v", req.Type)
}

View File

@ -164,7 +164,7 @@ type twoPhaseCommitter struct {
}
useAsyncCommit uint32
minCommitTS uint64
minCommitTSMgr *minCommitTsManager
maxCommitTS uint64
prewriteStarted bool
prewriteCancelled uint32
@ -477,6 +477,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
binlog: txn.binlog,
diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull,
resourceGroupName: txn.resourceGroupName,
minCommitTSMgr: newMinCommitTsManager(),
}
return committer, nil
}
@ -1137,6 +1138,69 @@ const (
stateClosed
)
// WriteAccessLevel represents the level of write access required to modify the value
type WriteAccessLevel int
const (
ttlAccess WriteAccessLevel = 1
twoPCAccess WriteAccessLevel = 2
)
// minCommitTsManager manages a minimum commit timestamp with different write access levels.
type minCommitTsManager struct {
mutex sync.Mutex
value uint64
requiredWriteAccess WriteAccessLevel
}
// newMinCommitTsManager creates and returns a new minCommitTsManager.
func newMinCommitTsManager() *minCommitTsManager {
return &minCommitTsManager{requiredWriteAccess: ttlAccess}
}
// tryUpdate update the value if the provided write access level is sufficient and
// the new value is greater.
func (m *minCommitTsManager) tryUpdate(newValue uint64, writeAccess WriteAccessLevel) {
m.mutex.Lock()
defer m.mutex.Unlock()
if writeAccess < m.requiredWriteAccess {
return
}
if newValue > m.value {
m.value = newValue
}
}
// elevateWriteAccess elevates the required write access level.
// It returns the current value.
func (m *minCommitTsManager) elevateWriteAccess(newLevel WriteAccessLevel) uint64 {
m.mutex.Lock()
defer m.mutex.Unlock()
if newLevel > m.requiredWriteAccess {
m.requiredWriteAccess = newLevel
}
return m.value
}
// get returns the current value. This is a read operation and doesn't require write access.
func (m *minCommitTsManager) get() uint64 {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.value
}
// getRequiredWriteAccess returns the current required write access level.
func (m *minCommitTsManager) getRequiredWriteAccess() WriteAccessLevel {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.requiredWriteAccess
}
type ttlManager struct {
state ttlManagerState
ch chan struct{}
@ -1175,7 +1239,11 @@ func (tm *ttlManager) reset() {
const keepAliveMaxBackoff = 20000
const pessimisticLockMaxBackoff = 20000
const maxConsecutiveFailure = 10
const broadcastGracePeriod = 5 * time.Second
const broadcastMaxBackoff = 10000
// keepAlive keeps sending heartbeat to update the primary key's TTL
// For pipelined transactions, it also updates min_commit_ts, and broadcasts it to all TiKVs.
func keepAlive(
c *twoPhaseCommitter, closeCh chan struct{}, tm *ttlManager, primaryKey []byte,
lockCtx *kv.LockCtx, isPipelinedTxn bool,
@ -1234,6 +1302,14 @@ func keepAlive(
return
}
// update minCommitTS, if it's a non-async-commit pipelined transaction
if isPipelinedTxn &&
!c.isOnePC() &&
!c.isAsyncCommit() &&
c.minCommitTSMgr.getRequiredWriteAccess() <= ttlAccess {
c.minCommitTSMgr.tryUpdate(now, ttlAccess)
}
newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL)
logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat",
zap.Uint64("startTS", c.startTS),
@ -1241,7 +1317,9 @@ func keepAlive(
zap.Bool("isPipelinedTxn", isPipelinedTxn),
)
startTime := time.Now()
_, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL)
_, stopHeartBeat, err := sendTxnHeartBeat(
bo, c.store, primaryKey, c.startTS, newTTL, c.minCommitTSMgr.get(),
)
if err != nil {
keepFail++
metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds())
@ -1265,19 +1343,122 @@ func keepAlive(
}
return
}
continue
}
} else {
keepFail = 0
metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds())
}
// broadcast to all stores
if isPipelinedTxn {
broadcastToAllStores(
c.txn,
c.store,
retry.NewBackofferWithVars(
context.Background(),
broadcastMaxBackoff,
c.txn.vars,
),
&kvrpcpb.TxnStatus{
StartTs: c.startTS,
MinCommitTs: c.minCommitTSMgr.get(),
CommitTs: 0,
RolledBack: false,
IsCompleted: false,
},
c.resourceGroupName,
c.resourceGroupTag,
)
}
}
}
}
func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
const broadcastRpcTimeout = time.Second * 5
const broadcastMaxConcurrency = 10
// broadcastToAllStores asynchronously broadcasts the transaction status to all stores.
// Errors are ignored.
func broadcastToAllStores(
txn *KVTxn,
store kvstore,
bo *retry.Backoffer,
status *kvrpcpb.TxnStatus,
resourceGroupName string,
resourceGroupTag []byte,
) {
broadcastFunc := func() {
stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV)
concurrency := min(broadcastMaxConcurrency, len(stores))
rateLimit := make(chan struct{}, concurrency)
var wg sync.WaitGroup
for _, s := range stores {
rateLimit <- struct{}{}
wg.Add(1)
target := s
err := txn.spawnWithStorePool(func() {
defer wg.Done()
defer func() { <-rateLimit }()
req := tikvrpc.NewRequest(
tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{
TxnStatus: []*kvrpcpb.TxnStatus{status},
},
)
req.Context.ClusterId = store.GetClusterID()
req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{
ResourceGroupName: resourceGroupName,
}
req.Context.ResourceGroupTag = resourceGroupTag
_, err := store.GetTiKVClient().SendRequest(
bo.GetCtx(),
target.GetAddr(),
req,
broadcastRpcTimeout,
)
if err != nil {
logutil.Logger(store.Ctx()).Info(
"broadcast txn status failed",
zap.Uint64("storeID", target.StoreID()),
zap.String("storeAddr", target.GetAddr()),
zap.Stringer("status", status),
zap.Error(err),
)
}
})
if err != nil {
// If spawning the goroutine fails, release the slot and mark done
<-rateLimit
wg.Done()
logutil.Logger(store.Ctx()).Error("failed to spawn worker goroutine", zap.Error(err))
}
}
wg.Wait()
}
if err := txn.spawnWithStorePool(broadcastFunc); err != nil {
logutil.Logger(store.Ctx()).Error("failed to spawn goroutine for broadcasting txn status",
zap.Error(err))
}
}
func sendTxnHeartBeat(
bo *retry.Backoffer,
store kvstore,
primary []byte,
startTS, ttl uint64,
minCommitTS uint64,
) (newTTL uint64, stopHeartBeat bool, err error) {
req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{
PrimaryLock: primary,
StartVersion: startTS,
AdviseLockTtl: ttl,
MinCommitTs: minCommitTS,
})
for {
loc, err := store.GetRegionCache().LocateKey(bo, primary)
@ -1424,6 +1605,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
var err error
if c.txn.IsPipelined() {
// TODO: cleanup pipelined txn
// TODO: broadcast txn status
} else if !c.isOnePC() {
err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
} else if c.isPessimistic {
@ -1444,6 +1626,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
c.minCommitTSMgr.elevateWriteAccess(twoPCAccess)
var binlogSkipped bool
defer func() {
if c.isOnePC() {
@ -1547,7 +1730,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
commitDetail.GetLatestTsTime = time.Since(start)
// Plus 1 to avoid producing the same commit TS with previously committed transactions
c.minCommitTS = latestTS + 1
c.minCommitTSMgr.tryUpdate(latestTS+1, twoPCAccess)
}
// Calculate maxCommitTS if necessary
if commitTSMayBeCalculated {
@ -1666,10 +1849,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
}
if c.isAsyncCommit() {
if c.minCommitTS == 0 {
if c.minCommitTSMgr.get() == 0 {
return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS)
}
commitTS = c.minCommitTS
commitTS = c.minCommitTSMgr.get()
} else {
start = time.Now()
logutil.Event(ctx, "start get commit ts")

View File

@ -0,0 +1,124 @@
// Copyright 2024 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/snapshot_test.go
//
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transaction
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMinCommitTsManager(t *testing.T) {
t.Run(
"Initial state", func(t *testing.T) {
manager := newMinCommitTsManager()
assert.Equal(t, uint64(0), manager.get(), "Initial value should be 0")
assert.Equal(
t,
ttlAccess,
manager.getRequiredWriteAccess(),
"Initial write access should be ttlAccess",
)
},
)
t.Run(
"TTL updates", func(t *testing.T) {
manager := newMinCommitTsManager()
manager.tryUpdate(10, ttlAccess)
assert.Equal(t, uint64(10), manager.get(), "Value should be 10")
manager.tryUpdate(5, ttlAccess)
assert.Equal(t, uint64(10), manager.get(), "Value should remain 10")
},
)
t.Run(
"Elevate write access", func(t *testing.T) {
manager := newMinCommitTsManager()
manager.tryUpdate(10, ttlAccess)
currentValue := manager.elevateWriteAccess(twoPCAccess)
assert.Equal(t, uint64(10), currentValue, "Current value should be 10")
assert.Equal(
t,
twoPCAccess,
manager.getRequiredWriteAccess(),
"Required write access should be twoPCAccess",
)
},
)
t.Run(
"Updates after elevation", func(t *testing.T) {
manager := newMinCommitTsManager()
manager.tryUpdate(10, ttlAccess)
manager.elevateWriteAccess(twoPCAccess)
manager.tryUpdate(20, ttlAccess)
assert.Equal(t, uint64(10), manager.get(), "Value should remain 10")
manager.tryUpdate(30, twoPCAccess)
assert.Equal(t, uint64(30), manager.get(), "Value should be 30")
},
)
t.Run(
"Concurrent updates", func(t *testing.T) {
manager := newMinCommitTsManager()
done := make(chan bool)
go func() {
for i := 0; i < 1000; i++ {
manager.tryUpdate(uint64(i), ttlAccess)
}
done <- true
}()
go func() {
for i := 0; i < 1000; i++ {
manager.tryUpdate(uint64(1000+i), ttlAccess)
}
done <- true
}()
<-done
<-done
assert.Equal(t, manager.get(), uint64(1999))
},
)
}

View File

@ -330,14 +330,32 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error {
if err = c.commitMutations(bo, &primaryMutation); err != nil {
return errors.Trace(err)
}
c.mu.RLock()
c.mu.Lock()
c.mu.committed = true
c.mu.RUnlock()
c.mu.Unlock()
logutil.Logger(bo.GetCtx()).Info(
"[pipelined dml] transaction is committed",
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", commitTS),
)
broadcastToAllStores(
c.txn,
c.store,
retry.NewBackofferWithVars(
bo.GetCtx(),
broadcastMaxBackoff,
c.txn.vars,
),
&kvrpcpb.TxnStatus{
StartTs: c.startTS,
MinCommitTs: c.minCommitTSMgr.get(),
CommitTs: commitTS,
RolledBack: false,
IsCompleted: false,
},
c.resourceGroupName,
c.resourceGroupTag,
)
if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil {
return nil
@ -439,13 +457,17 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
const RESOLVE_CONCURRENCY = 8
var resolved atomic.Uint64
handler, err := c.buildPipelinedResolveHandler(commit, &resolved)
commitTs := uint64(0)
if commit {
commitTs = atomic.LoadUint64(&c.commitTS)
}
if err != nil {
logutil.Logger(bo.GetCtx()).Error(
"[pipelined dml] build buildPipelinedResolveHandler error",
zap.Error(err),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("commitTS", commitTs),
)
return
}
@ -470,7 +492,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("commitTS", commitTs),
zap.Uint64("session", c.sessionID),
zap.Error(err),
)
@ -479,9 +501,33 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end
zap.String("txn-status", status),
zap.Uint64("resolved regions", resolved.Load()),
zap.Uint64("startTS", c.startTS),
zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)),
zap.Uint64("commitTS", commitTs),
zap.Uint64("session", c.sessionID),
)
// wait a while before notifying txn_status_cache to evict the txn,
// which tolerates slow followers and avoids the situation that the
// txn is evicted before the follower catches up.
time.Sleep(broadcastGracePeriod)
broadcastToAllStores(
c.txn,
c.store,
retry.NewBackofferWithVars(
bo.GetCtx(),
broadcastMaxBackoff,
c.txn.vars,
),
&kvrpcpb.TxnStatus{
StartTs: c.startTS,
MinCommitTs: 0,
CommitTs: commitTs,
RolledBack: !commit,
IsCompleted: true,
},
c.resourceGroupName,
c.resourceGroupTag,
)
}
}()
}

View File

@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
}
}
c.mu.Lock()
minCommitTS := c.minCommitTS
minCommitTS := c.minCommitTSMgr.get()
c.mu.Unlock()
if c.forUpdateTS > 0 && c.forUpdateTS >= minCommitTS {
minCommitTS = c.forUpdateTS + 1
@ -387,7 +387,7 @@ func (action actionPrewrite) handleSingleBatch(
c.setOnePC(false)
c.setAsyncCommit(false)
} else {
// For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe
// For 1PC, there's no racing to access `onePCCommitTS` so it's safe
// not to lock the mutex.
if c.onePCCommitTS != 0 {
logutil.Logger(bo.GetCtx()).Fatal(
@ -419,8 +419,8 @@ func (action actionPrewrite) handleSingleBatch(
c.setAsyncCommit(false)
} else {
c.mu.Lock()
if prewriteResp.MinCommitTs > c.minCommitTS {
c.minCommitTS = prewriteResp.MinCommitTs
if prewriteResp.MinCommitTs > c.minCommitTSMgr.get() {
c.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess)
}
c.mu.Unlock()
}

View File

@ -194,12 +194,12 @@ func (c CommitterProbe) GetCommitTS() uint64 {
// GetMinCommitTS returns the minimal commit ts can be used.
func (c CommitterProbe) GetMinCommitTS() uint64 {
return c.minCommitTS
return c.minCommitTSMgr.get()
}
// SetMinCommitTS sets the minimal commit ts can be used.
func (c CommitterProbe) SetMinCommitTS(ts uint64) {
c.minCommitTS = ts
c.minCommitTSMgr.tryUpdate(ts, twoPCAccess)
}
// SetMaxCommitTS sets the max commit ts can be used.
@ -381,7 +381,7 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by
// SendTxnHeartBeat renews a txn's ttl.
func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) {
return sendTxnHeartBeat(bo, store, primary, startTS, ttl)
return sendTxnHeartBeat(bo, store, primary, startTS, ttl, 0)
}
// ConfigProbe exposes configurations and global variables for testing purpose.

View File

@ -841,7 +841,26 @@ func (txn *KVTxn) Rollback() error {
txn.committer.ttlManager.close()
// no need to clean up locks when no flush triggered.
pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd
if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 {
needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0
broadcastToAllStores(
txn,
txn.committer.store,
retry.NewBackofferWithVars(
txn.store.Ctx(),
broadcastMaxBackoff,
txn.committer.txn.vars,
),
&kvrpcpb.TxnStatus{
StartTs: txn.startTS,
MinCommitTs: txn.committer.minCommitTSMgr.get(),
CommitTs: 0,
RolledBack: true,
IsCompleted: !needCleanUpLocks,
},
txn.resourceGroupName,
txn.resourceGroupTag,
)
if needCleanUpLocks {
rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars)
txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false)
}