tikv: stop async commit goroutines before store close (#175)

Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
disksing 2021-06-28 13:58:17 +08:00 committed by GitHub
parent be29f7ecef
commit dd88a925ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 11 deletions

View File

@ -133,6 +133,9 @@ type twoPhaseCommitter struct {
binlog BinlogExecutor
resourceGroupTag []byte
storeWg *sync.WaitGroup
storeCtx context.Context
}
type memBufferMutations struct {
@ -330,6 +333,8 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err
},
isPessimistic: txn.IsPessimistic(),
binlog: txn.binlog,
storeWg: &txn.store.wg,
storeCtx: txn.store.ctx,
}, nil
}
@ -671,8 +676,10 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPh
}
// Already spawned a goroutine for async commit transaction.
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
secondaryBo := retry.NewBackofferWithVars(context.Background(), CommitSecondaryMaxBackoff, c.txn.vars)
secondaryBo := retry.NewBackofferWithVars(c.storeCtx, CommitSecondaryMaxBackoff, c.txn.vars)
c.storeWg.Add(1)
go func() {
defer c.storeWg.Done()
if c.sessionID > 0 {
if v, err := util.EvalFailpoint("beforeCommitSecondaries"); err == nil {
if s, ok := v.(string); !ok {
@ -978,7 +985,9 @@ var VeryLongMaxBackoff = uint64(600000) // 10mins
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
c.cleanWg.Add(1)
c.storeWg.Add(1)
go func() {
defer c.storeWg.Done()
if _, err := util.EvalFailpoint("commitFailedSkipCleanup"); err == nil {
logutil.Logger(ctx).Info("[failpoint] injected skip cleanup secondaries on failure",
zap.Uint64("txnStartTS", c.startTS))
@ -986,7 +995,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
return
}
cleanupKeysCtx := context.WithValue(context.Background(), retry.TxnStartKey, ctx.Value(retry.TxnStartKey))
cleanupKeysCtx := context.WithValue(c.storeCtx, retry.TxnStartKey, ctx.Value(retry.TxnStartKey))
var err error
if !c.isOnePC() {
err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations)
@ -1275,11 +1284,13 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
zap.Uint64("sessionID", c.sessionID))
c.storeWg.Add(1)
go func() {
defer c.storeWg.Done()
if _, err := util.EvalFailpoint("asyncCommitDoNothing"); err == nil {
return
}
commitBo := retry.NewBackofferWithVars(ctx, CommitSecondaryMaxBackoff, c.txn.vars)
commitBo := retry.NewBackofferWithVars(c.storeCtx, CommitSecondaryMaxBackoff, c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
if err != nil {
logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("sessionID", c.sessionID),

View File

@ -108,8 +108,7 @@ type KVStore struct {
kv SafePointKV
safePoint uint64
spTime time.Time
spMutex sync.RWMutex // this is used to update safePoint and spTime
closed chan struct{} // this is used to notify when the store is closed
spMutex sync.RWMutex // this is used to update safePoint and spTime
// storeID -> safeTS, stored as map[uint64]uint64
// safeTS here will be used during the Stale Read process,
@ -117,6 +116,10 @@ type KVStore struct {
safeTSMap sync.Map
replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// UpdateSPCache updates cached safepoint.
@ -157,6 +160,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
if err != nil {
return nil, errors.Trace(err)
}
ctx, cancel := context.WithCancel(context.Background())
store := &KVStore{
clusterID: pdClient.GetClusterID(context.TODO()),
uuid: uuid,
@ -166,12 +170,14 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
kv: spkv,
safePoint: 0,
spTime: time.Now(),
closed: make(chan struct{}),
replicaReadSeed: rand.Uint32(),
ctx: ctx,
cancel: cancel,
}
store.clientMu.client = client.NewReqCollapse(tikvclient)
store.lockResolver = newLockResolver(store)
store.wg.Add(2)
go store.runSafePointChecker()
go store.safeTSUpdater()
@ -246,6 +252,7 @@ func (s *KVStore) IsLatchEnabled() bool {
}
func (s *KVStore) runSafePointChecker() {
defer s.wg.Done()
d := gcSafePointUpdateInterval
for {
select {
@ -260,7 +267,7 @@ func (s *KVStore) runSafePointChecker() {
logutil.BgLogger().Error("fail to load safepoint from pd", zap.Error(err))
d = gcSafePointQuickRepeatInterval
}
case <-s.Closed():
case <-s.ctx.Done():
return
}
}
@ -285,10 +292,12 @@ func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot {
// Close store
func (s *KVStore) Close() error {
s.cancel()
s.wg.Wait()
s.oracle.Close()
s.pdClient.Close()
close(s.closed)
if err := s.GetTiKVClient().Close(); err != nil {
return errors.Trace(err)
}
@ -385,7 +394,7 @@ func (s *KVStore) GetLockResolver() *LockResolver {
// Closed returns a channel that indicates if the store is closed.
func (s *KVStore) Closed() <-chan struct{} {
return s.closed
return s.ctx.Done()
}
// GetSafePointKV returns the kv store that used for safepoint.
@ -466,13 +475,14 @@ func (s *KVStore) getMinSafeTSByStores(stores []*locate.Store) uint64 {
}
func (s *KVStore) safeTSUpdater() {
defer s.wg.Done()
t := time.NewTicker(time.Second * 2)
defer t.Stop()
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
for {
select {
case <-s.Closed():
case <-s.ctx.Done():
return
case <-t.C:
s.updateSafeTS(ctx)