mirror of https://github.com/tikv/client-go.git
fix data race and panic when exiting (#396)
* fix the data race when exiting Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com> * remove error and change log level Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com> * add warn log Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com> Co-authored-by: Yilin Chen <sticnarf@gmail.com>
This commit is contained in:
parent
e9de5625c4
commit
300275dee6
|
|
@ -32,6 +32,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
|
//go:build !race
|
||||||
// +build !race
|
// +build !race
|
||||||
|
|
||||||
package unionstore
|
package unionstore
|
||||||
|
|
|
||||||
|
|
@ -68,6 +68,7 @@ import (
|
||||||
"github.com/tikv/client-go/v2/util"
|
"github.com/tikv/client-go/v2/util"
|
||||||
pd "github.com/tikv/pd/client"
|
pd "github.com/tikv/pd/client"
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
|
atomicutil "go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
@ -126,6 +127,7 @@ type KVStore struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
close atomicutil.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateSPCache updates cached safepoint.
|
// UpdateSPCache updates cached safepoint.
|
||||||
|
|
@ -295,6 +297,7 @@ func (s *KVStore) GetSnapshot(ts uint64) *txnsnapshot.KVSnapshot {
|
||||||
|
|
||||||
// Close store
|
// Close store
|
||||||
func (s *KVStore) Close() error {
|
func (s *KVStore) Close() error {
|
||||||
|
s.close.Store(true)
|
||||||
s.cancel()
|
s.cancel()
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
|
||||||
|
|
@ -456,6 +459,11 @@ func (s *KVStore) Ctx() context.Context {
|
||||||
return s.ctx
|
return s.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsClose checks whether the store is closed.
|
||||||
|
func (s *KVStore) IsClose() bool {
|
||||||
|
return s.close.Load()
|
||||||
|
}
|
||||||
|
|
||||||
// WaitGroup returns wg
|
// WaitGroup returns wg
|
||||||
func (s *KVStore) WaitGroup() *sync.WaitGroup {
|
func (s *KVStore) WaitGroup() *sync.WaitGroup {
|
||||||
return &s.wg
|
return &s.wg
|
||||||
|
|
|
||||||
|
|
@ -112,6 +112,8 @@ type kvstore interface {
|
||||||
// TxnLatches returns txnLatches.
|
// TxnLatches returns txnLatches.
|
||||||
TxnLatches() *latch.LatchesScheduler
|
TxnLatches() *latch.LatchesScheduler
|
||||||
GetClusterID() uint64
|
GetClusterID() uint64
|
||||||
|
// IsClose checks whether the store is closed.
|
||||||
|
IsClose() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// twoPhaseCommitter executes a two-phase commit protocol.
|
// twoPhaseCommitter executes a two-phase commit protocol.
|
||||||
|
|
@ -781,6 +783,12 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
|
||||||
// Already spawned a goroutine for async commit transaction.
|
// Already spawned a goroutine for async commit transaction.
|
||||||
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
|
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
|
||||||
secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
|
secondaryBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
|
||||||
|
if c.store.IsClose() {
|
||||||
|
logutil.Logger(bo.GetCtx()).Warn("the store is closed",
|
||||||
|
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
|
||||||
|
zap.Uint64("sessionID", c.sessionID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
c.store.WaitGroup().Add(1)
|
c.store.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer c.store.WaitGroup().Done()
|
defer c.store.WaitGroup().Done()
|
||||||
|
|
@ -1106,6 +1114,12 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
|
func (c *twoPhaseCommitter) cleanup(ctx context.Context) {
|
||||||
|
if c.store.IsClose() {
|
||||||
|
logutil.Logger(ctx).Warn("twoPhaseCommitter fail to cleanup because the store exited",
|
||||||
|
zap.Uint64("txnStartTS", c.startTS), zap.Bool("isPessimistic", c.isPessimistic),
|
||||||
|
zap.Bool("isOnePC", c.isOnePC()))
|
||||||
|
return
|
||||||
|
}
|
||||||
c.cleanWg.Add(1)
|
c.cleanWg.Add(1)
|
||||||
c.store.WaitGroup().Add(1)
|
c.store.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
@ -1407,6 +1421,12 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
||||||
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
|
logutil.Logger(ctx).Debug("2PC will use async commit protocol to commit this txn",
|
||||||
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
|
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
|
||||||
zap.Uint64("sessionID", c.sessionID))
|
zap.Uint64("sessionID", c.sessionID))
|
||||||
|
if c.store.IsClose() {
|
||||||
|
logutil.Logger(ctx).Warn("2PC will use async commit protocol to commit this txn but the store is closed",
|
||||||
|
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS),
|
||||||
|
zap.Uint64("sessionID", c.sessionID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
c.store.WaitGroup().Add(1)
|
c.store.WaitGroup().Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer c.store.WaitGroup().Done()
|
defer c.store.WaitGroup().Done()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue