From 806e1891f28ee9ead7aa58c6b7242dd837f07f00 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 17 Jun 2025 21:13:51 +0800 Subject: [PATCH] ci: add next-gen integration tests (#1661) Signed-off-by: ekexium --- .github/workflows/integration.yml | 80 ++++++- integration_tests/health_feedback_test.go | 2 + integration_tests/lock_test.go | 271 ++++++---------------- integration_tests/pd_api_test.go | 3 + integration_tests/pd_next_gen.toml | 2 + integration_tests/pipelined_memdb_test.go | 2 + integration_tests/scan_test.go | 5 + integration_tests/snapshot_fail_test.go | 50 ++-- integration_tests/snapshot_test.go | 46 ++-- integration_tests/tikv_next_gen.toml | 18 ++ integration_tests/util_test.go | 8 +- 11 files changed, 226 insertions(+), 261 deletions(-) create mode 100644 integration_tests/pd_next_gen.toml create mode 100644 integration_tests/tikv_next_gen.toml diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 5f279491..4b89a0a1 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -19,8 +19,11 @@ jobs: with: go-version: 1.24.1 + - name: Install gotestsum + run: go install gotest.tools/gotestsum@latest + - name: Test - run: go test ./... + run: gotestsum --format short-verbose -- ./... working-directory: integration_tests integration-local-race: @@ -35,8 +38,11 @@ jobs: with: go-version: 1.24.1 + - name: Install gotestsum + run: go install gotest.tools/gotestsum@latest + - name: Test - run: go test ./... -race + run: gotestsum --format short-verbose -- ./... -race working-directory: integration_tests integration-tikv: @@ -51,6 +57,9 @@ jobs: with: go-version: 1.24.1 + - name: Install gotestsum + run: go install gotest.tools/gotestsum@latest + - name: Fetch PD uses: shrink/actions-docker-extract@v1 id: extract-pd @@ -76,7 +85,7 @@ jobs: working-directory: integration_tests - name: Test - run: go test --with-tikv + run: gotestsum --format short-verbose -- --with-tikv working-directory: integration_tests integration-raw-tikv: @@ -94,6 +103,9 @@ jobs: with: go-version: 1.24.1 + - name: Install gotestsum + run: go install gotest.tools/gotestsum@latest + - name: Fetch PD uses: shrink/actions-docker-extract@v1 id: extract-pd @@ -119,5 +131,65 @@ jobs: working-directory: integration_tests/raw - name: Test - run: go test --with-tikv + run: gotestsum --format short-verbose -- --with-tikv working-directory: integration_tests/raw + + integration-next-gen-tikv: + if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip-integration-tests') }} + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: 1.24.1 + + - name: Install gotestsum + run: go install gotest.tools/gotestsum@latest + + - name: Fetch PD + uses: shrink/actions-docker-extract@v1 + id: extract-pd + with: + image: pingcap/pd:nightly + path: /pd-server + + - name: Fetch Next-Gen TiKV + uses: shrink/actions-docker-extract@v1 + id: extract-tikv + with: + image: gcr.io/pingcap-public/dbaas/tikv:dedicated-next-gen + path: /tikv-server + + - name: Run MinIO, PD & Next-Gen TiKV + run: | + mkdir -p tikv-data + # Install and run minio for S3 backend + curl -Lo minio https://dl.min.io/server/minio/release/linux-amd64/minio + chmod +x minio + mkdir -p $HOME/serverless_s3/cse-test + MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin ./minio server $HOME/serverless_s3 > minio.log 2>&1 & + sleep 5 + # Move binaries and run servers + mv ../${{steps.extract-pd.outputs.destination}}/pd-server . + mv ../${{steps.extract-tikv.outputs.destination}}/tikv-server . + ./pd-server --config pd_next_gen.toml > pd.log 2>&1 & + sleep 5 + ./tikv-server -C tikv_next_gen.toml --pd-endpoints="127.0.0.1:2379" --addr="127.0.0.1:20160" --data-dir=${{ github.workspace }}/integration_tests/tikv-data > tikv.log 2>&1 & + + echo "Waiting for servers to start..." + sleep 15 + + echo "--- MinIO Log ---" + tail minio.log || echo "minio.log not found" + echo "--- PD Log ---" + tail pd.log || echo "pd.log not found" + echo "--- TiKV Log ---" + tail tikv.log || echo "tikv.log not found" + working-directory: integration_tests + + - name: Test + run: gotestsum --format short-verbose -- -parallel 1 -tags=nextgen --with-tikv --keyspace-name='keyspace1' + working-directory: integration_tests diff --git a/integration_tests/health_feedback_test.go b/integration_tests/health_feedback_test.go index 286c9a5f..9ddaf021 100644 --- a/integration_tests/health_feedback_test.go +++ b/integration_tests/health_feedback_test.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !nextgen + package tikv_test import ( diff --git a/integration_tests/lock_test.go b/integration_tests/lock_test.go index 00190278..7dacc56e 100644 --- a/integration_tests/lock_test.go +++ b/integration_tests/lock_test.go @@ -37,12 +37,9 @@ package tikv_test import ( "bytes" "context" - "encoding/json" stderrs "errors" "fmt" - "io" "math" - "net/http" "sync" "sync/atomic" "testing" @@ -51,9 +48,7 @@ import ( "github.com/pingcap/failpoint" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/metapb" "github.com/pkg/errors" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/config/retry" @@ -1105,127 +1100,10 @@ func (s *testLockWithTiKVSuite) checkIsKeyLocked(key []byte, expectedLocked bool s.NoError(err) } -func (s *testLockWithTiKVSuite) trySetTiKVConfig(name string, value interface{}) func() { - stores, err := s.store.GetPDClient().GetAllStores(context.Background()) - s.NoError(err) - - type configItem struct { - url string - name string - value interface{} - } - - var recoverConfigs []configItem - - httpScheme := "http" - if c, err := config.GetGlobalConfig().Security.ToTLSConfig(); err == nil && c != nil { - httpScheme = "https" - } - - t := s.Suite.T() - - setCfg := func(url, name string, value interface{}) error { - postBody, err := json.Marshal(map[string]interface{}{name: value}) - if err != nil { - return err - } - resp, err := http.Post(url, "text/json", bytes.NewReader(postBody)) - if err != nil { - return err - } - s.NoError(resp.Body.Close()) - if resp.StatusCode != 200 { - return errors.Errorf("post config got unexpected status code: %v, request body: %s", resp.StatusCode, postBody) - } - t.Logf("set config for tikv at %s finished: %s", url, string(postBody)) - return nil - } - -storeIter: - for _, store := range stores { - if store.State != metapb.StoreState_Up { - continue - } - for _, label := range store.Labels { - if label.Key == "engine" && label.Value != "tikv" { - continue storeIter - } - } - - err := func() (err error) { - defer func() { - if r := recover(); r != nil { - err = errors.Errorf("set config for store at %v panicked: %v", store.StatusAddress, r) - } - }() - - url := fmt.Sprintf("%s://%s/config", httpScheme, store.StatusAddress) - resp, err := http.Get(url) - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return errors.Errorf("unexpected response status: %v", resp.Status) - } - oldCfgRaw, err := io.ReadAll(resp.Body) - if err != nil { - return err - } - - oldCfg := make(map[string]interface{}) - err = json.Unmarshal(oldCfgRaw, &oldCfg) - if err != nil { - return err - } - - oldValue := oldCfg["pessimistic-txn"].(map[string]interface{})["in-memory"] - if assert.ObjectsAreEqual(oldValue, value) { - return nil - } - - err = setCfg(url, name, value) - if err != nil { - return err - } - - recoverConfigs = append(recoverConfigs, configItem{ - url: url, - name: name, - value: oldValue, - }) - - return nil - }() - - if err != nil { - t.Logf("failed to set config for store at %s: %v", store.StatusAddress, err) - } - } - - // Prevent goleak from complaining about its internal connections. - http.DefaultClient.CloseIdleConnections() - - if len(recoverConfigs) > 0 { - // Sleep for a while to ensure the new configs are applied. - time.Sleep(time.Second) - } - - return func() { - for _, item := range recoverConfigs { - err = setCfg(item.url, item.name, item.value) - if err != nil { - t.Logf("failed to recover config for store at %s: %v", item.url, err) - } - } - - // Prevent goleak from complaining about its internal connections. - http.DefaultClient.CloseIdleConnections() - } -} - func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() { + if config.NextGen { + s.T().Skip("NextGen does not support fair locking") + } test := func(asyncCommit bool, onePC bool, causalConsistency bool) { k1 := []byte("k1") k2 := []byte("k2") @@ -1481,11 +1359,6 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() { } func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { - if *withTiKV { - recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", false) - defer recoverFunc() - } - s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`)) s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`)) s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`)) @@ -1542,6 +1415,9 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() { // k4 has txn2's stale primary pessimistic lock now. currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + // sleep a while for pipelined pessimistic locks + time.Sleep(time.Millisecond * 100) + remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS) s.NoError(err) @@ -1621,84 +1497,75 @@ func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() { s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL")) s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL")) }() - test := func(inMemoryLock bool) { - if *withTiKV { - recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", inMemoryLock) - defer recoverFunc() - } + // Init, cleanup possible left locks. + bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) + ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) + s.cleanupLocks() - // Init, cleanup possible left locks. - bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil) - ctx := context.WithValue(context.Background(), util.SessionID, uint64(1)) - s.cleanupLocks() + // Basic case, three keys could be rolled back within one pessimistic rollback request. + k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3") + txn1, err := s.store.Begin() + s.NoError(err) + startTS := txn1.StartTS() + txn1.SetPessimistic(true) + lockCtx := kv.NewLockCtx(startTS, 200, time.Now()) + err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3) + s.NoError(err) + txn1.GetCommitter().CloseTTLManager() - // Basic case, three keys could be rolled back within one pessimistic rollback request. - k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3") - txn1, err := s.store.Begin() - s.NoError(err) - startTS := txn1.StartTS() - txn1.SetPessimistic(true) - lockCtx := kv.NewLockCtx(startTS, 200, time.Now()) - err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3) - s.NoError(err) - txn1.GetCommitter().CloseTTLManager() + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l")) + locks := []*txnlock.Lock{ + s.makeLock(startTS, startTS, k3, k1), + } + s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l")) - time.Sleep(time.Millisecond * 100) - s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l")) - locks := []*txnlock.Lock{ - s.makeLock(startTS, startTS, k3, k1), - } - s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l")) + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l")) - time.Sleep(time.Millisecond * 100) - s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l")) - - // Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys. - formatKey := func(prefix rune, i int) []byte { - return []byte(fmt.Sprintf("%c%04d", prefix, i)) - } - numKeys := 1000 - prewriteKeys := make([][]byte, 0, numKeys/2) - pessimisticLockKeys := make([][]byte, 0, numKeys/2) - for i := 0; i < numKeys; i++ { - key := formatKey('k', i) - if i%2 == 0 { - err = txn1.LockKeys(ctx, lockCtx, key) - pessimisticLockKeys = append(pessimisticLockKeys, key) - } else { - err = txn1.Set(key, []byte("val")) - s.NoError(err) - prewriteKeys = append(prewriteKeys, key) - } + // Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys. + formatKey := func(prefix rune, i int) []byte { + return []byte(fmt.Sprintf("%c%04d", prefix, i)) + } + numKeys := 1000 + prewriteKeys := make([][]byte, 0, numKeys/2) + pessimisticLockKeys := make([][]byte, 0, numKeys/2) + for i := 0; i < numKeys; i++ { + key := formatKey('k', i) + if i%2 == 0 { + err = txn1.LockKeys(ctx, lockCtx, key) + pessimisticLockKeys = append(pessimisticLockKeys, key) + } else { + err = txn1.Set(key, []byte("val")) s.NoError(err) + prewriteKeys = append(prewriteKeys, key) } - committer, err := txn1.NewCommitter(1) - s.NoError(err) - mutations := committer.MutationsOfKeys(prewriteKeys) - err = committer.PrewriteMutations(ctx, mutations) - s.NoError(err) - - // All the pessimistic locks belonging to the same transaction are pessimistic - // rolled back within one request. - time.Sleep(time.Millisecond * 100) - pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0]) - _, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{ - CallerStartTS: startTS + 1, - Locks: []*txnlock.Lock{pessimisticLock}, - Lite: false, - ForRead: false, - Detail: nil, - PessimisticRegionResolve: true, - }) - s.NoError(err) - - time.Sleep(time.Millisecond * 100) - s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l")) - - // Cleanup. - err = txn1.Rollback() s.NoError(err) } - test(false) - test(true) + committer, err := txn1.NewCommitter(1) + s.NoError(err) + mutations := committer.MutationsOfKeys(prewriteKeys) + err = committer.PrewriteMutations(ctx, mutations) + s.NoError(err) + + // All the pessimistic locks belonging to the same transaction are pessimistic + // rolled back within one request. + time.Sleep(time.Millisecond * 100) + pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0]) + _, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{ + CallerStartTS: startTS + 1, + Locks: []*txnlock.Lock{pessimisticLock}, + Lite: false, + ForRead: false, + Detail: nil, + PessimisticRegionResolve: true, + }) + s.NoError(err) + + time.Sleep(time.Millisecond * 100) + s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l")) + + // Cleanup. + err = txn1.Rollback() + s.NoError(err) } diff --git a/integration_tests/pd_api_test.go b/integration_tests/pd_api_test.go index 0266f1aa..dcfeea33 100644 --- a/integration_tests/pd_api_test.go +++ b/integration_tests/pd_api_test.go @@ -101,6 +101,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error { } func (s *apiTestSuite) TestGetStoresMinResolvedTS() { + if config.NextGen { + s.T().Skip("NextGen does not support resolved ts yet") + } util.EnableFailpoints() require := s.Require() mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient()) diff --git a/integration_tests/pd_next_gen.toml b/integration_tests/pd_next_gen.toml new file mode 100644 index 00000000..e607cae0 --- /dev/null +++ b/integration_tests/pd_next_gen.toml @@ -0,0 +1,2 @@ +[keyspace] +pre-alloc = ["keyspace1"] \ No newline at end of file diff --git a/integration_tests/pipelined_memdb_test.go b/integration_tests/pipelined_memdb_test.go index ac778012..1f8dee3f 100644 --- a/integration_tests/pipelined_memdb_test.go +++ b/integration_tests/pipelined_memdb_test.go @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !nextgen + package tikv_test import ( diff --git a/integration_tests/scan_test.go b/integration_tests/scan_test.go index fba8885a..c7ee8544 100644 --- a/integration_tests/scan_test.go +++ b/integration_tests/scan_test.go @@ -40,6 +40,7 @@ import ( "testing" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/txnkv/transaction" @@ -183,6 +184,10 @@ func (s *testScanSuite) TestScan() { err = committer4.PrewriteAllMutations(context.Background()) s.Nil(err) txn5 := s.beginTxn() + if config.NextGen { + // NextGen doesn't support RC yet, skip this rest part + return + } txn5.GetSnapshot().SetIsolationLevel(txnsnapshot.RC) var meetLocks []*txnkv.Lock resolver := tikv.NewLockResolverProb(s.store.GetLockResolver()) diff --git a/integration_tests/snapshot_fail_test.go b/integration_tests/snapshot_fail_test.go index 17ac9436..620b89d2 100644 --- a/integration_tests/snapshot_fail_test.go +++ b/integration_tests/snapshot_fail_test.go @@ -36,6 +36,7 @@ package tikv_test import ( "context" + "fmt" "math" "testing" "time" @@ -43,6 +44,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/stretchr/testify/suite" + "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" @@ -57,30 +59,18 @@ func TestSnapshotFail(t *testing.T) { type testSnapshotFailSuite struct { suite.Suite - store tikv.StoreProbe + store tikv.StoreProbe + prefix string } -func (s *testSnapshotFailSuite) SetupSuite() { +func (s *testSnapshotFailSuite) SetupTest() { + s.prefix = fmt.Sprintf("test_snapshot_fail_%d", time.Now().Unix()) store := NewTestUniStore(s.T()) s.store = tikv.StoreProbe{KVStore: store} } -func (s *testSnapshotFailSuite) TearDownSuite() { - s.store.Close() -} - func (s *testSnapshotFailSuite) TearDownTest() { - txn, err := s.store.Begin() - s.Require().Nil(err) - iter, err := txn.Iter([]byte(""), []byte("")) - s.Require().Nil(err) - for iter.Valid() { - err = txn.Delete(iter.Key()) - s.Require().Nil(err) - err = iter.Next() - s.Require().Nil(err) - } - s.Require().Nil(txn.Commit(context.TODO())) + s.store.Close() } func (s *testSnapshotFailSuite) TestBatchGetResponseKeyError() { @@ -110,11 +100,14 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() { // Put two KV pairs txn, err := s.store.Begin() s.Require().Nil(err) - err = txn.Set([]byte("k1"), []byte("v1")) + k1 := encodeKey(s.prefix, "k1") + k2 := encodeKey(s.prefix, "k2") + k3 := encodeKey(s.prefix, "k3") + err = txn.Set(k1, []byte("v1")) s.Nil(err) - err = txn.Set([]byte("k2"), []byte("v2")) + err = txn.Set(k2, []byte("v2")) s.Nil(err) - err = txn.Set([]byte("k3"), []byte("v3")) + err = txn.Set(k3, []byte("v3")) s.Nil(err) err = txn.Commit(context.Background()) s.Nil(err) @@ -122,15 +115,15 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() { s.Require().Nil(failpoint.Enable("tikvclient/rpcScanResult", `1*return("keyError")`)) txn, err = s.store.Begin() s.Require().Nil(err) - iter, err := txn.Iter([]byte("a"), []byte("z")) + iter, err := txn.Iter(encodeKey(s.prefix, "a"), encodeKey(s.prefix, "z")) s.Nil(err) - s.Equal(iter.Key(), []byte("k1")) + s.Equal(iter.Key(), k1) s.Equal(iter.Value(), []byte("v1")) s.Nil(iter.Next()) - s.Equal(iter.Key(), []byte("k2")) + s.Equal(iter.Key(), k2) s.Equal(iter.Value(), []byte("v2")) s.Nil(iter.Next()) - s.Equal(iter.Key(), []byte("k3")) + s.Equal(iter.Key(), k3) s.Equal(iter.Value(), []byte("v3")) s.Nil(iter.Next()) s.False(iter.Valid()) @@ -139,12 +132,12 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() { s.Require().Nil(failpoint.Enable("tikvclient/rpcScanResult", `1*return("keyError")`)) txn, err = s.store.Begin() s.Require().Nil(err) - iter, err = txn.Iter([]byte("k2"), []byte("k4")) + iter, err = txn.Iter(k2, encodeKey(s.prefix, "k4")) s.Nil(err) - s.Equal(iter.Key(), []byte("k2")) + s.Equal(iter.Key(), k2) s.Equal(iter.Value(), []byte("v2")) s.Nil(iter.Next()) - s.Equal(iter.Key(), []byte("k3")) + s.Equal(iter.Key(), k3) s.Equal(iter.Value(), []byte("v3")) s.Nil(iter.Next()) s.False(iter.Valid()) @@ -320,6 +313,9 @@ func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock { } func (s *testSnapshotFailSuite) TestSnapshotUseResolveForRead() { + if config.NextGen { + s.T().Skip("NextGen does not support read through locks") + } s.Nil(failpoint.Enable("tikvclient/resolveLock", "sleep(500)")) s.Nil(failpoint.Enable("tikvclient/resolveAsyncResolveData", "sleep(500)")) defer func() { diff --git a/integration_tests/snapshot_test.go b/integration_tests/snapshot_test.go index 3f0ef971..2d52f7ef 100644 --- a/integration_tests/snapshot_test.go +++ b/integration_tests/snapshot_test.go @@ -36,6 +36,7 @@ package tikv_test import ( "context" + "encoding/hex" "fmt" "math" "sync" @@ -66,27 +67,14 @@ type testSnapshotSuite struct { rowNums []int } -func (s *testSnapshotSuite) SetupSuite() { - s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())} - s.prefix = fmt.Sprintf("snapshot_%d", time.Now().Unix()) - s.rowNums = append(s.rowNums, 1, 100, 191) +func (s *testSnapshotSuite) SetupTest() { + s.prefix = fmt.Sprintf("test_snapshot_%d", time.Now().Unix()) + store := NewTestUniStore(s.T()) + s.store = tikv.StoreProbe{KVStore: store} } -func (s *testSnapshotSuite) TearDownSuite() { - txn := s.beginTxn() - scanner, err := txn.Iter(encodeKey(s.prefix, ""), nil) - s.Nil(err) - s.NotNil(scanner) - for scanner.Valid() { - k := scanner.Key() - err = txn.Delete(k) - s.Nil(err) - scanner.Next() - } - err = txn.Commit(context.Background()) - s.Nil(err) - err = s.store.Close() - s.Nil(err) +func (s *testSnapshotSuite) TearDownTest() { + s.store.Close() } func (s *testSnapshotSuite) beginTxn() transaction.TxnProbe { @@ -214,23 +202,29 @@ func makeKeys(rowNum int, prefix string) [][]byte { } func (s *testSnapshotSuite) TestSkipLargeTxnLock() { - x := []byte("x_key_TestSkipLargeTxnLock") - y := []byte("y_key_TestSkipLargeTxnLock") + x, y := encodeKey(s.prefix, "x_TestSkipLargeTxnLock"), encodeKey(s.prefix, "y_TestSkipLargeTxnLock") txn := s.beginTxn() s.Nil(txn.Set(x, []byte("x"))) s.Nil(txn.Set(y, []byte("y"))) ctx := context.Background() - committer, err := txn.NewCommitter(0) + committer, err := txn.NewCommitter(1) s.Nil(err) committer.SetLockTTL(3000) + s.False(committer.IsAsyncCommit()) s.Nil(committer.PrewriteAllMutations(ctx)) txn1 := s.beginTxn() // txn1 is not blocked by txn in the large txn protocol. - _, err = txn1.Get(ctx, x) + r, err := txn1.Get(ctx, x) + if err != nil { + println(err.Error()) + } else { + println(hex.EncodeToString(r)) + } s.True(error.IsErrNotFound(err)) - res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys([][]byte{x, y, []byte("z")})) + testKeys := [][]byte{encodeKey(s.prefix, "z")} + res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys(testKeys)) s.Nil(err) s.Len(res, 0) @@ -357,6 +351,8 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() { } func (s *testSnapshotSuite) TestRCRead() { + // next-gen doesn't support RC yet, skip this test + s.T().Skip("next-gen doesn't support RC yet, skip this test") for _, rowNum := range s.rowNums { s.T().Logf("test RC Read, length=%v", rowNum) txn := s.beginTxn() @@ -388,7 +384,7 @@ func (s *testSnapshotSuite) TestRCRead() { // get v, err := snapshot.Get(context.Background(), key0) s.Nil(err) - s.Equal(len(meetLocks), 0) + s.Equal(0, len(meetLocks)) s.Equal(v, valueBytes(0)) // batch get m, err := snapshot.BatchGet(context.Background(), keys) diff --git a/integration_tests/tikv_next_gen.toml b/integration_tests/tikv_next_gen.toml new file mode 100644 index 00000000..da711417 --- /dev/null +++ b/integration_tests/tikv_next_gen.toml @@ -0,0 +1,18 @@ +[storage] +reserve-space = "1MB" +api-version = 2 +enable-ttl = true + +[dfs] +prefix = "tikv" +s3-endpoint = "127.0.0.1:9000" +s3-key-id = "minioadmin" +s3-secret-key = "minioadmin" +s3-bucket = "cse-test" +s3-region = "local" + +[rocksdb] +max-open-files = 10000 + +[raftdb] +max-open-files = 10000 diff --git a/integration_tests/util_test.go b/integration_tests/util_test.go index 6ca49a89..eb328846 100644 --- a/integration_tests/util_test.go +++ b/integration_tests/util_test.go @@ -60,8 +60,9 @@ import ( ) var ( - withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)") - pdAddrs = flag.String("pd-addrs", "http://127.0.0.1:2379", "pd addrs") + withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)") + keyspaceName = flag.String("keyspace-name", "", "keyspace name") + pdAddrs = flag.String("pd-addrs", "http://127.0.0.1:2379", "pd addrs") ) // NewTestStore creates a KVStore for testing purpose. @@ -110,7 +111,8 @@ func newTiKVStore(t *testing.T) *tikv.KVStore { pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) opt = tikv.WithCodec(tikv.NewCodecV1(tikv.ModeTxn)) case kvrpcpb.APIVersion_V2: - codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, tikv.DefaultKeyspaceName) + // The default keyspace cannot be used in API v2 :(. Must specify a valid keyspace + codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, *keyspaceName) pdClient = codecCli re.Nil(err) opt = tikv.WithCodec(codecCli.GetCodec())