mirror of https://github.com/tikv/client-java.git
do not use TiSession.getInstance (#117)
This commit is contained in:
parent
66fd0690a9
commit
f58a530948
|
|
@ -73,7 +73,7 @@ public class Main {
|
||||||
public static void main() {
|
public static void main() {
|
||||||
// You MUST create a raw configuration if you are using RawKVClient.
|
// You MUST create a raw configuration if you are using RawKVClient.
|
||||||
TiConfiguration conf = TiConfiguration.createRawDefault(YOUR_PD_ADDRESSES);
|
TiConfiguration conf = TiConfiguration.createRawDefault(YOUR_PD_ADDRESSES);
|
||||||
TiSession session = TiSession.getInstance(conf);
|
TiSession session = TiSession.create(conf);
|
||||||
RawKVClient = session.createRawKVClient();
|
RawKVClient = session.createRawKVClient();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,12 +40,10 @@ import org.tikv.kvproto.Kvrpcpb.KvPair;
|
||||||
public class Snapshot {
|
public class Snapshot {
|
||||||
private final TiTimestamp timestamp;
|
private final TiTimestamp timestamp;
|
||||||
private final TiSession session;
|
private final TiSession session;
|
||||||
private final TiConfiguration conf;
|
|
||||||
|
|
||||||
public Snapshot(@Nonnull TiTimestamp timestamp, TiConfiguration conf) {
|
public Snapshot(@Nonnull TiTimestamp timestamp, TiSession session) {
|
||||||
this.timestamp = timestamp;
|
this.timestamp = timestamp;
|
||||||
this.conf = conf;
|
this.session = session;
|
||||||
this.session = TiSession.getInstance(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TiSession getSession() {
|
public TiSession getSession() {
|
||||||
|
|
@ -175,6 +173,6 @@ public class Snapshot {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TiConfiguration getConf() {
|
public TiConfiguration getConf() {
|
||||||
return conf;
|
return this.session.getConf();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,7 @@ public class TiSession implements AutoCloseable {
|
||||||
return new TiSession(conf);
|
return new TiSession(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public static TiSession getInstance(TiConfiguration conf) {
|
public static TiSession getInstance(TiConfiguration conf) {
|
||||||
synchronized (sessionCachedMap) {
|
synchronized (sessionCachedMap) {
|
||||||
String key = conf.getPdAddrsString();
|
String key = conf.getPdAddrsString();
|
||||||
|
|
@ -132,11 +133,11 @@ public class TiSession implements AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Snapshot createSnapshot() {
|
public Snapshot createSnapshot() {
|
||||||
return new Snapshot(getTimestamp(), this.conf);
|
return new Snapshot(getTimestamp(), this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Snapshot createSnapshot(TiTimestamp ts) {
|
public Snapshot createSnapshot(TiTimestamp ts) {
|
||||||
return new Snapshot(ts, conf);
|
return new Snapshot(ts, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
public PDClient getPDClient() {
|
public PDClient getPDClient() {
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ import java.io.Serializable;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import org.tikv.common.Snapshot;
|
import org.tikv.common.Snapshot;
|
||||||
import org.tikv.common.TiConfiguration;
|
|
||||||
import org.tikv.common.TiSession;
|
import org.tikv.common.TiSession;
|
||||||
import org.tikv.common.codec.CodecDataInput;
|
import org.tikv.common.codec.CodecDataInput;
|
||||||
import org.tikv.common.codec.CodecDataOutput;
|
import org.tikv.common.codec.CodecDataOutput;
|
||||||
|
|
@ -41,15 +40,15 @@ import org.tikv.txn.TwoPhaseCommitter;
|
||||||
public final class RowIDAllocator implements Serializable {
|
public final class RowIDAllocator implements Serializable {
|
||||||
private final long maxShardRowIDBits;
|
private final long maxShardRowIDBits;
|
||||||
private final long dbId;
|
private final long dbId;
|
||||||
private final TiConfiguration conf;
|
private final TiSession session;
|
||||||
private final long step;
|
private final long step;
|
||||||
private long end;
|
private long end;
|
||||||
|
|
||||||
private RowIDAllocator(long maxShardRowIDBits, long dbId, long step, TiConfiguration conf) {
|
private RowIDAllocator(long maxShardRowIDBits, long dbId, long step, TiSession session) {
|
||||||
this.maxShardRowIDBits = maxShardRowIDBits;
|
this.maxShardRowIDBits = maxShardRowIDBits;
|
||||||
this.dbId = dbId;
|
this.dbId = dbId;
|
||||||
this.step = step;
|
this.step = step;
|
||||||
this.conf = conf;
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -72,18 +71,13 @@ public final class RowIDAllocator implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RowIDAllocator create(
|
public static RowIDAllocator create(
|
||||||
long dbId, TiTableInfo table, TiConfiguration conf, boolean unsigned, long step) {
|
long dbId, TiTableInfo table, TiSession session, boolean unsigned, long step) {
|
||||||
RowIDAllocator allocator = new RowIDAllocator(table.getMaxShardRowIDBits(), dbId, step, conf);
|
RowIDAllocator allocator =
|
||||||
|
new RowIDAllocator(table.getMaxShardRowIDBits(), dbId, step, session);
|
||||||
if (unsigned) {
|
if (unsigned) {
|
||||||
allocator.initUnsigned(
|
allocator.initUnsigned(session.createSnapshot(), table.getId(), table.getMaxShardRowIDBits());
|
||||||
TiSession.getInstance(conf).createSnapshot(),
|
|
||||||
table.getId(),
|
|
||||||
table.getMaxShardRowIDBits());
|
|
||||||
} else {
|
} else {
|
||||||
allocator.initSigned(
|
allocator.initSigned(session.createSnapshot(), table.getId(), table.getMaxShardRowIDBits());
|
||||||
TiSession.getInstance(conf).createSnapshot(),
|
|
||||||
table.getId(),
|
|
||||||
table.getMaxShardRowIDBits());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return allocator;
|
return allocator;
|
||||||
|
|
@ -99,9 +93,8 @@ public final class RowIDAllocator implements Serializable {
|
||||||
|
|
||||||
// set key value pair to tikv via two phase committer protocol.
|
// set key value pair to tikv via two phase committer protocol.
|
||||||
private void set(ByteString key, byte[] value) {
|
private void set(ByteString key, byte[] value) {
|
||||||
TiSession session = TiSession.getInstance(conf);
|
|
||||||
TwoPhaseCommitter twoPhaseCommitter =
|
TwoPhaseCommitter twoPhaseCommitter =
|
||||||
new TwoPhaseCommitter(conf, session.getTimestamp().getVersion());
|
new TwoPhaseCommitter(session, session.getTimestamp().getVersion());
|
||||||
|
|
||||||
twoPhaseCommitter.prewritePrimaryKey(
|
twoPhaseCommitter.prewritePrimaryKey(
|
||||||
ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF),
|
ConcreteBackOffer.newCustomBackOff(BackOffer.PREWRITE_MAX_BACKOFF),
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.tikv.common.TiConfiguration;
|
|
||||||
import org.tikv.common.TiSession;
|
import org.tikv.common.TiSession;
|
||||||
import org.tikv.common.codec.KeyUtils;
|
import org.tikv.common.codec.KeyUtils;
|
||||||
import org.tikv.common.exception.GrpcException;
|
import org.tikv.common.exception.GrpcException;
|
||||||
|
|
@ -64,12 +63,12 @@ public class TTLManager {
|
||||||
private final ScheduledExecutorService scheduler;
|
private final ScheduledExecutorService scheduler;
|
||||||
private final AtomicInteger state;
|
private final AtomicInteger state;
|
||||||
|
|
||||||
public TTLManager(TiConfiguration conf, long startTS, byte[] primaryKey) {
|
public TTLManager(TiSession session, long startTS, byte[] primaryKey) {
|
||||||
this.startTS = startTS;
|
this.startTS = startTS;
|
||||||
this.primaryLock = ByteString.copyFrom(primaryKey);
|
this.primaryLock = ByteString.copyFrom(primaryKey);
|
||||||
this.state = new AtomicInteger(STATE_UNINITIALIZED);
|
this.state = new AtomicInteger(STATE_UNINITIALIZED);
|
||||||
|
|
||||||
this.kvClient = TiSession.getInstance(conf).createTxnClient();
|
this.kvClient = session.createTxnClient();
|
||||||
this.regionManager = kvClient.getRegionManager();
|
this.regionManager = kvClient.getRegionManager();
|
||||||
|
|
||||||
scheduler =
|
scheduler =
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,6 @@ import java.util.concurrent.Executors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.tikv.common.BytePairWrapper;
|
import org.tikv.common.BytePairWrapper;
|
||||||
import org.tikv.common.TiConfiguration;
|
|
||||||
import org.tikv.common.TiSession;
|
import org.tikv.common.TiSession;
|
||||||
import org.tikv.common.codec.KeyUtils;
|
import org.tikv.common.codec.KeyUtils;
|
||||||
import org.tikv.common.exception.GrpcException;
|
import org.tikv.common.exception.GrpcException;
|
||||||
|
|
@ -82,8 +81,8 @@ public class TwoPhaseCommitter {
|
||||||
private final int prewriteMaxRetryTimes;
|
private final int prewriteMaxRetryTimes;
|
||||||
private final ExecutorService executorService;
|
private final ExecutorService executorService;
|
||||||
|
|
||||||
public TwoPhaseCommitter(TiConfiguration conf, long startTime) {
|
public TwoPhaseCommitter(TiSession session, long startTime) {
|
||||||
this.kvClient = TiSession.getInstance(conf).createTxnClient();
|
this.kvClient = session.createTxnClient();
|
||||||
this.regionManager = kvClient.getRegionManager();
|
this.regionManager = kvClient.getRegionManager();
|
||||||
this.startTs = startTime;
|
this.startTs = startTime;
|
||||||
this.lockTTL = DEFAULT_BATCH_WRITE_LOCK_TTL;
|
this.lockTTL = DEFAULT_BATCH_WRITE_LOCK_TTL;
|
||||||
|
|
@ -97,7 +96,7 @@ public class TwoPhaseCommitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public TwoPhaseCommitter(
|
public TwoPhaseCommitter(
|
||||||
TiConfiguration conf,
|
TiSession session,
|
||||||
long startTime,
|
long startTime,
|
||||||
long lockTTL,
|
long lockTTL,
|
||||||
long txnPrewriteBatchSize,
|
long txnPrewriteBatchSize,
|
||||||
|
|
@ -106,7 +105,7 @@ public class TwoPhaseCommitter {
|
||||||
int writeThreadPerTask,
|
int writeThreadPerTask,
|
||||||
boolean retryCommitSecondaryKeys,
|
boolean retryCommitSecondaryKeys,
|
||||||
int prewriteMaxRetryTimes) {
|
int prewriteMaxRetryTimes) {
|
||||||
this.kvClient = TiSession.getInstance(conf).createTxnClient();
|
this.kvClient = session.createTxnClient();
|
||||||
this.regionManager = kvClient.getRegionManager();
|
this.regionManager = kvClient.getRegionManager();
|
||||||
this.startTs = startTime;
|
this.startTs = startTime;
|
||||||
this.lockTTL = lockTTL;
|
this.lockTTL = lockTTL;
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,8 @@ public class RawKVClientTest {
|
||||||
data = new TreeMap<>(bsc);
|
data = new TreeMap<>(bsc);
|
||||||
initialized = true;
|
initialized = true;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn("Cannot initialize raw client, please check whether TiKV is running. Test skipped.", e);
|
logger.warn(
|
||||||
|
"Cannot initialize raw client, please check whether TiKV is running. Test skipped.", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue