mirror of https://github.com/tikv/client-java.git
fix KeyException no retry & no detail error msg(#778)
This commit is contained in:
parent
aedfd66b88
commit
2be0907330
12
pom.xml
12
pom.xml
|
@ -265,6 +265,18 @@
|
||||||
<artifactId>simpleclient_pushgateway</artifactId>
|
<artifactId>simpleclient_pushgateway</artifactId>
|
||||||
<version>0.10.0</version>
|
<version>0.10.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.powermock</groupId>
|
||||||
|
<artifactId>powermock-module-junit4</artifactId>
|
||||||
|
<version>2.0.2</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.powermock</groupId>
|
||||||
|
<artifactId>powermock-api-mockito2</artifactId>
|
||||||
|
<version>2.0.2</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
<reporting>
|
<reporting>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
|
|
@ -31,7 +31,7 @@ public class KeyException extends TiKVException {
|
||||||
}
|
}
|
||||||
|
|
||||||
public KeyException(Kvrpcpb.KeyError keyErr) {
|
public KeyException(Kvrpcpb.KeyError keyErr) {
|
||||||
super("Key exception occurred");
|
super("Key exception occurred " + keyErr.toString());
|
||||||
this.keyErr = keyErr;
|
this.keyErr = keyErr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -519,7 +519,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
|
||||||
Lock lock = new Lock(err.getLocked(), codec);
|
Lock lock = new Lock(err.getLocked(), codec);
|
||||||
locks.add(lock);
|
locks.add(lock);
|
||||||
} else {
|
} else {
|
||||||
throw new KeyException(err.toString());
|
throw new KeyException(err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isSuccess) {
|
if (isSuccess) {
|
||||||
|
|
|
@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.tikv.common.ReadOnlyPDClient;
|
import org.tikv.common.ReadOnlyPDClient;
|
||||||
import org.tikv.common.TiConfiguration;
|
import org.tikv.common.TiConfiguration;
|
||||||
import org.tikv.common.exception.GrpcException;
|
import org.tikv.common.exception.GrpcException;
|
||||||
import org.tikv.common.exception.KeyException;
|
|
||||||
import org.tikv.common.exception.RegionException;
|
import org.tikv.common.exception.RegionException;
|
||||||
import org.tikv.common.exception.TiClientInternalException;
|
import org.tikv.common.exception.TiClientInternalException;
|
||||||
import org.tikv.common.exception.TiKVException;
|
import org.tikv.common.exception.TiKVException;
|
||||||
|
@ -181,7 +180,6 @@ public class TxnKVClient implements AutoCloseable {
|
||||||
// TODO: check this logic to see are we satisfied?
|
// TODO: check this logic to see are we satisfied?
|
||||||
private boolean retryableException(Exception e) {
|
private boolean retryableException(Exception e) {
|
||||||
return e instanceof TiClientInternalException
|
return e instanceof TiClientInternalException
|
||||||
|| e instanceof KeyException
|
|
||||||
|| e instanceof RegionException
|
|| e instanceof RegionException
|
||||||
|| e instanceof StatusRuntimeException;
|
|| e instanceof StatusRuntimeException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,17 +20,40 @@ package org.tikv.txn;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.powermock.api.mockito.PowerMockito;
|
||||||
|
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||||
|
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||||
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
import org.tikv.BaseTxnKVTest;
|
import org.tikv.BaseTxnKVTest;
|
||||||
|
import org.tikv.common.BytePairWrapper;
|
||||||
import org.tikv.common.TiConfiguration;
|
import org.tikv.common.TiConfiguration;
|
||||||
import org.tikv.common.TiSession;
|
import org.tikv.common.TiSession;
|
||||||
|
import org.tikv.common.exception.KeyException;
|
||||||
|
import org.tikv.common.util.BackOffer;
|
||||||
|
import org.tikv.common.util.ConcreteBackOffer;
|
||||||
|
|
||||||
|
@RunWith(PowerMockRunner.class)
|
||||||
|
@PrepareForTest({TiSession.class, TxnKVClient.class, TwoPhaseCommitter.class})
|
||||||
|
@PowerMockIgnore({"javax.net.ssl.*"})
|
||||||
public class TwoPhaseCommitterTest extends BaseTxnKVTest {
|
public class TwoPhaseCommitterTest extends BaseTxnKVTest {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(TwoPhaseCommitterTest.class);
|
||||||
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
|
private static final int WRITE_BUFFER_SIZE = 32 * 1024;
|
||||||
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024;
|
private static final int TXN_COMMIT_BATCH_SIZE = 768 * 1024;
|
||||||
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000;
|
private static final long DEFAULT_BATCH_WRITE_LOCK_TTL = 3600000;
|
||||||
|
@ -76,4 +99,170 @@ public class TwoPhaseCommitterTest extends BaseTxnKVTest {
|
||||||
executorService)) {}
|
executorService)) {}
|
||||||
Assert.assertTrue(executorService.isShutdown());
|
Assert.assertTrue(executorService.isShutdown());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void prewriteWriteConflictFastFailTest() throws Exception {
|
||||||
|
|
||||||
|
int WRITE_BUFFER_SIZE = 32;
|
||||||
|
String primaryKey = RandomStringUtils.randomAlphabetic(3);
|
||||||
|
AtomicLong failCount = new AtomicLong();
|
||||||
|
ExecutorService executorService =
|
||||||
|
Executors.newFixedThreadPool(
|
||||||
|
WRITE_BUFFER_SIZE,
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
|
||||||
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
|
||||||
|
new Thread(
|
||||||
|
() -> {
|
||||||
|
long startTS = session.getTimestamp().getVersion();
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
TwoPhaseCommitter twoPhaseCommitter =
|
||||||
|
new TwoPhaseCommitter(
|
||||||
|
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
|
||||||
|
List<BytePairWrapper> pairs =
|
||||||
|
Arrays.asList(
|
||||||
|
new BytePairWrapper(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8),
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
twoPhaseCommitter.prewriteSecondaryKeys(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
|
||||||
|
|
||||||
|
long commitTs = session.getTimestamp().getVersion();
|
||||||
|
|
||||||
|
twoPhaseCommitter.commitPrimaryKey(
|
||||||
|
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
failCount.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.start();
|
||||||
|
|
||||||
|
Thread.sleep(10);
|
||||||
|
new Thread(
|
||||||
|
() -> {
|
||||||
|
long startTS = session.getTimestamp().getVersion();
|
||||||
|
try {
|
||||||
|
TwoPhaseCommitter twoPhaseCommitter =
|
||||||
|
new TwoPhaseCommitter(
|
||||||
|
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
|
||||||
|
List<BytePairWrapper> pairs =
|
||||||
|
Arrays.asList(
|
||||||
|
new BytePairWrapper(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8),
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
twoPhaseCommitter.prewriteSecondaryKeys(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
|
||||||
|
|
||||||
|
long commitTs = session.getTimestamp().getVersion();
|
||||||
|
|
||||||
|
twoPhaseCommitter.commitPrimaryKey(
|
||||||
|
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
failCount.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.start();
|
||||||
|
latch.await();
|
||||||
|
Assert.assertEquals(1, failCount.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void prewriteWriteConflictLongNoFailTest() throws Exception {
|
||||||
|
|
||||||
|
int WRITE_BUFFER_SIZE = 32;
|
||||||
|
String primaryKey = RandomStringUtils.randomAlphabetic(3);
|
||||||
|
AtomicLong failCount = new AtomicLong();
|
||||||
|
ExecutorService executorService =
|
||||||
|
Executors.newFixedThreadPool(
|
||||||
|
WRITE_BUFFER_SIZE,
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("2pc-pool-%d").setDaemon(true).build());
|
||||||
|
CountDownLatch latch = new CountDownLatch(2);
|
||||||
|
int DEFAULT_BATCH_WRITE_LOCK_TTL = 10000;
|
||||||
|
|
||||||
|
new Thread(
|
||||||
|
() -> {
|
||||||
|
long startTS = session.getTimestamp().getVersion();
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
session = PowerMockito.spy(session);
|
||||||
|
TxnKVClient kvClient = PowerMockito.spy(session.createTxnClient());
|
||||||
|
PowerMockito.when(kvClient, "retryableException", Mockito.any(KeyException.class))
|
||||||
|
.thenReturn(true);
|
||||||
|
PowerMockito.doReturn(kvClient).when(session).createTxnClient();
|
||||||
|
|
||||||
|
TwoPhaseCommitter twoPhaseCommitter =
|
||||||
|
new TwoPhaseCommitter(
|
||||||
|
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
|
||||||
|
List<BytePairWrapper> pairs =
|
||||||
|
Arrays.asList(
|
||||||
|
new BytePairWrapper(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8),
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
twoPhaseCommitter.prewriteSecondaryKeys(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
|
||||||
|
|
||||||
|
long commitTs = session.getTimestamp().getVersion();
|
||||||
|
|
||||||
|
twoPhaseCommitter.commitPrimaryKey(
|
||||||
|
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
failCount.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.start();
|
||||||
|
|
||||||
|
Thread.sleep(10);
|
||||||
|
new Thread(
|
||||||
|
() -> {
|
||||||
|
long startTS = session.getTimestamp().getVersion();
|
||||||
|
try {
|
||||||
|
TwoPhaseCommitter twoPhaseCommitter =
|
||||||
|
new TwoPhaseCommitter(
|
||||||
|
session, startTS, DEFAULT_BATCH_WRITE_LOCK_TTL, executorService);
|
||||||
|
List<BytePairWrapper> pairs =
|
||||||
|
Arrays.asList(
|
||||||
|
new BytePairWrapper(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8),
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8)));
|
||||||
|
twoPhaseCommitter.prewriteSecondaryKeys(
|
||||||
|
primaryKey.getBytes(StandardCharsets.UTF_8), pairs.iterator(), 20000);
|
||||||
|
BackOffer backOffer = ConcreteBackOffer.newCustomBackOff(20000);
|
||||||
|
|
||||||
|
long commitTs = session.getTimestamp().getVersion();
|
||||||
|
|
||||||
|
twoPhaseCommitter.commitPrimaryKey(
|
||||||
|
backOffer, primaryKey.getBytes(StandardCharsets.UTF_8), commitTs);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e.getMessage(), e);
|
||||||
|
failCount.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.start();
|
||||||
|
latch.await();
|
||||||
|
Assert.assertEquals(1, failCount.get());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue