Refactory class model (#18)

This commit is contained in:
Soup 2019-01-04 16:45:35 +08:00 committed by birdstorm
parent 24719494ea
commit 2f0d9997d1
21 changed files with 341 additions and 844 deletions

View File

@ -29,24 +29,21 @@ import org.tikv.common.policy.RetryMaxMs.Builder;
import org.tikv.common.policy.RetryPolicy;
import org.tikv.common.streaming.StreamingResponse;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
public abstract class AbstractGRPCClient<
BlockingStubT extends AbstractStub<BlockingStubT>, StubT extends AbstractStub<StubT>>
implements AutoCloseable {
protected final Logger logger = Logger.getLogger(this.getClass());
protected TiSession session;
protected TiConfiguration conf;
protected final TiConfiguration conf;
protected final ChannelFactory channelFactory;
protected AbstractGRPCClient(TiSession session) {
this.session = session;
this.conf = session.getConf();
protected AbstractGRPCClient(TiConfiguration conf, ChannelFactory channelFactory) {
this.conf = conf;
this.channelFactory = channelFactory;
}
public TiSession getSession() {
return session;
}
public TiConfiguration getConf() {
protected TiConfiguration getConf() {
return conf;
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataOutput;
import org.tikv.common.exception.GrpcException;
@ -41,6 +42,7 @@ import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.PDErrorHandler;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.FutureObserver;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.PDGrpc;
@ -48,6 +50,7 @@ import org.tikv.kvproto.PDGrpc.PDBlockingStub;
import org.tikv.kvproto.PDGrpc.PDStub;
import org.tikv.kvproto.Pdpb.*;
/** PDClient is thread-safe and suggested to be shared threads */
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
implements ReadOnlyPDClient {
private RequestHeader header;
@ -73,7 +76,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
@Override
public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
Supplier<GetRegionRequest> request;
if (conf.getKvMode().equalsIgnoreCase("RAW")) {
if (conf.getKvMode() == KVMode.RAW) {
request = () -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();
} else {
CodecDataOutput cdo = new CodecDataOutput();
@ -195,8 +198,8 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
}
public static ReadOnlyPDClient create(TiSession session) {
return createRaw(session);
public static ReadOnlyPDClient create(TiConfiguration conf, ChannelFactory channelFactory) {
return createRaw(conf, channelFactory);
}
@VisibleForTesting
@ -247,7 +250,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
public GetMembersResponse getMembers(HostAndPort url) {
try {
ManagedChannel probChan = session.getChannel(url.getHostText() + ":" + url.getPort());
ManagedChannel probChan = channelFactory.getChannel(url.getHostText() + ":" + url.getPort());
PDGrpc.PDBlockingStub stub = PDGrpc.newBlockingStub(probChan);
GetMembersRequest request =
GetMembersRequest.newBuilder().setHeader(RequestHeader.getDefaultInstance()).build();
@ -279,7 +282,7 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
}
// create new Leader
ManagedChannel clientChannel = session.getChannel(leaderUrlStr);
ManagedChannel clientChannel = channelFactory.getChannel(leaderUrlStr);
leaderWrapper =
new LeaderWrapper(
leaderUrlStr,
@ -330,13 +333,13 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit());
}
private PDClient(TiSession session) {
super(session);
private PDClient(TiConfiguration conf, ChannelFactory channelFactory) {
super(conf, channelFactory);
}
private void initCluster() {
GetMembersResponse resp = null;
List<HostAndPort> pdAddrs = getSession().getConf().getPdAddrs();
List<HostAndPort> pdAddrs = getConf().getPdAddrs();
for (HostAndPort u : pdAddrs) {
resp = getMembers(u);
if (resp != null) {
@ -366,10 +369,10 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
TimeUnit.MINUTES);
}
static PDClient createRaw(TiSession session) {
static PDClient createRaw(TiConfiguration conf, ChannelFactory channelFactory) {
PDClient client = null;
try {
client = new PDClient(session);
client = new PDClient(conf, channelFactory);
client.initCluster();
} catch (Exception e) {
if (client != null) {

View File

@ -60,7 +60,4 @@ public interface ReadOnlyPDClient {
Store getStore(BackOffer backOffer, long storeId);
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);
/** Get associated session * @return the session associated to client */
TiSession getSession();
}

View File

@ -1,108 +0,0 @@
/*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common;
import static org.tikv.common.util.KeyRangeUtils.makeRange;
import com.google.common.collect.Range;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.iterator.ConcreteScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.kvproto.Metapb.Store;
public class Snapshot {
private final TiTimestamp timestamp;
private final TiSession session;
private final TiConfiguration conf;
public Snapshot(TiTimestamp timestamp, TiSession session) {
this.timestamp = timestamp;
this.session = session;
this.conf = session.getConf();
}
public TiSession getSession() {
return session;
}
public long getVersion() {
return timestamp.getVersion();
}
public TiTimestamp getTimestamp() {
return timestamp;
}
public byte[] get(byte[] key) {
ByteString keyString = ByteString.copyFrom(key);
ByteString value = get(keyString);
return value.toByteArray();
}
public ByteString get(ByteString key) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, getSession());
// TODO: Need to deal with lock error after grpc stable
return client.get(ConcreteBackOffer.newGetBackOff(), key, timestamp.getVersion());
}
public Iterator<KvPair> scan(ByteString startKey) {
return new ConcreteScanIterator(startKey, session, timestamp.getVersion());
}
// TODO: Need faster implementation, say concurrent version
// Assume keys sorted
public List<KvPair> batchGet(List<ByteString> keys) {
TiRegion curRegion = null;
Range<Key> curKeyRange = null;
Pair<TiRegion, Store> lastPair;
List<ByteString> keyBuffer = new ArrayList<>();
List<KvPair> result = new ArrayList<>(keys.size());
BackOffer backOffer = ConcreteBackOffer.newBatchGetMaxBackOff();
for (ByteString key : keys) {
if (curRegion == null || !curKeyRange.contains(Key.toRawKey(key))) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(key);
lastPair = pair;
curRegion = pair.first;
curKeyRange = makeRange(curRegion.getStartKey(), curRegion.getEndKey());
try (RegionStoreClient client =
RegionStoreClient.create(lastPair.first, lastPair.second, getSession())) {
List<KvPair> partialResult =
client.batchGet(backOffer, keyBuffer, timestamp.getVersion());
// TODO: Add lock check
result.addAll(partialResult);
} catch (Exception e) {
throw new TiClientInternalException("Error Closing Store client.", e);
}
keyBuffer = new ArrayList<>();
keyBuffer.add(key);
}
}
return result;
}
}

View File

@ -44,7 +44,7 @@ public class TiConfiguration implements Serializable {
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC;
private static final boolean DEF_SHOW_ROWID = false;
private static final String DEF_DB_PREFIX = "";
private static final String DEF_KV_MODE = "KV";
private static final KVMode DEF_KV_MODE = KVMode.TXN;
private static final int DEF_RAW_CLIENT_CONCURRENCY = 200;
private int timeout = DEF_TIMEOUT;
@ -63,9 +63,14 @@ public class TiConfiguration implements Serializable {
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
private boolean showRowId = DEF_SHOW_ROWID;
private String dbPrefix = DEF_DB_PREFIX;
private String kvMode = DEF_KV_MODE;
private KVMode kvMode = DEF_KV_MODE;
private int rawClientConcurrency = DEF_RAW_CLIENT_CONCURRENCY;
public enum KVMode {
TXN,
RAW
}
public static TiConfiguration createDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
@ -77,7 +82,7 @@ public class TiConfiguration implements Serializable {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
TiConfiguration conf = new TiConfiguration();
conf.pdAddrs = strToHostAndPort(pdAddrsStr);
conf.kvMode = "RAW";
conf.kvMode = KVMode.RAW;
return conf;
}
@ -229,12 +234,12 @@ public class TiConfiguration implements Serializable {
this.dbPrefix = dbPrefix;
}
public String getKvMode() {
public KVMode getKvMode() {
return kvMode;
}
public void setKvMode(String kvMode) {
this.kvMode = kvMode;
this.kvMode = KVMode.valueOf(kvMode);
}
public int getRawClientConcurrency() {

View File

@ -15,84 +15,57 @@
package org.tikv.common;
import com.google.common.net.HostAndPort;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.util.ChannelFactory;
import org.tikv.raw.RawKVClient;
/**
* TiSession is the holder for PD Client, Store pdClient and PD Cache All sessions share common
* region store connection pool but separated PD conn and cache for better concurrency TiSession is
* thread-safe but it's also recommended to have multiple session avoiding lock contention
*/
public class TiSession implements AutoCloseable {
private static final Map<String, ManagedChannel> connPool = new HashMap<>();
private final TiConfiguration conf;
// below object creation is either heavy or making connection (pd), pending for lazy loading
private volatile RegionManager regionManager;
private volatile PDClient client;
private final PDClient pdClient;
private final ChannelFactory channelFactory;
public TiSession(TiConfiguration conf) {
this.conf = conf;
this.channelFactory = new ChannelFactory(conf.getMaxFrameSize());
this.pdClient = PDClient.createRaw(conf, channelFactory);
}
public TiConfiguration getConf() {
return conf;
}
public PDClient getPDClient() {
PDClient res = client;
if (res == null) {
synchronized (this) {
if (client == null) {
client = PDClient.createRaw(this);
}
res = client;
}
}
return res;
}
public synchronized RegionManager getRegionManager() {
RegionManager res = regionManager;
if (res == null) {
synchronized (this) {
if (regionManager == null) {
regionManager = new RegionManager(getPDClient());
}
res = regionManager;
}
}
return res;
}
public synchronized ManagedChannel getChannel(String addressStr) {
ManagedChannel channel = connPool.get(addressStr);
if (channel == null) {
HostAndPort address;
try {
address = HostAndPort.fromString(addressStr);
} catch (Exception e) {
throw new IllegalArgumentException("failed to form address");
}
// Channel should be lazy without actual connection until first call
// So a coarse grain lock is ok here
channel =
ManagedChannelBuilder.forAddress(address.getHostText(), address.getPort())
.maxInboundMessageSize(conf.getMaxFrameSize())
.usePlaintext(true)
.idleTimeout(60, TimeUnit.SECONDS)
.build();
connPool.put(addressStr, channel);
}
return channel;
}
public static TiSession create(TiConfiguration conf) {
return new TiSession(conf);
}
public RawKVClient createRawClient() {
// Create new Region Manager avoiding thread contentions
RegionManager regionMgr = new RegionManager(pdClient);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(conf, channelFactory, regionMgr);
return new RawKVClient(conf, builder);
}
@VisibleForTesting
public PDClient getPDClient() {
return pdClient;
}
@VisibleForTesting
public ChannelFactory getChannelFactory() {
return channelFactory;
}
@Override
public void close() {
getPDClient().close();
pdClient.close();
channelFactory.close();
}
}

View File

@ -16,28 +16,27 @@
package org.tikv.common.operation.iterator;
import com.google.protobuf.ByteString;
import org.tikv.common.TiSession;
import org.tikv.common.TiConfiguration;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
public class ConcreteScanIterator extends ScanIterator {
private final long version;
public ConcreteScanIterator(ByteString startKey, TiSession session, long version) {
public ConcreteScanIterator(
TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, long version) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
super(startKey, ByteString.EMPTY, Integer.MAX_VALUE, session);
super(conf, builder, startKey, ByteString.EMPTY, Integer.MAX_VALUE);
this.version = version;
}
TiRegion loadCurrentRegionToCache() throws Exception {
Pair<TiRegion, Metapb.Store> pair = regionCache.getRegionStorePairByKey(startKey);
TiRegion region = pair.first;
Metapb.Store store = pair.second;
try (RegionStoreClient client = RegionStoreClient.create(region, store, session)) {
TiRegion region;
try (RegionStoreClient client = builder.build(startKey)) {
region = client.getRegion();
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
return region;

View File

@ -16,28 +16,31 @@
package org.tikv.common.operation.iterator;
import com.google.protobuf.ByteString;
import org.tikv.common.TiSession;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Metapb;
public class RawScanIterator extends ScanIterator {
public RawScanIterator(ByteString startKey, ByteString endKey, int limit, TiSession session) {
super(startKey, endKey, limit, session);
public RawScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit) {
super(conf, builder, startKey, endKey, limit);
}
TiRegion loadCurrentRegionToCache() throws Exception {
Pair<TiRegion, Metapb.Store> pair = regionCache.getRegionStorePairByKey(startKey);
TiRegion region = pair.first;
Metapb.Store store = pair.second;
try (RegionStoreClient client = RegionStoreClient.create(region, store, session)) {
TiRegion region;
try (RegionStoreClient client = builder.build(startKey)) {
region = client.getRegion();
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
if (limit <= 0) {
currentCache = null;

View File

@ -20,17 +20,16 @@ import static java.util.Objects.requireNonNull;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.List;
import org.tikv.common.TiSession;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Kvrpcpb;
public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected final TiSession session;
protected final RegionManager regionCache;
protected final TiConfiguration conf;
protected final RegionStoreClientBuilder builder;
protected List<Kvrpcpb.KvPair> currentCache;
protected ByteString startKey;
protected int index = -1;
@ -41,7 +40,12 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected boolean hasEndKey;
protected boolean lastBatch = false;
ScanIterator(ByteString startKey, ByteString endKey, int limit, TiSession session) {
ScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit) {
this.startKey = requireNonNull(startKey, "start key is null");
if (startKey.isEmpty()) {
throw new IllegalArgumentException("start key cannot be empty");
@ -49,8 +53,8 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.equals(ByteString.EMPTY);
this.limit = limit;
this.session = session;
this.regionCache = session.getRegionManager();
this.conf = conf;
this.builder = builder;
}
abstract TiRegion loadCurrentRegionToCache() throws Exception;
@ -80,7 +84,7 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
// Session should be single-threaded itself
// so that we don't worry about conf change in the middle
// of a transaction. Otherwise below code might lose data
if (currentCache.size() < session.getConf().getScanBatchSize()) {
if (currentCache.size() < conf.getScanBatchSize()) {
startKey = curRegionEndKey;
} else {
// Start new scan from exact next key in current region

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiSession;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.key.Key;
@ -42,13 +41,11 @@ import org.tikv.kvproto.Metapb.StoreState;
public class RegionManager {
private static final Logger logger = Logger.getLogger(RegionManager.class);
private RegionCache cache;
private final ReadOnlyPDClient pdClient;
// To avoid double retrieval, we used the async version of grpc
// When rpc not returned, instead of call again, it wait for previous one done
public RegionManager(ReadOnlyPDClient pdClient) {
this.cache = new RegionCache(pdClient);
this.pdClient = pdClient;
}
public static class RegionCache {
@ -167,10 +164,6 @@ public class RegionManager {
}
}
public TiSession getSession() {
return pdClient.getSession();
}
public TiRegion getRegionByKey(ByteString key) {
return cache.getRegionByKey(key);
}

View File

@ -27,16 +27,35 @@ import io.grpc.ManagedChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.log4j.Logger;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiSession;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.GetRequest;
import org.tikv.kvproto.Kvrpcpb.GetResponse;
import org.tikv.kvproto.Kvrpcpb.KvPair;
import org.tikv.kvproto.Kvrpcpb.RawBatchPutRequest;
import org.tikv.kvproto.Kvrpcpb.RawBatchPutResponse;
import org.tikv.kvproto.Kvrpcpb.RawDeleteRequest;
import org.tikv.kvproto.Kvrpcpb.RawDeleteResponse;
import org.tikv.kvproto.Kvrpcpb.RawGetRequest;
import org.tikv.kvproto.Kvrpcpb.RawGetResponse;
import org.tikv.kvproto.Kvrpcpb.RawPutRequest;
import org.tikv.kvproto.Kvrpcpb.RawPutResponse;
import org.tikv.kvproto.Kvrpcpb.RawScanRequest;
import org.tikv.kvproto.Kvrpcpb.RawScanResponse;
import org.tikv.kvproto.Kvrpcpb.ScanRequest;
import org.tikv.kvproto.Kvrpcpb.ScanResponse;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
@ -61,8 +80,11 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
private TikvBlockingStub blockingStub;
private TikvStub asyncStub;
// APIs for KV Scan/Put/Get/Delete
public TiRegion getRegion() {
return region;
}
// APIs for KV Scan/Put/Get/Delete
public ByteString get(BackOffer backOffer, ByteString key, long version) {
while (true) {
// we should refresh region
@ -410,32 +432,71 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
return resp.getKvsList();
}
public static RegionStoreClient create(TiRegion region, Store store, TiSession session) {
RegionStoreClient client;
String addressStr = store.getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
public static class RegionStoreClientBuilder {
private final TiConfiguration conf;
private final ChannelFactory channelFactory;
private final RegionManager regionManager;
public RegionStoreClientBuilder(
TiConfiguration conf, ChannelFactory channelFactory, RegionManager regionManager) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(channelFactory, "channelFactory is null");
Objects.requireNonNull(regionManager, "regionManager is null");
this.conf = conf;
this.channelFactory = channelFactory;
this.regionManager = regionManager;
}
ManagedChannel channel = session.getChannel(addressStr);
TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
public RegionStoreClient build(TiRegion region, Store store) {
Objects.requireNonNull(region, "region is null");
Objects.requireNonNull(store, "store is null");
TikvStub asyncStub = TikvGrpc.newStub(channel);
client = new RegionStoreClient(region, session, blockingStub, asyncStub);
return client;
String addressStr = store.getAddress();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Create region store client on address %s", addressStr));
}
ManagedChannel channel = channelFactory.getChannel(addressStr);
TikvBlockingStub blockingStub = TikvGrpc.newBlockingStub(channel);
TikvStub asyncStub = TikvGrpc.newStub(channel);
return new RegionStoreClient(
conf, region, channelFactory, blockingStub, asyncStub, regionManager);
}
public RegionStoreClient build(ByteString key) {
Pair<TiRegion, Store> pair = regionManager.getRegionStorePairByKey(key);
return build(pair.first, pair.second);
}
public RegionStoreClient build(TiRegion region) {
Store store = regionManager.getStoreById(region.getLeader().getStoreId());
return build(region, store);
}
public RegionManager getRegionManager() {
return regionManager;
}
}
private RegionStoreClient(
TiRegion region, TiSession session, TikvBlockingStub blockingStub, TikvStub asyncStub) {
super(session);
TiConfiguration conf,
TiRegion region,
ChannelFactory channelFactory,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
RegionManager regionManager) {
super(conf, channelFactory);
checkNotNull(region, "Region is empty");
checkNotNull(region.getLeader(), "Leader Peer is null");
checkArgument(region.getLeader() != null, "Leader Peer is null");
this.regionManager = session.getRegionManager();
this.regionManager = regionManager;
this.region = region;
this.blockingStub = blockingStub;
this.asyncStub = asyncStub;
this.lockResolverClient = new LockResolverClient(session, this.blockingStub, this.asyncStub);
this.lockResolverClient =
new LockResolverClient(
conf, this.blockingStub, this.asyncStub, channelFactory, regionManager);
}
@Override
@ -471,7 +532,7 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
}
region = cachedRegion;
String addressStr = regionManager.getStoreById(region.getLeader().getStoreId()).getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
ManagedChannel channel = channelFactory.getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
return true;
@ -480,7 +541,7 @@ public class RegionStoreClient extends AbstractGRPCClient<TikvBlockingStub, Tikv
@Override
public void onStoreNotMatch(Store store) {
String addressStr = store.getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
ManagedChannel channel = channelFactory.getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) {

View File

@ -21,6 +21,7 @@ import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.codec.Codec.BytesCodec;
import org.tikv.common.codec.CodecDataInput;
import org.tikv.common.codec.KeyUtils;
@ -44,9 +45,9 @@ public class TiRegion implements Serializable {
Peer peer,
IsolationLevel isolationLevel,
Kvrpcpb.CommandPri commandPri,
String kvMode) {
KVMode kvMode) {
Objects.requireNonNull(meta, "meta is null");
this.meta = decodeRegion(meta, kvMode.equalsIgnoreCase("RAW"));
this.meta = decodeRegion(meta, kvMode == KVMode.RAW);
if (peer == null || peer.getId() == 0) {
if (meta.getPeersCount() == 0) {
throw new TiClientInternalException("Empty peer list for region " + meta.getId());

View File

@ -0,0 +1,59 @@
/*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.util;
import com.google.common.net.HostAndPort;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class ChannelFactory implements AutoCloseable {
private final int maxFrameSize;
private final Map<String, ManagedChannel> connPool = new ConcurrentHashMap<>();
public ChannelFactory(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
public ManagedChannel getChannel(String addressStr) {
return connPool.computeIfAbsent(
addressStr,
key -> {
HostAndPort address;
try {
address = HostAndPort.fromString(key);
} catch (Exception e) {
throw new IllegalArgumentException("failed to form address");
}
// Channel should be lazy without actual connection until first call
// So a coarse grain lock is ok here
return ManagedChannelBuilder.forAddress(address.getHostText(), address.getPort())
.maxInboundMessageSize(maxFrameSize)
.usePlaintext(true)
.idleTimeout(60, TimeUnit.SECONDS)
.build();
});
}
public void close() {
for (ManagedChannel ch : connPool.values()) {
ch.shutdown();
}
connPool.clear();
}
}

View File

@ -16,57 +16,51 @@
package org.tikv.raw;
import com.google.protobuf.ByteString;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
public class RawKVClient implements AutoCloseable {
private static final String DEFAULT_PD_ADDRESS = "127.0.0.1:2379";
private final TiSession session;
private final RegionManager regionManager;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorCompletionService<Object> completionService;
private static final Logger logger = Logger.getLogger(RawKVClient.class);
private static final int RAW_BATCH_PUT_SIZE = 16 * 1024;
private RawKVClient(String addresses) {
session = TiSession.create(TiConfiguration.createRawDefault(addresses));
regionManager = session.getRegionManager();
ExecutorService executors =
Executors.newFixedThreadPool(session.getConf().getRawClientConcurrency());
completionService = new ExecutorCompletionService<>(executors);
}
private RawKVClient() {
this(DEFAULT_PD_ADDRESS);
}
public static RawKVClient create() {
return new RawKVClient();
}
public static RawKVClient create(String address) {
return new RawKVClient(address);
public RawKVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = conf;
this.clientBuilder = clientBuilder;
ExecutorService executors = Executors.newFixedThreadPool(conf.getRawClientConcurrency());
this.completionService = new ExecutorCompletionService<>(executors);
}
@Override
public void close() {
session.close();
}
public void close() {}
/**
* Put a raw key-value pair to TiKV
@ -77,8 +71,7 @@ public class RawKVClient implements AutoCloseable {
public void put(ByteString key, ByteString value) {
BackOffer backOffer = defaultBackOff();
while (true) {
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawPut(backOffer, key, value);
return;
@ -126,8 +119,7 @@ public class RawKVClient implements AutoCloseable {
public ByteString get(ByteString key) {
BackOffer backOffer = defaultBackOff();
while (true) {
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
RegionStoreClient client = clientBuilder.build(key);
try {
return client.rawGet(defaultBackOff(), key);
} catch (final TiKVException e) {
@ -144,7 +136,7 @@ public class RawKVClient implements AutoCloseable {
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey) {
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(startKey, endKey);
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(conf, clientBuilder, startKey, endKey);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -158,7 +150,7 @@ public class RawKVClient implements AutoCloseable {
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, int limit) {
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(startKey, limit);
Iterator<Kvrpcpb.KvPair> iterator = rawScanIterator(conf, clientBuilder, startKey, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
@ -172,8 +164,7 @@ public class RawKVClient implements AutoCloseable {
public void delete(ByteString key) {
BackOffer backOffer = defaultBackOff();
while (true) {
Pair<TiRegion, Metapb.Store> pair = regionManager.getRegionStorePairByKey(key);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
RegionStoreClient client = clientBuilder.build(key);
try {
client.rawDelete(defaultBackOff(), key);
return;
@ -238,14 +229,14 @@ public class RawKVClient implements AutoCloseable {
TiRegion lastRegion = null;
for (ByteString key : keys) {
if (lastRegion == null || !lastRegion.contains(key)) {
lastRegion = regionManager.getRegionByKey(key);
lastRegion = clientBuilder.getRegionManager().getRegionByKey(key);
}
groups.computeIfAbsent(lastRegion, k -> new ArrayList<>()).add(key);
}
return groups;
}
static Map<ByteString, ByteString> mapKeysToValues(
private static Map<ByteString, ByteString> mapKeysToValues(
List<ByteString> keys, List<ByteString> values) {
Map<ByteString, ByteString> map = new HashMap<>();
for (int i = 0; i < keys.size(); i++) {
@ -264,12 +255,8 @@ public class RawKVClient implements AutoCloseable {
for (Batch batch : batches) {
completionService.submit(
() -> {
RegionStoreClient client = clientBuilder.build(batch.region);
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
RegionStoreClient client =
RegionStoreClient.create(
batch.region,
regionManager.getStoreById(batch.region.getLeader().getStoreId()),
session);
List<Kvrpcpb.KvPair> kvPairs = new ArrayList<>();
for (int i = 0; i < batch.keys.size(); i++) {
kvPairs.add(
@ -304,12 +291,17 @@ public class RawKVClient implements AutoCloseable {
}
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(ByteString startKey, ByteString endKey) {
return new RawScanIterator(startKey, endKey, Integer.MAX_VALUE, session);
private Iterator<Kvrpcpb.KvPair> rawScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey) {
return new RawScanIterator(conf, builder, startKey, endKey, Integer.MAX_VALUE);
}
private Iterator<Kvrpcpb.KvPair> rawScanIterator(ByteString startKey, int limit) {
return new RawScanIterator(startKey, ByteString.EMPTY, limit, session);
private Iterator<Kvrpcpb.KvPair> rawScanIterator(
TiConfiguration conf, RegionStoreClientBuilder builder, ByteString startKey, int limit) {
return new RawScanIterator(conf, builder, startKey, ByteString.EMPTY, limit);
}
private BackOffer defaultBackOff() {

View File

@ -21,20 +21,29 @@ import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoRegionMiss;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.log4j.Logger;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiSession;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiRegion.RegionVerID;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.TsoUtils;
import org.tikv.kvproto.Kvrpcpb.CleanupRequest;
import org.tikv.kvproto.Kvrpcpb.CleanupResponse;
@ -71,13 +80,20 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
private TikvBlockingStub blockingStub;
private TikvStub asyncStub;
private TiRegion region;
private final RegionManager regionManager;
public LockResolverClient(TiSession session, TikvBlockingStub blockingStub, TikvStub asyncStub) {
super(session);
public LockResolverClient(
TiConfiguration conf,
TikvBlockingStub blockingStub,
TikvStub asyncStub,
ChannelFactory channelFactory,
RegionManager regionManager) {
super(conf, channelFactory);
resolved = new HashMap<>();
recentResolved = new LinkedList<>();
readWriteLock = new ReentrantReadWriteLock();
this.blockingStub = blockingStub;
this.regionManager = regionManager;
this.asyncStub = asyncStub;
}
@ -117,7 +133,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
while (true) {
// refresh region
region = session.getRegionManager().getRegionByKey(primary);
region = regionManager.getRegionByKey(primary);
Supplier<CleanupRequest> factory =
() ->
@ -128,7 +144,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
.build();
KVErrorHandler<CleanupResponse> handler =
new KVErrorHandler<>(
session.getRegionManager(),
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
@ -197,7 +213,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
private void resolveLock(BackOffer bo, Lock lock, long txnStatus, Set<RegionVerID> cleanRegion) {
while (true) {
region = session.getRegionManager().getRegionByKey(lock.getKey());
region = regionManager.getRegionByKey(lock.getKey());
if (cleanRegion.contains(region.getVerID())) {
return;
@ -225,7 +241,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
KVErrorHandler<ResolveLockResponse> handler =
new KVErrorHandler<>(
session.getRegionManager(),
regionManager,
this,
region,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
@ -273,7 +289,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
if (logger.isDebugEnabled()) {
logger.debug(region + ", new leader = " + newStore.getId());
}
TiRegion cachedRegion = getSession().getRegionManager().getRegionById(region.getId());
TiRegion cachedRegion = regionManager.getRegionById(region.getId());
// When switch leader fails or the region changed its key range,
// it would be necessary to re-split task's key range for new region.
if (!region.getStartKey().equals(cachedRegion.getStartKey())
@ -281,9 +297,8 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
return false;
}
region = cachedRegion;
String addressStr =
getSession().getRegionManager().getStoreById(region.getLeader().getStoreId()).getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
String addressStr = newStore.getAddress();
ManagedChannel channel = channelFactory.getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
return true;
@ -292,7 +307,7 @@ public class LockResolverClient extends AbstractGRPCClient<TikvBlockingStub, Tik
@Override
public void onStoreNotMatch(Store store) {
String addressStr = store.getAddress();
ManagedChannel channel = getSession().getChannel(addressStr);
ManagedChannel channel = channelFactory.getChannel(addressStr);
blockingStub = TikvGrpc.newBlockingStub(channel);
asyncStub = TikvGrpc.newStub(channel);
}

View File

@ -1,11 +1,10 @@
package org.tikv.common;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.junit.After;
import org.junit.Before;
import org.tikv.common.TiConfiguration.KVMode;
import org.tikv.common.region.TiRegion;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.Pdpb;
@ -40,7 +39,8 @@ public class MockServerTest {
.build();
region =
new TiRegion(r, r.getPeers(0), Kvrpcpb.IsolationLevel.RC, Kvrpcpb.CommandPri.Low, "KV");
new TiRegion(
r, r.getPeers(0), Kvrpcpb.IsolationLevel.RC, Kvrpcpb.CommandPri.Low, KVMode.TXN);
pdServer.addGetRegionResp(Pdpb.GetRegionResponse.newBuilder().setRegion(r).build());
server = new KVMockServer();
port = server.start(region);
@ -52,10 +52,6 @@ public class MockServerTest {
@After
public void tearDown() throws Exception {
server.stop();
}
@VisibleForTesting
protected static Coprocessor.KeyRange createByteStringRange(ByteString sKey, ByteString eKey) {
return Coprocessor.KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
session.close();
}
}

View File

@ -56,6 +56,7 @@ public class PDClientTest {
@After
public void tearDown() {
session.close();
server.stop();
}

View File

@ -34,6 +34,7 @@ public class RegionManagerTest {
private static final long CLUSTER_ID = 1024;
private static final String LOCAL_ADDR = "127.0.0.1";
private RegionManager mgr;
private TiSession session;
@Before
public void setup() throws IOException {
@ -47,13 +48,14 @@ public class RegionManagerTest {
GrpcUtils.makeMember(2, "http://" + LOCAL_ADDR + ":" + (server.port + 2))));
TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:" + server.port);
TiSession session = TiSession.create(conf);
mgr = session.getRegionManager();
session = TiSession.create(conf);
mgr = new RegionManager(session.getPDClient());
}
@After
public void tearDown() {
server.stop();
session.close();
}
@Test
@ -86,7 +88,7 @@ public class RegionManagerTest {
try {
mgr.getRegionByKey(searchKeyNotExists);
fail();
} catch (Exception e) {
} catch (Exception ignored) {
}
}
@ -190,7 +192,7 @@ public class RegionManagerTest {
try {
mgr.getStoreById(storeId);
fail();
} catch (Exception e) {
} catch (Exception ignored) {
}
}
}

View File

@ -21,7 +21,9 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import org.junit.Test;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.Kvrpcpb;
@ -37,7 +39,13 @@ public class RegionStoreClientTest extends MockServerTest {
.setState(Metapb.StoreState.Up)
.build();
return RegionStoreClient.create(region, store, session);
RegionStoreClientBuilder builder =
new RegionStoreClientBuilder(
session.getConf(),
session.getChannelFactory(),
new RegionManager(session.getPDClient()));
return builder.build(region, store);
}
@Test

View File

@ -1,13 +1,17 @@
package org.tikv.raw;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
@ -15,6 +19,7 @@ import org.tikv.common.util.FastByteComparisons;
import org.tikv.kvproto.Kvrpcpb;
public class RawKVClientTest {
private static final String DEFAULT_PD_ADDRESS = "127.0.0.1:2379";
private static final String RAW_PREFIX = "raw_\\u0001_";
private static final int KEY_POOL_SIZE = 1000000;
private static final int TEST_CASES = 10000;
@ -33,6 +38,7 @@ public class RawKVClientTest {
private final ExecutorCompletionService<Object> completionService =
new ExecutorCompletionService<>(executors);
private static final Logger logger = Logger.getLogger(RawKVClientTest.class);
private TiSession session;
static {
orderedKeys = new ArrayList<>();
@ -58,11 +64,12 @@ public class RawKVClientTest {
}
@Before
public void setClient() {
public void setup() throws IOException {
try {
session = TiSession.create(TiConfiguration.createDefault(DEFAULT_PD_ADDRESS));
initialized = false;
if (client == null) {
client = RawKVClient.create();
client = session.createRawClient();
}
data = new TreeMap<>(bsc);
initialized = true;
@ -71,6 +78,11 @@ public class RawKVClientTest {
}
}
@After
public void tearDown() {
session.close();
}
@Test
public void simpleTest() {
if (!initialized) return;

View File

@ -1,516 +0,0 @@
/*
* Copyright 2017 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.txn;
import static junit.framework.TestCase.*;
import static org.tikv.common.util.BackOffFunction.BackOffFuncType.BoTxnLock;
import com.google.protobuf.ByteString;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.tikv.common.PDClient;
import org.tikv.common.ReadOnlyPDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.KeyException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.common.operation.KVErrorHandler;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb.*;
import org.tikv.kvproto.Metapb.Store;
import org.tikv.kvproto.TikvGrpc;
public class LockResolverTest {
private TiSession session;
private static final int DefaultTTL = 10;
private boolean init = false;
private BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(1000);
private ReadOnlyPDClient pdClient;
private void putKV(String key, String value, long startTS, long commitTS) {
Mutation m =
Mutation.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setOp(Op.Put)
.setValue(ByteString.copyFromUtf8(value))
.build();
boolean res = prewrite(Arrays.asList(m), startTS, m);
assertTrue(res);
res = commit(startTS, commitTS, Arrays.asList(ByteString.copyFromUtf8(key)));
assertTrue(res);
}
private boolean prewrite(List<Mutation> mutations, long startTS, Mutation primary) {
if (mutations.size() == 0) return true;
for (Mutation m : mutations) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(m.getKey());
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
Supplier<PrewriteRequest> factory =
() ->
PrewriteRequest.newBuilder()
.addAllMutations(Arrays.asList(m))
.setPrimaryLock(primary.getKey())
.setStartVersion(startTS)
.setLockTtl(DefaultTTL)
.setContext(pair.first.getContext())
.build();
KVErrorHandler<PrewriteResponse> handler =
new KVErrorHandler<>(
session.getRegionManager(),
client,
pair.first,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
PrewriteResponse resp =
client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_PREWRITE, factory, handler);
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (resp.getErrorsCount() == 0) {
continue;
}
List<Lock> locks = new ArrayList<>();
for (KeyError err : resp.getErrorsList()) {
if (err.hasLocked()) {
Lock lock = new Lock(err.getLocked());
locks.add(lock);
} else {
throw new KeyException(err);
}
}
LockResolverClient resolver = null;
try {
Field field = RegionStoreClient.class.getDeclaredField("lockResolverClient");
assert (field != null);
field.setAccessible(true);
resolver = (LockResolverClient) (field.get(client));
} catch (Exception e) {
fail();
}
assertNotNull(resolver);
if (!resolver.resolveLocks(backOffer, locks)) {
backOffer.doBackOff(BoTxnLock, new KeyException(resp.getErrorsList().get(0)));
}
prewrite(Arrays.asList(m), startTS, primary);
}
return true;
}
private boolean lockKey(
String key,
String value,
String primaryKey,
String primaryValue,
boolean commitPrimary,
long startTs,
long commitTS) {
List<Mutation> mutations = new ArrayList<>();
mutations.add(
Mutation.newBuilder()
.setKey(ByteString.copyFromUtf8(primaryKey))
.setValue(ByteString.copyFromUtf8(primaryValue))
.setOp(Op.Put)
.build());
if (!key.equals(primaryKey)) {
mutations.add(
Mutation.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(value))
.setOp(Op.Put)
.build());
}
if (!prewrite(mutations, startTs, mutations.get(0))) return false;
if (commitPrimary) {
if (!key.equals(primaryKey)) {
if (!commit(
startTs,
commitTS,
Arrays.asList(ByteString.copyFromUtf8(primaryKey), ByteString.copyFromUtf8(key)))) {
return false;
}
} else {
if (!commit(startTs, commitTS, Arrays.asList(ByteString.copyFromUtf8(primaryKey)))) {
return false;
}
}
}
return true;
}
private boolean commit(long startTS, long commitTS, List<ByteString> keys) {
if (keys.size() == 0) return true;
for (ByteString k : keys) {
Pair<TiRegion, Store> pair = session.getRegionManager().getRegionStorePairByKey(k);
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
Supplier<CommitRequest> factory =
() ->
CommitRequest.newBuilder()
.setStartVersion(startTS)
.setCommitVersion(commitTS)
.addAllKeys(Arrays.asList(k))
.setContext(pair.first.getContext())
.build();
KVErrorHandler<CommitResponse> handler =
new KVErrorHandler<>(
session.getRegionManager(),
client,
pair.first,
resp -> resp.hasRegionError() ? resp.getRegionError() : null);
CommitResponse resp =
client.callWithRetry(backOffer, TikvGrpc.METHOD_KV_COMMIT, factory, handler);
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (resp.hasError()) {
throw new KeyException(resp.getError());
}
}
return true;
}
private void putAlphabet() {
for (int i = 0; i < 26; i++) {
long startTs = pdClient.getTimestamp(backOffer).getVersion();
long endTs = pdClient.getTimestamp(backOffer).getVersion();
while (startTs == endTs) {
endTs = pdClient.getTimestamp(backOffer).getVersion();
}
putKV(String.valueOf((char) ('a' + i)), String.valueOf((char) ('a' + i)), startTs, endTs);
}
for (int i = 0; i < 26; i++) {
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i)));
}
}
private void prepareAlphabetLocks() {
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
while (startTs == endTs) {
endTs = pdClient.getTimestamp(backOffer);
}
putKV("c", "cc", startTs.getVersion(), endTs.getVersion());
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
while (startTs == endTs) {
endTs = pdClient.getTimestamp(backOffer);
}
assertTrue(lockKey("c", "c", "z1", "z1", true, startTs.getVersion(), endTs.getVersion()));
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
while (startTs == endTs) {
endTs = pdClient.getTimestamp(backOffer);
}
assertTrue(lockKey("d", "dd", "z2", "z2", false, startTs.getVersion(), endTs.getVersion()));
}
private BackOffer defaultBackOff() {
return ConcreteBackOffer.newCustomBackOff(1000);
}
private class RetryException extends RuntimeException {
public RetryException() {}
}
@Before
public void setUp() throws Exception {
TiConfiguration conf = TiConfiguration.createDefault("127.0.0.1:2379");
session = TiSession.create(conf);
try {
pdClient = PDClient.create(session);
} catch (Exception e) {
init = false;
}
}
@Test
public void getSITest() throws Exception {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.SI);
putAlphabet();
prepareAlphabetLocks();
for (int i = 0; i < 26; i++) {
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i)));
}
session.getConf().setIsolationLevel(IsolationLevel.RC);
}
@Test
public void getRCTest() {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.RC);
putAlphabet();
prepareAlphabetLocks();
for (int i = 0; i < 26; i++) {
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i)));
}
}
@Test
public void cleanLockTest() {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.SI);
for (int i = 0; i < 26; i++) {
String k = String.valueOf((char) ('a' + i));
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
lockKey(k, k, k, k, false, startTs.getVersion(), endTs.getVersion());
}
List<Mutation> mutations = new ArrayList<>();
List<ByteString> keys = new ArrayList<>();
for (int i = 0; i < 26; i++) {
String k = String.valueOf((char) ('a' + i));
String v = String.valueOf((char) ('a' + i + 1));
Mutation m =
Mutation.newBuilder()
.setKey(ByteString.copyFromUtf8(k))
.setOp(Op.Put)
.setValue(ByteString.copyFromUtf8(v))
.build();
mutations.add(m);
keys.add(ByteString.copyFromUtf8(k));
}
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
boolean res = prewrite(mutations, startTs.getVersion(), mutations.get(0));
assertTrue(res);
res = commit(startTs.getVersion(), endTs.getVersion(), keys);
assertTrue(res);
for (int i = 0; i < 26; i++) {
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a' + i))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a' + i + 1)));
}
session.getConf().setIsolationLevel(IsolationLevel.RC);
}
@Test
public void txnStatusTest() {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.SI);
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
putKV("a", "a", startTs.getVersion(), endTs.getVersion());
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
long status =
client.lockResolverClient.getTxnStatus(
backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
assertEquals(status, endTs.getVersion());
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
lockKey("a", "a", "a", "a", true, startTs.getVersion(), endTs.getVersion());
pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
client = RegionStoreClient.create(pair.first, pair.second, session);
status =
client.lockResolverClient.getTxnStatus(
backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
assertEquals(status, endTs.getVersion());
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
lockKey("a", "a", "a", "a", false, startTs.getVersion(), endTs.getVersion());
pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
client = RegionStoreClient.create(pair.first, pair.second, session);
status =
client.lockResolverClient.getTxnStatus(
backOffer, startTs.getVersion(), ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
assertNotSame(status, endTs.getVersion());
session.getConf().setIsolationLevel(IsolationLevel.RC);
}
@Test
public void SITest() {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.SI);
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
putKV("a", "a", startTs.getVersion(), endTs.getVersion());
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion());
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a'))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a')));
try {
commit(startTs.getVersion(), endTs.getVersion(), Arrays.asList(ByteString.copyFromUtf8("a")));
fail();
} catch (KeyException e) {
assertNotNull(e.getKeyErr().getRetryable());
}
session.getConf().setIsolationLevel(IsolationLevel.RC);
}
@Test
public void RCTest() {
if (!init) {
System.out.println("PD client not initialized. Test skipped");
return;
}
session.getConf().setIsolationLevel(IsolationLevel.RC);
TiTimestamp startTs = pdClient.getTimestamp(backOffer);
TiTimestamp endTs = pdClient.getTimestamp(backOffer);
putKV("a", "a", startTs.getVersion(), endTs.getVersion());
startTs = pdClient.getTimestamp(backOffer);
endTs = pdClient.getTimestamp(backOffer);
lockKey("a", "aa", "a", "aa", false, startTs.getVersion(), endTs.getVersion());
Pair<TiRegion, Store> pair =
session
.getRegionManager()
.getRegionStorePairByKey(ByteString.copyFromUtf8(String.valueOf((char) ('a'))));
RegionStoreClient client = RegionStoreClient.create(pair.first, pair.second, session);
ByteString v =
client.get(
backOffer,
ByteString.copyFromUtf8(String.valueOf((char) ('a'))),
pdClient.getTimestamp(backOffer).getVersion());
assertEquals(v.toStringUtf8(), String.valueOf((char) ('a')));
try {
commit(startTs.getVersion(), endTs.getVersion(), Arrays.asList(ByteString.copyFromUtf8("a")));
} catch (KeyException e) {
fail();
}
}
private static Coprocessor.KeyRange createByteStringRange(ByteString sKey, ByteString eKey) {
return Coprocessor.KeyRange.newBuilder().setStart(sKey).setEnd(eKey).build();
}
}