mirror of https://github.com/etcd-io/jetcd.git
Compare commits
63 Commits
jetcd-0.8.
...
main
Author | SHA1 | Date |
---|---|---|
|
8e72ab3da0 | |
|
3be6978e23 | |
|
9b4c9dd2ce | |
|
03a3b994e6 | |
|
10a335bf5f | |
|
fd240de17e | |
|
602feaf479 | |
|
4489b3f8da | |
|
40425c35d5 | |
|
990a5ea690 | |
|
6843e47a96 | |
|
5a83bdb482 | |
|
5f0ee26f3b | |
|
80b72b0b2a | |
|
98307934fe | |
|
a3f85475a2 | |
|
579e05b634 | |
|
43f0f23122 | |
|
81ea44b082 | |
|
c41c3fdc68 | |
|
e852bd921d | |
|
8dd9d3504e | |
|
c468ab4dd8 | |
|
08f3c274cc | |
|
a2c1db0279 | |
|
a65e765e36 | |
|
280a445773 | |
|
d498294fca | |
|
81ebd3a3fe | |
|
9a0d2f4076 | |
|
61a21a2d7c | |
|
69c2ec3d03 | |
|
59ff5091cc | |
|
098cf0f8bc | |
|
e68613fb6b | |
|
59337b388b | |
|
9f1a90cc6a | |
|
2e34d10d24 | |
|
11f45ec3fc | |
|
3a7740fa92 | |
|
b7293499a4 | |
|
08def01ee1 | |
|
05f92e0443 | |
|
a240dc6a74 | |
|
8ee9c1573e | |
|
0dad061111 | |
|
dab6c4cb2d | |
|
befcdc538d | |
|
8a6c463341 | |
|
01a2014b54 | |
|
acbfa6f434 | |
|
d2005ada3f | |
|
327a37bbc9 | |
|
8f182d4b8e | |
|
2da6f79b9d | |
|
05777ffaca | |
|
503bdd0f5a | |
|
346827957c | |
|
8e7ed6aaa6 | |
|
fcfe099aab | |
|
d1d21b1cdb | |
|
066d92cf95 | |
|
676bad3dd5 |
|
@ -42,8 +42,8 @@ jobs:
|
|||
- 17
|
||||
- 21
|
||||
etcd:
|
||||
- quay.io/coreos/etcd:v3.5.14
|
||||
- quay.io/coreos/etcd:v3.4.33
|
||||
- quay.io/coreos/etcd:v3.5.21
|
||||
- quay.io/coreos/etcd:v3.6.0
|
||||
uses: ./.github/workflows/build.yml
|
||||
with:
|
||||
javaVersion: "${{ matrix.java-version }}"
|
||||
|
|
|
@ -43,8 +43,8 @@ jobs:
|
|||
- 17
|
||||
- 21
|
||||
etcd:
|
||||
- quay.io/coreos/etcd:v3.5.14
|
||||
- quay.io/coreos/etcd:v3.4.33
|
||||
- quay.io/coreos/etcd:v3.5.21
|
||||
- quay.io/coreos/etcd:v3.6.0
|
||||
uses: ./.github/workflows/build.yml
|
||||
with:
|
||||
javaVersion: "${{ matrix.java-version }}"
|
||||
|
|
3
OWNERS
3
OWNERS
|
@ -3,5 +3,4 @@
|
|||
approvers:
|
||||
- fanminshi # Fanmin Shi <fanmin.shi@coreos.com>
|
||||
- lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com>
|
||||
- xiang90 # Xiang Li <xiang.li@coreos.com>
|
||||
- heyitsanthony # Anthony Romano <anthony.romano@coreos.com>
|
||||
- vorburger # Michael Vorburger <mike@vorburger.ch>
|
||||
|
|
|
@ -111,7 +111,7 @@ The project is tested against a three node `etcd` setup started with the Launche
|
|||
|
||||
```sh
|
||||
$ ./gradlew test
|
||||
````
|
||||
```
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
|
@ -129,3 +129,4 @@ See [CONTRIBUTING](https://github.com/etcd-io/jetcd/blob/master/CONTRIBUTING.md)
|
|||
|
||||
jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details.
|
||||
|
||||
|
||||
|
|
|
@ -76,6 +76,13 @@ subprojects {
|
|||
maxFailures = 5
|
||||
}
|
||||
}
|
||||
testing {
|
||||
suites {
|
||||
test {
|
||||
useJUnitJupiter()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testlogger {
|
||||
theme 'mocha-parallel'
|
||||
|
|
|
@ -1,33 +1,33 @@
|
|||
[versions]
|
||||
grpc = "1.66.0"
|
||||
log4j = "2.23.1"
|
||||
mockito = "5.12.0"
|
||||
slf4j = "2.0.16"
|
||||
guava = "33.2.1-jre"
|
||||
assertj = "3.26.3"
|
||||
junit = "5.11.0"
|
||||
testcontainers = "1.20.1"
|
||||
grpc = "1.74.0"
|
||||
log4j = "2.25.1"
|
||||
mockito = "5.18.0"
|
||||
slf4j = "2.0.17"
|
||||
guava = "33.4.8-jre"
|
||||
assertj = "3.27.3"
|
||||
junit = "5.13.4"
|
||||
testcontainers = "1.21.3"
|
||||
protoc = "3.25.1"
|
||||
failsafe = "3.3.2"
|
||||
awaitility = "4.2.2"
|
||||
commonsIo = "2.16.1"
|
||||
commonCompress = "1.27.0"
|
||||
awaitility = "4.3.0"
|
||||
commonsIo = "2.20.0"
|
||||
commonCompress = "1.28.0"
|
||||
autoService = "1.1.1"
|
||||
errorprone = "2.30.0"
|
||||
vertx = "4.5.9"
|
||||
picocli = "4.7.6"
|
||||
restAssured = "5.5.0"
|
||||
vertx = "5.0.1"
|
||||
picocli = "4.7.7"
|
||||
restAssured = "5.5.5"
|
||||
javaxAnnotation = "1.3.2"
|
||||
|
||||
versionsPlugin = "0.51.0"
|
||||
versionsPlugin = "0.52.0"
|
||||
errorPronePlugin = "4.0.1"
|
||||
spotlessPlugin = "6.25.0"
|
||||
shadowPlugin = "8.1.1"
|
||||
testLoggerPlugin = "4.0.0"
|
||||
protobufPlugin = "0.9.4"
|
||||
protobufPlugin = "0.9.5"
|
||||
nexusPublishPlugin = "2.0.0"
|
||||
axionReleasePlugin = "1.18.3"
|
||||
testRetryPlugin = "1.5.10"
|
||||
axionReleasePlugin = "1.18.17"
|
||||
testRetryPlugin = "1.6.2"
|
||||
|
||||
|
||||
[libraries]
|
||||
|
|
Binary file not shown.
|
@ -1,6 +1,6 @@
|
|||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
|
||||
networkTimeout=10000
|
||||
validateDistributionUrl=true
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
|
|
|
@ -86,8 +86,7 @@ done
|
|||
# shellcheck disable=SC2034
|
||||
APP_BASE_NAME=${0##*/}
|
||||
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
|
||||
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
|
||||
' "$PWD" ) || exit
|
||||
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
|
||||
|
||||
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
||||
MAX_FD=maximum
|
||||
|
@ -115,7 +114,7 @@ case "$( uname )" in #(
|
|||
NONSTOP* ) nonstop=true ;;
|
||||
esac
|
||||
|
||||
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
|
||||
CLASSPATH="\\\"\\\""
|
||||
|
||||
|
||||
# Determine the Java command to use to start the JVM.
|
||||
|
@ -206,7 +205,7 @@ fi
|
|||
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
||||
|
||||
# Collect all arguments for the java command:
|
||||
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
||||
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
||||
# and any embedded shellness will be escaped.
|
||||
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
|
||||
# treated as '${Hostname}' itself on the command line.
|
||||
|
@ -214,7 +213,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
|||
set -- \
|
||||
"-Dorg.gradle.appname=$APP_BASE_NAME" \
|
||||
-classpath "$CLASSPATH" \
|
||||
org.gradle.wrapper.GradleWrapperMain \
|
||||
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
|
||||
"$@"
|
||||
|
||||
# Stop when "xargs" is not available.
|
||||
|
|
|
@ -70,11 +70,11 @@ goto fail
|
|||
:execute
|
||||
@rem Setup the command line
|
||||
|
||||
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
|
||||
set CLASSPATH=
|
||||
|
||||
|
||||
@rem Execute Gradle
|
||||
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
|
||||
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
|
||||
|
||||
:end
|
||||
@rem End local scope for the variables with windows NT shell
|
||||
|
|
|
@ -37,6 +37,7 @@ import io.grpc.Metadata;
|
|||
import io.grpc.netty.GrpcSslContexts;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.vertx.core.Vertx;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
|
@ -68,6 +69,7 @@ public final class ClientBuilder implements Cloneable {
|
|||
private Duration retryMaxDuration;
|
||||
private Duration connectTimeout;
|
||||
private boolean waitForReady = true;
|
||||
private Vertx vertx;
|
||||
|
||||
ClientBuilder() {
|
||||
}
|
||||
|
@ -707,6 +709,30 @@ public final class ClientBuilder implements Cloneable {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the Vertx instance.
|
||||
*
|
||||
* @return the vertx instance.
|
||||
*/
|
||||
public Vertx vertx() {
|
||||
return vertx;
|
||||
}
|
||||
|
||||
/**
|
||||
* configure Vertx instance.
|
||||
*
|
||||
* @param vertx Vertx instance to use.
|
||||
* @return this builder to train
|
||||
* @throws IllegalArgumentException if vertx is null
|
||||
*/
|
||||
public ClientBuilder vertx(Vertx vertx) {
|
||||
Preconditions.checkArgument(vertx != null, "vertx can't be null");
|
||||
|
||||
this.vertx = vertx;
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* build a new Client.
|
||||
*
|
||||
|
|
|
@ -26,6 +26,7 @@ import io.etcd.jetcd.options.CompactOption;
|
|||
import io.etcd.jetcd.options.DeleteOption;
|
||||
import io.etcd.jetcd.options.GetOption;
|
||||
import io.etcd.jetcd.options.PutOption;
|
||||
import io.etcd.jetcd.options.TxnOption;
|
||||
import io.etcd.jetcd.support.CloseableClient;
|
||||
|
||||
/**
|
||||
|
@ -115,4 +116,12 @@ public interface KV extends CloseableClient {
|
|||
* @return a Txn
|
||||
*/
|
||||
Txn txn();
|
||||
|
||||
/**
|
||||
* creates a transaction.
|
||||
*
|
||||
* @param option TxnOption
|
||||
* @return a Txn
|
||||
*/
|
||||
Txn txn(TxnOption option);
|
||||
}
|
||||
|
|
|
@ -61,6 +61,10 @@ final class ClientConnectionManager {
|
|||
} else {
|
||||
this.executorService = builder.executorService();
|
||||
}
|
||||
|
||||
if (builder.vertx() != null) {
|
||||
this.vertx = builder.vertx();
|
||||
}
|
||||
}
|
||||
|
||||
ManagedChannel getChannel() {
|
||||
|
|
|
@ -88,7 +88,7 @@ final class ElectionImpl extends Impl implements Election {
|
|||
execute(
|
||||
() -> stubWithLeader().campaign(request),
|
||||
CampaignResponse::new,
|
||||
Errors::isRetryable));
|
||||
Errors::isRetryableForNoSafeRedoOp));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,7 +111,7 @@ final class ElectionImpl extends Impl implements Election {
|
|||
execute(
|
||||
() -> stubWithLeader().proclaim(request),
|
||||
ProclaimResponse::new,
|
||||
Errors::isRetryable));
|
||||
Errors::isRetryableForNoSafeRedoOp));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,7 +126,7 @@ final class ElectionImpl extends Impl implements Election {
|
|||
execute(
|
||||
() -> stubWithLeader().leader(request),
|
||||
response -> new LeaderResponse(response, namespace),
|
||||
Errors::isRetryable));
|
||||
Errors::isRetryableForNoSafeRedoOp));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -135,7 +135,7 @@ final class ElectionImpl extends Impl implements Election {
|
|||
requireNonNull(listener, "listener should not be null");
|
||||
|
||||
LeaderRequest request = LeaderRequest.newBuilder()
|
||||
.setName(ByteString.copyFrom(electionName.getBytes()))
|
||||
.setName(Util.prefixNamespace(electionName, namespace))
|
||||
.build();
|
||||
|
||||
stubWithLeader().observeWithHandler(request,
|
||||
|
@ -162,7 +162,7 @@ final class ElectionImpl extends Impl implements Election {
|
|||
execute(
|
||||
() -> stubWithLeader().resign(request),
|
||||
ResignResponse::new,
|
||||
Errors::isRetryable));
|
||||
Errors::isRetryableForNoSafeRedoOp));
|
||||
}
|
||||
|
||||
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
|
||||
|
|
|
@ -89,9 +89,11 @@ abstract class Impl {
|
|||
*/
|
||||
protected <S, T> CompletableFuture<T> execute(
|
||||
Supplier<Future<S>> supplier,
|
||||
Function<S, T> resultConvert) {
|
||||
Function<S, T> resultConvert,
|
||||
boolean autoRetry) {
|
||||
|
||||
return execute(supplier, resultConvert, Errors::isRetryable);
|
||||
return execute(supplier, resultConvert,
|
||||
autoRetry ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -33,6 +33,7 @@ import io.etcd.jetcd.options.CompactOption;
|
|||
import io.etcd.jetcd.options.DeleteOption;
|
||||
import io.etcd.jetcd.options.GetOption;
|
||||
import io.etcd.jetcd.options.PutOption;
|
||||
import io.etcd.jetcd.options.TxnOption;
|
||||
import io.etcd.jetcd.support.Errors;
|
||||
import io.etcd.jetcd.support.Requests;
|
||||
|
||||
|
@ -65,7 +66,7 @@ final class KVImpl extends Impl implements KV {
|
|||
return execute(
|
||||
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
|
||||
response -> new PutResponse(response, namespace),
|
||||
Errors::isRetryable);
|
||||
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,7 +82,7 @@ final class KVImpl extends Impl implements KV {
|
|||
return execute(
|
||||
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
|
||||
response -> new GetResponse(response, namespace),
|
||||
Errors::isRetryable);
|
||||
Errors::isRetryableForSafeRedoOp);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,7 +98,7 @@ final class KVImpl extends Impl implements KV {
|
|||
return execute(
|
||||
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
|
||||
response -> new DeleteResponse(response, namespace),
|
||||
Errors::isRetryable);
|
||||
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,15 +117,21 @@ final class KVImpl extends Impl implements KV {
|
|||
return execute(
|
||||
() -> stub.compact(request),
|
||||
CompactResponse::new,
|
||||
Errors::isRetryable);
|
||||
Errors::isRetryableForSafeRedoOp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Txn txn() {
|
||||
return txn(TxnOption.DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Txn txn(TxnOption option) {
|
||||
return TxnImpl.newTxn(
|
||||
request -> execute(
|
||||
() -> stub.txn(request),
|
||||
response -> new TxnResponse(response, namespace), Errors::isRetryable),
|
||||
response -> new TxnResponse(response, namespace),
|
||||
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp),
|
||||
namespace);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,8 @@ final class LeaseImpl extends Impl implements Lease {
|
|||
LeaseGrantRequest.newBuilder()
|
||||
.setTTL(ttl)
|
||||
.build()),
|
||||
LeaseGrantResponse::new);
|
||||
LeaseGrantResponse::new,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,7 +95,8 @@ final class LeaseImpl extends Impl implements Lease {
|
|||
LeaseGrantRequest.newBuilder()
|
||||
.setTTL(ttl)
|
||||
.build()),
|
||||
LeaseGrantResponse::new);
|
||||
LeaseGrantResponse::new,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,7 +106,8 @@ final class LeaseImpl extends Impl implements Lease {
|
|||
LeaseRevokeRequest.newBuilder()
|
||||
.setID(leaseId)
|
||||
.build()),
|
||||
LeaseRevokeResponse::new);
|
||||
LeaseRevokeResponse::new,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -118,7 +121,8 @@ final class LeaseImpl extends Impl implements Lease {
|
|||
|
||||
return execute(
|
||||
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
|
||||
LeaseTimeToLiveResponse::new);
|
||||
LeaseTimeToLiveResponse::new,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -69,7 +69,7 @@ final class LockImpl extends Impl implements Lock {
|
|||
return execute(
|
||||
() -> stubWithLeader().lock(request),
|
||||
response -> new LockResponse(response, namespace),
|
||||
Errors::isRetryable);
|
||||
Errors::isRetryableForSafeRedoOp);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,6 +83,6 @@ final class LockImpl extends Impl implements Lock {
|
|||
return execute(
|
||||
() -> stubWithLeader().unlock(request),
|
||||
UnlockResponse::new,
|
||||
Errors::isRetryable);
|
||||
Errors::isRetryableForSafeRedoOp);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -357,7 +356,7 @@ final class WatchImpl extends Impl implements Watch {
|
|||
private void reschedule() {
|
||||
Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() {
|
||||
@Override
|
||||
public void onFailure(@NonNull Throwable t) {
|
||||
public void onFailure(Throwable t) {
|
||||
LOG.warn("scheduled resume failed", t);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,11 @@ public class GetResponse extends AbstractResponse<RangeResponse> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the number of keys within the range when requested.
|
||||
* Returns the number of keys within the range requested.
|
||||
* Note this value is never affected by filtering options (limit, min or max created or modified revisions)
|
||||
* Count is the count for keys on the range part of a request.
|
||||
* Filters for limit and created or modified revision ranges restrict the
|
||||
* returned KVs, but not the count.
|
||||
*
|
||||
* @return count.
|
||||
*/
|
||||
|
|
|
@ -29,11 +29,13 @@ public final class DeleteOption {
|
|||
private final ByteSequence endKey;
|
||||
private final boolean prevKV;
|
||||
private final boolean prefix;
|
||||
private final boolean autoRetry;
|
||||
|
||||
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix) {
|
||||
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix, final boolean autoRetry) {
|
||||
this.endKey = endKey;
|
||||
this.prevKV = prevKV;
|
||||
this.prefix = prefix;
|
||||
this.autoRetry = autoRetry;
|
||||
}
|
||||
|
||||
public Optional<ByteSequence> getEndKey() {
|
||||
|
@ -49,10 +51,25 @@ public final class DeleteOption {
|
|||
return prevKV;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to treat this deletion as deletion by prefix
|
||||
*
|
||||
* @return true if deletion by prefix.
|
||||
*/
|
||||
public boolean isPrefix() {
|
||||
return prefix;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to treat a delete operation as idempotent from the point of view of automated retries.
|
||||
* Note under failure scenarios this may mean a single delete is attempted more than once.
|
||||
*
|
||||
* @return true if automated retries should happen.
|
||||
*/
|
||||
public boolean isAutoRetry() {
|
||||
return autoRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder.
|
||||
*
|
||||
|
@ -65,6 +82,11 @@ public final class DeleteOption {
|
|||
return builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder.
|
||||
*
|
||||
* @return the builder
|
||||
*/
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
@ -73,6 +95,7 @@ public final class DeleteOption {
|
|||
private ByteSequence endKey;
|
||||
private boolean prevKV = false;
|
||||
private boolean prefix = false;
|
||||
private boolean autoRetry = false;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
@ -144,8 +167,22 @@ public final class DeleteOption {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* When autoRetry is set, the delete operation is treated as idempotent from the point of view of automated retries.
|
||||
* Note under some failure scenarios true may make a delete operation be attempted more than once, where
|
||||
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
|
||||
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
|
||||
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||
*
|
||||
* @return builder
|
||||
*/
|
||||
public Builder withAutoRetry() {
|
||||
this.autoRetry = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeleteOption build() {
|
||||
return new DeleteOption(endKey, prevKV, prefix);
|
||||
return new DeleteOption(endKey, prevKV, prefix, autoRetry);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Optional;
|
|||
|
||||
import io.etcd.jetcd.ByteSequence;
|
||||
import io.etcd.jetcd.KV;
|
||||
import io.etcd.jetcd.kv.GetResponse;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
|
@ -76,56 +77,158 @@ public final class GetOption {
|
|||
/**
|
||||
* Get the maximum number of keys to return for a get request.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* </p>
|
||||
*
|
||||
*
|
||||
* @return the maximum number of keys to return.
|
||||
*/
|
||||
public long getLimit() {
|
||||
return this.limit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the end key for a range request
|
||||
*
|
||||
* @return the end key for a range request
|
||||
*/
|
||||
public Optional<ByteSequence> getEndKey() {
|
||||
return Optional.ofNullable(this.endKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the revision for the request
|
||||
*
|
||||
* @return the revision for the request
|
||||
*/
|
||||
public long getRevision() {
|
||||
return revision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the sort order for the request
|
||||
*
|
||||
* @return the sort order for the request
|
||||
*/
|
||||
public SortOrder getSortOrder() {
|
||||
return sortOrder;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the sort field for the request
|
||||
*
|
||||
* @return the sort field for the request
|
||||
*/
|
||||
public SortTarget getSortField() {
|
||||
return sortTarget;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return if the consistency level for this request is "serializable".
|
||||
* Note serializable is a lower than default consistency, and implies
|
||||
* the possibility of getting stale data.
|
||||
*
|
||||
* @return true if this request is only serializable consistency
|
||||
*/
|
||||
public boolean isSerializable() {
|
||||
return serializable;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this request should get only keys in a range and there is no
|
||||
* need to retrieve the values.
|
||||
*
|
||||
* @return true if only get keys
|
||||
*/
|
||||
public boolean isKeysOnly() {
|
||||
return keysOnly;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this request should only populate the count of keys matching
|
||||
* a range, and no other data.
|
||||
*
|
||||
* @return true if only get the count of keys
|
||||
*/
|
||||
public boolean isCountOnly() {
|
||||
return countOnly;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only populate results for keys that match a
|
||||
* minimum value for a created revision.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting a min create revision option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @return minimum created revision to match, or zero for any.
|
||||
*/
|
||||
public long getMinCreateRevision() {
|
||||
return this.minCreateRevision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only populate results for keys that match a
|
||||
* maximum value for a created revision.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting a max create revision option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @return maximum created revision to match, or zero for any.
|
||||
*/
|
||||
public long getMaxCreateRevision() {
|
||||
return this.maxCreateRevision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only populate results for keys that match a
|
||||
* minimum value for a modified revision.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting a min mod revision option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @return minimum modified revision to match, or zero for any.
|
||||
*/
|
||||
public long getMinModRevision() {
|
||||
return this.minModRevision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only populate results for keys that match a
|
||||
* maximum value for a modified revision.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting a max mod revision option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @return maximum modified revision to match, or zero for any.
|
||||
*/
|
||||
public long getMaxModRevision() {
|
||||
return this.maxModRevision;
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this Get request should do prefix match
|
||||
*
|
||||
* @return true if this Get request should do prefix match
|
||||
*/
|
||||
public boolean isPrefix() {
|
||||
return prefix;
|
||||
}
|
||||
|
@ -176,6 +279,13 @@ public final class GetOption {
|
|||
/**
|
||||
* Limit the number of keys to return for a get request. By default is 0 - no limitation.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting this option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @param limit the maximum number of keys to return for a get request.
|
||||
* @return builder
|
||||
*/
|
||||
|
@ -229,7 +339,9 @@ public final class GetOption {
|
|||
* <p>
|
||||
* Get requests are linearizable by
|
||||
* default. For better performance, a serializable get request is served locally without needing
|
||||
* to reach consensus with other nodes in the cluster.
|
||||
* to reach consensus with other nodes in the cluster. Note this is a tradeoff with strict
|
||||
* consistency so it should be used with care in situations where reading stale
|
||||
* is acceptable.
|
||||
*
|
||||
* @param serializable is the get request a serializable get request.
|
||||
* @return builder
|
||||
|
@ -316,10 +428,17 @@ public final class GetOption {
|
|||
}
|
||||
|
||||
/**
|
||||
* Limit returned keys to those with create revision greater than the provided value.
|
||||
* Limit returned keys to those with create revision greater or equal than the provided value.
|
||||
* min_create_revision is the lower bound for returned key create revisions; all keys with
|
||||
* lesser create revisions will be filtered away.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting this option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @param createRevision create revision
|
||||
* @return builder
|
||||
*/
|
||||
|
@ -329,10 +448,17 @@ public final class GetOption {
|
|||
}
|
||||
|
||||
/**
|
||||
* Limit returned keys to those with create revision less than the provided value.
|
||||
* Limit returned keys to those with create revision less or equal than the provided value.
|
||||
* max_create_revision is the upper bound for returned key create revisions; all keys with
|
||||
* greater create revisions will be filtered away.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting this option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @param createRevision create revision
|
||||
* @return builder
|
||||
*/
|
||||
|
@ -342,10 +468,17 @@ public final class GetOption {
|
|||
}
|
||||
|
||||
/**
|
||||
* Limit returned keys to those with mod revision greater than the provided value.
|
||||
* Limit returned keys to those with mod revision greater or equal than the provided value.
|
||||
* min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod
|
||||
* revisions will be filtered away.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting this option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @param modRevision mod revision
|
||||
* @return this builder instance
|
||||
*/
|
||||
|
@ -355,10 +488,17 @@ public final class GetOption {
|
|||
}
|
||||
|
||||
/**
|
||||
* Limit returned keys to those with mod revision less than the provided value. max_mod_revision
|
||||
* Limit returned keys to those with mod revision less or equal than the provided value. max_mod_revision
|
||||
* is the upper bound for returned key mod revisions; all keys with greater mod revisions will
|
||||
* be filtered away.
|
||||
*
|
||||
* <p>
|
||||
* Note this filter does not affect the count field in GetResponse.
|
||||
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||
* For the same reason, it would be meaningless to mix setting this option
|
||||
* with the count only option.
|
||||
* </p>
|
||||
*
|
||||
* @param modRevision mod revision
|
||||
* @return this builder instance
|
||||
*/
|
||||
|
|
|
@ -26,10 +26,12 @@ public final class PutOption {
|
|||
|
||||
private final long leaseId;
|
||||
private final boolean prevKV;
|
||||
private final boolean autoRetry;
|
||||
|
||||
private PutOption(long leaseId, boolean prevKV) {
|
||||
private PutOption(long leaseId, boolean prevKV, boolean autoRetry) {
|
||||
this.leaseId = leaseId;
|
||||
this.prevKV = prevKV;
|
||||
this.autoRetry = autoRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -50,6 +52,16 @@ public final class PutOption {
|
|||
return this.prevKV;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to treat a put operation as idempotent from the point of view of automated retries.
|
||||
* Note under failure scenarios this may mean a single put executes more than once.
|
||||
*
|
||||
* @return true if automated retries should happen.
|
||||
*/
|
||||
public boolean isAutoRetry() {
|
||||
return autoRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder.
|
||||
*
|
||||
|
@ -73,6 +85,7 @@ public final class PutOption {
|
|||
|
||||
private long leaseId = 0L;
|
||||
private boolean prevKV = false;
|
||||
private boolean autoRetry = false;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
@ -100,13 +113,28 @@ public final class PutOption {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* When autoRetry is set, treat this put as idempotent from the point of view of automated retries.
|
||||
* Note under some failure scenarios autoRetry=true may make a put operation execute more than once, where
|
||||
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
|
||||
* the client won't retry since it is not safe to assume on such a failure that the operation did not happen
|
||||
* in the server.
|
||||
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||
*
|
||||
* @return builder
|
||||
*/
|
||||
public Builder withAutoRetry() {
|
||||
this.autoRetry = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* build the put option.
|
||||
*
|
||||
* @return the put option
|
||||
*/
|
||||
public PutOption build() {
|
||||
return new PutOption(this.leaseId, this.prevKV);
|
||||
return new PutOption(this.leaseId, this.prevKV, this.autoRetry);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package io.etcd.jetcd.options;
|
||||
|
||||
public final class TxnOption {
|
||||
public static final TxnOption DEFAULT = builder().build();
|
||||
|
||||
private final boolean autoRetry;
|
||||
|
||||
private TxnOption(final boolean autoRetry) {
|
||||
this.autoRetry = autoRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether to treat a txn operation as idempotent from the point of view of automated retries.
|
||||
*
|
||||
* @return true if automated retries should happen.
|
||||
*/
|
||||
public boolean isAutoRetry() {
|
||||
return autoRetry;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the builder.
|
||||
*
|
||||
* @return the builder
|
||||
*/
|
||||
public static TxnOption.Builder builder() {
|
||||
return new TxnOption.Builder();
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
private boolean autoRetry = false;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
/**
|
||||
* When autoRetry is set, the txn operation is treated as idempotent from the point of view of automated retries.
|
||||
* Note under some failure scenarios true may make a txn operation be attempted and/or execute more than once, where
|
||||
* a first attempt executed but its result status did not reach the client; by default (autoRetry=false),
|
||||
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
|
||||
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||
*
|
||||
* @return builder
|
||||
*/
|
||||
public Builder withAutoRetry() {
|
||||
this.autoRetry = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TxnOption build() {
|
||||
return new TxnOption(autoRetry);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ package io.etcd.jetcd.resolver;
|
|||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -30,8 +29,6 @@ import io.grpc.Attributes;
|
|||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.SharedResourceHolder;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -47,7 +44,6 @@ public abstract class AbstractNameResolver extends NameResolver {
|
|||
private volatile boolean shutdown;
|
||||
private volatile boolean resolving;
|
||||
|
||||
private Executor executor;
|
||||
private Listener listener;
|
||||
|
||||
public AbstractNameResolver(URI targetUri) {
|
||||
|
@ -69,7 +65,6 @@ public abstract class AbstractNameResolver extends NameResolver {
|
|||
public void start(Listener listener) {
|
||||
synchronized (lock) {
|
||||
Preconditions.checkState(this.listener == null, "already started");
|
||||
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
|
||||
this.listener = Objects.requireNonNull(listener, "listener");
|
||||
resolve();
|
||||
}
|
||||
|
@ -86,21 +81,13 @@ public abstract class AbstractNameResolver extends NameResolver {
|
|||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
|
||||
synchronized (lock) {
|
||||
if (executor != null) {
|
||||
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void resolve() {
|
||||
if (resolving || shutdown) {
|
||||
return;
|
||||
}
|
||||
synchronized (lock) {
|
||||
executor.execute(this::doResolve);
|
||||
}
|
||||
doResolve();
|
||||
}
|
||||
|
||||
private void doResolve() {
|
||||
|
|
|
@ -26,9 +26,18 @@ public final class Errors {
|
|||
private Errors() {
|
||||
}
|
||||
|
||||
public static boolean isRetryable(Status status) {
|
||||
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status)
|
||||
|| isAuthStoreExpired(status);
|
||||
// isRetryable implementation for idempotent operations.
|
||||
public static boolean isRetryableForSafeRedoOp(Status status) {
|
||||
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isAlwaysSafeToRetry(status);
|
||||
}
|
||||
|
||||
// isRetryable implementation for non-idempotent operations
|
||||
public static boolean isRetryableForNoSafeRedoOp(Status status) {
|
||||
return isAlwaysSafeToRetry(status);
|
||||
}
|
||||
|
||||
public static boolean isAlwaysSafeToRetry(Status status) {
|
||||
return isInvalidTokenError(status) || isAuthStoreExpired(status);
|
||||
}
|
||||
|
||||
public static boolean isInvalidTokenError(Throwable e) {
|
||||
|
|
|
@ -50,6 +50,11 @@ public class ClientBuilderTest {
|
|||
assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVertx_Null() {
|
||||
assertThatThrownBy(() -> Client.builder().vertx(null)).isInstanceOf(IllegalArgumentException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndPoints_Verify_Empty() {
|
||||
assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class);
|
||||
|
|
|
@ -316,7 +316,7 @@ public class KVTest {
|
|||
for (int i = 0; i < putCount; ++i) {
|
||||
ByteSequence value = ByteSequence
|
||||
.from(Integer.toString(i), StandardCharsets.UTF_8);
|
||||
customClient.getKVClient().put(key, value).join();
|
||||
customClient.getKVClient().put(key, value, PutOption.builder().withAutoRetry().build()).join();
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -39,16 +39,18 @@ class UtilTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testAuthErrorIsNotRetryable() {
|
||||
public void testAuthErrorIsRetryable() {
|
||||
Status authErrorStatus = Status.UNAUTHENTICATED
|
||||
.withDescription("etcdserver: invalid auth token");
|
||||
Status status = Status.fromThrowable(new StatusException(authErrorStatus));
|
||||
assertThat(Errors.isRetryable(status)).isTrue();
|
||||
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isTrue();
|
||||
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnavailableErrorIsRetryable() {
|
||||
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
|
||||
assertThat(Errors.isRetryable(status)).isTrue();
|
||||
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isFalse();
|
||||
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.net.URI;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -31,8 +30,6 @@ import io.grpc.Attributes;
|
|||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.SharedResourceHolder;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -47,7 +44,6 @@ public class EtcdClusterNameResolver extends NameResolver {
|
|||
private volatile boolean shutdown;
|
||||
private volatile boolean resolving;
|
||||
|
||||
private Executor executor;
|
||||
private Listener listener;
|
||||
|
||||
public EtcdClusterNameResolver(URI targetUri) {
|
||||
|
@ -65,7 +61,6 @@ public class EtcdClusterNameResolver extends NameResolver {
|
|||
public void start(Listener listener) {
|
||||
synchronized (lock) {
|
||||
Preconditions.checkState(this.listener == null, "already started");
|
||||
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
|
||||
this.listener = Objects.requireNonNull(listener, "listener");
|
||||
resolve();
|
||||
}
|
||||
|
@ -82,21 +77,14 @@ public class EtcdClusterNameResolver extends NameResolver {
|
|||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
|
||||
synchronized (lock) {
|
||||
if (executor != null) {
|
||||
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void resolve() {
|
||||
if (resolving || shutdown) {
|
||||
return;
|
||||
}
|
||||
synchronized (lock) {
|
||||
executor.execute(this::doResolve);
|
||||
}
|
||||
|
||||
doResolve();
|
||||
}
|
||||
|
||||
private void doResolve() {
|
||||
|
|
Loading…
Reference in New Issue