Add CompareAndSet for RawClient and make Get returns Optional (#192)

* Add CompareAndSet for RawClient and make Get returns Optional

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Apply suggestions from code review

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>

* Format code

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Add putIfAbsent

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Rename sth

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Remove .vscode

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Remove .settings

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Delete .classpath

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Fix deadloop

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Enable TTL and CAS test

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* Fix test

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

* rebase #202

Signed-off-by: Andy Lok <andylokandy@hotmail.com>

Co-authored-by: Liangliang Gu <marsishandsome@gmail.com>
Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
This commit is contained in:
Andy Lok 2021-06-18 14:00:05 +08:00 committed by GitHub
parent c6d7f0478c
commit 8308d796e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 190 additions and 75 deletions

View File

@ -1,8 +1,8 @@
def call(ghprbActualCommit, ghprbPullId, ghprbPullTitle, ghprbPullLink, ghprbPullDescription, credentialsId) {
def TIDB_BRANCH = "release-4.0"
def TIKV_BRANCH = "release-4.0"
def PD_BRANCH = "release-4.0"
def TIDB_BRANCH = "release-5.0"
def TIKV_BRANCH = "release-5.0"
def PD_BRANCH = "release-5.0"
// parse tidb branch
def m1 = ghprbCommentBody =~ /tidb\s*=\s*([^\s\\]+)(\s|\\|$)/

1
.gitignore vendored
View File

@ -5,6 +5,7 @@ pub.sh
# ignore compiled classes
target
.classpath
# ignore version info
src/main/java/com/pingcap/tikv/TiVersion.java

View File

@ -3,3 +3,6 @@
[raftstore]
# set store capacity, if no set, use disk capacity.
capacity = "8G"
[storage]
enable-ttl = true

View File

@ -57,6 +57,8 @@
</scm>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<protobuf.version>3.5.1</protobuf.version>
@ -64,7 +66,7 @@
<slf4j.version>1.7.16</slf4j.version>
<grpc.version>1.24.0</grpc.version>
<powermock.version>1.6.6</powermock.version>
<jackson.version>2.10.0</jackson.version>
<jackson.version>2.12.3</jackson.version>
<trove4j.version>3.0.1</trove4j.version>
<jetcd.version>0.4.1</jetcd.version>
<joda-time.version>2.9.9</joda-time.version>
@ -386,7 +388,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<version>3.0.1</version>
<configuration>
<skip>${javadoc.skip}</skip>
</configuration>

View File

@ -0,0 +1,50 @@
/*
* Copyright 2021 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.tikv.common.exception;
import com.google.protobuf.ByteString;
import java.util.Optional;
import org.tikv.common.codec.KeyUtils;
public class RawCASConflictException extends RuntimeException {
private final ByteString key;
private final Optional<ByteString> expectedPrevValue;
private final Optional<ByteString> prevValue;
public RawCASConflictException(
ByteString key, Optional<ByteString> expectedPrevValue, Optional<ByteString> prevValue) {
super(
String.format(
"key=%s expectedPrevValue=%s prevValue=%s",
KeyUtils.formatBytes(key), expectedPrevValue, prevValue));
this.key = key;
this.expectedPrevValue = expectedPrevValue;
this.prevValue = prevValue;
}
public ByteString getKey() {
return this.key;
}
public Optional<ByteString> getExpectedPrevValue() {
return this.expectedPrevValue;
}
public Optional<ByteString> getPrevValue() {
return this.prevValue;
}
}

View File

@ -797,7 +797,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
// APIs for Raw Scan/Put/Get/Delete
public ByteString rawGet(BackOffer backOffer, ByteString key) {
public Optional<ByteString> rawGet(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get").startTimer();
try {
@ -817,7 +817,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
private ByteString rawGetHelper(RawGetResponse resp) {
private Optional<ByteString> rawGetHelper(RawGetResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
@ -829,10 +829,14 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
return resp.getValue();
if (resp.getNotFound()) {
return Optional.empty();
} else {
return Optional.of(resp.getValue());
}
}
public Long rawGetKeyTTL(BackOffer backOffer, ByteString key) {
public Optional<Long> rawGetKeyTTL(BackOffer backOffer, ByteString key) {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_get_key_ttl").startTimer();
try {
@ -853,7 +857,7 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
private Long rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
private Optional<Long> rawGetKeyTTLHelper(RawGetKeyTTLResponse resp) {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawGetResponse failed without a cause");
@ -866,9 +870,9 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
throw new RegionException(resp.getRegionError());
}
if (resp.getNotFound()) {
return null;
return Optional.empty();
}
return resp.getTtl();
return Optional.of(resp.getTtl());
}
public void rawDelete(BackOffer backOffer, ByteString key) {
@ -944,8 +948,13 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
}
}
public ByteString rawPutIfAbsent(
BackOffer backOffer, ByteString key, ByteString value, long ttl) {
public void rawCompareAndSet(
BackOffer backOffer,
ByteString key,
Optional<ByteString> prevValue,
ByteString value,
long ttl)
throws RawCASConflictException {
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_put_if_absent").startTimer();
try {
@ -955,7 +964,8 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
.setContext(region.getReplicaContext(storeType))
.setKey(key)
.setValue(value)
.setPreviousNotExist(true)
.setPreviousValue(prevValue.orElse(ByteString.EMPTY))
.setPreviousNotExist(!prevValue.isPresent())
.setTtl(ttl)
.build();
@ -964,13 +974,15 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
regionManager, this, resp -> resp.hasRegionError() ? resp.getRegionError() : null);
RawCASResponse resp =
callWithRetry(backOffer, TikvGrpc.getRawCompareAndSwapMethod(), factory, handler);
return rawPutIfAbsentHelper(resp);
rawCompareAndSetHelper(key, prevValue, resp);
} finally {
requestTimer.observeDuration();
}
}
private ByteString rawPutIfAbsentHelper(RawCASResponse resp) {
private void rawCompareAndSetHelper(
ByteString key, Optional<ByteString> expectedPrevValue, RawCASResponse resp)
throws RawCASConflictException {
if (resp == null) {
this.regionManager.onRequestFail(region);
throw new TiClientInternalException("RawPutResponse failed without a cause");
@ -982,10 +994,14 @@ public class RegionStoreClient extends AbstractRegionStoreClient {
if (resp.hasRegionError()) {
throw new RegionException(resp.getRegionError());
}
if (resp.getSucceed()) {
return ByteString.EMPTY;
if (!resp.getSucceed()) {
if (resp.getPreviousNotExist()) {
throw new RawCASConflictException(key, expectedPrevValue, Optional.empty());
} else {
throw new RawCASConflictException(
key, expectedPrevValue, Optional.of(resp.getPreviousValue()));
}
}
return resp.getPreviousValue();
}
public List<KvPair> rawBatchGet(BackOffer backoffer, List<ByteString> keys) {

View File

@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.operation.iterator.RawScanIterator;
@ -139,10 +140,10 @@ public class RawKVClient implements AutoCloseable {
*
* @param key key
* @param value value
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
* previous key if the value already exists, and does not write to TiKV.
*/
public ByteString putIfAbsent(ByteString key, ByteString value) {
public Optional<ByteString> putIfAbsent(ByteString key, ByteString value) {
return putIfAbsent(key, value, 0L);
}
@ -152,20 +153,49 @@ public class RawKVClient implements AutoCloseable {
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
* @return a ByteString. returns ByteString.EMPTY if the value is written successfully. returns
* the previous key if the value already exists, and does not write to TiKV.
* @return a ByteString. returns Optional.EMPTY if the value is written successfully. returns the
* previous key if the value already exists, and does not write to TiKV.
*/
public ByteString putIfAbsent(ByteString key, ByteString value, long ttl) {
String label = "client_raw_put_if_absent";
public Optional<ByteString> putIfAbsent(ByteString key, ByteString value, long ttl) {
try {
compareAndSet(key, Optional.empty(), value, ttl);
return Optional.empty();
} catch (RawCASConflictException e) {
return e.getPrevValue();
}
}
/**
* Put a key-value pair if the prevValue matched the value in TiKV. This API is atomic.
*
* @param key key
* @param value value
*/
public void compareAndSet(ByteString key, Optional<ByteString> prevValue, ByteString value)
throws RawCASConflictException {
compareAndSet(key, prevValue, value, 0L);
}
/**
* pair if the prevValue matched the value in TiKV. This API is atomic.
*
* @param key key
* @param value value
* @param ttl TTL of key (in seconds), 0 means the key will never be outdated.
*/
public void compareAndSet(
ByteString key, Optional<ByteString> prevValue, ByteString value, long ttl)
throws RawCASConflictException {
String label = "client_raw_compare_and_set";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
BackOffer backOffer = defaultBackOff();
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawPutIfAbsent(backOffer, key, value, ttl);
client.rawCompareAndSet(backOffer, key, prevValue, value, ttl);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
return;
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
@ -236,7 +266,7 @@ public class RawKVClient implements AutoCloseable {
* @param key raw key
* @return a ByteString value if key exists, ByteString.EMPTY if key does not exist
*/
public ByteString get(ByteString key) {
public Optional<ByteString> get(ByteString key) {
String label = "client_raw_get";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
@ -244,7 +274,7 @@ public class RawKVClient implements AutoCloseable {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
ByteString result = client.rawGet(defaultBackOff(), key);
Optional<ByteString> result = client.rawGet(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {
@ -322,7 +352,7 @@ public class RawKVClient implements AutoCloseable {
* @return a Long indicating the TTL of key ttl is a non-null long value indicating TTL if key
* exists. - ttl=0 if the key will never be outdated. - ttl=null if the key does not exist
*/
public Long getKeyTTL(ByteString key) {
public Optional<Long> getKeyTTL(ByteString key) {
String label = "client_raw_get_key_ttl";
Histogram.Timer requestTimer = RAW_REQUEST_LATENCY.labels(label).startTimer();
try {
@ -330,7 +360,7 @@ public class RawKVClient implements AutoCloseable {
while (true) {
RegionStoreClient client = clientBuilder.build(key);
try {
Long result = client.rawGetKeyTTL(defaultBackOff(), key);
Optional<Long> result = client.rawGetKeyTTL(defaultBackOff(), key);
RAW_REQUEST_SUCCESS.labels(label).inc();
return result;
} catch (final TiKVException e) {

View File

@ -20,6 +20,7 @@ import static org.junit.Assert.*;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.RegionStoreClient;
@ -65,13 +66,13 @@ public class RegionStoreClientTest extends MockServerTest {
public void doRawGetTest(RegionStoreClient client) throws Exception {
server.put("key1", "value1");
ByteString value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value);
Optional<ByteString> value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value.get());
server.putError("error1", KVMockServer.NOT_LEADER);
// since not_leader is retryable, so the result should be correct.
value = client.rawGet(defaultBackOff(), ByteString.copyFromUtf8("key1"));
assertEquals(ByteString.copyFromUtf8("value1"), value);
assertEquals(ByteString.copyFromUtf8("value1"), value.get());
server.putError("failure", KVMockServer.STALE_EPOCH);
try {

View File

@ -9,14 +9,15 @@ import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.TiConfiguration;
import org.tikv.common.TiSession;
import org.tikv.common.codec.KeyUtils;
import org.tikv.common.exception.RawCASConflictException;
import org.tikv.common.exception.TiKVException;
import org.tikv.common.key.Key;
import org.tikv.common.util.FastByteComparisons;
@ -95,30 +96,45 @@ public class RawKVClientTest {
}
}
// tikv-4.0 does not support atomic api
@Ignore
public void atomicAPITest() {
@Test
public void rawCASTest() {
if (!initialized) return;
ByteString key = ByteString.copyFromUtf8("key_atomic");
ByteString value = ByteString.copyFromUtf8("value");
ByteString value2 = ByteString.copyFromUtf8("value2");
client.delete(key);
client.compareAndSet(key, Optional.empty(), value);
Assert.assertEquals(value, client.get(key).get());
try {
client.compareAndSet(key, Optional.empty(), value2);
Assert.fail("compareAndSet should fail.");
} catch (RawCASConflictException err) {
Assert.assertEquals(value, err.getPrevValue().get());
}
}
@Test
public void rawPutIfAbsentTest() {
if (!initialized) return;
long ttl = 10;
ByteString key = ByteString.copyFromUtf8("key_atomic");
ByteString value = ByteString.copyFromUtf8("value");
ByteString value2 = ByteString.copyFromUtf8("value2");
client.delete(key);
ByteString res1 = client.putIfAbsent(key, value, ttl);
assertTrue(res1.isEmpty());
ByteString res2 = client.putIfAbsent(key, value2, ttl);
assertEquals(value, res2);
Optional<ByteString> res1 = client.putIfAbsent(key, value, ttl);
assertFalse(res1.isPresent());
Optional<ByteString> res2 = client.putIfAbsent(key, value2, ttl);
assertEquals(res2.get(), value);
try {
Thread.sleep(ttl * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
ByteString res3 = client.putIfAbsent(key, value, ttl);
assertTrue(res3.isEmpty());
Optional<ByteString> res3 = client.putIfAbsent(key, value, ttl);
assertFalse(res3.isPresent());
}
// tikv-4.0 doest not support ttl
@Ignore
@Test
public void getKeyTTLTest() {
if (!initialized) return;
long ttl = 10;
@ -126,19 +142,19 @@ public class RawKVClientTest {
ByteString value = ByteString.copyFromUtf8("value");
client.put(key, value, ttl);
for (int i = 0; i < 9; i++) {
Long t = client.getKeyTTL(key);
logger.info("current ttl of key is " + t);
Optional<Long> t = client.getKeyTTL(key);
logger.info("current ttl of key is " + t.orElse(null));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Long t = client.getKeyTTL(key);
if (t == null) {
logger.info("key outdated.");
Optional<Long> t = client.getKeyTTL(key);
if (t.isPresent()) {
logger.info("key not outdated: " + t.get());
} else {
logger.info("key not outdated: " + t);
logger.info("key outdated.");
}
}
@ -275,10 +291,10 @@ public class RawKVClientTest {
try {
checkDeleteRange(ByteString.EMPTY, ByteString.EMPTY);
checkEmpty(kv);
checkEmpty(kv1);
checkEmpty(kv2);
checkEmpty(kv3);
checkNotExist(key);
checkNotExist(key1);
checkNotExist(key2);
checkNotExist(key3);
checkPut(kv);
checkPut(kv1);
checkPut(kv2);
@ -532,7 +548,7 @@ public class RawKVClientTest {
} else {
int i = 0;
for (Map.Entry<ByteString, ByteString> pair : data.entrySet()) {
assertEquals(pair.getValue(), client.get(pair.getKey()));
assertEquals(client.get(pair.getKey()), Optional.of(pair.getValue()));
i++;
if (i >= getCases) {
break;
@ -795,13 +811,13 @@ public class RawKVClientTest {
private void checkPut(ByteString key, ByteString value) {
client.put(key, value);
assertEquals(value, client.get(key));
assertEquals(client.get(key).orElse(null), value);
}
private void checkBatchPut(Map<ByteString, ByteString> kvPairs) {
client.batchPut(kvPairs);
for (Map.Entry<ByteString, ByteString> kvPair : kvPairs.entrySet()) {
assertEquals(kvPair.getValue(), client.get(kvPair.getKey()));
assertEquals(client.get(kvPair.getKey()).orElse(null), kvPair.getValue());
}
}
@ -863,7 +879,7 @@ public class RawKVClientTest {
private void checkDelete(ByteString key) {
client.delete(key);
checkEmpty(key);
checkNotExist(key);
}
private void checkDeleteRange(ByteString startKey, ByteString endKey) {
@ -876,30 +892,26 @@ public class RawKVClientTest {
private void checkPutTTL(ByteString key, ByteString value, long ttl) {
client.put(key, value, ttl);
assertEquals(value, client.get(key));
assert client.get(key).orElse(null).equals(value);
}
private void checkGetKeyTTL(ByteString key, long ttl) {
Long t = client.getKeyTTL(key);
assertNotNull(t);
assertTrue(t <= ttl && t > 0);
Optional<Long> t = client.getKeyTTL(key);
assertTrue(t.isPresent());
assertTrue(t.get() <= ttl && t.get() > 0);
}
private void checkGetTTLTimeOut(ByteString key) {
assertTrue(client.get(key).isEmpty());
assertFalse(client.get(key).isPresent());
}
private void checkGetKeyTTLTimeOut(ByteString key) {
Long t = client.getKeyTTL(key);
assertNull(t);
Optional<Long> t = client.getKeyTTL(key);
assertFalse(t.isPresent());
}
private void checkEmpty(Kvrpcpb.KvPair kv) {
checkEmpty(kv.getKey());
}
private void checkEmpty(ByteString key) {
assertTrue(client.get(key).isEmpty());
private void checkNotExist(ByteString key) {
assertFalse(client.get(key).isPresent());
}
private static ByteString rawKey(String key) {