mirror of https://github.com/tikv/client-java.git
This commit is contained in:
parent
97983823cc
commit
d354ffc99a
|
@ -840,4 +840,12 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
|
|||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public long getClusterId() {
|
||||
return header.getClusterId();
|
||||
}
|
||||
|
||||
public List<URI> getPdAddrs() {
|
||||
return pdAddrs;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,11 @@
|
|||
|
||||
package org.tikv.common.log;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import java.util.Map;
|
||||
|
||||
public interface SlowLog {
|
||||
|
||||
SlowLogSpan start(String name);
|
||||
|
||||
long getTraceId();
|
||||
|
@ -26,5 +30,11 @@ public interface SlowLog {
|
|||
|
||||
void setError(Throwable err);
|
||||
|
||||
SlowLog withFields(Map<String, Object> fields);
|
||||
|
||||
default SlowLog withField(String key, Object value) {
|
||||
return withFields(ImmutableMap.of(key, value));
|
||||
}
|
||||
|
||||
void log();
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.tikv.common.log;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class SlowLogEmptyImpl implements SlowLog {
|
||||
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();
|
||||
|
||||
|
@ -40,6 +42,11 @@ public class SlowLogEmptyImpl implements SlowLog {
|
|||
@Override
|
||||
public void setError(Throwable err) {}
|
||||
|
||||
@Override
|
||||
public SlowLog withFields(Map<String, Object> fields) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log() {}
|
||||
}
|
||||
|
|
|
@ -22,12 +22,16 @@ import com.google.gson.JsonObject;
|
|||
import java.math.BigInteger;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SlowLogImpl implements SlowLog {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);
|
||||
|
||||
private static final int MAX_SPAN_SIZE = 1024;
|
||||
|
@ -35,6 +39,7 @@ public class SlowLogImpl implements SlowLog {
|
|||
private static final Random random = new Random();
|
||||
|
||||
private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
|
||||
private final HashMap<String, Object> fields = new HashMap<>();
|
||||
private Throwable error = null;
|
||||
|
||||
private final long startMS;
|
||||
|
@ -81,6 +86,12 @@ public class SlowLogImpl implements SlowLog {
|
|||
this.error = err;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SlowLog withFields(Map<String, Object> fields) {
|
||||
this.fields.putAll(fields);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void log() {
|
||||
recordTime();
|
||||
|
@ -120,6 +131,25 @@ public class SlowLogImpl implements SlowLog {
|
|||
}
|
||||
jsonObject.add("spans", jsonArray);
|
||||
|
||||
for (Entry<String, Object> entry : fields.entrySet()) {
|
||||
Object value = entry.getValue();
|
||||
if (value instanceof List) {
|
||||
JsonArray field = new JsonArray();
|
||||
for (Object o : (List<?>) value) {
|
||||
field.add(o.toString());
|
||||
}
|
||||
jsonObject.add(entry.getKey(), field);
|
||||
} else if (value instanceof Map) {
|
||||
JsonObject field = new JsonObject();
|
||||
for (Entry<?, ?> e : ((Map<?, ?>) value).entrySet()) {
|
||||
field.addProperty(e.getKey().toString(), e.getValue().toString());
|
||||
}
|
||||
jsonObject.add(entry.getKey(), field);
|
||||
} else {
|
||||
jsonObject.addProperty(entry.getKey(), value.toString());
|
||||
}
|
||||
}
|
||||
|
||||
return jsonObject;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,13 +17,31 @@
|
|||
|
||||
package org.tikv.raw;
|
||||
|
||||
import static org.tikv.common.util.ClientUtils.*;
|
||||
import static org.tikv.common.util.ClientUtils.appendBatches;
|
||||
import static org.tikv.common.util.ClientUtils.genUUID;
|
||||
import static org.tikv.common.util.ClientUtils.getBatches;
|
||||
import static org.tikv.common.util.ClientUtils.getTasks;
|
||||
import static org.tikv.common.util.ClientUtils.getTasksWithOutput;
|
||||
import static org.tikv.common.util.ClientUtils.groupKeysByRegion;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Histogram;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -44,10 +62,19 @@ import org.tikv.common.operation.iterator.RawScanIterator;
|
|||
import org.tikv.common.region.RegionStoreClient;
|
||||
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
|
||||
import org.tikv.common.region.TiRegion;
|
||||
import org.tikv.common.util.*;
|
||||
import org.tikv.common.util.BackOffFunction;
|
||||
import org.tikv.common.util.BackOffer;
|
||||
import org.tikv.common.util.Batch;
|
||||
import org.tikv.common.util.ConcreteBackOffer;
|
||||
import org.tikv.common.util.DeleteRange;
|
||||
import org.tikv.common.util.HistogramUtils;
|
||||
import org.tikv.common.util.Pair;
|
||||
import org.tikv.common.util.ScanOption;
|
||||
import org.tikv.kvproto.Kvrpcpb.KvPair;
|
||||
|
||||
public class RawKVClient implements RawKVClientBase {
|
||||
private final long clusterId;
|
||||
private final List<URI> pdAddresses;
|
||||
private final TiSession tiSession;
|
||||
private final RegionStoreClientBuilder clientBuilder;
|
||||
private final TiConfiguration conf;
|
||||
|
@ -95,6 +122,12 @@ public class RawKVClient implements RawKVClientBase {
|
|||
this.batchScanThreadPool = session.getThreadPoolForBatchScan();
|
||||
this.deleteRangeThreadPool = session.getThreadPoolForDeleteRange();
|
||||
this.atomicForCAS = conf.isEnableAtomicForCAS();
|
||||
this.clusterId = session.getPDClient().getClusterId();
|
||||
this.pdAddresses = session.getPDClient().getPdAddrs();
|
||||
}
|
||||
|
||||
private SlowLog withClusterInfo(SlowLog logger) {
|
||||
return logger.withField("cluster_id", clusterId).withField("pd_addresses", pdAddresses);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -110,7 +143,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
String label = "client_raw_put";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("put");
|
||||
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
|
||||
|
||||
|
@ -172,7 +205,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
String label = "client_raw_compare_and_set";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("putIfAbsent");
|
||||
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
|
||||
|
||||
|
@ -211,7 +244,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
String label = "client_raw_batch_put";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("batchPut");
|
||||
span.addProperty("keySize", String.valueOf(kvPairs.size()));
|
||||
|
||||
|
@ -237,7 +270,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
String label = "client_raw_get";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("get");
|
||||
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
|
||||
|
||||
|
@ -270,7 +303,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public List<KvPair> batchGet(List<ByteString> keys) {
|
||||
String label = "client_raw_batch_get";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchReadSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("batchGet");
|
||||
span.addProperty("keySize", String.valueOf(keys.size()));
|
||||
ConcreteBackOffer backOffer =
|
||||
|
@ -295,7 +328,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public void batchDelete(List<ByteString> keys) {
|
||||
String label = "client_raw_batch_delete";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVBatchWriteSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("batchDelete");
|
||||
span.addProperty("keySize", String.valueOf(keys.size()));
|
||||
ConcreteBackOffer backOffer =
|
||||
|
@ -320,7 +353,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public Optional<Long> getKeyTTL(ByteString key) {
|
||||
String label = "client_raw_get_key_ttl";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVReadSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVReadSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("getKeyTTL");
|
||||
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
|
||||
ConcreteBackOffer backOffer =
|
||||
|
@ -428,7 +461,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
|
||||
String label = "client_raw_scan";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("scan");
|
||||
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
|
||||
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
|
||||
|
@ -473,7 +506,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
|
||||
String label = "client_raw_scan_without_limit";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVScanSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVScanSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("scan");
|
||||
span.addProperty("startKey", KeyUtils.formatBytesUTF8(startKey));
|
||||
span.addProperty("endKey", KeyUtils.formatBytesUTF8(endKey));
|
||||
|
@ -539,7 +572,7 @@ public class RawKVClient implements RawKVClientBase {
|
|||
public void delete(ByteString key) {
|
||||
String label = "client_raw_delete";
|
||||
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
|
||||
SlowLog slowLog = new SlowLogImpl(conf.getRawKVWriteSlowLogInMS());
|
||||
SlowLog slowLog = withClusterInfo(new SlowLogImpl(conf.getRawKVWriteSlowLogInMS()));
|
||||
SlowLogSpan span = slowLog.start("delete");
|
||||
span.addProperty("key", KeyUtils.formatBytesUTF8(key));
|
||||
ConcreteBackOffer backOffer =
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
|
||||
package org.tikv.common.log;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonObject;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -63,4 +66,24 @@ public class SlowLogImplTest {
|
|||
Assert.assertEquals("18446744073709551615", SlowLogImpl.toUnsignedBigInteger(-1L).toString());
|
||||
Assert.assertEquals("18446744073709551614", SlowLogImpl.toUnsignedBigInteger(-2L).toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithFields() throws InterruptedException {
|
||||
SlowLogImpl slowLog = new SlowLogImpl(1);
|
||||
slowLog
|
||||
.withField("key0", "value0")
|
||||
.withField("key1", ImmutableList.of("value0", "value1"))
|
||||
.withField("key2", ImmutableMap.of("key3", "value3"));
|
||||
|
||||
JsonObject object = slowLog.getSlowLogJson();
|
||||
Assert.assertEquals("value0", object.get("key0").getAsString());
|
||||
|
||||
AtomicInteger i = new AtomicInteger();
|
||||
object
|
||||
.get("key1")
|
||||
.getAsJsonArray()
|
||||
.forEach(e -> Assert.assertEquals("value" + (i.getAndIncrement()), e.getAsString()));
|
||||
|
||||
Assert.assertEquals("value3", object.get("key2").getAsJsonObject().get("key3").getAsString());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue