mirror of https://github.com/tikv/client-go.git
ci: add next-gen integration tests (#1661)
Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
parent
f75488ba72
commit
806e1891f2
|
|
@ -19,8 +19,11 @@ jobs:
|
|||
with:
|
||||
go-version: 1.24.1
|
||||
|
||||
- name: Install gotestsum
|
||||
run: go install gotest.tools/gotestsum@latest
|
||||
|
||||
- name: Test
|
||||
run: go test ./...
|
||||
run: gotestsum --format short-verbose -- ./...
|
||||
working-directory: integration_tests
|
||||
|
||||
integration-local-race:
|
||||
|
|
@ -35,8 +38,11 @@ jobs:
|
|||
with:
|
||||
go-version: 1.24.1
|
||||
|
||||
- name: Install gotestsum
|
||||
run: go install gotest.tools/gotestsum@latest
|
||||
|
||||
- name: Test
|
||||
run: go test ./... -race
|
||||
run: gotestsum --format short-verbose -- ./... -race
|
||||
working-directory: integration_tests
|
||||
|
||||
integration-tikv:
|
||||
|
|
@ -51,6 +57,9 @@ jobs:
|
|||
with:
|
||||
go-version: 1.24.1
|
||||
|
||||
- name: Install gotestsum
|
||||
run: go install gotest.tools/gotestsum@latest
|
||||
|
||||
- name: Fetch PD
|
||||
uses: shrink/actions-docker-extract@v1
|
||||
id: extract-pd
|
||||
|
|
@ -76,7 +85,7 @@ jobs:
|
|||
working-directory: integration_tests
|
||||
|
||||
- name: Test
|
||||
run: go test --with-tikv
|
||||
run: gotestsum --format short-verbose -- --with-tikv
|
||||
working-directory: integration_tests
|
||||
|
||||
integration-raw-tikv:
|
||||
|
|
@ -94,6 +103,9 @@ jobs:
|
|||
with:
|
||||
go-version: 1.24.1
|
||||
|
||||
- name: Install gotestsum
|
||||
run: go install gotest.tools/gotestsum@latest
|
||||
|
||||
- name: Fetch PD
|
||||
uses: shrink/actions-docker-extract@v1
|
||||
id: extract-pd
|
||||
|
|
@ -119,5 +131,65 @@ jobs:
|
|||
working-directory: integration_tests/raw
|
||||
|
||||
- name: Test
|
||||
run: go test --with-tikv
|
||||
run: gotestsum --format short-verbose -- --with-tikv
|
||||
working-directory: integration_tests/raw
|
||||
|
||||
integration-next-gen-tikv:
|
||||
if: ${{ !contains(github.event.pull_request.labels.*.name, 'skip-integration-tests') }}
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: 1.24.1
|
||||
|
||||
- name: Install gotestsum
|
||||
run: go install gotest.tools/gotestsum@latest
|
||||
|
||||
- name: Fetch PD
|
||||
uses: shrink/actions-docker-extract@v1
|
||||
id: extract-pd
|
||||
with:
|
||||
image: pingcap/pd:nightly
|
||||
path: /pd-server
|
||||
|
||||
- name: Fetch Next-Gen TiKV
|
||||
uses: shrink/actions-docker-extract@v1
|
||||
id: extract-tikv
|
||||
with:
|
||||
image: gcr.io/pingcap-public/dbaas/tikv:dedicated-next-gen
|
||||
path: /tikv-server
|
||||
|
||||
- name: Run MinIO, PD & Next-Gen TiKV
|
||||
run: |
|
||||
mkdir -p tikv-data
|
||||
# Install and run minio for S3 backend
|
||||
curl -Lo minio https://dl.min.io/server/minio/release/linux-amd64/minio
|
||||
chmod +x minio
|
||||
mkdir -p $HOME/serverless_s3/cse-test
|
||||
MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin ./minio server $HOME/serverless_s3 > minio.log 2>&1 &
|
||||
sleep 5
|
||||
# Move binaries and run servers
|
||||
mv ../${{steps.extract-pd.outputs.destination}}/pd-server .
|
||||
mv ../${{steps.extract-tikv.outputs.destination}}/tikv-server .
|
||||
./pd-server --config pd_next_gen.toml > pd.log 2>&1 &
|
||||
sleep 5
|
||||
./tikv-server -C tikv_next_gen.toml --pd-endpoints="127.0.0.1:2379" --addr="127.0.0.1:20160" --data-dir=${{ github.workspace }}/integration_tests/tikv-data > tikv.log 2>&1 &
|
||||
|
||||
echo "Waiting for servers to start..."
|
||||
sleep 15
|
||||
|
||||
echo "--- MinIO Log ---"
|
||||
tail minio.log || echo "minio.log not found"
|
||||
echo "--- PD Log ---"
|
||||
tail pd.log || echo "pd.log not found"
|
||||
echo "--- TiKV Log ---"
|
||||
tail tikv.log || echo "tikv.log not found"
|
||||
working-directory: integration_tests
|
||||
|
||||
- name: Test
|
||||
run: gotestsum --format short-verbose -- -parallel 1 -tags=nextgen --with-tikv --keyspace-name='keyspace1'
|
||||
working-directory: integration_tests
|
||||
|
|
|
|||
|
|
@ -12,6 +12,8 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !nextgen
|
||||
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
|
|
|
|||
|
|
@ -37,12 +37,9 @@ package tikv_test
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
stderrs "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
|
@ -51,9 +48,7 @@ import (
|
|||
"github.com/pingcap/failpoint"
|
||||
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/config/retry"
|
||||
|
|
@ -1105,127 +1100,10 @@ func (s *testLockWithTiKVSuite) checkIsKeyLocked(key []byte, expectedLocked bool
|
|||
s.NoError(err)
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) trySetTiKVConfig(name string, value interface{}) func() {
|
||||
stores, err := s.store.GetPDClient().GetAllStores(context.Background())
|
||||
s.NoError(err)
|
||||
|
||||
type configItem struct {
|
||||
url string
|
||||
name string
|
||||
value interface{}
|
||||
}
|
||||
|
||||
var recoverConfigs []configItem
|
||||
|
||||
httpScheme := "http"
|
||||
if c, err := config.GetGlobalConfig().Security.ToTLSConfig(); err == nil && c != nil {
|
||||
httpScheme = "https"
|
||||
}
|
||||
|
||||
t := s.Suite.T()
|
||||
|
||||
setCfg := func(url, name string, value interface{}) error {
|
||||
postBody, err := json.Marshal(map[string]interface{}{name: value})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := http.Post(url, "text/json", bytes.NewReader(postBody))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.NoError(resp.Body.Close())
|
||||
if resp.StatusCode != 200 {
|
||||
return errors.Errorf("post config got unexpected status code: %v, request body: %s", resp.StatusCode, postBody)
|
||||
}
|
||||
t.Logf("set config for tikv at %s finished: %s", url, string(postBody))
|
||||
return nil
|
||||
}
|
||||
|
||||
storeIter:
|
||||
for _, store := range stores {
|
||||
if store.State != metapb.StoreState_Up {
|
||||
continue
|
||||
}
|
||||
for _, label := range store.Labels {
|
||||
if label.Key == "engine" && label.Value != "tikv" {
|
||||
continue storeIter
|
||||
}
|
||||
}
|
||||
|
||||
err := func() (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = errors.Errorf("set config for store at %v panicked: %v", store.StatusAddress, r)
|
||||
}
|
||||
}()
|
||||
|
||||
url := fmt.Sprintf("%s://%s/config", httpScheme, store.StatusAddress)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
return errors.Errorf("unexpected response status: %v", resp.Status)
|
||||
}
|
||||
oldCfgRaw, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oldCfg := make(map[string]interface{})
|
||||
err = json.Unmarshal(oldCfgRaw, &oldCfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oldValue := oldCfg["pessimistic-txn"].(map[string]interface{})["in-memory"]
|
||||
if assert.ObjectsAreEqual(oldValue, value) {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = setCfg(url, name, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
recoverConfigs = append(recoverConfigs, configItem{
|
||||
url: url,
|
||||
name: name,
|
||||
value: oldValue,
|
||||
})
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
t.Logf("failed to set config for store at %s: %v", store.StatusAddress, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent goleak from complaining about its internal connections.
|
||||
http.DefaultClient.CloseIdleConnections()
|
||||
|
||||
if len(recoverConfigs) > 0 {
|
||||
// Sleep for a while to ensure the new configs are applied.
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
return func() {
|
||||
for _, item := range recoverConfigs {
|
||||
err = setCfg(item.url, item.name, item.value)
|
||||
if err != nil {
|
||||
t.Logf("failed to recover config for store at %s: %v", item.url, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent goleak from complaining about its internal connections.
|
||||
http.DefaultClient.CloseIdleConnections()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) TestPrewriteCheckForUpdateTS() {
|
||||
if config.NextGen {
|
||||
s.T().Skip("NextGen does not support fair locking")
|
||||
}
|
||||
test := func(asyncCommit bool, onePC bool, causalConsistency bool) {
|
||||
k1 := []byte("k1")
|
||||
k2 := []byte("k2")
|
||||
|
|
@ -1481,11 +1359,6 @@ func (s *testLockWithTiKVSuite) TestCheckTxnStatusSentToSecondary() {
|
|||
}
|
||||
|
||||
func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
|
||||
if *withTiKV {
|
||||
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", false)
|
||||
defer recoverFunc()
|
||||
}
|
||||
|
||||
s.NoError(failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
|
||||
s.NoError(failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
|
||||
s.NoError(failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
|
||||
|
|
@ -1542,6 +1415,9 @@ func (s *testLockWithTiKVSuite) TestBatchResolveLocks() {
|
|||
// k4 has txn2's stale primary pessimistic lock now.
|
||||
currentTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope)
|
||||
|
||||
// sleep a while for pipelined pessimistic locks
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
|
||||
remainingLocks, err := s.store.ScanLocks(ctx, []byte("k"), []byte("l"), currentTS)
|
||||
s.NoError(err)
|
||||
|
||||
|
|
@ -1621,84 +1497,75 @@ func (s *testLockWithTiKVSuite) TestPessimisticRollbackWithRead() {
|
|||
s.NoError(failpoint.Disable("tikvclient/shortPessimisticLockTTL"))
|
||||
s.NoError(failpoint.Disable("tikvclient/twoPCShortLockTTL"))
|
||||
}()
|
||||
test := func(inMemoryLock bool) {
|
||||
if *withTiKV {
|
||||
recoverFunc := s.trySetTiKVConfig("pessimistic-txn.in-memory", inMemoryLock)
|
||||
defer recoverFunc()
|
||||
}
|
||||
// Init, cleanup possible left locks.
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
|
||||
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
||||
s.cleanupLocks()
|
||||
|
||||
// Init, cleanup possible left locks.
|
||||
bo := tikv.NewBackofferWithVars(context.Background(), int(transaction.PrewriteMaxBackoff.Load()), nil)
|
||||
ctx := context.WithValue(context.Background(), util.SessionID, uint64(1))
|
||||
s.cleanupLocks()
|
||||
// Basic case, three keys could be rolled back within one pessimistic rollback request.
|
||||
k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
|
||||
txn1, err := s.store.Begin()
|
||||
s.NoError(err)
|
||||
startTS := txn1.StartTS()
|
||||
txn1.SetPessimistic(true)
|
||||
lockCtx := kv.NewLockCtx(startTS, 200, time.Now())
|
||||
err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3)
|
||||
s.NoError(err)
|
||||
txn1.GetCommitter().CloseTTLManager()
|
||||
|
||||
// Basic case, three keys could be rolled back within one pessimistic rollback request.
|
||||
k1, k2, k3 := []byte("k1"), []byte("k2"), []byte("k3")
|
||||
txn1, err := s.store.Begin()
|
||||
s.NoError(err)
|
||||
startTS := txn1.StartTS()
|
||||
txn1.SetPessimistic(true)
|
||||
lockCtx := kv.NewLockCtx(startTS, 200, time.Now())
|
||||
err = txn1.LockKeys(ctx, lockCtx, k1, k2, k3)
|
||||
s.NoError(err)
|
||||
txn1.GetCommitter().CloseTTLManager()
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l"))
|
||||
locks := []*txnlock.Lock{
|
||||
s.makeLock(startTS, startTS, k3, k1),
|
||||
}
|
||||
s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 3, startTS+1, []byte("k"), []byte("l"))
|
||||
locks := []*txnlock.Lock{
|
||||
s.makeLock(startTS, startTS, k3, k1),
|
||||
}
|
||||
s.mustResolve(ctx, bo, locks, startTS+1, []byte("k"), []byte("l"))
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, 0, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
// Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys.
|
||||
formatKey := func(prefix rune, i int) []byte {
|
||||
return []byte(fmt.Sprintf("%c%04d", prefix, i))
|
||||
}
|
||||
numKeys := 1000
|
||||
prewriteKeys := make([][]byte, 0, numKeys/2)
|
||||
pessimisticLockKeys := make([][]byte, 0, numKeys/2)
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := formatKey('k', i)
|
||||
if i%2 == 0 {
|
||||
err = txn1.LockKeys(ctx, lockCtx, key)
|
||||
pessimisticLockKeys = append(pessimisticLockKeys, key)
|
||||
} else {
|
||||
err = txn1.Set(key, []byte("val"))
|
||||
s.NoError(err)
|
||||
prewriteKeys = append(prewriteKeys, key)
|
||||
}
|
||||
// Acquire pessimistic locks for more than 256(RESOLVE_LOCK_BATCH_SIZE) keys.
|
||||
formatKey := func(prefix rune, i int) []byte {
|
||||
return []byte(fmt.Sprintf("%c%04d", prefix, i))
|
||||
}
|
||||
numKeys := 1000
|
||||
prewriteKeys := make([][]byte, 0, numKeys/2)
|
||||
pessimisticLockKeys := make([][]byte, 0, numKeys/2)
|
||||
for i := 0; i < numKeys; i++ {
|
||||
key := formatKey('k', i)
|
||||
if i%2 == 0 {
|
||||
err = txn1.LockKeys(ctx, lockCtx, key)
|
||||
pessimisticLockKeys = append(pessimisticLockKeys, key)
|
||||
} else {
|
||||
err = txn1.Set(key, []byte("val"))
|
||||
s.NoError(err)
|
||||
prewriteKeys = append(prewriteKeys, key)
|
||||
}
|
||||
committer, err := txn1.NewCommitter(1)
|
||||
s.NoError(err)
|
||||
mutations := committer.MutationsOfKeys(prewriteKeys)
|
||||
err = committer.PrewriteMutations(ctx, mutations)
|
||||
s.NoError(err)
|
||||
|
||||
// All the pessimistic locks belonging to the same transaction are pessimistic
|
||||
// rolled back within one request.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0])
|
||||
_, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: startTS + 1,
|
||||
Locks: []*txnlock.Lock{pessimisticLock},
|
||||
Lite: false,
|
||||
ForRead: false,
|
||||
Detail: nil,
|
||||
PessimisticRegionResolve: true,
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
// Cleanup.
|
||||
err = txn1.Rollback()
|
||||
s.NoError(err)
|
||||
}
|
||||
test(false)
|
||||
test(true)
|
||||
committer, err := txn1.NewCommitter(1)
|
||||
s.NoError(err)
|
||||
mutations := committer.MutationsOfKeys(prewriteKeys)
|
||||
err = committer.PrewriteMutations(ctx, mutations)
|
||||
s.NoError(err)
|
||||
|
||||
// All the pessimistic locks belonging to the same transaction are pessimistic
|
||||
// rolled back within one request.
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
pessimisticLock := s.makeLock(startTS, startTS, pessimisticLockKeys[1], pessimisticLockKeys[0])
|
||||
_, err = s.store.GetLockResolver().ResolveLocksWithOpts(bo, txnlock.ResolveLocksOptions{
|
||||
CallerStartTS: startTS + 1,
|
||||
Locks: []*txnlock.Lock{pessimisticLock},
|
||||
Lite: false,
|
||||
ForRead: false,
|
||||
Detail: nil,
|
||||
PessimisticRegionResolve: true,
|
||||
})
|
||||
s.NoError(err)
|
||||
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
s.mustLockNum(ctx, numKeys/2, startTS+1, []byte("k"), []byte("l"))
|
||||
|
||||
// Cleanup.
|
||||
err = txn1.Rollback()
|
||||
s.NoError(err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,6 +101,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
|
|||
}
|
||||
|
||||
func (s *apiTestSuite) TestGetStoresMinResolvedTS() {
|
||||
if config.NextGen {
|
||||
s.T().Skip("NextGen does not support resolved ts yet")
|
||||
}
|
||||
util.EnableFailpoints()
|
||||
require := s.Require()
|
||||
mockClient := newStoreSafeTsMockClient(s.store.GetTiKVClient())
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
[keyspace]
|
||||
pre-alloc = ["keyspace1"]
|
||||
|
|
@ -12,6 +12,8 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !nextgen
|
||||
|
||||
package tikv_test
|
||||
|
||||
import (
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
"github.com/tikv/client-go/v2/txnkv"
|
||||
"github.com/tikv/client-go/v2/txnkv/transaction"
|
||||
|
|
@ -183,6 +184,10 @@ func (s *testScanSuite) TestScan() {
|
|||
err = committer4.PrewriteAllMutations(context.Background())
|
||||
s.Nil(err)
|
||||
txn5 := s.beginTxn()
|
||||
if config.NextGen {
|
||||
// NextGen doesn't support RC yet, skip this rest part
|
||||
return
|
||||
}
|
||||
txn5.GetSnapshot().SetIsolationLevel(txnsnapshot.RC)
|
||||
var meetLocks []*txnkv.Lock
|
||||
resolver := tikv.NewLockResolverProb(s.store.GetLockResolver())
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ package tikv_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -43,6 +44,7 @@ import (
|
|||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/tikv/client-go/v2/config"
|
||||
tikverr "github.com/tikv/client-go/v2/error"
|
||||
"github.com/tikv/client-go/v2/oracle"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
|
|
@ -57,30 +59,18 @@ func TestSnapshotFail(t *testing.T) {
|
|||
|
||||
type testSnapshotFailSuite struct {
|
||||
suite.Suite
|
||||
store tikv.StoreProbe
|
||||
store tikv.StoreProbe
|
||||
prefix string
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) SetupSuite() {
|
||||
func (s *testSnapshotFailSuite) SetupTest() {
|
||||
s.prefix = fmt.Sprintf("test_snapshot_fail_%d", time.Now().Unix())
|
||||
store := NewTestUniStore(s.T())
|
||||
s.store = tikv.StoreProbe{KVStore: store}
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TearDownSuite() {
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TearDownTest() {
|
||||
txn, err := s.store.Begin()
|
||||
s.Require().Nil(err)
|
||||
iter, err := txn.Iter([]byte(""), []byte(""))
|
||||
s.Require().Nil(err)
|
||||
for iter.Valid() {
|
||||
err = txn.Delete(iter.Key())
|
||||
s.Require().Nil(err)
|
||||
err = iter.Next()
|
||||
s.Require().Nil(err)
|
||||
}
|
||||
s.Require().Nil(txn.Commit(context.TODO()))
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TestBatchGetResponseKeyError() {
|
||||
|
|
@ -110,11 +100,14 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() {
|
|||
// Put two KV pairs
|
||||
txn, err := s.store.Begin()
|
||||
s.Require().Nil(err)
|
||||
err = txn.Set([]byte("k1"), []byte("v1"))
|
||||
k1 := encodeKey(s.prefix, "k1")
|
||||
k2 := encodeKey(s.prefix, "k2")
|
||||
k3 := encodeKey(s.prefix, "k3")
|
||||
err = txn.Set(k1, []byte("v1"))
|
||||
s.Nil(err)
|
||||
err = txn.Set([]byte("k2"), []byte("v2"))
|
||||
err = txn.Set(k2, []byte("v2"))
|
||||
s.Nil(err)
|
||||
err = txn.Set([]byte("k3"), []byte("v3"))
|
||||
err = txn.Set(k3, []byte("v3"))
|
||||
s.Nil(err)
|
||||
err = txn.Commit(context.Background())
|
||||
s.Nil(err)
|
||||
|
|
@ -122,15 +115,15 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() {
|
|||
s.Require().Nil(failpoint.Enable("tikvclient/rpcScanResult", `1*return("keyError")`))
|
||||
txn, err = s.store.Begin()
|
||||
s.Require().Nil(err)
|
||||
iter, err := txn.Iter([]byte("a"), []byte("z"))
|
||||
iter, err := txn.Iter(encodeKey(s.prefix, "a"), encodeKey(s.prefix, "z"))
|
||||
s.Nil(err)
|
||||
s.Equal(iter.Key(), []byte("k1"))
|
||||
s.Equal(iter.Key(), k1)
|
||||
s.Equal(iter.Value(), []byte("v1"))
|
||||
s.Nil(iter.Next())
|
||||
s.Equal(iter.Key(), []byte("k2"))
|
||||
s.Equal(iter.Key(), k2)
|
||||
s.Equal(iter.Value(), []byte("v2"))
|
||||
s.Nil(iter.Next())
|
||||
s.Equal(iter.Key(), []byte("k3"))
|
||||
s.Equal(iter.Key(), k3)
|
||||
s.Equal(iter.Value(), []byte("v3"))
|
||||
s.Nil(iter.Next())
|
||||
s.False(iter.Valid())
|
||||
|
|
@ -139,12 +132,12 @@ func (s *testSnapshotFailSuite) TestScanResponseKeyError() {
|
|||
s.Require().Nil(failpoint.Enable("tikvclient/rpcScanResult", `1*return("keyError")`))
|
||||
txn, err = s.store.Begin()
|
||||
s.Require().Nil(err)
|
||||
iter, err = txn.Iter([]byte("k2"), []byte("k4"))
|
||||
iter, err = txn.Iter(k2, encodeKey(s.prefix, "k4"))
|
||||
s.Nil(err)
|
||||
s.Equal(iter.Key(), []byte("k2"))
|
||||
s.Equal(iter.Key(), k2)
|
||||
s.Equal(iter.Value(), []byte("v2"))
|
||||
s.Nil(iter.Next())
|
||||
s.Equal(iter.Key(), []byte("k3"))
|
||||
s.Equal(iter.Key(), k3)
|
||||
s.Equal(iter.Value(), []byte("v3"))
|
||||
s.Nil(iter.Next())
|
||||
s.False(iter.Valid())
|
||||
|
|
@ -320,6 +313,9 @@ func (s *testSnapshotFailSuite) getLock(key []byte) *txnkv.Lock {
|
|||
}
|
||||
|
||||
func (s *testSnapshotFailSuite) TestSnapshotUseResolveForRead() {
|
||||
if config.NextGen {
|
||||
s.T().Skip("NextGen does not support read through locks")
|
||||
}
|
||||
s.Nil(failpoint.Enable("tikvclient/resolveLock", "sleep(500)"))
|
||||
s.Nil(failpoint.Enable("tikvclient/resolveAsyncResolveData", "sleep(500)"))
|
||||
defer func() {
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ package tikv_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
|
@ -66,27 +67,14 @@ type testSnapshotSuite struct {
|
|||
rowNums []int
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) SetupSuite() {
|
||||
s.store = tikv.StoreProbe{KVStore: NewTestStore(s.T())}
|
||||
s.prefix = fmt.Sprintf("snapshot_%d", time.Now().Unix())
|
||||
s.rowNums = append(s.rowNums, 1, 100, 191)
|
||||
func (s *testSnapshotSuite) SetupTest() {
|
||||
s.prefix = fmt.Sprintf("test_snapshot_%d", time.Now().Unix())
|
||||
store := NewTestUniStore(s.T())
|
||||
s.store = tikv.StoreProbe{KVStore: store}
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) TearDownSuite() {
|
||||
txn := s.beginTxn()
|
||||
scanner, err := txn.Iter(encodeKey(s.prefix, ""), nil)
|
||||
s.Nil(err)
|
||||
s.NotNil(scanner)
|
||||
for scanner.Valid() {
|
||||
k := scanner.Key()
|
||||
err = txn.Delete(k)
|
||||
s.Nil(err)
|
||||
scanner.Next()
|
||||
}
|
||||
err = txn.Commit(context.Background())
|
||||
s.Nil(err)
|
||||
err = s.store.Close()
|
||||
s.Nil(err)
|
||||
func (s *testSnapshotSuite) TearDownTest() {
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testSnapshotSuite) beginTxn() transaction.TxnProbe {
|
||||
|
|
@ -214,23 +202,29 @@ func makeKeys(rowNum int, prefix string) [][]byte {
|
|||
}
|
||||
|
||||
func (s *testSnapshotSuite) TestSkipLargeTxnLock() {
|
||||
x := []byte("x_key_TestSkipLargeTxnLock")
|
||||
y := []byte("y_key_TestSkipLargeTxnLock")
|
||||
x, y := encodeKey(s.prefix, "x_TestSkipLargeTxnLock"), encodeKey(s.prefix, "y_TestSkipLargeTxnLock")
|
||||
txn := s.beginTxn()
|
||||
s.Nil(txn.Set(x, []byte("x")))
|
||||
s.Nil(txn.Set(y, []byte("y")))
|
||||
ctx := context.Background()
|
||||
committer, err := txn.NewCommitter(0)
|
||||
committer, err := txn.NewCommitter(1)
|
||||
s.Nil(err)
|
||||
committer.SetLockTTL(3000)
|
||||
s.False(committer.IsAsyncCommit())
|
||||
s.Nil(committer.PrewriteAllMutations(ctx))
|
||||
|
||||
txn1 := s.beginTxn()
|
||||
// txn1 is not blocked by txn in the large txn protocol.
|
||||
_, err = txn1.Get(ctx, x)
|
||||
r, err := txn1.Get(ctx, x)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
} else {
|
||||
println(hex.EncodeToString(r))
|
||||
}
|
||||
s.True(error.IsErrNotFound(err))
|
||||
|
||||
res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys([][]byte{x, y, []byte("z")}))
|
||||
testKeys := [][]byte{encodeKey(s.prefix, "z")}
|
||||
res, err := toTiDBTxn(&txn1).BatchGet(ctx, toTiDBKeys(testKeys))
|
||||
s.Nil(err)
|
||||
s.Len(res, 0)
|
||||
|
||||
|
|
@ -357,6 +351,8 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats() {
|
|||
}
|
||||
|
||||
func (s *testSnapshotSuite) TestRCRead() {
|
||||
// next-gen doesn't support RC yet, skip this test
|
||||
s.T().Skip("next-gen doesn't support RC yet, skip this test")
|
||||
for _, rowNum := range s.rowNums {
|
||||
s.T().Logf("test RC Read, length=%v", rowNum)
|
||||
txn := s.beginTxn()
|
||||
|
|
@ -388,7 +384,7 @@ func (s *testSnapshotSuite) TestRCRead() {
|
|||
// get
|
||||
v, err := snapshot.Get(context.Background(), key0)
|
||||
s.Nil(err)
|
||||
s.Equal(len(meetLocks), 0)
|
||||
s.Equal(0, len(meetLocks))
|
||||
s.Equal(v, valueBytes(0))
|
||||
// batch get
|
||||
m, err := snapshot.BatchGet(context.Background(), keys)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
[storage]
|
||||
reserve-space = "1MB"
|
||||
api-version = 2
|
||||
enable-ttl = true
|
||||
|
||||
[dfs]
|
||||
prefix = "tikv"
|
||||
s3-endpoint = "127.0.0.1:9000"
|
||||
s3-key-id = "minioadmin"
|
||||
s3-secret-key = "minioadmin"
|
||||
s3-bucket = "cse-test"
|
||||
s3-region = "local"
|
||||
|
||||
[rocksdb]
|
||||
max-open-files = 10000
|
||||
|
||||
[raftdb]
|
||||
max-open-files = 10000
|
||||
|
|
@ -60,8 +60,9 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)")
|
||||
pdAddrs = flag.String("pd-addrs", "http://127.0.0.1:2379", "pd addrs")
|
||||
withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)")
|
||||
keyspaceName = flag.String("keyspace-name", "", "keyspace name")
|
||||
pdAddrs = flag.String("pd-addrs", "http://127.0.0.1:2379", "pd addrs")
|
||||
)
|
||||
|
||||
// NewTestStore creates a KVStore for testing purpose.
|
||||
|
|
@ -110,7 +111,8 @@ func newTiKVStore(t *testing.T) *tikv.KVStore {
|
|||
pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdClient)
|
||||
opt = tikv.WithCodec(tikv.NewCodecV1(tikv.ModeTxn))
|
||||
case kvrpcpb.APIVersion_V2:
|
||||
codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, tikv.DefaultKeyspaceName)
|
||||
// The default keyspace cannot be used in API v2 :(. Must specify a valid keyspace
|
||||
codecCli, err := tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdClient, *keyspaceName)
|
||||
pdClient = codecCli
|
||||
re.Nil(err)
|
||||
opt = tikv.WithCodec(codecCli.GetCodec())
|
||||
|
|
|
|||
Loading…
Reference in New Issue