diff --git a/internal/unionstore/memdb_norace_test.go b/internal/unionstore/memdb_norace_test.go index ed040563..289f803f 100644 --- a/internal/unionstore/memdb_norace_test.go +++ b/internal/unionstore/memdb_norace_test.go @@ -32,6 +32,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !race // +build !race package unionstore diff --git a/tikv/kv.go b/tikv/kv.go index caf84592..5b5c1291 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -68,6 +68,7 @@ import ( "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/clientv3" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -126,6 +127,7 @@ type KVStore struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + close atomicutil.Bool } // UpdateSPCache updates cached safepoint. @@ -295,6 +297,7 @@ func (s *KVStore) GetSnapshot(ts uint64) *txnsnapshot.KVSnapshot { // Close store func (s *KVStore) Close() error { + s.close.Store(true) s.cancel() s.wg.Wait() @@ -456,6 +459,11 @@ func (s *KVStore) Ctx() context.Context { return s.ctx } +// IsClose checks whether the store is closed. +func (s *KVStore) IsClose() bool { + return s.close.Load() +} + // WaitGroup returns wg func (s *KVStore) WaitGroup() *sync.WaitGroup { return &s.wg diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index a2b9128e..b6c2cb1d 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -112,6 +112,8 @@ type kvstore interface { // TxnLatches returns txnLatches. TxnLatches() *latch.LatchesScheduler GetClusterID() uint64 + // IsClose checks whether the store is closed. + IsClose() bool } // twoPhaseCommitter executes a two-phase commit protocol. @@ -781,6 +783,12 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action // Already spawned a goroutine for async commit transaction. if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() { secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars) + if c.store.IsClose() { + logutil.Logger(bo.GetCtx()).Warn("the store is closed", + zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), + zap.Uint64("sessionID", c.sessionID)) + return nil + } c.store.WaitGroup().Add(1) go func() { defer c.store.WaitGroup().Done() @@ -1106,6 +1114,12 @@ const ( ) func (c *twoPhaseCommitter) cleanup(ctx context.Context) { + if c.store.IsClose() { + logutil.Logger(ctx).Warn("twoPhaseCommitter fail to cleanup because the store exited", + zap.Uint64("txnStartTS", c.startTS), zap.Bool("isPessimistic", c.isPessimistic), + zap.Bool("isOnePC", c.isOnePC())) + return + } c.cleanWg.Add(1) c.store.WaitGroup().Add(1) go func() { @@ -1407,6 +1421,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Uint64("sessionID", c.sessionID)) + if c.store.IsClose() { + logutil.Logger(ctx).Warn("2PC will use async commit protocol to commit this txn but the store is closed", + zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), + zap.Uint64("sessionID", c.sessionID)) + return nil + } c.store.WaitGroup().Add(1) go func() { defer c.store.WaitGroup().Done()