client-go/internal/client/client_fail_test.go

165 lines
6.2 KiB
Go

// Copyright 2021 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: The code in this file is based on code from the
// TiDB project, licensed under the Apache License v 2.0
//
// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/client/client_fail_test.go
//
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/internal/client/mockserver"
"github.com/tikv/client-go/v2/tikvrpc"
)
func TestPanicInRecvLoop(t *testing.T) {
require.Nil(t, failpoint.Enable("tikvclient/panicInFailPendingRequests", `panic`))
require.Nil(t, failpoint.Enable("tikvclient/gotErrorInRecvLoop", `return("0")`))
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
rpcClient := NewRPCClient()
defer rpcClient.Close()
rpcClient.option.dialTimeout = time.Second / 3
// Start batchRecvLoop, and it should panic in `failPendingRequests`.
_, err := rpcClient.getConnArray(addr, true, func(cfg *config.TiKVClient) { cfg.GrpcConnectionCount = 1 })
assert.Nil(t, err, "cannot establish local connection due to env problems(e.g. heavy load in test machine), please retry again")
req := tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second/2)
assert.NotNil(t, err)
require.Nil(t, failpoint.Disable("tikvclient/gotErrorInRecvLoop"))
require.Nil(t, failpoint.Disable("tikvclient/panicInFailPendingRequests"))
time.Sleep(time.Second * 2)
req = tikvrpc.NewRequest(tikvrpc.CmdEmpty, &tikvpb.BatchCommandsEmptyRequest{})
_, err = rpcClient.SendRequest(context.Background(), addr, req, time.Second*4)
assert.Nil(t, err)
}
func TestRecvErrorInMultipleRecvLoops(t *testing.T) {
server, port := mockserver.StartMockTikvService()
require.True(t, port > 0)
defer server.Stop()
addr := server.Addr()
// Enable batch and limit the connection count to 1 so that
// there is only one BatchCommands stream for each host or forwarded host.
defer config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.MaxBatchSize = 128
conf.TiKVClient.GrpcConnectionCount = 1
})()
rpcClient := NewRPCClient()
defer rpcClient.Close()
// Create 4 BatchCommands streams.
prewriteReq := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{})
forwardedHosts := []string{"", "127.0.0.1:6666", "127.0.0.1:7777", "127.0.0.1:8888"}
for _, forwardedHost := range forwardedHosts {
prewriteReq.ForwardedHost = forwardedHost
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
assert.Nil(t, err)
}
connArray, err := rpcClient.getConnArray(addr, true)
assert.NotNil(t, connArray)
assert.Nil(t, err)
batchConn := connArray.batchConn
assert.NotNil(t, batchConn)
assert.Equal(t, len(batchConn.batchCommandsClients), 1)
batchClient := batchConn.batchCommandsClients[0]
assert.NotNil(t, batchClient.client)
assert.Equal(t, batchClient.client.forwardedHost, "")
assert.Equal(t, len(batchClient.forwardedClients), 3)
for _, forwardedHosts := range forwardedHosts[1:] {
assert.Equal(t, batchClient.forwardedClients[forwardedHosts].forwardedHost, forwardedHosts)
}
// Save all streams
clientSave := batchClient.client.Tikv_BatchCommandsClient
forwardedClientsSave := make(map[string]tikvpb.Tikv_BatchCommandsClient)
for host, client := range batchClient.forwardedClients {
forwardedClientsSave[host] = client.Tikv_BatchCommandsClient
}
epoch := atomic.LoadUint64(&batchClient.epoch)
fp := "tikvclient/gotErrorInRecvLoop"
// Send a request to each stream to trigger reconnection.
for _, forwardedHost := range forwardedHosts {
require.Nil(t, failpoint.Enable(fp, `1*return("0")`))
prewriteReq.ForwardedHost = forwardedHost
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
assert.Nil(t, failpoint.Disable(fp))
}
// Wait for finishing reconnection.
for {
batchClient.lockForRecreate()
if atomic.LoadUint64(&batchClient.epoch) != epoch {
batchClient.unlockForRecreate()
break
}
batchClient.unlockForRecreate()
time.Sleep(time.Millisecond * 100)
}
// send request after reconnection.
for _, forwardedHost := range forwardedHosts {
prewriteReq.ForwardedHost = forwardedHost
_, err := rpcClient.SendRequest(context.Background(), addr, prewriteReq, 10*time.Second)
assert.Nil(t, err)
}
// Should only reconnect once.
assert.Equal(t, atomic.LoadUint64(&batchClient.epoch), epoch+1)
// All streams are refreshed.
assert.NotEqual(t, batchClient.client.Tikv_BatchCommandsClient, clientSave)
assert.Equal(t, len(batchClient.forwardedClients), len(forwardedClientsSave))
for host, clientSave := range forwardedClientsSave {
assert.NotEqual(t, batchClient.forwardedClients[host].Tikv_BatchCommandsClient, clientSave)
}
}