diff --git a/internal/client/client_fail_test.go b/internal/client/client_fail_test.go index c63c7227..6da27594 100644 --- a/internal/client/client_fail_test.go +++ b/internal/client/client_fail_test.go @@ -46,7 +46,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -54,7 +54,7 @@ func TestPanicInRecvLoop(t *testing.T) { require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`)) require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`)) - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() @@ -81,7 +81,7 @@ func TestPanicInRecvLoop(t *testing.T) { } func TestRecvErrorInMultipleRecvLoops(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := server.Addr() diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 2934c124..e107bb90 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -55,7 +55,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/config" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/tikvrpc" "go.uber.org/zap" @@ -119,7 +119,7 @@ func TestCancelTimeoutRetErr(t *testing.T) { } func TestSendWhenReconnect(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) rpcClient := NewRPCClient() @@ -243,7 +243,7 @@ func TestCollapseResolveLock(t *testing.T) { } func TestForwardMetadataByUnaryCall(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := fmt.Sprintf("%s:%d", "127.0.0.1", port) @@ -311,7 +311,7 @@ func TestForwardMetadataByUnaryCall(t *testing.T) { } func TestForwardMetadataByBatchCommands(t *testing.T) { - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) defer server.Stop() addr := server.Addr() @@ -650,7 +650,7 @@ func TestBatchClientRecoverAfterServerRestart(t *testing.T) { conf.TiKVClient.MaxBatchSize = 128 })() - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() require.True(t, port > 0) require.True(t, server.IsRunning()) addr := server.Addr() diff --git a/internal/client/mock_server/mock_tikv_service.go b/internal/client/mockserver/mock_tikv_service.go similarity index 89% rename from internal/client/mock_server/mock_tikv_service.go rename to internal/client/mockserver/mock_tikv_service.go index 392d3a5f..90c1b535 100644 --- a/internal/client/mock_server/mock_tikv_service.go +++ b/internal/client/mockserver/mock_tikv_service.go @@ -18,7 +18,7 @@ // https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/mock_tikv_service_test.go // -package mock_server +package mockserver import ( "context" @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" ) +// MockServer is a mock tikv server for testing purpose. type MockServer struct { tikvpb.TikvServer grpcServer *grpc.Server @@ -49,6 +50,7 @@ type MockServer struct { } } +// KvGet implements the TikvServer interface. func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb.GetResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err @@ -56,6 +58,7 @@ func (s *MockServer) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpc return &kvrpcpb.GetResponse{}, nil } +// KvScan implements the TikvServer interface. func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) { if err := s.checkMetadata(ctx); err != nil { return nil, err @@ -63,6 +66,7 @@ func (s *MockServer) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteReques return &kvrpcpb.PrewriteResponse{}, nil } +// KvCommit implements the TikvServer interface. func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_CoprocessorStreamServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err @@ -70,6 +74,7 @@ func (s *MockServer) CoprocessorStream(req *coprocessor.Request, ss tikvpb.Tikv_ return ss.Send(&coprocessor.Response{}) } +// KvBatchGet implements the TikvServer interface. func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { if err := s.checkMetadata(ss.Context()); err != nil { return err @@ -101,6 +106,7 @@ func (s *MockServer) BatchCommands(ss tikvpb.Tikv_BatchCommandsServer) error { } } +// SetMetaChecker set the meta checker for mock server. func (s *MockServer) SetMetaChecker(check func(context.Context) error) { s.metaChecker.Lock() s.metaChecker.check = check @@ -116,19 +122,23 @@ func (s *MockServer) checkMetadata(ctx context.Context) error { return nil } +// IsRunning returns true is the mock server is running. func (s *MockServer) IsRunning() bool { return atomic.LoadInt64(&s.running) == 1 } +// Addr returns the address of the mock server. func (s *MockServer) Addr() string { return s.addr } +// Stop stops the mock server. func (s *MockServer) Stop() { s.grpcServer.Stop() atomic.StoreInt64(&s.running, 0) } +// Start starts the mock server. func (s *MockServer) Start(addr string) int { if addr == "" { addr = fmt.Sprintf("%s:%d", "127.0.0.1", 0) @@ -159,7 +169,7 @@ func (s *MockServer) Start(addr string) int { return port } -// StartMockTikvService try to start a gRPC server and retrun the server instance and binded port. +// StartMockTikvService try to start a gRPC server and return the server instance and binded port. func StartMockTikvService() (*MockServer, int) { server := &MockServer{} port := server.Start("") diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 74158d7c..c40d91b3 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -58,7 +58,7 @@ import ( "github.com/tikv/client-go/v2/config/retry" "github.com/tikv/client-go/v2/internal/apicodec" "github.com/tikv/client-go/v2/internal/client" - "github.com/tikv/client-go/v2/internal/client/mock_server" + "github.com/tikv/client-go/v2/internal/client/mockserver" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/tikvrpc" "google.golang.org/grpc" @@ -707,7 +707,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestKVReadTimeoutWithDisableBatchC conf.TiKVClient.MaxBatchSize = 0 })() - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() s.True(port > 0) server.SetMetaChecker(func(ctx context.Context) error { return context.DeadlineExceeded @@ -742,7 +742,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestBatchClientSendLoopPanic() { conf.TiKVClient.MaxBatchSize = 128 })() - server, port := mock_server.StartMockTikvService() + server, port := mockserver.StartMockTikvService() s.True(port > 0) rpcClient := client.NewRPCClient() fnClient := &fnClient{fn: func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { diff --git a/tikv/gc.go b/tikv/gc.go index ae4d3eb7..90fb3bf8 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -35,7 +35,7 @@ import ( zap "go.uber.org/zap" ) -// We don't want gc to sweep out the cached info belong to other processes, like coprocessor. +// GCScanLockLimit We don't want gc to sweep out the cached info belong to other processes, like coprocessor. const GCScanLockLimit = txnlock.ResolvedCacheSize / 2 // GC does garbage collection (GC) of the TiKV cluster. @@ -98,11 +98,13 @@ func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrenc return nil } +// BaseRegionLockResolver is a base implementation of RegionLockResolver. type BaseRegionLockResolver struct { identifier string store Storage } +// NewRegionLockResolver creates a new BaseRegionLockResolver. func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockResolver { return &BaseRegionLockResolver{ identifier: identifier, @@ -110,18 +112,22 @@ func NewRegionLockResolver(identifier string, store Storage) *BaseRegionLockReso } } +// Identifier represents the name of this resolver. func (l *BaseRegionLockResolver) Identifier() string { return l.identifier } +// ResolveLocksInOneRegion tries to resolve expired locks for one region. func (l *BaseRegionLockResolver) ResolveLocksInOneRegion(bo *Backoffer, locks []*txnlock.Lock, loc *locate.KeyLocation) (*locate.KeyLocation, error) { return batchResolveLocksInOneRegion(bo, l.GetStore(), locks, loc) } +// ScanLocksInOneRegion return locks and location with given start key in a region. func (l *BaseRegionLockResolver) ScanLocksInOneRegion(bo *Backoffer, key []byte, maxVersion uint64, scanLimit uint32) ([]*txnlock.Lock, *locate.KeyLocation, error) { return scanLocksInOneRegionWithStartKey(bo, l.GetStore(), key, maxVersion, scanLimit) } +// GetStore is used to get store to GetRegionCache and SendReq for this lock resolver. func (l *BaseRegionLockResolver) GetStore() Storage { return l.store }