mirror of https://github.com/tikv/client-go.git
165 lines
6.2 KiB
Go
165 lines
6.2 KiB
Go
// Copyright 2021 TiKV Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// NOTE: The code in this file is based on code from the
|
|
// TiDB project, licensed under the Apache License v 2.0
|
|
//
|
|
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/client_fail_test.go
|
|
//
|
|
|
|
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
|
"github.com/pingcap/kvproto/pkg/tikvpb"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/client-go/v2/config"
|
|
"github.com/tikv/client-go/v2/internal/client/mockserver"
|
|
"github.com/tikv/client-go/v2/tikvrpc"
|
|
)
|
|
|
|
func TestPanicInRecvLoop(t *testing.T) {
|
|
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
|
|
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))
|
|
|
|
server, port := mockserver.StartMockTikvService()
|
|
require.True(t, port > 0)
|
|
defer server.Stop()
|
|
|
|
addr := server.Addr()
|
|
rpcClient := NewRPCClient()
|
|
defer rpcClient.Close()
|
|
rpcClient.option.dialTimeout = time.Second / 3
|
|
|
|
// Start batchRecvLoop, and it should panic in `failPendingRequests`.
|
|
_, err := rpcClient.getConnArray(addr, true, func(cfg *config.TiKVClient) { cfg.GrpcConnectionCount = 1 })
|
|
assert.Nil(t, err, "cannot establish local connection due to env problems(e.g. heavy load in test machine), please retry again")
|
|
|
|
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
|
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second/2)
|
|
assert.NotNil(t, err)
|
|
|
|
require.Nil(t, failpoint.Disable("tikvclient/gotErrorInRecvLoop"))
|
|
require.Nil(t, failpoint.Disable("tikvclient/panicInFailPendingRequests"))
|
|
time.Sleep(time.Second * 2)
|
|
|
|
req = tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
|
|
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second*4)
|
|
assert.Nil(t, err)
|
|
}
|
|
|
|
func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
|
|
server, port := mockserver.StartMockTikvService()
|
|
require.True(t, port > 0)
|
|
defer server.Stop()
|
|
addr := server.Addr()
|
|
|
|
// Enable batch and limit the connection count to 1 so that
|
|
// there is only one BatchCommands stream for each host or forwarded host.
|
|
defer config.UpdateGlobal(func(conf *config.Config) {
|
|
conf.TiKVClient.MaxBatchSize = 128
|
|
conf.TiKVClient.GrpcConnectionCount = 1
|
|
})()
|
|
rpcClient := NewRPCClient()
|
|
defer rpcClient.Close()
|
|
|
|
// Create 4 BatchCommands streams.
|
|
prewriteReq := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
|
|
forwardedHosts := []string{"", "127.0.0.1:6666", "127.0.0.1:7777", "127.0.0.1:8888"}
|
|
for _, forwardedHost := range forwardedHosts {
|
|
prewriteReq.ForwardedHost = forwardedHost
|
|
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
|
|
assert.Nil(t, err)
|
|
}
|
|
connArray, err := rpcClient.getConnArray(addr, true)
|
|
assert.NotNil(t, connArray)
|
|
assert.Nil(t, err)
|
|
batchConn := connArray.batchConn
|
|
assert.NotNil(t, batchConn)
|
|
assert.Equal(t, len(batchConn.batchCommandsClients), 1)
|
|
batchClient := batchConn.batchCommandsClients[0]
|
|
assert.NotNil(t, batchClient.client)
|
|
assert.Equal(t, batchClient.client.forwardedHost, "")
|
|
assert.Equal(t, len(batchClient.forwardedClients), 3)
|
|
for _, forwardedHosts := range forwardedHosts[1:] {
|
|
assert.Equal(t, batchClient.forwardedClients[forwardedHosts].forwardedHost, forwardedHosts)
|
|
}
|
|
|
|
// Save all streams
|
|
clientSave := batchClient.client.Tikv_BatchCommandsClient
|
|
forwardedClientsSave := make(map[string]tikvpb.Tikv_BatchCommandsClient)
|
|
for host, client := range batchClient.forwardedClients {
|
|
forwardedClientsSave[host] = client.Tikv_BatchCommandsClient
|
|
}
|
|
epoch := atomic.LoadUint64(&batchClient.epoch)
|
|
|
|
fp := "tikvclient/gotErrorInRecvLoop"
|
|
// Send a request to each stream to trigger reconnection.
|
|
for _, forwardedHost := range forwardedHosts {
|
|
require.Nil(t, failpoint.Enable(fp, `1*return("0")`))
|
|
prewriteReq.ForwardedHost = forwardedHost
|
|
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
|
|
assert.Nil(t, err)
|
|
time.Sleep(100 * time.Millisecond)
|
|
assert.Nil(t, failpoint.Disable(fp))
|
|
}
|
|
|
|
// Wait for finishing reconnection.
|
|
for {
|
|
batchClient.lockForRecreate()
|
|
if atomic.LoadUint64(&batchClient.epoch) != epoch {
|
|
batchClient.unlockForRecreate()
|
|
break
|
|
}
|
|
batchClient.unlockForRecreate()
|
|
time.Sleep(time.Millisecond * 100)
|
|
}
|
|
|
|
// send request after reconnection.
|
|
for _, forwardedHost := range forwardedHosts {
|
|
prewriteReq.ForwardedHost = forwardedHost
|
|
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
|
|
assert.Nil(t, err)
|
|
}
|
|
// Should only reconnect once.
|
|
assert.Equal(t, atomic.LoadUint64(&batchClient.epoch), epoch+1)
|
|
// All streams are refreshed.
|
|
assert.NotEqual(t, batchClient.client.Tikv_BatchCommandsClient, clientSave)
|
|
assert.Equal(t, len(batchClient.forwardedClients), len(forwardedClientsSave))
|
|
for host, clientSave := range forwardedClientsSave {
|
|
assert.NotEqual(t, batchClient.forwardedClients[host].Tikv_BatchCommandsClient, clientSave)
|
|
}
|
|
}
|