add slow log (#328)

This commit is contained in:
Liangliang Gu 2021-11-19 16:47:43 +08:00 committed by GitHub
parent 1e74211269
commit a881a6585b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 571 additions and 41 deletions

View File

@ -90,7 +90,8 @@ public abstract class AbstractGRPCClient<
return ClientCalls.blockingUnaryCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get());
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
if (logger.isTraceEnabled()) {
logger.trace(String.format("leaving %s...", method.getFullMethodName()));
@ -118,7 +119,8 @@ public abstract class AbstractGRPCClient<
responseObserver);
return null;
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
}
@ -139,7 +141,8 @@ public abstract class AbstractGRPCClient<
return asyncBidiStreamingCall(
stub.getChannel().newCall(method, stub.getCallOptions()), responseObserver);
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return observer;
}
@ -162,7 +165,8 @@ public abstract class AbstractGRPCClient<
blockingServerStreamingCall(
stub.getChannel(), method, stub.getCallOptions(), requestFactory.get()));
},
method.getFullMethodName());
method.getFullMethodName(),
backOffer);
logger.debug(String.format("leaving %s...", method.getFullMethodName()));
return response;
}

View File

@ -67,6 +67,13 @@ public class ConfigUtils {
public static final String TIKV_RAWKV_SCAN_TIMEOUT_IN_MS = "tikv.rawkv.scan_timeout_in_ms";
public static final String TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = "tikv.rawkv.clean_timeout_in_ms";
public static final String TIKV_BO_REGION_MISS_BASE_IN_MS = "tikv.bo_region_miss_base_in_ms";
public static final String TIKV_RAWKV_READ_SLOWLOG_IN_MS = "tikv.rawkv.read_slowlog_in_ms";
public static final String TIKV_RAWKV_WRITE_SLOWLOG_IN_MS = "tikv.rawkv.write_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS =
"tikv.rawkv.batch_read_slowlog_in_ms";
public static final String TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS =
"tikv.rawkv.batch_write_slowlog_in_ms";
public static final String TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "tikv.rawkv.scan_slowlog_in_ms";
public static final String DEF_PD_ADDRESSES = "127.0.0.1:2379";
public static final String DEF_TIMEOUT = "200ms";
@ -110,6 +117,7 @@ public class ConfigUtils {
public static final int DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS = 600000;
public static final int DEF_TIKV_BO_REGION_MISS_BASE_IN_MS = 20;
public static final String DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS = "5000";
public static final String NORMAL_COMMAND_PRIORITY = "NORMAL";
public static final String LOW_COMMAND_PRIORITY = "LOW";

View File

@ -119,6 +119,7 @@ public class TiConfiguration implements Serializable {
setIfMissing(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS, DEF_TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
setIfMissing(TIKV_BO_REGION_MISS_BASE_IN_MS, DEF_TIKV_BO_REGION_MISS_BASE_IN_MS);
setIfMissing(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS, DEF_TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
}
public static void listAll() {
@ -169,6 +170,10 @@ public class TiConfiguration implements Serializable {
return Integer.parseInt(get(key));
}
public static Optional<Integer> getIntOption(String key) {
return getOption(key).map(Integer::parseInt);
}
private static int getInt(String key, int defaultValue) {
try {
return getOption(key).map(Integer::parseInt).orElse(defaultValue);
@ -315,6 +320,13 @@ public class TiConfiguration implements Serializable {
private int rawKVBatchWriteTimeoutInMS = getInt(TIKV_RAWKV_BATCH_WRITE_TIMEOUT_IN_MS);
private int rawKVScanTimeoutInMS = getInt(TIKV_RAWKV_SCAN_TIMEOUT_IN_MS);
private int rawKVCleanTimeoutInMS = getInt(TIKV_RAWKV_CLEAN_TIMEOUT_IN_MS);
private Optional<Integer> rawKVReadSlowLogInMS = getIntOption(TIKV_RAWKV_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVWriteSlowLogInMS = getIntOption(TIKV_RAWKV_WRITE_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchReadSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_READ_SLOWLOG_IN_MS);
private Optional<Integer> rawKVBatchWriteSlowLogInMS =
getIntOption(TIKV_RAWKV_BATCH_WRITE_SLOWLOG_IN_MS);
private int rawKVScanSlowLogInMS = getInt(TIKV_RAWKV_SCAN_SLOWLOG_IN_MS);
public enum KVMode {
TXN,
@ -677,4 +689,44 @@ public class TiConfiguration implements Serializable {
public void setRawKVCleanTimeoutInMS(int rawKVCleanTimeoutInMS) {
this.rawKVCleanTimeoutInMS = rawKVCleanTimeoutInMS;
}
public Integer getRawKVReadSlowLogInMS() {
return rawKVReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVReadSlowLogInMS(Integer rawKVReadSlowLogInMS) {
this.rawKVReadSlowLogInMS = Optional.of(rawKVReadSlowLogInMS);
}
public Integer getRawKVWriteSlowLogInMS() {
return rawKVWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVWriteSlowLogInMS(Integer rawKVWriteSlowLogInMS) {
this.rawKVWriteSlowLogInMS = Optional.of(rawKVWriteSlowLogInMS);
}
public Integer getRawKVBatchReadSlowLogInMS() {
return rawKVBatchReadSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVBatchReadSlowLogInMS(Integer rawKVBatchReadSlowLogInMS) {
this.rawKVBatchReadSlowLogInMS = Optional.of(rawKVBatchReadSlowLogInMS);
}
public Integer getRawKVBatchWriteSlowLogInMS() {
return rawKVBatchWriteSlowLogInMS.orElse((int) (getTimeout() * 2));
}
public void setRawKVBatchWriteSlowLogInMS(Integer rawKVBatchWriteSlowLogInMS) {
this.rawKVBatchWriteSlowLogInMS = Optional.of(rawKVBatchWriteSlowLogInMS);
}
public int getRawKVScanSlowLogInMS() {
return rawKVScanSlowLogInMS;
}
public void setRawKVScanSlowLogInMS(int rawKVScanSlowLogInMS) {
this.rawKVScanSlowLogInMS = rawKVScanSlowLogInMS;
}
}

View File

@ -0,0 +1,26 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
public interface SlowLog {
void addProperty(String key, String value);
SlowLogSpan start(String name);
void log();
}

View File

@ -0,0 +1,35 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
public class SlowLogEmptyImpl implements SlowLog {
public static final SlowLogEmptyImpl INSTANCE = new SlowLogEmptyImpl();
private SlowLogEmptyImpl() {}
@Override
public void addProperty(String key, String value) {}
@Override
public SlowLogSpan start(String name) {
return SlowLogSpanEmptyImpl.INSTANCE;
}
@Override
public void log() {}
}

View File

@ -0,0 +1,93 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SlowLogImpl implements SlowLog {
private static final Logger logger = LoggerFactory.getLogger(SlowLogImpl.class);
private static final int MAX_SPAN_SIZE = 1024;
public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("HH:mm:ss.SSS");
private final List<SlowLogSpan> slowLogSpans = new ArrayList<>();
private final long startMS;
private final long slowThresholdMS;
/** Key-Value pairs which will be logged, e.g. function name, key, region, etc. */
private final Map<String, String> properties;
public SlowLogImpl(long slowThresholdMS, Map<String, String> properties) {
this.startMS = System.currentTimeMillis();
this.slowThresholdMS = slowThresholdMS;
this.properties = new HashMap<>(properties);
}
@Override
public void addProperty(String key, String value) {
this.properties.put(key, value);
}
@Override
public synchronized SlowLogSpan start(String name) {
SlowLogSpan slowLogSpan = new SlowLogSpanImpl(name);
if (slowLogSpans.size() < MAX_SPAN_SIZE) {
slowLogSpans.add(slowLogSpan);
}
slowLogSpan.start();
return slowLogSpan;
}
@Override
public void log() {
long currentMS = System.currentTimeMillis();
if (slowThresholdMS >= 0 && currentMS - startMS > slowThresholdMS) {
logger.warn("SlowLog:" + getSlowLogString(currentMS));
}
}
private String getSlowLogString(long currentMS) {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("start", DATE_FORMAT.format(startMS));
jsonObject.addProperty("end", DATE_FORMAT.format(currentMS));
jsonObject.addProperty("duration", (currentMS - startMS) + "ms");
for (Map.Entry<String, String> entry : properties.entrySet()) {
jsonObject.addProperty(entry.getKey(), entry.getValue());
}
JsonArray jsonArray = new JsonArray();
for (SlowLogSpan slowLogSpan : slowLogSpans) {
jsonArray.add(slowLogSpan.toJsonElement());
}
jsonObject.add("spans", jsonArray);
return jsonObject.toString();
}
}

View File

@ -0,0 +1,28 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonElement;
public interface SlowLogSpan {
void start();
void end();
JsonElement toJsonElement();
}

View File

@ -0,0 +1,39 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
public class SlowLogSpanEmptyImpl implements SlowLogSpan {
public static final SlowLogSpanEmptyImpl INSTANCE = new SlowLogSpanEmptyImpl();
private SlowLogSpanEmptyImpl() {}
@Override
public void start() {}
@Override
public void end() {}
@Override
public JsonElement toJsonElement() {
return new JsonObject();
}
}

View File

@ -0,0 +1,77 @@
/*
*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common.log;
import static org.tikv.common.log.SlowLogImpl.DATE_FORMAT;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
public class SlowLogSpanImpl implements SlowLogSpan {
private final String name;
private long startMS;
private long endMS;
public SlowLogSpanImpl(String name) {
this.name = name;
this.startMS = 0;
this.endMS = 0;
}
@Override
public void start() {
this.startMS = System.currentTimeMillis();
}
@Override
public void end() {
this.endMS = System.currentTimeMillis();
}
@Override
public JsonElement toJsonElement() {
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("name", name);
jsonObject.addProperty("start", getStartString());
jsonObject.addProperty("end", getEndString());
jsonObject.addProperty("duration", getDurationString());
return jsonObject;
}
private String getStartString() {
if (startMS == 0) {
return "N/A";
}
return DATE_FORMAT.format(startMS);
}
private String getEndString() {
if (endMS == 0) {
return "N/A";
}
return DATE_FORMAT.format(endMS);
}
private String getDurationString() {
if (startMS == 0 || endMS == 0) {
return "N/A";
}
return (endMS - startMS) + "ms";
}
}

View File

@ -217,7 +217,7 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
@Override
public boolean handleRequestError(BackOffer backOffer, Exception e) {
if (recv.onStoreUnreachable()) {
if (recv.onStoreUnreachable(backOffer.getSlowLog())) {
if (!backOffer.canRetryAfterSleep(BackOffFunction.BackOffFuncType.BoTiKVRPC)) {
regionManager.onRequestFail(recv.getRegion());
throw new GrpcException("retry is exhausted.", e);

View File

@ -21,6 +21,7 @@ import io.prometheus.client.Counter;
import io.prometheus.client.Histogram;
import java.util.concurrent.Callable;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.operation.ErrorHandler;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
@ -67,8 +68,10 @@ public abstract class RetryPolicy<RespT> {
}
}
public RespT callWithRetry(Callable<RespT> proc, String methodName) {
public RespT callWithRetry(Callable<RespT> proc, String methodName, BackOffer backOffer) {
Histogram.Timer callWithRetryTimer = CALL_WITH_RETRY_DURATION.labels(methodName).startTimer();
SlowLogSpan callWithRetrySlowLogSpan =
backOffer.getSlowLog().start("callWithRetry " + methodName);
try {
while (true) {
RespT result = null;
@ -76,9 +79,11 @@ public abstract class RetryPolicy<RespT> {
// add single request duration histogram
Histogram.Timer requestTimer =
GRPC_SINGLE_REQUEST_LATENCY.labels(methodName).startTimer();
SlowLogSpan slowLogSpan = backOffer.getSlowLog().start("gRPC " + methodName);
try {
result = proc.call();
} finally {
slowLogSpan.end();
requestTimer.observeDuration();
}
} catch (Exception e) {
@ -105,6 +110,7 @@ public abstract class RetryPolicy<RespT> {
}
} finally {
callWithRetryTimer.observeDuration();
callWithRetrySlowLogSpan.end();
}
}

View File

@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.Pair;
import org.tikv.kvproto.Kvrpcpb;
@ -80,7 +83,9 @@ public abstract class AbstractRegionStoreClient
if (this.store.getProxyStore() != null) {
this.timeout = conf.getForwardTimeout();
} else if (!this.store.isReachable()) {
onStoreUnreachable();
// cannot get Deadline or SlowLog instance here
// use SlowLogEmptyImpl instead to skip slow log record
onStoreUnreachable(SlowLogEmptyImpl.INSTANCE);
}
}
@ -130,7 +135,7 @@ public abstract class AbstractRegionStoreClient
}
@Override
public boolean onStoreUnreachable() {
public boolean onStoreUnreachable(SlowLog slowLog) {
if (!store.isValid()) {
logger.warn(String.format("store [%d] has been invalid", store.getId()));
store = regionManager.getStoreById(store.getId());
@ -148,13 +153,13 @@ public abstract class AbstractRegionStoreClient
}
// seek an available leader store to send request
Boolean result = seekLeaderStore();
Boolean result = seekLeaderStore(slowLog);
if (result != null) {
return result;
}
if (conf.getEnableGrpcForward()) {
// seek an available proxy store to forward request
return seekProxyStore();
return seekProxyStore(slowLog);
}
return false;
}
@ -187,8 +192,9 @@ public abstract class AbstractRegionStoreClient
}
}
private Boolean seekLeaderStore() {
private Boolean seekLeaderStore(SlowLog slowLog) {
Histogram.Timer switchLeaderDurationTimer = SEEK_LEADER_STORE_DURATION.startTimer();
SlowLogSpan slowLogSpan = slowLog.start("seekLeaderStore");
try {
List<Metapb.Peer> peers = region.getFollowerList();
if (peers.isEmpty()) {
@ -237,11 +243,13 @@ public abstract class AbstractRegionStoreClient
}
} finally {
switchLeaderDurationTimer.observeDuration();
slowLogSpan.end();
}
return null;
}
private boolean seekProxyStore() {
private boolean seekProxyStore(SlowLog slowLog) {
SlowLogSpan slowLogSpan = slowLog.start("seekProxyStore");
Histogram.Timer grpcForwardDurationTimer = SEEK_PROXY_STORE_DURATION.startTimer();
try {
logger.info(String.format("try grpc forward: region[%d]", region.getId()));
@ -259,6 +267,7 @@ public abstract class AbstractRegionStoreClient
return true;
} finally {
grpcForwardDurationTimer.observeDuration();
slowLogSpan.end();
}
}

View File

@ -17,11 +17,13 @@
package org.tikv.common.region;
import org.tikv.common.log.SlowLog;
public interface RegionErrorReceiver {
boolean onNotLeader(TiRegion region);
/// return whether we need to retry this request.
boolean onStoreUnreachable();
boolean onStoreUnreachable(SlowLog slowLog);
TiRegion getRegion();
}

View File

@ -17,6 +17,8 @@
package org.tikv.common.util;
import org.tikv.common.log.SlowLog;
public interface BackOffer {
// Back off types.
int seconds = 1000;
@ -58,4 +60,6 @@ public interface BackOffer {
// DecorrJitter increases the maximum jitter based on the last random value.
DecorrJitter
}
SlowLog getSlowLog();
}

View File

@ -29,6 +29,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogSpan;
public class ConcreteBackOffer implements BackOffer {
private static final Logger logger = LoggerFactory.getLogger(ConcreteBackOffer.class);
@ -37,6 +40,7 @@ public class ConcreteBackOffer implements BackOffer {
private final List<Exception> errors;
private int totalSleep;
private final long deadline;
private final SlowLog slowLog;
public static final Histogram BACKOFF_DURATION =
Histogram.build()
@ -45,7 +49,7 @@ public class ConcreteBackOffer implements BackOffer {
.labelNames("type")
.register();
private ConcreteBackOffer(int maxSleep, long deadline) {
private ConcreteBackOffer(int maxSleep, long deadline, SlowLog slowLog) {
Preconditions.checkArgument(
maxSleep == 0 || deadline == 0, "Max sleep time should be 0 or Deadline should be 0.");
Preconditions.checkArgument(maxSleep >= 0, "Max sleep time cannot be less than 0.");
@ -54,6 +58,7 @@ public class ConcreteBackOffer implements BackOffer {
this.errors = new ArrayList<>();
this.backOffFunctionMap = new HashMap<>();
this.deadline = deadline;
this.slowLog = slowLog;
}
private ConcreteBackOffer(ConcreteBackOffer source) {
@ -62,39 +67,40 @@ public class ConcreteBackOffer implements BackOffer {
this.errors = source.errors;
this.backOffFunctionMap = source.backOffFunctionMap;
this.deadline = source.deadline;
this.slowLog = source.slowLog;
}
public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs) {
public static ConcreteBackOffer newDeadlineBackOff(int timeoutInMs, SlowLog slowLog) {
long deadline = System.currentTimeMillis() + timeoutInMs;
return new ConcreteBackOffer(0, deadline);
return new ConcreteBackOffer(0, deadline, slowLog);
}
public static ConcreteBackOffer newCustomBackOff(int maxSleep) {
return new ConcreteBackOffer(maxSleep, 0);
return new ConcreteBackOffer(maxSleep, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newScannerNextMaxBackOff() {
return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0);
return new ConcreteBackOffer(SCANNER_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newBatchGetMaxBackOff() {
return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0);
return new ConcreteBackOffer(BATCH_GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newCopNextMaxBackOff() {
return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0);
return new ConcreteBackOffer(COP_NEXT_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newGetBackOff() {
return new ConcreteBackOffer(GET_MAX_BACKOFF, 0);
return new ConcreteBackOffer(GET_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newRawKVBackOff() {
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0);
return new ConcreteBackOffer(RAWKV_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer newTsoBackOff() {
return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0);
return new ConcreteBackOffer(TSO_MAX_BACKOFF, 0, SlowLogEmptyImpl.INSTANCE);
}
public static ConcreteBackOffer create(BackOffer source) {
@ -151,6 +157,7 @@ public class ConcreteBackOffer implements BackOffer {
}
public boolean canRetryAfterSleep(BackOffFunction.BackOffFuncType funcType, long maxSleepMs) {
SlowLogSpan slowLogSpan = getSlowLog().start("backoff " + funcType.name());
Histogram.Timer backOffTimer = BACKOFF_DURATION.labels(funcType.name()).startTimer();
BackOffFunction backOffFunction =
backOffFunctionMap.computeIfAbsent(funcType, this::createBackOffFunc);
@ -171,8 +178,10 @@ public class ConcreteBackOffer implements BackOffer {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new GrpcException(e);
} finally {
slowLogSpan.end();
backOffTimer.observeDuration();
}
backOffTimer.observeDuration();
if (maxSleep > 0 && totalSleep >= maxSleep) {
logger.warn(String.format("BackOffer.maxSleep %dms is exceeded, errors:", maxSleep));
return false;
@ -206,4 +215,9 @@ public class ConcreteBackOffer implements BackOffer {
// Use the last backoff type to generate an exception
throw new GrpcException("retry is exhausted.", err);
}
@Override
public SlowLog getSlowLog() {
return slowLog;
}
}

View File

@ -27,8 +27,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.log.SlowLog;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.log.SlowLogImpl;
import org.tikv.common.operation.iterator.RawScanIterator;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.RegionStoreClient.RegionStoreClientBuilder;
@ -129,10 +133,21 @@ public class RawKVClient implements AutoCloseable {
private void put(ByteString key, ByteString value, long ttl, boolean atomic) {
String label = "client_raw_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "put");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS());
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
client.rawPut(backOffer, key, value, ttl, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -147,6 +162,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -174,10 +190,21 @@ public class RawKVClient implements AutoCloseable {
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put_if_absent";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "putIfAbsent");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS());
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -192,6 +219,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -236,9 +264,18 @@ public class RawKVClient implements AutoCloseable {
private void batchPut(Map<ByteString, ByteString> kvPairs, long ttl, boolean atomic) {
String label = "client_raw_batch_put";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchPut");
put("keySize", String.valueOf(kvPairs.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS());
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchPut(backOffer, kvPairs, ttl, atomic, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -247,6 +284,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -259,10 +297,22 @@ public class RawKVClient implements AutoCloseable {
public ByteString get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "get");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS());
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
ByteString result = client.rawGet(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -277,6 +327,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -289,9 +340,18 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> batchGet(List<ByteString> keys) {
String label = "client_raw_batch_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchGet");
put("keySize", String.valueOf(keys.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchReadTimeoutInMS());
long deadline = System.currentTimeMillis() + conf.getRawKVBatchReadTimeoutInMS();
List<KvPair> result = doSendBatchGet(backOffer, keys, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -301,6 +361,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -325,9 +386,18 @@ public class RawKVClient implements AutoCloseable {
private void batchDelete(List<ByteString> keys, boolean atomic) {
String label = "client_raw_batch_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVBatchWriteSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "batchDelete");
put("keySize", String.valueOf(keys.size()));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVBatchWriteTimeoutInMS());
long deadline = System.currentTimeMillis() + conf.getRawKVBatchWriteTimeoutInMS();
doSendBatchDelete(backOffer, keys, atomic, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -337,6 +407,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -350,10 +421,21 @@ public class RawKVClient implements AutoCloseable {
public Long getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVReadSlowLogInMS(),
new HashMap<String, String>(2) {
{
put("func", "getKeyTTL");
put("key", KeyUtils.formatBytesUTF8(key));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVReadTimeoutInMS());
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
Long result = client.rawGetKeyTTL(backOffer, key);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -368,6 +450,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -445,8 +528,21 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> scan(ByteString startKey, ByteString endKey, int limit, boolean keyOnly) {
String label = "client_raw_scan";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(5) {
{
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
put("limit", String.valueOf(limit));
put("keyOnly", String.valueOf(keyOnly));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS());
Iterator<KvPair> iterator =
rawScanIterator(conf, clientBuilder, startKey, endKey, limit, keyOnly, backOffer);
List<KvPair> result = new ArrayList<>();
@ -458,6 +554,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -506,18 +603,37 @@ public class RawKVClient implements AutoCloseable {
public List<KvPair> scan(ByteString startKey, ByteString endKey, boolean keyOnly) {
String label = "client_raw_scan_without_limit";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVScanSlowLogInMS(),
new HashMap<String, String>(4) {
{
put("func", "scan");
put("startKey", KeyUtils.formatBytesUTF8(startKey));
put("endKey", KeyUtils.formatBytesUTF8(endKey));
put("keyOnly", String.valueOf(keyOnly));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVScanTimeoutInMS());
ByteString newStartKey = startKey;
List<KvPair> result = new ArrayList<>();
while (true) {
Iterator<KvPair> iterator =
rawScanIterator(
conf, clientBuilder, startKey, endKey, conf.getScanBatchSize(), keyOnly, backOffer);
conf,
clientBuilder,
newStartKey,
endKey,
conf.getScanBatchSize(),
keyOnly,
backOffer);
if (!iterator.hasNext()) {
break;
}
iterator.forEachRemaining(result::add);
startKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
newStartKey = Key.toRawKey(result.get(result.size() - 1).getKey()).next().toByteString();
}
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
@ -526,6 +642,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -578,10 +695,22 @@ public class RawKVClient implements AutoCloseable {
private void delete(ByteString key, boolean atomic) {
String label = "client_raw_delete";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
SlowLog slowLog =
new SlowLogImpl(
conf.getRawKVWriteSlowLogInMS(),
new HashMap<String, String>(3) {
{
put("func", "delete");
put("key", KeyUtils.formatBytesUTF8(key));
put("atomic", String.valueOf(atomic));
}
});
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS(), slowLog);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVWriteTimeoutInMS());
while (true) {
RegionStoreClient client = clientBuilder.build(key, backOffer);
slowLog.addProperty("region", client.getRegion().toString());
try {
client.rawDelete(backOffer, key, atomic);
RAW_REQUEST_SUCCESS.labels(label).inc();
@ -596,6 +725,7 @@ public class RawKVClient implements AutoCloseable {
throw e;
} finally {
requestTimer.observeDuration();
slowLog.log();
}
}
@ -611,8 +741,10 @@ public class RawKVClient implements AutoCloseable {
public synchronized void deleteRange(ByteString startKey, ByteString endKey) {
String label = "client_raw_delete_range";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
ConcreteBackOffer backOffer =
ConcreteBackOffer.newDeadlineBackOff(
conf.getRawKVCleanTimeoutInMS(), SlowLogEmptyImpl.INSTANCE);
try {
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(conf.getRawKVCleanTimeoutInMS());
long deadline = System.currentTimeMillis() + conf.getRawKVCleanTimeoutInMS();
doSendDeleteRange(backOffer, startKey, endKey, deadline);
RAW_REQUEST_SUCCESS.labels(label).inc();

View File

@ -19,6 +19,7 @@ import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.log.SlowLogEmptyImpl;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffer;
import org.tikv.common.util.ConcreteBackOffer;
@ -187,7 +188,7 @@ public class RawKVClientTest {
public void testDeadlineBackOff() {
int timeout = 2000;
int sleep = 150;
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout);
BackOffer backOffer = ConcreteBackOffer.newDeadlineBackOff(timeout, SlowLogEmptyImpl.INSTANCE);
long s = System.currentTimeMillis();
try {
while (true) {