From 40a82457ebaafd489ea56a9d11c4e78d56669363 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 27 Feb 2023 11:23:58 +0800 Subject: [PATCH] tikv: configurable pool (#714) Signed-off-by: Weizhen Wang --- tikv/kv.go | 29 ++++++++++++++++++++------ tikv/pool.go | 45 ++++++++++++++++++++++++++++++++++++++++ txnkv/transaction/2pc.go | 13 +++++++++--- 3 files changed, 78 insertions(+), 9 deletions(-) create mode 100644 tikv/pool.go diff --git a/tikv/kv.go b/tikv/kv.go index a1a2a233..12f8ac64 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -48,7 +48,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pkg/errors" - "github.com/tiancaiamao/gp" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/internal/client" @@ -132,12 +131,12 @@ type KVStore struct { cancel context.CancelFunc wg sync.WaitGroup close atomicutil.Bool - gP *gp.Pool + gP Pool } // Go run the function in a separate goroutine. -func (s *KVStore) Go(f func()) { - s.gP.Go(f) +func (s *KVStore) Go(f func()) error { + return s.gP.Run(f) } // UpdateSPCache updates cached safepoint. @@ -172,8 +171,25 @@ func (s *KVStore) CheckVisibility(startTime uint64) error { return nil } +// Option is the option for pool. +type Option func(*KVStore) + +// WithPool set the pool +func WithPool(gp Pool) Option { + return func(o *KVStore) { + o.gP = gp + } +} + +// loadOption load KVStore option into KVStore. +func loadOption(store *KVStore, opt ...Option) { + for _, f := range opt { + f(store) + } +} + // NewKVStore creates a new TiKV store instance. -func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client) (*KVStore, error) { +func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Client, opt ...Option) (*KVStore, error) { o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond) if err != nil { return nil, err @@ -191,10 +207,11 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, tikvclient Cl replicaReadSeed: rand.Uint32(), ctx: ctx, cancel: cancel, - gP: gp.New(128, 10*time.Second), + gP: NewSpool(128, 10*time.Second), } store.clientMu.client = client.NewReqCollapse(client.NewInterceptedClient(tikvclient)) store.lockResolver = txnlock.NewLockResolver(store) + loadOption(store, opt...) store.wg.Add(2) go store.runSafePointChecker() diff --git a/tikv/pool.go b/tikv/pool.go new file mode 100644 index 00000000..ca2799b5 --- /dev/null +++ b/tikv/pool.go @@ -0,0 +1,45 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tikv + +import ( + "time" + + "github.com/tiancaiamao/gp" +) + +// Pool is a simple interface for goroutine pool. +type Pool interface { + Run(func()) error + Close() +} + +// Spool is a simple implementation of Pool. +type Spool struct { + gp.Pool +} + +// NewSpool creates a new Spool. +func NewSpool(n int, dur time.Duration) *Spool { + return &Spool{ + *gp.New(n, dur), + } +} + +// Run implements Pool.Run. +func (p *Spool) Run(fn func()) error { + p.Go(fn) + return nil +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 7af1e6ed..cd32deb4 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -116,7 +116,7 @@ type kvstore interface { // IsClose checks whether the store is closed. IsClose() bool // Go run the function in a separate goroutine. - Go(f func()) + Go(f func()) error } // twoPhaseCommitter executes a two-phase commit protocol. @@ -988,7 +988,7 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action return nil } c.store.WaitGroup().Add(1) - c.store.Go(func() { + err = c.store.Go(func() { defer c.store.WaitGroup().Done() if c.sessionID > 0 { if v, err := util.EvalFailpoint("beforeCommitSecondaries"); err == nil { @@ -1013,7 +1013,14 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action metrics.SecondaryLockCleanupFailureCounterCommit.Inc() } }) - + if err != nil { + c.store.WaitGroup().Done() + logutil.BgLogger().Error("fail to create goroutine", + zap.Uint64("session", c.sessionID), + zap.Stringer("action type", action), + zap.Error(err)) + return err + } } else { err = c.doActionOnBatches(bo, action, batchBuilder.allBatches()) }