bind the goroutine pool with store and close on Close (#645)

Signed-off-by: tiancaiamao <tiancaiamao@gmail.com>
This commit is contained in:
tiancaiamao 2022-12-26 16:01:48 +08:00 committed by GitHub
parent fe3536dd59
commit 018c59dbd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 19 additions and 8 deletions

2
go.mod
View File

@ -20,7 +20,7 @@ require (
github.com/prometheus/client_model v0.2.0
github.com/stathat/consistent v1.0.0
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/twmb/murmur3 v1.1.3
go.etcd.io/etcd/api/v3 v3.5.2

4
go.sum
View File

@ -201,8 +201,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=

View File

@ -69,7 +69,7 @@ require (
github.com/shirou/gopsutil/v3 v3.22.7 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 // indirect
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twmb/murmur3 v1.1.3 // indirect

View File

@ -507,6 +507,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1 h1:iffZXeHZTd35tTOS3nJ2OyMUmn40eNkLHCeQXMs6KYI=
github.com/tiancaiamao/gp v0.0.0-20221214071713-abacb15f16f1/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b h1:4RNtqw1/tW67qP9fFgfQpTVd7DrfkaAWu4vsC18QmBo=
github.com/tiancaiamao/gp v0.0.0-20221221095600-1a473d1f9b4b/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/gjson v1.14.1 h1:iymTbGkQBhveq21bEvAQ81I0LEBork8BFe1CUZXdyuo=
github.com/tidwall/gjson v1.14.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=

View File

@ -48,6 +48,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/tiancaiamao/gp"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
@ -130,6 +131,12 @@ type KVStore struct {
cancel context.CancelFunc
wg sync.WaitGroup
close atomicutil.Bool
gP *gp.Pool
}
// Go run the function in a separate goroutine.
func (s *KVStore) Go(f func()) {
s.gP.Go(f)
}
// UpdateSPCache updates cached safepoint.
@ -183,6 +190,7 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl
replicaReadSeed: rand.Uint32(),
ctx: ctx,
cancel: cancel,
gP: gp.New(128, 10*time.Second),
}
store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient))
store.lockResolver = txnlock.NewLockResolver(store)
@ -302,6 +310,7 @@ func (s *KVStore) GetSnapshot(ts uint64) *txnsnapshot.KVSnapshot {
// Close store
func (s *KVStore) Close() error {
defer s.gP.Close()
s.close.Store(true)
s.cancel()
s.wg.Wait()

View File

@ -49,7 +49,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/tiancaiamao/gp"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/client"
@ -89,8 +88,6 @@ var (
CommitMaxBackoff = uint64(40000)
)
var gP = gp.New(128, 10*time.Second)
type kvstore interface {
// GetRegionCache gets the RegionCache.
GetRegionCache() *locate.RegionCache
@ -118,6 +115,8 @@ type kvstore interface {
GetClusterID() uint64
// IsClose checks whether the store is closed.
IsClose() bool
// Go run the function in a separate goroutine.
Go(f func())
}
// twoPhaseCommitter executes a two-phase commit protocol.
@ -990,7 +989,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
return nil
}
c.store.WaitGroup().Add(1)
gP.Go(func() {
c.store.Go(func() {
defer c.store.WaitGroup().Done()
if c.sessionID > 0 {
if v, err := util.EvalFailpoint("beforeCommitSecondaries"); err == nil {
@ -1015,6 +1014,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
metrics.SecondaryLockCleanupFailureCounterCommit.Inc()
}
})
} else {
err = c.doActionOnBatches(bo, action, batchBuilder.allBatches())
}