Compare commits

..

No commits in common. "main" and "jetcd-0.8.4-alpha" have entirely different histories.

30 changed files with 98 additions and 409 deletions

View File

@ -42,8 +42,8 @@ jobs:
- 17 - 17
- 21 - 21
etcd: etcd:
- quay.io/coreos/etcd:v3.5.21 - quay.io/coreos/etcd:v3.5.14
- quay.io/coreos/etcd:v3.6.0 - quay.io/coreos/etcd:v3.4.33
uses: ./.github/workflows/build.yml uses: ./.github/workflows/build.yml
with: with:
javaVersion: "${{ matrix.java-version }}" javaVersion: "${{ matrix.java-version }}"

View File

@ -43,8 +43,8 @@ jobs:
- 17 - 17
- 21 - 21
etcd: etcd:
- quay.io/coreos/etcd:v3.5.21 - quay.io/coreos/etcd:v3.5.14
- quay.io/coreos/etcd:v3.6.0 - quay.io/coreos/etcd:v3.4.33
uses: ./.github/workflows/build.yml uses: ./.github/workflows/build.yml
with: with:
javaVersion: "${{ matrix.java-version }}" javaVersion: "${{ matrix.java-version }}"

3
OWNERS
View File

@ -3,4 +3,5 @@
approvers: approvers:
- fanminshi # Fanmin Shi <fanmin.shi@coreos.com> - fanminshi # Fanmin Shi <fanmin.shi@coreos.com>
- lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com> - lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com>
- vorburger # Michael Vorburger <mike@vorburger.ch> - xiang90 # Xiang Li <xiang.li@coreos.com>
- heyitsanthony # Anthony Romano <anthony.romano@coreos.com>

View File

@ -111,7 +111,7 @@ The project is tested against a three node `etcd` setup started with the Launche
```sh ```sh
$ ./gradlew test $ ./gradlew test
``` ````
### Troubleshooting ### Troubleshooting
@ -129,4 +129,3 @@ See [CONTRIBUTING](https://github.com/etcd-io/jetcd/blob/master/CONTRIBUTING.md)
jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details. jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details.

View File

@ -76,13 +76,6 @@ subprojects {
maxFailures = 5 maxFailures = 5
} }
} }
testing {
suites {
test {
useJUnitJupiter()
}
}
}
testlogger { testlogger {
theme 'mocha-parallel' theme 'mocha-parallel'

View File

@ -1,33 +1,33 @@
[versions] [versions]
grpc = "1.74.0" grpc = "1.66.0"
log4j = "2.25.1" log4j = "2.23.1"
mockito = "5.18.0" mockito = "5.12.0"
slf4j = "2.0.17" slf4j = "2.0.16"
guava = "33.4.8-jre" guava = "33.2.1-jre"
assertj = "3.27.3" assertj = "3.26.3"
junit = "5.13.4" junit = "5.11.0"
testcontainers = "1.21.3" testcontainers = "1.20.1"
protoc = "3.25.1" protoc = "3.25.1"
failsafe = "3.3.2" failsafe = "3.3.2"
awaitility = "4.3.0" awaitility = "4.2.2"
commonsIo = "2.20.0" commonsIo = "2.16.1"
commonCompress = "1.28.0" commonCompress = "1.27.0"
autoService = "1.1.1" autoService = "1.1.1"
errorprone = "2.30.0" errorprone = "2.30.0"
vertx = "5.0.1" vertx = "4.5.9"
picocli = "4.7.7" picocli = "4.7.6"
restAssured = "5.5.5" restAssured = "5.5.0"
javaxAnnotation = "1.3.2" javaxAnnotation = "1.3.2"
versionsPlugin = "0.52.0" versionsPlugin = "0.51.0"
errorPronePlugin = "4.0.1" errorPronePlugin = "4.0.1"
spotlessPlugin = "6.25.0" spotlessPlugin = "6.25.0"
shadowPlugin = "8.1.1" shadowPlugin = "8.1.1"
testLoggerPlugin = "4.0.0" testLoggerPlugin = "4.0.0"
protobufPlugin = "0.9.5" protobufPlugin = "0.9.4"
nexusPublishPlugin = "2.0.0" nexusPublishPlugin = "2.0.0"
axionReleasePlugin = "1.18.17" axionReleasePlugin = "1.18.3"
testRetryPlugin = "1.6.2" testRetryPlugin = "1.5.10"
[libraries] [libraries]

Binary file not shown.

View File

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
networkTimeout=10000 networkTimeout=10000
validateDistributionUrl=true validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME

9
gradlew vendored
View File

@ -86,7 +86,8 @@ done
# shellcheck disable=SC2034 # shellcheck disable=SC2034
APP_BASE_NAME=${0##*/} APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) # Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
' "$PWD" ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value. # Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum MAX_FD=maximum
@ -114,7 +115,7 @@ case "$( uname )" in #(
NONSTOP* ) nonstop=true ;; NONSTOP* ) nonstop=true ;;
esac esac
CLASSPATH="\\\"\\\"" CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM. # Determine the Java command to use to start the JVM.
@ -205,7 +206,7 @@ fi
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command: # Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, # * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped. # and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be # * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line. # treated as '${Hostname}' itself on the command line.
@ -213,7 +214,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
set -- \ set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \ "-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \ -classpath "$CLASSPATH" \
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ org.gradle.wrapper.GradleWrapperMain \
"$@" "$@"
# Stop when "xargs" is not available. # Stop when "xargs" is not available.

4
gradlew.bat vendored
View File

@ -70,11 +70,11 @@ goto fail
:execute :execute
@rem Setup the command line @rem Setup the command line
set CLASSPATH= set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell

View File

@ -37,7 +37,6 @@ import io.grpc.Metadata;
import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.vertx.core.Vertx;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@ -69,7 +68,6 @@ public final class ClientBuilder implements Cloneable {
private Duration retryMaxDuration; private Duration retryMaxDuration;
private Duration connectTimeout; private Duration connectTimeout;
private boolean waitForReady = true; private boolean waitForReady = true;
private Vertx vertx;
ClientBuilder() { ClientBuilder() {
} }
@ -709,30 +707,6 @@ public final class ClientBuilder implements Cloneable {
return this; return this;
} }
/**
* Gets the Vertx instance.
*
* @return the vertx instance.
*/
public Vertx vertx() {
return vertx;
}
/**
* configure Vertx instance.
*
* @param vertx Vertx instance to use.
* @return this builder to train
* @throws IllegalArgumentException if vertx is null
*/
public ClientBuilder vertx(Vertx vertx) {
Preconditions.checkArgument(vertx != null, "vertx can't be null");
this.vertx = vertx;
return this;
}
/** /**
* build a new Client. * build a new Client.
* *

View File

@ -26,7 +26,6 @@ import io.etcd.jetcd.options.CompactOption;
import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.CloseableClient; import io.etcd.jetcd.support.CloseableClient;
/** /**
@ -116,12 +115,4 @@ public interface KV extends CloseableClient {
* @return a Txn * @return a Txn
*/ */
Txn txn(); Txn txn();
/**
* creates a transaction.
*
* @param option TxnOption
* @return a Txn
*/
Txn txn(TxnOption option);
} }

View File

@ -61,10 +61,6 @@ final class ClientConnectionManager {
} else { } else {
this.executorService = builder.executorService(); this.executorService = builder.executorService();
} }
if (builder.vertx() != null) {
this.vertx = builder.vertx();
}
} }
ManagedChannel getChannel() { ManagedChannel getChannel() {

View File

@ -88,7 +88,7 @@ final class ElectionImpl extends Impl implements Election {
execute( execute(
() -> stubWithLeader().campaign(request), () -> stubWithLeader().campaign(request),
CampaignResponse::new, CampaignResponse::new,
Errors::isRetryableForNoSafeRedoOp)); Errors::isRetryable));
} }
@Override @Override
@ -111,7 +111,7 @@ final class ElectionImpl extends Impl implements Election {
execute( execute(
() -> stubWithLeader().proclaim(request), () -> stubWithLeader().proclaim(request),
ProclaimResponse::new, ProclaimResponse::new,
Errors::isRetryableForNoSafeRedoOp)); Errors::isRetryable));
} }
@Override @Override
@ -126,7 +126,7 @@ final class ElectionImpl extends Impl implements Election {
execute( execute(
() -> stubWithLeader().leader(request), () -> stubWithLeader().leader(request),
response -> new LeaderResponse(response, namespace), response -> new LeaderResponse(response, namespace),
Errors::isRetryableForNoSafeRedoOp)); Errors::isRetryable));
} }
@Override @Override
@ -135,7 +135,7 @@ final class ElectionImpl extends Impl implements Election {
requireNonNull(listener, "listener should not be null"); requireNonNull(listener, "listener should not be null");
LeaderRequest request = LeaderRequest.newBuilder() LeaderRequest request = LeaderRequest.newBuilder()
.setName(Util.prefixNamespace(electionName, namespace)) .setName(ByteString.copyFrom(electionName.getBytes()))
.build(); .build();
stubWithLeader().observeWithHandler(request, stubWithLeader().observeWithHandler(request,
@ -162,7 +162,7 @@ final class ElectionImpl extends Impl implements Election {
execute( execute(
() -> stubWithLeader().resign(request), () -> stubWithLeader().resign(request),
ResignResponse::new, ResignResponse::new,
Errors::isRetryableForNoSafeRedoOp)); Errors::isRetryable));
} }
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) { private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {

View File

@ -89,11 +89,9 @@ abstract class Impl {
*/ */
protected <S, T> CompletableFuture<T> execute( protected <S, T> CompletableFuture<T> execute(
Supplier<Future<S>> supplier, Supplier<Future<S>> supplier,
Function<S, T> resultConvert, Function<S, T> resultConvert) {
boolean autoRetry) {
return execute(supplier, resultConvert, return execute(supplier, resultConvert, Errors::isRetryable);
autoRetry ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
} }
/** /**

View File

@ -33,7 +33,6 @@ import io.etcd.jetcd.options.CompactOption;
import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption; import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.Errors; import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Requests; import io.etcd.jetcd.support.Requests;
@ -66,7 +65,7 @@ final class KVImpl extends Impl implements KV {
return execute( return execute(
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)), () -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
response -> new PutResponse(response, namespace), response -> new PutResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp); Errors::isRetryable);
} }
@Override @Override
@ -82,7 +81,7 @@ final class KVImpl extends Impl implements KV {
return execute( return execute(
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)), () -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
response -> new GetResponse(response, namespace), response -> new GetResponse(response, namespace),
Errors::isRetryableForSafeRedoOp); Errors::isRetryable);
} }
@Override @Override
@ -98,7 +97,7 @@ final class KVImpl extends Impl implements KV {
return execute( return execute(
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)), () -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
response -> new DeleteResponse(response, namespace), response -> new DeleteResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp); Errors::isRetryable);
} }
@Override @Override
@ -117,21 +116,15 @@ final class KVImpl extends Impl implements KV {
return execute( return execute(
() -> stub.compact(request), () -> stub.compact(request),
CompactResponse::new, CompactResponse::new,
Errors::isRetryableForSafeRedoOp); Errors::isRetryable);
} }
@Override @Override
public Txn txn() { public Txn txn() {
return txn(TxnOption.DEFAULT);
}
@Override
public Txn txn(TxnOption option) {
return TxnImpl.newTxn( return TxnImpl.newTxn(
request -> execute( request -> execute(
() -> stub.txn(request), () -> stub.txn(request),
response -> new TxnResponse(response, namespace), response -> new TxnResponse(response, namespace), Errors::isRetryable),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp),
namespace); namespace);
} }
} }

View File

@ -84,8 +84,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseGrantRequest.newBuilder() LeaseGrantRequest.newBuilder()
.setTTL(ttl) .setTTL(ttl)
.build()), .build()),
LeaseGrantResponse::new, LeaseGrantResponse::new);
true);
} }
@Override @Override
@ -95,8 +94,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseGrantRequest.newBuilder() LeaseGrantRequest.newBuilder()
.setTTL(ttl) .setTTL(ttl)
.build()), .build()),
LeaseGrantResponse::new, LeaseGrantResponse::new);
true);
} }
@Override @Override
@ -106,8 +104,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseRevokeRequest.newBuilder() LeaseRevokeRequest.newBuilder()
.setID(leaseId) .setID(leaseId)
.build()), .build()),
LeaseRevokeResponse::new, LeaseRevokeResponse::new);
true);
} }
@Override @Override
@ -121,8 +118,7 @@ final class LeaseImpl extends Impl implements Lease {
return execute( return execute(
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest), () -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
LeaseTimeToLiveResponse::new, LeaseTimeToLiveResponse::new);
true);
} }
@Override @Override

View File

@ -69,7 +69,7 @@ final class LockImpl extends Impl implements Lock {
return execute( return execute(
() -> stubWithLeader().lock(request), () -> stubWithLeader().lock(request),
response -> new LockResponse(response, namespace), response -> new LockResponse(response, namespace),
Errors::isRetryableForSafeRedoOp); Errors::isRetryable);
} }
@Override @Override
@ -83,6 +83,6 @@ final class LockImpl extends Impl implements Lock {
return execute( return execute(
() -> stubWithLeader().unlock(request), () -> stubWithLeader().unlock(request),
UnlockResponse::new, UnlockResponse::new,
Errors::isRetryableForSafeRedoOp); Errors::isRetryable);
} }
} }

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -356,7 +357,7 @@ final class WatchImpl extends Impl implements Watch {
private void reschedule() { private void reschedule() {
Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() { Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() {
@Override @Override
public void onFailure(Throwable t) { public void onFailure(@NonNull Throwable t) {
LOG.warn("scheduled resume failed", t); LOG.warn("scheduled resume failed", t);
} }

View File

@ -58,11 +58,7 @@ public class GetResponse extends AbstractResponse<RangeResponse> {
} }
/** /**
* Returns the number of keys within the range requested. * Returns the number of keys within the range when requested.
* Note this value is never affected by filtering options (limit, min or max created or modified revisions)
* Count is the count for keys on the range part of a request.
* Filters for limit and created or modified revision ranges restrict the
* returned KVs, but not the count.
* *
* @return count. * @return count.
*/ */

View File

@ -29,13 +29,11 @@ public final class DeleteOption {
private final ByteSequence endKey; private final ByteSequence endKey;
private final boolean prevKV; private final boolean prevKV;
private final boolean prefix; private final boolean prefix;
private final boolean autoRetry;
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix, final boolean autoRetry) { private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix) {
this.endKey = endKey; this.endKey = endKey;
this.prevKV = prevKV; this.prevKV = prevKV;
this.prefix = prefix; this.prefix = prefix;
this.autoRetry = autoRetry;
} }
public Optional<ByteSequence> getEndKey() { public Optional<ByteSequence> getEndKey() {
@ -51,25 +49,10 @@ public final class DeleteOption {
return prevKV; return prevKV;
} }
/**
* Whether to treat this deletion as deletion by prefix
*
* @return true if deletion by prefix.
*/
public boolean isPrefix() { public boolean isPrefix() {
return prefix; return prefix;
} }
/**
* Whether to treat a delete operation as idempotent from the point of view of automated retries.
* Note under failure scenarios this may mean a single delete is attempted more than once.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/** /**
* Returns the builder. * Returns the builder.
* *
@ -82,11 +65,6 @@ public final class DeleteOption {
return builder(); return builder();
} }
/**
* Returns the builder.
*
* @return the builder
*/
public static Builder builder() { public static Builder builder() {
return new Builder(); return new Builder();
} }
@ -95,7 +73,6 @@ public final class DeleteOption {
private ByteSequence endKey; private ByteSequence endKey;
private boolean prevKV = false; private boolean prevKV = false;
private boolean prefix = false; private boolean prefix = false;
private boolean autoRetry = false;
private Builder() { private Builder() {
} }
@ -167,22 +144,8 @@ public final class DeleteOption {
return this; return this;
} }
/**
* When autoRetry is set, the delete operation is treated as idempotent from the point of view of automated retries.
* Note under some failure scenarios true may make a delete operation be attempted more than once, where
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
public DeleteOption build() { public DeleteOption build() {
return new DeleteOption(endKey, prevKV, prefix, autoRetry); return new DeleteOption(endKey, prevKV, prefix);
} }
} }

View File

@ -20,7 +20,6 @@ import java.util.Optional;
import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.KV; import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
@ -77,158 +76,56 @@ public final class GetOption {
/** /**
* Get the maximum number of keys to return for a get request. * Get the maximum number of keys to return for a get request.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* </p>
*
*
* @return the maximum number of keys to return. * @return the maximum number of keys to return.
*/ */
public long getLimit() { public long getLimit() {
return this.limit; return this.limit;
} }
/**
* Get the end key for a range request
*
* @return the end key for a range request
*/
public Optional<ByteSequence> getEndKey() { public Optional<ByteSequence> getEndKey() {
return Optional.ofNullable(this.endKey); return Optional.ofNullable(this.endKey);
} }
/**
* Get the revision for the request
*
* @return the revision for the request
*/
public long getRevision() { public long getRevision() {
return revision; return revision;
} }
/**
* Get the sort order for the request
*
* @return the sort order for the request
*/
public SortOrder getSortOrder() { public SortOrder getSortOrder() {
return sortOrder; return sortOrder;
} }
/**
* Get the sort field for the request
*
* @return the sort field for the request
*/
public SortTarget getSortField() { public SortTarget getSortField() {
return sortTarget; return sortTarget;
} }
/**
* Return if the consistency level for this request is "serializable".
* Note serializable is a lower than default consistency, and implies
* the possibility of getting stale data.
*
* @return true if this request is only serializable consistency
*/
public boolean isSerializable() { public boolean isSerializable() {
return serializable; return serializable;
} }
/**
* True if this request should get only keys in a range and there is no
* need to retrieve the values.
*
* @return true if only get keys
*/
public boolean isKeysOnly() { public boolean isKeysOnly() {
return keysOnly; return keysOnly;
} }
/**
* True if this request should only populate the count of keys matching
* a range, and no other data.
*
* @return true if only get the count of keys
*/
public boolean isCountOnly() { public boolean isCountOnly() {
return countOnly; return countOnly;
} }
/**
* Only populate results for keys that match a
* minimum value for a created revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a min create revision option
* with the count only option.
* </p>
*
* @return minimum created revision to match, or zero for any.
*/
public long getMinCreateRevision() { public long getMinCreateRevision() {
return this.minCreateRevision; return this.minCreateRevision;
} }
/**
* Only populate results for keys that match a
* maximum value for a created revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a max create revision option
* with the count only option.
* </p>
*
* @return maximum created revision to match, or zero for any.
*/
public long getMaxCreateRevision() { public long getMaxCreateRevision() {
return this.maxCreateRevision; return this.maxCreateRevision;
} }
/**
* Only populate results for keys that match a
* minimum value for a modified revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a min mod revision option
* with the count only option.
* </p>
*
* @return minimum modified revision to match, or zero for any.
*/
public long getMinModRevision() { public long getMinModRevision() {
return this.minModRevision; return this.minModRevision;
} }
/**
* Only populate results for keys that match a
* maximum value for a modified revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a max mod revision option
* with the count only option.
* </p>
*
* @return maximum modified revision to match, or zero for any.
*/
public long getMaxModRevision() { public long getMaxModRevision() {
return this.maxModRevision; return this.maxModRevision;
} }
/**
* True if this Get request should do prefix match
*
* @return true if this Get request should do prefix match
*/
public boolean isPrefix() { public boolean isPrefix() {
return prefix; return prefix;
} }
@ -279,13 +176,6 @@ public final class GetOption {
/** /**
* Limit the number of keys to return for a get request. By default is 0 - no limitation. * Limit the number of keys to return for a get request. By default is 0 - no limitation.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param limit the maximum number of keys to return for a get request. * @param limit the maximum number of keys to return for a get request.
* @return builder * @return builder
*/ */
@ -339,9 +229,7 @@ public final class GetOption {
* <p> * <p>
* Get requests are linearizable by * Get requests are linearizable by
* default. For better performance, a serializable get request is served locally without needing * default. For better performance, a serializable get request is served locally without needing
* to reach consensus with other nodes in the cluster. Note this is a tradeoff with strict * to reach consensus with other nodes in the cluster.
* consistency so it should be used with care in situations where reading stale
* is acceptable.
* *
* @param serializable is the get request a serializable get request. * @param serializable is the get request a serializable get request.
* @return builder * @return builder
@ -428,17 +316,10 @@ public final class GetOption {
} }
/** /**
* Limit returned keys to those with create revision greater or equal than the provided value. * Limit returned keys to those with create revision greater than the provided value.
* min_create_revision is the lower bound for returned key create revisions; all keys with * min_create_revision is the lower bound for returned key create revisions; all keys with
* lesser create revisions will be filtered away. * lesser create revisions will be filtered away.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param createRevision create revision * @param createRevision create revision
* @return builder * @return builder
*/ */
@ -448,17 +329,10 @@ public final class GetOption {
} }
/** /**
* Limit returned keys to those with create revision less or equal than the provided value. * Limit returned keys to those with create revision less than the provided value.
* max_create_revision is the upper bound for returned key create revisions; all keys with * max_create_revision is the upper bound for returned key create revisions; all keys with
* greater create revisions will be filtered away. * greater create revisions will be filtered away.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param createRevision create revision * @param createRevision create revision
* @return builder * @return builder
*/ */
@ -468,17 +342,10 @@ public final class GetOption {
} }
/** /**
* Limit returned keys to those with mod revision greater or equal than the provided value. * Limit returned keys to those with mod revision greater than the provided value.
* min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod * min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod
* revisions will be filtered away. * revisions will be filtered away.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param modRevision mod revision * @param modRevision mod revision
* @return this builder instance * @return this builder instance
*/ */
@ -488,17 +355,10 @@ public final class GetOption {
} }
/** /**
* Limit returned keys to those with mod revision less or equal than the provided value. max_mod_revision * Limit returned keys to those with mod revision less than the provided value. max_mod_revision
* is the upper bound for returned key mod revisions; all keys with greater mod revisions will * is the upper bound for returned key mod revisions; all keys with greater mod revisions will
* be filtered away. * be filtered away.
* *
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param modRevision mod revision * @param modRevision mod revision
* @return this builder instance * @return this builder instance
*/ */

View File

@ -26,12 +26,10 @@ public final class PutOption {
private final long leaseId; private final long leaseId;
private final boolean prevKV; private final boolean prevKV;
private final boolean autoRetry;
private PutOption(long leaseId, boolean prevKV, boolean autoRetry) { private PutOption(long leaseId, boolean prevKV) {
this.leaseId = leaseId; this.leaseId = leaseId;
this.prevKV = prevKV; this.prevKV = prevKV;
this.autoRetry = autoRetry;
} }
/** /**
@ -52,16 +50,6 @@ public final class PutOption {
return this.prevKV; return this.prevKV;
} }
/**
* Whether to treat a put operation as idempotent from the point of view of automated retries.
* Note under failure scenarios this may mean a single put executes more than once.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/** /**
* Returns the builder. * Returns the builder.
* *
@ -85,7 +73,6 @@ public final class PutOption {
private long leaseId = 0L; private long leaseId = 0L;
private boolean prevKV = false; private boolean prevKV = false;
private boolean autoRetry = false;
private Builder() { private Builder() {
} }
@ -113,28 +100,13 @@ public final class PutOption {
return this; return this;
} }
/**
* When autoRetry is set, treat this put as idempotent from the point of view of automated retries.
* Note under some failure scenarios autoRetry=true may make a put operation execute more than once, where
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure that the operation did not happen
* in the server.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
/** /**
* build the put option. * build the put option.
* *
* @return the put option * @return the put option
*/ */
public PutOption build() { public PutOption build() {
return new PutOption(this.leaseId, this.prevKV, this.autoRetry); return new PutOption(this.leaseId, this.prevKV);
} }
} }

View File

@ -1,54 +0,0 @@
package io.etcd.jetcd.options;
public final class TxnOption {
public static final TxnOption DEFAULT = builder().build();
private final boolean autoRetry;
private TxnOption(final boolean autoRetry) {
this.autoRetry = autoRetry;
}
/**
* Whether to treat a txn operation as idempotent from the point of view of automated retries.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/**
* Returns the builder.
*
* @return the builder
*/
public static TxnOption.Builder builder() {
return new TxnOption.Builder();
}
public static final class Builder {
private boolean autoRetry = false;
private Builder() {
}
/**
* When autoRetry is set, the txn operation is treated as idempotent from the point of view of automated retries.
* Note under some failure scenarios true may make a txn operation be attempted and/or execute more than once, where
* a first attempt executed but its result status did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
public TxnOption build() {
return new TxnOption(autoRetry);
}
}
}

View File

@ -19,6 +19,7 @@ package io.etcd.jetcd.resolver;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -29,6 +30,8 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -44,6 +47,7 @@ public abstract class AbstractNameResolver extends NameResolver {
private volatile boolean shutdown; private volatile boolean shutdown;
private volatile boolean resolving; private volatile boolean resolving;
private Executor executor;
private Listener listener; private Listener listener;
public AbstractNameResolver(URI targetUri) { public AbstractNameResolver(URI targetUri) {
@ -65,6 +69,7 @@ public abstract class AbstractNameResolver extends NameResolver {
public void start(Listener listener) { public void start(Listener listener) {
synchronized (lock) { synchronized (lock) {
Preconditions.checkState(this.listener == null, "already started"); Preconditions.checkState(this.listener == null, "already started");
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
this.listener = Objects.requireNonNull(listener, "listener"); this.listener = Objects.requireNonNull(listener, "listener");
resolve(); resolve();
} }
@ -81,13 +86,21 @@ public abstract class AbstractNameResolver extends NameResolver {
return; return;
} }
shutdown = true; shutdown = true;
synchronized (lock) {
if (executor != null) {
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
} }
private void resolve() { private void resolve() {
if (resolving || shutdown) { if (resolving || shutdown) {
return; return;
} }
doResolve(); synchronized (lock) {
executor.execute(this::doResolve);
}
} }
private void doResolve() { private void doResolve() {

View File

@ -26,18 +26,9 @@ public final class Errors {
private Errors() { private Errors() {
} }
// isRetryable implementation for idempotent operations. public static boolean isRetryable(Status status) {
public static boolean isRetryableForSafeRedoOp(Status status) { return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status)
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isAlwaysSafeToRetry(status); || isAuthStoreExpired(status);
}
// isRetryable implementation for non-idempotent operations
public static boolean isRetryableForNoSafeRedoOp(Status status) {
return isAlwaysSafeToRetry(status);
}
public static boolean isAlwaysSafeToRetry(Status status) {
return isInvalidTokenError(status) || isAuthStoreExpired(status);
} }
public static boolean isInvalidTokenError(Throwable e) { public static boolean isInvalidTokenError(Throwable e) {

View File

@ -50,11 +50,6 @@ public class ClientBuilderTest {
assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class); assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class);
} }
@Test
public void testVertx_Null() {
assertThatThrownBy(() -> Client.builder().vertx(null)).isInstanceOf(IllegalArgumentException.class);
}
@Test @Test
public void testEndPoints_Verify_Empty() { public void testEndPoints_Verify_Empty() {
assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class);

View File

@ -316,7 +316,7 @@ public class KVTest {
for (int i = 0; i < putCount; ++i) { for (int i = 0; i < putCount; ++i) {
ByteSequence value = ByteSequence ByteSequence value = ByteSequence
.from(Integer.toString(i), StandardCharsets.UTF_8); .from(Integer.toString(i), StandardCharsets.UTF_8);
customClient.getKVClient().put(key, value, PutOption.builder().withAutoRetry().build()).join(); customClient.getKVClient().put(key, value).join();
} }
}); });

View File

@ -39,18 +39,16 @@ class UtilTest {
} }
@Test @Test
public void testAuthErrorIsRetryable() { public void testAuthErrorIsNotRetryable() {
Status authErrorStatus = Status.UNAUTHENTICATED Status authErrorStatus = Status.UNAUTHENTICATED
.withDescription("etcdserver: invalid auth token"); .withDescription("etcdserver: invalid auth token");
Status status = Status.fromThrowable(new StatusException(authErrorStatus)); Status status = Status.fromThrowable(new StatusException(authErrorStatus));
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isTrue(); assertThat(Errors.isRetryable(status)).isTrue();
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
} }
@Test @Test
public void testUnavailableErrorIsRetryable() { public void testUnavailableErrorIsRetryable() {
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE)); Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isFalse(); assertThat(Errors.isRetryable(status)).isTrue();
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
} }
} }

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,6 +31,8 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver; import io.grpc.NameResolver;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -44,6 +47,7 @@ public class EtcdClusterNameResolver extends NameResolver {
private volatile boolean shutdown; private volatile boolean shutdown;
private volatile boolean resolving; private volatile boolean resolving;
private Executor executor;
private Listener listener; private Listener listener;
public EtcdClusterNameResolver(URI targetUri) { public EtcdClusterNameResolver(URI targetUri) {
@ -61,6 +65,7 @@ public class EtcdClusterNameResolver extends NameResolver {
public void start(Listener listener) { public void start(Listener listener) {
synchronized (lock) { synchronized (lock) {
Preconditions.checkState(this.listener == null, "already started"); Preconditions.checkState(this.listener == null, "already started");
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
this.listener = Objects.requireNonNull(listener, "listener"); this.listener = Objects.requireNonNull(listener, "listener");
resolve(); resolve();
} }
@ -77,14 +82,21 @@ public class EtcdClusterNameResolver extends NameResolver {
return; return;
} }
shutdown = true; shutdown = true;
synchronized (lock) {
if (executor != null) {
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
} }
private void resolve() { private void resolve() {
if (resolving || shutdown) { if (resolving || shutdown) {
return; return;
} }
synchronized (lock) {
doResolve(); executor.execute(this::doResolve);
}
} }
private void doResolve() { private void doResolve() {