mirror of https://github.com/tikv/client-go.git
*: sync updates from pingcap/tidb (#81)
Signed-off-by: disksing <i@disksing.com>
This commit is contained in:
parent
a34c866361
commit
9b0bb22130
|
|
@ -864,6 +864,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector(c *C) {
|
||||||
|
|
||||||
region.lastAccess = time.Now().Unix()
|
region.lastAccess = time.Now().Unix()
|
||||||
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region)
|
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
c.Assert(replicaSelector, NotNil)
|
c.Assert(replicaSelector, NotNil)
|
||||||
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
|
||||||
return reachable
|
return reachable
|
||||||
|
|
@ -945,6 +946,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector(c *C) {
|
||||||
replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region)
|
replicaSelector, _ = newReplicaSelector(cache, regionLoc.Region)
|
||||||
replicaSelector.next(s.bo)
|
replicaSelector.next(s.bo)
|
||||||
rpcCtx, err = replicaSelector.next(s.bo)
|
rpcCtx, err = replicaSelector.next(s.bo)
|
||||||
|
c.Assert(err, IsNil)
|
||||||
replicaSelector.OnSendSuccess()
|
replicaSelector.OnSendSuccess()
|
||||||
// Verify the regionStore is updated and the workTiKVIdx points to the leader.
|
// Verify the regionStore is updated and the workTiKVIdx points to the leader.
|
||||||
leaderStore, leaderPeer, _, _ = region.WorkStorePeer(region.getStore())
|
leaderStore, leaderPeer, _, _ = region.WorkStorePeer(region.getStore())
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
|
||||||
|
|
||||||
"github.com/opentracing/opentracing-go"
|
"github.com/opentracing/opentracing-go"
|
||||||
"github.com/pingcap/errors"
|
"github.com/pingcap/errors"
|
||||||
|
|
@ -151,6 +150,7 @@ func (s *KVSnapshot) SetSnapshotTS(ts uint64) {
|
||||||
|
|
||||||
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
|
// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
|
||||||
// The map will not contain nonexistent keys.
|
// The map will not contain nonexistent keys.
|
||||||
|
// NOTE: Don't modify keys. Some codes rely on the order of keys.
|
||||||
func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
|
func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]byte, error) {
|
||||||
// Check the cached value first.
|
// Check the cached value first.
|
||||||
m := make(map[string][]byte)
|
m := make(map[string][]byte)
|
||||||
|
|
@ -175,14 +175,12 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][]
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We want [][]byte instead of []kv.Key, use some magic to save memory.
|
|
||||||
bytesKeys := *(*[][]byte)(unsafe.Pointer(&keys))
|
|
||||||
ctx = context.WithValue(ctx, retry.TxnStartKey, s.version)
|
ctx = context.WithValue(ctx, retry.TxnStartKey, s.version)
|
||||||
bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars)
|
bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars)
|
||||||
|
|
||||||
// Create a map to collect key-values from region servers.
|
// Create a map to collect key-values from region servers.
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
err := s.batchGetKeysByRegions(bo, bytesKeys, func(k, v []byte) {
|
err := s.batchGetKeysByRegions(bo, keys, func(k, v []byte) {
|
||||||
if len(v) == 0 {
|
if len(v) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -236,14 +234,16 @@ type batchKeys struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchKeys) relocate(bo *Backoffer, c *RegionCache) (bool, error) {
|
func (b *batchKeys) relocate(bo *Backoffer, c *RegionCache) (bool, error) {
|
||||||
begin, end := b.keys[0], b.keys[len(b.keys)-1]
|
loc, err := c.LocateKey(bo, b.keys[0])
|
||||||
loc, err := c.LocateKey(bo, begin)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Trace(err)
|
return false, errors.Trace(err)
|
||||||
}
|
}
|
||||||
if !loc.Contains(end) {
|
// keys is not in order, so we have to iterate all keys.
|
||||||
|
for i := 1; i < len(b.keys); i++ {
|
||||||
|
if !loc.Contains(b.keys[i]) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
}
|
||||||
b.region = loc.Region
|
b.region = loc.Region
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue