Compare commits

..

No commits in common. "main" and "jetcd-0.8.1" have entirely different histories.

37 changed files with 125 additions and 479 deletions

View File

@ -15,8 +15,8 @@ assignees: ''
A clear and concise description of what the bug is.
**To Reproduce**
Steps or reproducer to reproduce the behavior in a form of a unit test.
This section *must* be provided, if not, the issue may not get attention since the maintainers have very limited capacity.
Steps or reproducer to reproduce the behavior.
This section *must* be provided, if not, the issue may not get attention.
**Expected behavior**
A clear and concise description of what you expected to happen.

View File

@ -42,8 +42,8 @@ jobs:
- 17
- 21
etcd:
- quay.io/coreos/etcd:v3.5.21
- quay.io/coreos/etcd:v3.6.0
- gcr.io/etcd-development/etcd:v3.5.10
- gcr.io/etcd-development/etcd:v3.4.27
uses: ./.github/workflows/build.yml
with:
javaVersion: "${{ matrix.java-version }}"

View File

@ -43,8 +43,8 @@ jobs:
- 17
- 21
etcd:
- quay.io/coreos/etcd:v3.5.21
- quay.io/coreos/etcd:v3.6.0
- gcr.io/etcd-development/etcd:v3.5.10
- gcr.io/etcd-development/etcd:v3.4.27
uses: ./.github/workflows/build.yml
with:
javaVersion: "${{ matrix.java-version }}"

View File

@ -29,11 +29,10 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
days-before-stale: 60
days-before-close: 7
only-labels: 'waiting-for-feedbacks'
stale-issue-label: stale
exempt-issue-labels: never-stale
exempt-issue-label: never-stale
stale-pr-label: stale
exempt-pr-labels: never-stale
exempt-pr-label: never-stale
stale-issue-message: |
This issue is stale because it has been open 60 days with no activity.
Remove stale label or comment or this will be closed in 7 days.

3
OWNERS
View File

@ -3,4 +3,5 @@
approvers:
- fanminshi # Fanmin Shi <fanmin.shi@coreos.com>
- lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com>
- vorburger # Michael Vorburger <mike@vorburger.ch>
- xiang90 # Xiang Li <xiang.li@coreos.com>
- heyitsanthony # Anthony Romano <anthony.romano@coreos.com>

View File

@ -111,7 +111,7 @@ The project is tested against a three node `etcd` setup started with the Launche
```sh
$ ./gradlew test
```
````
### Troubleshooting
@ -129,4 +129,3 @@ See [CONTRIBUTING](https://github.com/etcd-io/jetcd/blob/master/CONTRIBUTING.md)
jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details.

View File

@ -76,13 +76,6 @@ subprojects {
maxFailures = 5
}
}
testing {
suites {
test {
useJUnitJupiter()
}
}
}
testlogger {
theme 'mocha-parallel'
@ -90,11 +83,6 @@ subprojects {
}
tasks.register('allDeps', DependencyReportTask)
tasks.withType(AbstractArchiveTask).configureEach {
preserveFileTimestamps = false
reproducibleFileOrder = true
}
}

View File

@ -1,33 +1,33 @@
[versions]
grpc = "1.74.0"
log4j = "2.25.1"
mockito = "5.18.0"
slf4j = "2.0.17"
guava = "33.4.8-jre"
assertj = "3.27.3"
junit = "5.13.4"
testcontainers = "1.21.3"
grpc = "1.64.0"
log4j = "2.23.1"
mockito = "5.12.0"
slf4j = "2.0.13"
guava = "33.2.0-jre"
assertj = "3.26.0"
junit = "5.10.2"
testcontainers = "1.19.8"
protoc = "3.25.1"
failsafe = "3.3.2"
awaitility = "4.3.0"
commonsIo = "2.20.0"
commonCompress = "1.28.0"
awaitility = "4.2.1"
commonsIo = "2.16.1"
commonCompress = "1.26.2"
autoService = "1.1.1"
errorprone = "2.30.0"
vertx = "5.0.1"
picocli = "4.7.7"
restAssured = "5.5.5"
errorprone = "2.27.1"
vertx = "4.5.8"
picocli = "4.7.6"
restAssured = "5.4.0"
javaxAnnotation = "1.3.2"
versionsPlugin = "0.52.0"
errorPronePlugin = "4.0.1"
spotlessPlugin = "6.25.0"
versionsPlugin = "0.50.0"
errorPronePlugin = "3.1.0"
spotlessPlugin = "6.23.3"
shadowPlugin = "8.1.1"
testLoggerPlugin = "4.0.0"
protobufPlugin = "0.9.5"
nexusPublishPlugin = "2.0.0"
axionReleasePlugin = "1.18.17"
testRetryPlugin = "1.6.2"
protobufPlugin = "0.9.4"
nexusPublishPlugin = "1.3.0"
axionReleasePlugin = "1.16.1"
testRetryPlugin = "1.5.8"
[libraries]

Binary file not shown.

View File

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

12
gradlew vendored
View File

@ -15,8 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#
##############################################################################
#
@ -57,7 +55,7 @@
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
@ -86,7 +84,7 @@ done
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
@ -114,7 +112,7 @@ case "$( uname )" in #(
NONSTOP* ) nonstop=true ;;
esac
CLASSPATH="\\\"\\\""
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
@ -205,7 +203,7 @@ fi
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.
@ -213,7 +211,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
org.gradle.wrapper.GradleWrapperMain \
"$@"
# Stop when "xargs" is not available.

6
gradlew.bat vendored
View File

@ -13,8 +13,6 @@
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@rem SPDX-License-Identifier: Apache-2.0
@rem
@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@ -70,11 +68,11 @@ goto fail
:execute
@rem Setup the command line
set CLASSPATH=
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell

View File

@ -37,7 +37,6 @@ import io.grpc.Metadata;
import io.grpc.netty.GrpcSslContexts;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.vertx.core.Vertx;
import com.google.common.base.Strings;
@ -69,7 +68,6 @@ public final class ClientBuilder implements Cloneable {
private Duration retryMaxDuration;
private Duration connectTimeout;
private boolean waitForReady = true;
private Vertx vertx;
ClientBuilder() {
}
@ -709,30 +707,6 @@ public final class ClientBuilder implements Cloneable {
return this;
}
/**
* Gets the Vertx instance.
*
* @return the vertx instance.
*/
public Vertx vertx() {
return vertx;
}
/**
* configure Vertx instance.
*
* @param vertx Vertx instance to use.
* @return this builder to train
* @throws IllegalArgumentException if vertx is null
*/
public ClientBuilder vertx(Vertx vertx) {
Preconditions.checkArgument(vertx != null, "vertx can't be null");
this.vertx = vertx;
return this;
}
/**
* build a new Client.
*

View File

@ -26,7 +26,6 @@ import io.etcd.jetcd.options.CompactOption;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.CloseableClient;
/**
@ -116,12 +115,4 @@ public interface KV extends CloseableClient {
* @return a Txn
*/
Txn txn();
/**
* creates a transaction.
*
* @param option TxnOption
* @return a Txn
*/
Txn txn(TxnOption option);
}

View File

@ -61,10 +61,6 @@ final class ClientConnectionManager {
} else {
this.executorService = builder.executorService();
}
if (builder.vertx() != null) {
this.vertx = builder.vertx();
}
}
ManagedChannel getChannel() {

View File

@ -88,7 +88,7 @@ final class ElectionImpl extends Impl implements Election {
execute(
() -> stubWithLeader().campaign(request),
CampaignResponse::new,
Errors::isRetryableForNoSafeRedoOp));
Errors::isRetryable));
}
@Override
@ -111,7 +111,7 @@ final class ElectionImpl extends Impl implements Election {
execute(
() -> stubWithLeader().proclaim(request),
ProclaimResponse::new,
Errors::isRetryableForNoSafeRedoOp));
Errors::isRetryable));
}
@Override
@ -126,7 +126,7 @@ final class ElectionImpl extends Impl implements Election {
execute(
() -> stubWithLeader().leader(request),
response -> new LeaderResponse(response, namespace),
Errors::isRetryableForNoSafeRedoOp));
Errors::isRetryable));
}
@Override
@ -135,7 +135,7 @@ final class ElectionImpl extends Impl implements Election {
requireNonNull(listener, "listener should not be null");
LeaderRequest request = LeaderRequest.newBuilder()
.setName(Util.prefixNamespace(electionName, namespace))
.setName(ByteString.copyFrom(electionName.getBytes()))
.build();
stubWithLeader().observeWithHandler(request,
@ -162,7 +162,7 @@ final class ElectionImpl extends Impl implements Election {
execute(
() -> stubWithLeader().resign(request),
ResignResponse::new,
Errors::isRetryableForNoSafeRedoOp));
Errors::isRetryable));
}
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {

View File

@ -89,11 +89,9 @@ abstract class Impl {
*/
protected <S, T> CompletableFuture<T> execute(
Supplier<Future<S>> supplier,
Function<S, T> resultConvert,
boolean autoRetry) {
Function<S, T> resultConvert) {
return execute(supplier, resultConvert,
autoRetry ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
return execute(supplier, resultConvert, Errors::isRetryable);
}
/**

View File

@ -33,7 +33,6 @@ import io.etcd.jetcd.options.CompactOption;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.TxnOption;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Requests;
@ -66,7 +65,7 @@ final class KVImpl extends Impl implements KV {
return execute(
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
response -> new PutResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
Errors::isRetryable);
}
@Override
@ -82,7 +81,7 @@ final class KVImpl extends Impl implements KV {
return execute(
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
response -> new GetResponse(response, namespace),
Errors::isRetryableForSafeRedoOp);
Errors::isRetryable);
}
@Override
@ -98,7 +97,7 @@ final class KVImpl extends Impl implements KV {
return execute(
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
response -> new DeleteResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
Errors::isRetryable);
}
@Override
@ -117,21 +116,15 @@ final class KVImpl extends Impl implements KV {
return execute(
() -> stub.compact(request),
CompactResponse::new,
Errors::isRetryableForSafeRedoOp);
Errors::isRetryable);
}
@Override
public Txn txn() {
return txn(TxnOption.DEFAULT);
}
@Override
public Txn txn(TxnOption option) {
return TxnImpl.newTxn(
request -> execute(
() -> stub.txn(request),
response -> new TxnResponse(response, namespace),
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp),
response -> new TxnResponse(response, namespace), Errors::isRetryable),
namespace);
}
}

View File

@ -84,8 +84,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseGrantRequest.newBuilder()
.setTTL(ttl)
.build()),
LeaseGrantResponse::new,
true);
LeaseGrantResponse::new);
}
@Override
@ -95,8 +94,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseGrantRequest.newBuilder()
.setTTL(ttl)
.build()),
LeaseGrantResponse::new,
true);
LeaseGrantResponse::new);
}
@Override
@ -106,8 +104,7 @@ final class LeaseImpl extends Impl implements Lease {
LeaseRevokeRequest.newBuilder()
.setID(leaseId)
.build()),
LeaseRevokeResponse::new,
true);
LeaseRevokeResponse::new);
}
@Override
@ -121,8 +118,7 @@ final class LeaseImpl extends Impl implements Lease {
return execute(
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
LeaseTimeToLiveResponse::new,
true);
LeaseTimeToLiveResponse::new);
}
@Override

View File

@ -69,7 +69,7 @@ final class LockImpl extends Impl implements Lock {
return execute(
() -> stubWithLeader().lock(request),
response -> new LockResponse(response, namespace),
Errors::isRetryableForSafeRedoOp);
Errors::isRetryable);
}
@Override
@ -83,6 +83,6 @@ final class LockImpl extends Impl implements Lock {
return execute(
() -> stubWithLeader().unlock(request),
UnlockResponse::new,
Errors::isRetryableForSafeRedoOp);
Errors::isRetryable);
}
}

View File

@ -23,17 +23,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.api.VertxWatchGrpc;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchProgressRequest;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.api.*;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.options.OptionsUtil;
@ -41,6 +37,7 @@ import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.Errors;
import io.etcd.jetcd.support.Util;
import io.grpc.Status;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import com.google.common.base.Strings;
@ -49,10 +46,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newClosedWatchClientException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newCompactedException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.*;
/**
* watch Implementation.
@ -123,8 +117,10 @@ final class WatchImpl extends Impl implements Watch {
private final Listener listener;
private final AtomicBoolean closed;
//private StreamObserver<WatchRequest> stream;
private final AtomicReference<WriteStream<WatchRequest>> wstream;
private final AtomicBoolean started;
private ReadStream<WatchResponse> rstream;
private long revision;
private long id;
@ -136,6 +132,7 @@ final class WatchImpl extends Impl implements Watch {
this.started = new AtomicBoolean();
this.wstream = new AtomicReference<>();
this.rstream = null;
this.id = -1;
this.revision = this.option.getRevision();
}
@ -157,7 +154,7 @@ final class WatchImpl extends Impl implements Watch {
}
if (started.compareAndSet(false, true)) {
// id is not really useful today, but it may be in etcd 3.4
// id is not really useful today but it may be in etcd 3.4
id = -1;
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
@ -169,7 +166,7 @@ final class WatchImpl extends Impl implements Watch {
.map(endKey -> Util.prefixNamespaceToRangeEnd(endKey, namespace))
.ifPresent(builder::setRangeEnd);
if (option.getEndKey().isEmpty() && option.isPrefix()) {
if (!option.getEndKey().isPresent() && option.isPrefix()) {
ByteSequence endKey = OptionsUtil.prefixEndOf(key);
builder.setRangeEnd(Util.prefixNamespaceToRangeEnd(endKey, namespace));
}
@ -182,7 +179,7 @@ final class WatchImpl extends Impl implements Watch {
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
}
var ignored = Util.applyRequireLeader(option.withRequireLeader(), stub)
rstream = Util.applyRequireLeader(option.withRequireLeader(), stub)
.watchWithHandler(
stream -> {
wstream.set(stream);
@ -356,7 +353,7 @@ final class WatchImpl extends Impl implements Watch {
private void reschedule() {
Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
public void onFailure(@NonNull Throwable t) {
LOG.warn("scheduled resume failed", t);
}

View File

@ -58,11 +58,7 @@ public class GetResponse extends AbstractResponse<RangeResponse> {
}
/**
* Returns the number of keys within the range requested.
* Note this value is never affected by filtering options (limit, min or max created or modified revisions)
* Count is the count for keys on the range part of a request.
* Filters for limit and created or modified revision ranges restrict the
* returned KVs, but not the count.
* Returns the number of keys within the range when requested.
*
* @return count.
*/

View File

@ -29,13 +29,11 @@ public final class DeleteOption {
private final ByteSequence endKey;
private final boolean prevKV;
private final boolean prefix;
private final boolean autoRetry;
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix, final boolean autoRetry) {
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix) {
this.endKey = endKey;
this.prevKV = prevKV;
this.prefix = prefix;
this.autoRetry = autoRetry;
}
public Optional<ByteSequence> getEndKey() {
@ -51,25 +49,10 @@ public final class DeleteOption {
return prevKV;
}
/**
* Whether to treat this deletion as deletion by prefix
*
* @return true if deletion by prefix.
*/
public boolean isPrefix() {
return prefix;
}
/**
* Whether to treat a delete operation as idempotent from the point of view of automated retries.
* Note under failure scenarios this may mean a single delete is attempted more than once.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/**
* Returns the builder.
*
@ -82,11 +65,6 @@ public final class DeleteOption {
return builder();
}
/**
* Returns the builder.
*
* @return the builder
*/
public static Builder builder() {
return new Builder();
}
@ -95,7 +73,6 @@ public final class DeleteOption {
private ByteSequence endKey;
private boolean prevKV = false;
private boolean prefix = false;
private boolean autoRetry = false;
private Builder() {
}
@ -167,22 +144,8 @@ public final class DeleteOption {
return this;
}
/**
* When autoRetry is set, the delete operation is treated as idempotent from the point of view of automated retries.
* Note under some failure scenarios true may make a delete operation be attempted more than once, where
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
public DeleteOption build() {
return new DeleteOption(endKey, prevKV, prefix, autoRetry);
return new DeleteOption(endKey, prevKV, prefix);
}
}

View File

@ -20,7 +20,6 @@ import java.util.Optional;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import static java.util.Objects.requireNonNull;
@ -77,158 +76,56 @@ public final class GetOption {
/**
* Get the maximum number of keys to return for a get request.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* </p>
*
*
* @return the maximum number of keys to return.
*/
public long getLimit() {
return this.limit;
}
/**
* Get the end key for a range request
*
* @return the end key for a range request
*/
public Optional<ByteSequence> getEndKey() {
return Optional.ofNullable(this.endKey);
}
/**
* Get the revision for the request
*
* @return the revision for the request
*/
public long getRevision() {
return revision;
}
/**
* Get the sort order for the request
*
* @return the sort order for the request
*/
public SortOrder getSortOrder() {
return sortOrder;
}
/**
* Get the sort field for the request
*
* @return the sort field for the request
*/
public SortTarget getSortField() {
return sortTarget;
}
/**
* Return if the consistency level for this request is "serializable".
* Note serializable is a lower than default consistency, and implies
* the possibility of getting stale data.
*
* @return true if this request is only serializable consistency
*/
public boolean isSerializable() {
return serializable;
}
/**
* True if this request should get only keys in a range and there is no
* need to retrieve the values.
*
* @return true if only get keys
*/
public boolean isKeysOnly() {
return keysOnly;
}
/**
* True if this request should only populate the count of keys matching
* a range, and no other data.
*
* @return true if only get the count of keys
*/
public boolean isCountOnly() {
return countOnly;
}
/**
* Only populate results for keys that match a
* minimum value for a created revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a min create revision option
* with the count only option.
* </p>
*
* @return minimum created revision to match, or zero for any.
*/
public long getMinCreateRevision() {
return this.minCreateRevision;
}
/**
* Only populate results for keys that match a
* maximum value for a created revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a max create revision option
* with the count only option.
* </p>
*
* @return maximum created revision to match, or zero for any.
*/
public long getMaxCreateRevision() {
return this.maxCreateRevision;
}
/**
* Only populate results for keys that match a
* minimum value for a modified revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a min mod revision option
* with the count only option.
* </p>
*
* @return minimum modified revision to match, or zero for any.
*/
public long getMinModRevision() {
return this.minModRevision;
}
/**
* Only populate results for keys that match a
* maximum value for a modified revision.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting a max mod revision option
* with the count only option.
* </p>
*
* @return maximum modified revision to match, or zero for any.
*/
public long getMaxModRevision() {
return this.maxModRevision;
}
/**
* True if this Get request should do prefix match
*
* @return true if this Get request should do prefix match
*/
public boolean isPrefix() {
return prefix;
}
@ -279,13 +176,6 @@ public final class GetOption {
/**
* Limit the number of keys to return for a get request. By default is 0 - no limitation.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param limit the maximum number of keys to return for a get request.
* @return builder
*/
@ -339,9 +229,7 @@ public final class GetOption {
* <p>
* Get requests are linearizable by
* default. For better performance, a serializable get request is served locally without needing
* to reach consensus with other nodes in the cluster. Note this is a tradeoff with strict
* consistency so it should be used with care in situations where reading stale
* is acceptable.
* to reach consensus with other nodes in the cluster.
*
* @param serializable is the get request a serializable get request.
* @return builder
@ -428,17 +316,10 @@ public final class GetOption {
}
/**
* Limit returned keys to those with create revision greater or equal than the provided value.
* Limit returned keys to those with create revision greater than the provided value.
* min_create_revision is the lower bound for returned key create revisions; all keys with
* lesser create revisions will be filtered away.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param createRevision create revision
* @return builder
*/
@ -448,17 +329,10 @@ public final class GetOption {
}
/**
* Limit returned keys to those with create revision less or equal than the provided value.
* Limit returned keys to those with create revision less than the provided value.
* max_create_revision is the upper bound for returned key create revisions; all keys with
* greater create revisions will be filtered away.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param createRevision create revision
* @return builder
*/
@ -468,17 +342,10 @@ public final class GetOption {
}
/**
* Limit returned keys to those with mod revision greater or equal than the provided value.
* Limit returned keys to those with mod revision greater than the provided value.
* min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod
* revisions will be filtered away.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param modRevision mod revision
* @return this builder instance
*/
@ -488,17 +355,10 @@ public final class GetOption {
}
/**
* Limit returned keys to those with mod revision less or equal than the provided value. max_mod_revision
* Limit returned keys to those with mod revision less than the provided value. max_mod_revision
* is the upper bound for returned key mod revisions; all keys with greater mod revisions will
* be filtered away.
*
* <p>
* Note this filter does not affect the count field in GetResponse.
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
* For the same reason, it would be meaningless to mix setting this option
* with the count only option.
* </p>
*
* @param modRevision mod revision
* @return this builder instance
*/

View File

@ -26,12 +26,10 @@ public final class PutOption {
private final long leaseId;
private final boolean prevKV;
private final boolean autoRetry;
private PutOption(long leaseId, boolean prevKV, boolean autoRetry) {
private PutOption(long leaseId, boolean prevKV) {
this.leaseId = leaseId;
this.prevKV = prevKV;
this.autoRetry = autoRetry;
}
/**
@ -52,16 +50,6 @@ public final class PutOption {
return this.prevKV;
}
/**
* Whether to treat a put operation as idempotent from the point of view of automated retries.
* Note under failure scenarios this may mean a single put executes more than once.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/**
* Returns the builder.
*
@ -85,7 +73,6 @@ public final class PutOption {
private long leaseId = 0L;
private boolean prevKV = false;
private boolean autoRetry = false;
private Builder() {
}
@ -113,28 +100,13 @@ public final class PutOption {
return this;
}
/**
* When autoRetry is set, treat this put as idempotent from the point of view of automated retries.
* Note under some failure scenarios autoRetry=true may make a put operation execute more than once, where
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure that the operation did not happen
* in the server.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
/**
* build the put option.
*
* @return the put option
*/
public PutOption build() {
return new PutOption(this.leaseId, this.prevKV, this.autoRetry);
return new PutOption(this.leaseId, this.prevKV);
}
}

View File

@ -1,54 +0,0 @@
package io.etcd.jetcd.options;
public final class TxnOption {
public static final TxnOption DEFAULT = builder().build();
private final boolean autoRetry;
private TxnOption(final boolean autoRetry) {
this.autoRetry = autoRetry;
}
/**
* Whether to treat a txn operation as idempotent from the point of view of automated retries.
*
* @return true if automated retries should happen.
*/
public boolean isAutoRetry() {
return autoRetry;
}
/**
* Returns the builder.
*
* @return the builder
*/
public static TxnOption.Builder builder() {
return new TxnOption.Builder();
}
public static final class Builder {
private boolean autoRetry = false;
private Builder() {
}
/**
* When autoRetry is set, the txn operation is treated as idempotent from the point of view of automated retries.
* Note under some failure scenarios true may make a txn operation be attempted and/or execute more than once, where
* a first attempt executed but its result status did not reach the client; by default (autoRetry=false),
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
*
* @return builder
*/
public Builder withAutoRetry() {
this.autoRetry = true;
return this;
}
public TxnOption build() {
return new TxnOption(autoRetry);
}
}
}

View File

@ -19,6 +19,7 @@ package io.etcd.jetcd.resolver;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,6 +30,8 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import com.google.common.base.Preconditions;
@ -44,6 +47,7 @@ public abstract class AbstractNameResolver extends NameResolver {
private volatile boolean shutdown;
private volatile boolean resolving;
private Executor executor;
private Listener listener;
public AbstractNameResolver(URI targetUri) {
@ -65,6 +69,7 @@ public abstract class AbstractNameResolver extends NameResolver {
public void start(Listener listener) {
synchronized (lock) {
Preconditions.checkState(this.listener == null, "already started");
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
this.listener = Objects.requireNonNull(listener, "listener");
resolve();
}
@ -81,13 +86,21 @@ public abstract class AbstractNameResolver extends NameResolver {
return;
}
shutdown = true;
synchronized (lock) {
if (executor != null) {
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
}
private void resolve() {
if (resolving || shutdown) {
return;
}
doResolve();
synchronized (lock) {
executor.execute(this::doResolve);
}
}
private void doResolve() {

View File

@ -26,18 +26,9 @@ public final class Errors {
private Errors() {
}
// isRetryable implementation for idempotent operations.
public static boolean isRetryableForSafeRedoOp(Status status) {
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isAlwaysSafeToRetry(status);
}
// isRetryable implementation for non-idempotent operations
public static boolean isRetryableForNoSafeRedoOp(Status status) {
return isAlwaysSafeToRetry(status);
}
public static boolean isAlwaysSafeToRetry(Status status) {
return isInvalidTokenError(status) || isAuthStoreExpired(status);
public static boolean isRetryable(Status status) {
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status)
|| isAuthStoreExpired(status);
}
public static boolean isInvalidTokenError(Throwable e) {

View File

@ -50,11 +50,6 @@ public class ClientBuilderTest {
assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class);
}
@Test
public void testVertx_Null() {
assertThatThrownBy(() -> Client.builder().vertx(null)).isInstanceOf(IllegalArgumentException.class);
}
@Test
public void testEndPoints_Verify_Empty() {
assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class);

View File

@ -316,7 +316,7 @@ public class KVTest {
for (int i = 0; i < putCount; ++i) {
ByteSequence value = ByteSequence
.from(Integer.toString(i), StandardCharsets.UTF_8);
customClient.getKVClient().put(key, value, PutOption.builder().withAutoRetry().build()).join();
customClient.getKVClient().put(key, value).join();
}
});

View File

@ -39,18 +39,16 @@ class UtilTest {
}
@Test
public void testAuthErrorIsRetryable() {
public void testAuthErrorIsNotRetryable() {
Status authErrorStatus = Status.UNAUTHENTICATED
.withDescription("etcdserver: invalid auth token");
Status status = Status.fromThrowable(new StatusException(authErrorStatus));
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isTrue();
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
assertThat(Errors.isRetryable(status)).isTrue();
}
@Test
public void testUnavailableErrorIsRetryable() {
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isFalse();
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
assertThat(Errors.isRetryable(status)).isTrue();
}
}

View File

@ -28,7 +28,7 @@ import org.testcontainers.containers.Network;
import com.google.common.base.Strings;
public final class Etcd {
public static final String CONTAINER_IMAGE = "quay.io/coreos/etcd:v3.5.14";
public static final String CONTAINER_IMAGE = "gcr.io/etcd-development/etcd:v3.5.10";
public static final int ETCD_CLIENT_PORT = 2379;
public static final int ETCD_PEER_PORT = 2380;
public static final String ETCD_DATA_DIR = "/data.etcd";
@ -58,7 +58,6 @@ public final class Etcd {
private List<String> additionalArgs;
private Network network;
private boolean shouldMountDataDirectory = true;
private String user;
public Builder withClusterName(String clusterName) {
this.clusterName = clusterName;
@ -115,18 +114,12 @@ public final class Etcd {
debug,
additionalArgs,
network != null ? network : Network.SHARED,
shouldMountDataDirectory,
user);
shouldMountDataDirectory);
}
public Builder withMountedDataDirectory(boolean shouldMountDataDirectory) {
this.shouldMountDataDirectory = shouldMountDataDirectory;
return this;
}
public Builder withUser(String user) {
this.user = user;
return this;
}
}
}

View File

@ -43,8 +43,7 @@ public class EtcdClusterImpl implements EtcdCluster {
boolean debug,
Collection<String> additionalArgs,
Network network,
boolean shouldMountDataDirectory,
String user) {
boolean shouldMountDataDirectory) {
this.clusterName = clusterName;
this.endpoints = IntStream.range(0, nodes)
@ -58,8 +57,7 @@ public class EtcdClusterImpl implements EtcdCluster {
.withDebug(debug)
.withAdditionalArgs(additionalArgs)
.withNetwork(network)
.withShouldMountDataDirectory(shouldMountDataDirectory)
.withUser(user))
.withShouldMountDataDirectory(shouldMountDataDirectory))
.collect(toList());
}

View File

@ -63,7 +63,6 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
private Path dataDirectory;
private Collection<String> additionalArgs;
private boolean shouldMountDataDirectory = true;
private String user;
public EtcdContainer(String image, String node, Collection<String> nodes) {
super(image);
@ -103,18 +102,6 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
return self();
}
/**
* Optional values are {@code [ user | user:group | uid | uid:gid | user:gid | uid:group ]}.
* See <a href="https://docs.docker.com/engine/reference/run/#user">User</a> .
*
* @param user Refer to {@link com.github.dockerjava.api.command.CreateContainerCmd#withUser(String)}
* @return self container.
*/
public EtcdContainer withUser(String user) {
this.user = user;
return self();
}
@Override
protected void configure() {
if (!configured.compareAndSet(false, true)) {
@ -133,13 +120,9 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
withEnv("ETCD_LOG_LEVEL", this.debug ? "debug" : "info");
withEnv("ETCD_LOGGER", "zap");
String tempUser = this.user;
if (tempUser == null) {
tempUser = System.getenv("TC_USER");
}
if (tempUser != null) {
String finalUser = tempUser;
withCreateContainerCmdModifier(c -> c.withUser(finalUser));
String user = System.getenv("TC_USER");
if (user != null) {
withCreateContainerCmdModifier(c -> c.withUser(user));
}
if (ssl) {

View File

@ -27,6 +27,9 @@
<Loggers>
<!-- package loggers -->
<Logger name="org.testcontainers" level="INFO"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.3]" level="WARN"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4]" level="WARN"/>
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4.7]" level="WARN"/>
<Logger name="io.etcd.jetcd.internal.infrastructure" level="INFO"/>
<Logger name="io.etcd.jetcd.launcher.EtcdCluster" level="WARN"/>

View File

@ -168,11 +168,6 @@ public class EtcdClusterExtension implements BeforeAllCallback, BeforeEachCallba
return this;
}
public Builder withUser(String user) {
builder.withUser(user);
return this;
}
public EtcdClusterExtension build() {
return new EtcdClusterExtension(builder.build());
}

View File

@ -20,6 +20,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,6 +31,8 @@ import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import com.google.common.base.Preconditions;
@ -44,6 +47,7 @@ public class EtcdClusterNameResolver extends NameResolver {
private volatile boolean shutdown;
private volatile boolean resolving;
private Executor executor;
private Listener listener;
public EtcdClusterNameResolver(URI targetUri) {
@ -61,6 +65,7 @@ public class EtcdClusterNameResolver extends NameResolver {
public void start(Listener listener) {
synchronized (lock) {
Preconditions.checkState(this.listener == null, "already started");
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
this.listener = Objects.requireNonNull(listener, "listener");
resolve();
}
@ -77,14 +82,21 @@ public class EtcdClusterNameResolver extends NameResolver {
return;
}
shutdown = true;
synchronized (lock) {
if (executor != null) {
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
}
}
}
private void resolve() {
if (resolving || shutdown) {
return;
}
doResolve();
synchronized (lock) {
executor.execute(this::doResolve);
}
}
private void doResolve() {