[close #699] Batch pick from release-3.1.1.11 (#708)

* [close #626] remove nix-shell (#627)

Signed-off-by: shiyuhang <1136742008@qq.com>

* fix wrong backoffer for pd client

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

* add TsoBatchUsedUp region error handler

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

* fix license header

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

* upgrade grpc-netty to 1.48.0

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

* Update pom.xml

Upgrade commons-codec as well

Signed-off-by: shiyuhang <1136742008@qq.com>

* upgrade protobuf

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

* upgrade protobuf to 3.19.6

Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Signed-off-by: shiyuhang <1136742008@qq.com>

Signed-off-by: shiyuhang <1136742008@qq.com>
Signed-off-by: iosmanthus <myosmanthustree@gmail.com>
Co-authored-by: iosmanthus <dengliming@pingcap.com>
Co-authored-by: iosmanthus <myosmanthustree@gmail.com>
Co-authored-by: Xiaoguang Sun <sunxiaoguang@users.noreply.github.com>
This commit is contained in:
shi yuhang 2022-12-30 20:20:09 +08:00 committed by GitHub
parent 30930e22e4
commit 6b21aa6902
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 163 additions and 74 deletions

11
pom.xml
View File

@ -57,7 +57,7 @@
<protobuf.version>3.5.1</protobuf.version>
<log4j.version>1.2.17</log4j.version>
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.38.0</grpc.version>
<grpc.version>1.48.0</grpc.version>
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<gson.version>2.8.9</gson.version>
<powermock.version>1.6.6</powermock.version>
@ -75,12 +75,12 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.16.1</version>
<version>3.19.6</version>
</dependency>
<dependency>
<groupId>io.perfmark</groupId>
@ -232,6 +232,11 @@
<version>3.9</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>

View File

@ -1,14 +0,0 @@
{ pkgs ? import <nixpkgs> {} }:
(
pkgs.buildFHSUserEnv {
name = "client-java-shell";
targetPkgs = pkgs: with pkgs;[ git maven openjdk8 ];
runScript = ''
env \
GIT_SSL_CAINFO=/etc/ssl/certs/ca-certificates.crt \
JAVA_HOME=${pkgs.openjdk8}/lib/openjdk \
bash
'';
}
).env

View File

@ -24,6 +24,7 @@ import static io.grpc.Contexts.statusFromCancelled;
import static io.grpc.Status.DEADLINE_EXCEEDED;
import static io.grpc.internal.GrpcUtil.CONTENT_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
import static java.lang.Math.max;
@ -33,6 +34,7 @@ import com.google.common.base.MoreObjects;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.CompressorRegistry;
@ -166,6 +168,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
DecompressorRegistry decompressorRegistry,
Compressor compressor,
boolean fullStreamDecompression) {
headers.discardAll(CONTENT_LENGTH_KEY);
headers.discardAll(MESSAGE_ENCODING_KEY);
if (compressor != Codec.Identity.NONE) {
headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
@ -260,10 +263,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
} else {
ClientStreamTracer[] tracers =
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
stream =
new FailingClientStream(
DEADLINE_EXCEEDED.withDescription(
"ClientCall started after deadline exceeded: " + effectiveDeadline));
"ClientCall started after deadline exceeded: " + effectiveDeadline),
tracers);
}
if (callExecutorIsDirect) {
@ -363,12 +369,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
StringBuilder builder =
new StringBuilder(
String.format(
"Call timeout set to '%d' ns, due to context deadline.", effectiveTimeout));
Locale.US,
"Call timeout set to '%d' ns, due to context deadline.",
effectiveTimeout));
if (callDeadline == null) {
builder.append(" Explicit call timeout was not set.");
} else {
long callTimeout = callDeadline.timeRemaining(TimeUnit.NANOSECONDS);
builder.append(String.format(" Explicit call timeout was '%d' ns.", callTimeout));
builder.append(String.format(Locale.US, " Explicit call timeout was '%d' ns.", callTimeout));
}
log.fine(builder.toString());
@ -562,6 +570,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
@Override
public boolean isReady() {
if (halfCloseCalled) {
return false;
}
return stream.isReady();
}
@ -711,11 +722,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}
}
@Override
public void closed(Status status, Metadata trailers) {
closed(status, RpcProgress.PROCESSED, trailers);
}
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
PerfMark.startTask("ClientStreamListener.closed", tag);

View File

@ -57,7 +57,6 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
@ -97,7 +96,6 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
super(pageSize, pageShifts, chunkSize, cacheAlignment);
this.parent = parent;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
numSmallSubpagePools = nSubpages;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
@ -183,9 +181,9 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
return;
}
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link
* PoolChunk#free(long)} may modify the doubly linked list as well.
/*
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
final PoolSubpage<T> head = smallSubpagePools[sizeIdx];
final boolean needsNormalAllocation;
@ -193,7 +191,13 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
final PoolSubpage<T> s = head.next;
needsNormalAllocation = s == head;
if (!needsNormalAllocation) {
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx);
assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx)
: "doNotDestroy="
+ s.doNotDestroy
+ ", elemSize="
+ s.elemSize
+ ", sizeIdx="
+ sizeIdx;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
@ -221,7 +225,7 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
}
}
// Method must be called inside synchronized(this) { ... } block
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(
PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {
if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache)
@ -272,7 +276,7 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
}
}
private SizeClass sizeClass(long handle) {
private static SizeClass sizeClass(long handle) {
return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;
}
@ -499,6 +503,25 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
return max(0, val);
}
/**
* Return the number of bytes that are currently pinned to buffer instances, by the arena. The
* pinned memory is not accessible for use by any other allocation, until the buffers using have
* all been released.
*/
public long numPinnedBytes() {
long val =
activeBytesHuge
.value(); // Huge chunks are exact-sized for the buffers they were allocated to.
synchronized (this) {
for (int i = 0; i < chunkListMetrics.size(); i++) {
for (PoolChunkMetric m : chunkListMetrics.get(i)) {
val += ((PoolChunk<?>) m).pinnedBytes();
}
}
}
return max(0, val);
}
protected abstract PoolChunk<T> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
@ -588,13 +611,8 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
static final class HeapArena extends PoolArena<byte[]> {
HeapArena(
PooledByteBufAllocator parent,
int pageSize,
int pageShifts,
int chunkSize,
int directMemoryCacheAlignment) {
super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment);
HeapArena(PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize) {
super(parent, pageSize, pageShifts, chunkSize, 0);
}
private static byte[] newByteArray(int size) {
@ -610,12 +628,12 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
protected PoolChunk<byte[]> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(
this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);
}
@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
return new PoolChunk<byte[]>(this, null, newByteArray(capacity), capacity);
}
@Override
@ -656,40 +674,33 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
return true;
}
// mark as package-private, only for unit test
int offsetCacheLine(ByteBuffer memory) {
// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...)
// will
// throw an NPE.
int remainder =
HAS_UNSAFE
? (int)
(PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
: 0;
// offset = alignment - address & (alignment - 1)
return directMemoryCacheAlignment - remainder;
}
@Override
protected PoolChunk<ByteBuffer> newChunk(
int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
ByteBuffer memory = allocateDirect(chunkSize);
return new PoolChunk<ByteBuffer>(
this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0);
this, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);
final ByteBuffer base = allocateDirect(chunkSize + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(
this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory));
this, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity, 0);
ByteBuffer memory = allocateDirect(capacity);
return new PoolChunk<ByteBuffer>(this, memory, memory, capacity);
}
final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity, offsetCacheLine(memory));
final ByteBuffer base = allocateDirect(capacity + directMemoryCacheAlignment);
final ByteBuffer memory =
PlatformDependent.alignDirectBuffer(base, directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, base, memory, capacity);
}
private static ByteBuffer allocateDirect(int capacity) {
@ -701,9 +712,9 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
PlatformDependent.freeDirectNoCleaner((ByteBuffer) chunk.base);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
PlatformDependent.freeDirectBuffer((ByteBuffer) chunk.base);
}
}

View File

@ -193,13 +193,17 @@ public abstract class AbstractGRPCClient<
HealthCheckResponse resp = stub.check(req);
return resp.getStatus() == HealthCheckResponse.ServingStatus.SERVING;
} catch (Exception e) {
logger.warn("check health failed.", e);
logger.warn("check health failed, addr: {}, caused by: {}", addressStr, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoCheckHealth, e);
}
}
}
protected boolean checkHealth(BackOffer backOffer, String addressStr, HostMapping hostMapping) {
return doCheckHealth(backOffer, addressStr, hostMapping);
try {
return doCheckHealth(backOffer, addressStr, hostMapping);
} catch (Exception e) {
return false;
}
}
}

View File

@ -462,14 +462,19 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
}
return resp;
} catch (Exception e) {
logger.warn("failed to get member from pd server.", e);
logger.warn(
"failed to get member from pd server from {}, caused by: {}", uri, e.getMessage());
backOffer.doBackOff(BackOffFuncType.BoPDRPC, e);
}
}
}
private GetMembersResponse getMembers(BackOffer backOffer, URI uri) {
return doGetMembers(backOffer, uri);
try {
return doGetMembers(backOffer, uri);
} catch (Exception e) {
return null;
}
}
// return whether the leader has changed to target address `leaderUrlStr`.
@ -524,13 +529,16 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
public void tryUpdateLeaderOrForwardFollower() {
if (updateLeaderNotify.compareAndSet(false, true)) {
try {
BackOffer backOffer = defaultBackOffer();
updateLeaderService.submit(
() -> {
try {
updateLeaderOrForwardFollower(backOffer);
updateLeaderOrForwardFollower();
} catch (Exception e) {
logger.info("update leader or forward follower failed", e);
throw e;
} finally {
updateLeaderNotify.set(false);
logger.info("updating leader finish");
}
});
} catch (RejectedExecutionException e) {
@ -540,11 +548,13 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
}
}
private synchronized void updateLeaderOrForwardFollower(BackOffer backOffer) {
private synchronized void updateLeaderOrForwardFollower() {
logger.warn("updating leader or forward follower");
if (System.currentTimeMillis() - lastUpdateLeaderTime < MIN_TRY_UPDATE_DURATION) {
return;
}
for (URI url : this.pdAddrs) {
BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
@ -602,8 +612,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
}
public void tryUpdateLeader() {
logger.info("try update leader");
for (URI url : this.pdAddrs) {
BackOffer backOffer = defaultBackOffer();
BackOffer backOffer = this.probeBackOffer();
// since resp is null, we need update leader's address by walking through all pd server.
GetMembersResponse resp = getMembers(backOffer, url);
if (resp == null) {
@ -856,4 +867,9 @@ public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDFutureStub>
private static BackOffer defaultBackOffer() {
return ConcreteBackOffer.newCustomBackOff(BackOffer.PD_INFO_BACKOFF);
}
private BackOffer probeBackOffer() {
int maxSleep = (int) getTimeout() * 2;
return ConcreteBackOffer.newCustomBackOff(maxSleep);
}
}

View File

@ -31,6 +31,7 @@ import org.tikv.common.region.RegionErrorReceiver;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiRegion;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.BackOffFunction.BackOffFuncType;
import org.tikv.common.util.BackOffer;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.Metapb;
@ -168,6 +169,12 @@ public class RegionErrorHandler<RespT> implements ErrorHandler<RespT> {
regionManager.clearRegionCache();
throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString()));
}
// The tso cache is used up in TiKV servers, we should backoff and wait its cache is renewed.
else if (error.getMessage().contains("TsoBatchUsedUp")) {
logger.warn(String.format("tso batch used up for region [%s]", recv.getRegion()));
backOffer.doBackOff(BackOffFuncType.BoTsoBatchUsedUp, new GrpcException(error.getMessage()));
return true;
}
logger.warn(String.format("Unknown error %s for region [%s]", error, recv.getRegion()));
// For other errors, we only drop cache here.

View File

@ -81,6 +81,7 @@ public class BackOffFunction {
BoServerBusy,
BoTxnNotFound,
BoCheckTimeout,
BoCheckHealth
BoCheckHealth,
BoTsoBatchUsedUp
}
}

View File

@ -174,6 +174,13 @@ public class ConcreteBackOffer implements BackOffer {
case BoCheckHealth:
backOffFunction = BackOffFunction.create(100, 600, BackOffStrategy.EqualJitter);
break;
case BoTsoBatchUsedUp:
backOffFunction =
BackOffFunction.create(
TiConfiguration.getInt(TIKV_BO_REGION_MISS_BASE_IN_MS),
500,
BackOffStrategy.NoJitter);
break;
}
return backOffFunction;
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.tikv.common;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.tikv.kvproto.Errorpb.Error;
import org.tikv.raw.RawKVClient;
public class TsoBatchUsedUpTest extends MockThreeStoresTest {
RawKVClient createClient() {
return session.createRawClient();
}
@Test
public void testTsoBatchUsedUp() {
ByteString key = ByteString.copyFromUtf8("tso");
servers.get(0).putError("tso", () -> Error.newBuilder().setMessage("TsoBatchUsedUp"));
try (RawKVClient client = createClient()) {
try {
client.put(key, ByteString.EMPTY);
Assert.fail();
} catch (Exception ignore) {
}
pdServers.get(0).addGetRegionListener(request -> null);
// Will not clean region cache
Assert.assertNotNull(session.getRegionManager().getRegionByKey(key));
}
}
}