mirror of https://github.com/etcd-io/jetcd.git
Compare commits
93 Commits
jetcd-0.8.
...
main
Author | SHA1 | Date |
---|---|---|
|
8e72ab3da0 | |
|
3be6978e23 | |
|
9b4c9dd2ce | |
|
03a3b994e6 | |
|
10a335bf5f | |
|
fd240de17e | |
|
602feaf479 | |
|
4489b3f8da | |
|
40425c35d5 | |
|
990a5ea690 | |
|
6843e47a96 | |
|
5a83bdb482 | |
|
5f0ee26f3b | |
|
80b72b0b2a | |
|
98307934fe | |
|
a3f85475a2 | |
|
579e05b634 | |
|
43f0f23122 | |
|
81ea44b082 | |
|
c41c3fdc68 | |
|
e852bd921d | |
|
8dd9d3504e | |
|
c468ab4dd8 | |
|
08f3c274cc | |
|
a2c1db0279 | |
|
a65e765e36 | |
|
280a445773 | |
|
d498294fca | |
|
81ebd3a3fe | |
|
9a0d2f4076 | |
|
61a21a2d7c | |
|
69c2ec3d03 | |
|
59ff5091cc | |
|
098cf0f8bc | |
|
e68613fb6b | |
|
59337b388b | |
|
9f1a90cc6a | |
|
2e34d10d24 | |
|
11f45ec3fc | |
|
3a7740fa92 | |
|
b7293499a4 | |
|
08def01ee1 | |
|
05f92e0443 | |
|
a240dc6a74 | |
|
8ee9c1573e | |
|
0dad061111 | |
|
dab6c4cb2d | |
|
befcdc538d | |
|
8a6c463341 | |
|
01a2014b54 | |
|
acbfa6f434 | |
|
d2005ada3f | |
|
327a37bbc9 | |
|
8f182d4b8e | |
|
2da6f79b9d | |
|
05777ffaca | |
|
503bdd0f5a | |
|
346827957c | |
|
8e7ed6aaa6 | |
|
fcfe099aab | |
|
d1d21b1cdb | |
|
066d92cf95 | |
|
676bad3dd5 | |
|
a8f4ab8f20 | |
|
195032ee2d | |
|
958ed92b92 | |
|
9362dc8db3 | |
|
a5e1de0a88 | |
|
7c6fbcc8ce | |
|
4bbc3bb65f | |
|
e78aa2bad2 | |
|
3bc741013a | |
|
d70df1c279 | |
|
612ecbb4dd | |
|
2e7fdbf264 | |
|
7fc1e0078b | |
|
b51258e4a7 | |
|
05d2fda634 | |
|
639bb27d21 | |
|
43bc2614c2 | |
|
7ffe8f6c37 | |
|
548b810883 | |
|
6bbb9836a0 | |
|
278d3afb16 | |
|
a2e3015e6e | |
|
ae02549362 | |
|
c008ba5603 | |
|
9cf296c654 | |
|
36a5738f8d | |
|
4d93e3b8fa | |
|
185ee008e0 | |
|
e878e1d4b1 | |
|
58c80dbbb1 |
|
@ -15,8 +15,8 @@ assignees: ''
|
||||||
A clear and concise description of what the bug is.
|
A clear and concise description of what the bug is.
|
||||||
|
|
||||||
**To Reproduce**
|
**To Reproduce**
|
||||||
Steps or reproducer to reproduce the behavior.
|
Steps or reproducer to reproduce the behavior in a form of a unit test.
|
||||||
This section *must* be provided, if not, the issue may not get attention.
|
This section *must* be provided, if not, the issue may not get attention since the maintainers have very limited capacity.
|
||||||
|
|
||||||
**Expected behavior**
|
**Expected behavior**
|
||||||
A clear and concise description of what you expected to happen.
|
A clear and concise description of what you expected to happen.
|
||||||
|
|
|
@ -42,8 +42,8 @@ jobs:
|
||||||
- 17
|
- 17
|
||||||
- 21
|
- 21
|
||||||
etcd:
|
etcd:
|
||||||
- gcr.io/etcd-development/etcd:v3.5.10
|
- quay.io/coreos/etcd:v3.5.21
|
||||||
- gcr.io/etcd-development/etcd:v3.4.27
|
- quay.io/coreos/etcd:v3.6.0
|
||||||
uses: ./.github/workflows/build.yml
|
uses: ./.github/workflows/build.yml
|
||||||
with:
|
with:
|
||||||
javaVersion: "${{ matrix.java-version }}"
|
javaVersion: "${{ matrix.java-version }}"
|
||||||
|
|
|
@ -43,8 +43,8 @@ jobs:
|
||||||
- 17
|
- 17
|
||||||
- 21
|
- 21
|
||||||
etcd:
|
etcd:
|
||||||
- gcr.io/etcd-development/etcd:v3.5.10
|
- quay.io/coreos/etcd:v3.5.21
|
||||||
- gcr.io/etcd-development/etcd:v3.4.27
|
- quay.io/coreos/etcd:v3.6.0
|
||||||
uses: ./.github/workflows/build.yml
|
uses: ./.github/workflows/build.yml
|
||||||
with:
|
with:
|
||||||
javaVersion: "${{ matrix.java-version }}"
|
javaVersion: "${{ matrix.java-version }}"
|
||||||
|
|
|
@ -29,10 +29,11 @@ jobs:
|
||||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
days-before-stale: 60
|
days-before-stale: 60
|
||||||
days-before-close: 7
|
days-before-close: 7
|
||||||
|
only-labels: 'waiting-for-feedbacks'
|
||||||
stale-issue-label: stale
|
stale-issue-label: stale
|
||||||
exempt-issue-label: never-stale
|
exempt-issue-labels: never-stale
|
||||||
stale-pr-label: stale
|
stale-pr-label: stale
|
||||||
exempt-pr-label: never-stale
|
exempt-pr-labels: never-stale
|
||||||
stale-issue-message: |
|
stale-issue-message: |
|
||||||
This issue is stale because it has been open 60 days with no activity.
|
This issue is stale because it has been open 60 days with no activity.
|
||||||
Remove stale label or comment or this will be closed in 7 days.
|
Remove stale label or comment or this will be closed in 7 days.
|
||||||
|
|
3
OWNERS
3
OWNERS
|
@ -3,5 +3,4 @@
|
||||||
approvers:
|
approvers:
|
||||||
- fanminshi # Fanmin Shi <fanmin.shi@coreos.com>
|
- fanminshi # Fanmin Shi <fanmin.shi@coreos.com>
|
||||||
- lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com>
|
- lburgazzoli # Luca Burgazzoli <lburgazzoli@gmail.com>
|
||||||
- xiang90 # Xiang Li <xiang.li@coreos.com>
|
- vorburger # Michael Vorburger <mike@vorburger.ch>
|
||||||
- heyitsanthony # Anthony Romano <anthony.romano@coreos.com>
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ The project is tested against a three node `etcd` setup started with the Launche
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ ./gradlew test
|
$ ./gradlew test
|
||||||
````
|
```
|
||||||
|
|
||||||
### Troubleshooting
|
### Troubleshooting
|
||||||
|
|
||||||
|
@ -129,3 +129,4 @@ See [CONTRIBUTING](https://github.com/etcd-io/jetcd/blob/master/CONTRIBUTING.md)
|
||||||
|
|
||||||
jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details.
|
jetcd is under the Apache 2.0 license. See the [LICENSE](https://github.com/etcd-io/jetcd/blob/master/LICENSE) file for details.
|
||||||
|
|
||||||
|
|
||||||
|
|
12
build.gradle
12
build.gradle
|
@ -76,6 +76,13 @@ subprojects {
|
||||||
maxFailures = 5
|
maxFailures = 5
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
testing {
|
||||||
|
suites {
|
||||||
|
test {
|
||||||
|
useJUnitJupiter()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
testlogger {
|
testlogger {
|
||||||
theme 'mocha-parallel'
|
theme 'mocha-parallel'
|
||||||
|
@ -83,6 +90,11 @@ subprojects {
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks.register('allDeps', DependencyReportTask)
|
tasks.register('allDeps', DependencyReportTask)
|
||||||
|
|
||||||
|
tasks.withType(AbstractArchiveTask).configureEach {
|
||||||
|
preserveFileTimestamps = false
|
||||||
|
reproducibleFileOrder = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,33 +1,33 @@
|
||||||
[versions]
|
[versions]
|
||||||
grpc = "1.64.0"
|
grpc = "1.74.0"
|
||||||
log4j = "2.23.1"
|
log4j = "2.25.1"
|
||||||
mockito = "5.12.0"
|
mockito = "5.18.0"
|
||||||
slf4j = "2.0.13"
|
slf4j = "2.0.17"
|
||||||
guava = "33.2.0-jre"
|
guava = "33.4.8-jre"
|
||||||
assertj = "3.26.0"
|
assertj = "3.27.3"
|
||||||
junit = "5.10.2"
|
junit = "5.13.4"
|
||||||
testcontainers = "1.19.8"
|
testcontainers = "1.21.3"
|
||||||
protoc = "3.25.1"
|
protoc = "3.25.1"
|
||||||
failsafe = "3.3.2"
|
failsafe = "3.3.2"
|
||||||
awaitility = "4.2.1"
|
awaitility = "4.3.0"
|
||||||
commonsIo = "2.16.1"
|
commonsIo = "2.20.0"
|
||||||
commonCompress = "1.26.2"
|
commonCompress = "1.28.0"
|
||||||
autoService = "1.1.1"
|
autoService = "1.1.1"
|
||||||
errorprone = "2.27.1"
|
errorprone = "2.30.0"
|
||||||
vertx = "4.5.8"
|
vertx = "5.0.1"
|
||||||
picocli = "4.7.6"
|
picocli = "4.7.7"
|
||||||
restAssured = "5.4.0"
|
restAssured = "5.5.5"
|
||||||
javaxAnnotation = "1.3.2"
|
javaxAnnotation = "1.3.2"
|
||||||
|
|
||||||
versionsPlugin = "0.50.0"
|
versionsPlugin = "0.52.0"
|
||||||
errorPronePlugin = "3.1.0"
|
errorPronePlugin = "4.0.1"
|
||||||
spotlessPlugin = "6.23.3"
|
spotlessPlugin = "6.25.0"
|
||||||
shadowPlugin = "8.1.1"
|
shadowPlugin = "8.1.1"
|
||||||
testLoggerPlugin = "4.0.0"
|
testLoggerPlugin = "4.0.0"
|
||||||
protobufPlugin = "0.9.4"
|
protobufPlugin = "0.9.5"
|
||||||
nexusPublishPlugin = "1.3.0"
|
nexusPublishPlugin = "2.0.0"
|
||||||
axionReleasePlugin = "1.16.1"
|
axionReleasePlugin = "1.18.17"
|
||||||
testRetryPlugin = "1.5.8"
|
testRetryPlugin = "1.6.2"
|
||||||
|
|
||||||
|
|
||||||
[libraries]
|
[libraries]
|
||||||
|
|
Binary file not shown.
|
@ -1,6 +1,6 @@
|
||||||
distributionBase=GRADLE_USER_HOME
|
distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
|
||||||
networkTimeout=10000
|
networkTimeout=10000
|
||||||
validateDistributionUrl=true
|
validateDistributionUrl=true
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
|
|
|
@ -15,6 +15,8 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
|
#
|
||||||
|
|
||||||
##############################################################################
|
##############################################################################
|
||||||
#
|
#
|
||||||
|
@ -55,7 +57,7 @@
|
||||||
# Darwin, MinGW, and NonStop.
|
# Darwin, MinGW, and NonStop.
|
||||||
#
|
#
|
||||||
# (3) This script is generated from the Groovy template
|
# (3) This script is generated from the Groovy template
|
||||||
# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
|
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
|
||||||
# within the Gradle project.
|
# within the Gradle project.
|
||||||
#
|
#
|
||||||
# You can find Gradle at https://github.com/gradle/gradle/.
|
# You can find Gradle at https://github.com/gradle/gradle/.
|
||||||
|
@ -84,7 +86,7 @@ done
|
||||||
# shellcheck disable=SC2034
|
# shellcheck disable=SC2034
|
||||||
APP_BASE_NAME=${0##*/}
|
APP_BASE_NAME=${0##*/}
|
||||||
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
|
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
|
||||||
APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
|
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s\n' "$PWD" ) || exit
|
||||||
|
|
||||||
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
# Use the maximum available, or set MAX_FD != -1 to use that value.
|
||||||
MAX_FD=maximum
|
MAX_FD=maximum
|
||||||
|
@ -112,7 +114,7 @@ case "$( uname )" in #(
|
||||||
NONSTOP* ) nonstop=true ;;
|
NONSTOP* ) nonstop=true ;;
|
||||||
esac
|
esac
|
||||||
|
|
||||||
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
|
CLASSPATH="\\\"\\\""
|
||||||
|
|
||||||
|
|
||||||
# Determine the Java command to use to start the JVM.
|
# Determine the Java command to use to start the JVM.
|
||||||
|
@ -203,7 +205,7 @@ fi
|
||||||
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
||||||
|
|
||||||
# Collect all arguments for the java command:
|
# Collect all arguments for the java command:
|
||||||
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
|
||||||
# and any embedded shellness will be escaped.
|
# and any embedded shellness will be escaped.
|
||||||
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
|
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
|
||||||
# treated as '${Hostname}' itself on the command line.
|
# treated as '${Hostname}' itself on the command line.
|
||||||
|
@ -211,7 +213,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
|
||||||
set -- \
|
set -- \
|
||||||
"-Dorg.gradle.appname=$APP_BASE_NAME" \
|
"-Dorg.gradle.appname=$APP_BASE_NAME" \
|
||||||
-classpath "$CLASSPATH" \
|
-classpath "$CLASSPATH" \
|
||||||
org.gradle.wrapper.GradleWrapperMain \
|
-jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
|
||||||
"$@"
|
"$@"
|
||||||
|
|
||||||
# Stop when "xargs" is not available.
|
# Stop when "xargs" is not available.
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
@rem See the License for the specific language governing permissions and
|
@rem See the License for the specific language governing permissions and
|
||||||
@rem limitations under the License.
|
@rem limitations under the License.
|
||||||
@rem
|
@rem
|
||||||
|
@rem SPDX-License-Identifier: Apache-2.0
|
||||||
|
@rem
|
||||||
|
|
||||||
@if "%DEBUG%"=="" @echo off
|
@if "%DEBUG%"=="" @echo off
|
||||||
@rem ##########################################################################
|
@rem ##########################################################################
|
||||||
|
@ -68,11 +70,11 @@ goto fail
|
||||||
:execute
|
:execute
|
||||||
@rem Setup the command line
|
@rem Setup the command line
|
||||||
|
|
||||||
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
|
set CLASSPATH=
|
||||||
|
|
||||||
|
|
||||||
@rem Execute Gradle
|
@rem Execute Gradle
|
||||||
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
|
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
|
||||||
|
|
||||||
:end
|
:end
|
||||||
@rem End local scope for the variables with windows NT shell
|
@rem End local scope for the variables with windows NT shell
|
||||||
|
|
|
@ -37,6 +37,7 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.netty.GrpcSslContexts;
|
import io.grpc.netty.GrpcSslContexts;
|
||||||
import io.netty.handler.ssl.SslContext;
|
import io.netty.handler.ssl.SslContext;
|
||||||
import io.netty.handler.ssl.SslContextBuilder;
|
import io.netty.handler.ssl.SslContextBuilder;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
|
||||||
|
@ -68,6 +69,7 @@ public final class ClientBuilder implements Cloneable {
|
||||||
private Duration retryMaxDuration;
|
private Duration retryMaxDuration;
|
||||||
private Duration connectTimeout;
|
private Duration connectTimeout;
|
||||||
private boolean waitForReady = true;
|
private boolean waitForReady = true;
|
||||||
|
private Vertx vertx;
|
||||||
|
|
||||||
ClientBuilder() {
|
ClientBuilder() {
|
||||||
}
|
}
|
||||||
|
@ -707,6 +709,30 @@ public final class ClientBuilder implements Cloneable {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the Vertx instance.
|
||||||
|
*
|
||||||
|
* @return the vertx instance.
|
||||||
|
*/
|
||||||
|
public Vertx vertx() {
|
||||||
|
return vertx;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* configure Vertx instance.
|
||||||
|
*
|
||||||
|
* @param vertx Vertx instance to use.
|
||||||
|
* @return this builder to train
|
||||||
|
* @throws IllegalArgumentException if vertx is null
|
||||||
|
*/
|
||||||
|
public ClientBuilder vertx(Vertx vertx) {
|
||||||
|
Preconditions.checkArgument(vertx != null, "vertx can't be null");
|
||||||
|
|
||||||
|
this.vertx = vertx;
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* build a new Client.
|
* build a new Client.
|
||||||
*
|
*
|
||||||
|
|
|
@ -26,6 +26,7 @@ import io.etcd.jetcd.options.CompactOption;
|
||||||
import io.etcd.jetcd.options.DeleteOption;
|
import io.etcd.jetcd.options.DeleteOption;
|
||||||
import io.etcd.jetcd.options.GetOption;
|
import io.etcd.jetcd.options.GetOption;
|
||||||
import io.etcd.jetcd.options.PutOption;
|
import io.etcd.jetcd.options.PutOption;
|
||||||
|
import io.etcd.jetcd.options.TxnOption;
|
||||||
import io.etcd.jetcd.support.CloseableClient;
|
import io.etcd.jetcd.support.CloseableClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,4 +116,12 @@ public interface KV extends CloseableClient {
|
||||||
* @return a Txn
|
* @return a Txn
|
||||||
*/
|
*/
|
||||||
Txn txn();
|
Txn txn();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* creates a transaction.
|
||||||
|
*
|
||||||
|
* @param option TxnOption
|
||||||
|
* @return a Txn
|
||||||
|
*/
|
||||||
|
Txn txn(TxnOption option);
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,10 @@ final class ClientConnectionManager {
|
||||||
} else {
|
} else {
|
||||||
this.executorService = builder.executorService();
|
this.executorService = builder.executorService();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (builder.vertx() != null) {
|
||||||
|
this.vertx = builder.vertx();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ManagedChannel getChannel() {
|
ManagedChannel getChannel() {
|
||||||
|
|
|
@ -88,7 +88,7 @@ final class ElectionImpl extends Impl implements Election {
|
||||||
execute(
|
execute(
|
||||||
() -> stubWithLeader().campaign(request),
|
() -> stubWithLeader().campaign(request),
|
||||||
CampaignResponse::new,
|
CampaignResponse::new,
|
||||||
Errors::isRetryable));
|
Errors::isRetryableForNoSafeRedoOp));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -111,7 +111,7 @@ final class ElectionImpl extends Impl implements Election {
|
||||||
execute(
|
execute(
|
||||||
() -> stubWithLeader().proclaim(request),
|
() -> stubWithLeader().proclaim(request),
|
||||||
ProclaimResponse::new,
|
ProclaimResponse::new,
|
||||||
Errors::isRetryable));
|
Errors::isRetryableForNoSafeRedoOp));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -126,7 +126,7 @@ final class ElectionImpl extends Impl implements Election {
|
||||||
execute(
|
execute(
|
||||||
() -> stubWithLeader().leader(request),
|
() -> stubWithLeader().leader(request),
|
||||||
response -> new LeaderResponse(response, namespace),
|
response -> new LeaderResponse(response, namespace),
|
||||||
Errors::isRetryable));
|
Errors::isRetryableForNoSafeRedoOp));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -135,7 +135,7 @@ final class ElectionImpl extends Impl implements Election {
|
||||||
requireNonNull(listener, "listener should not be null");
|
requireNonNull(listener, "listener should not be null");
|
||||||
|
|
||||||
LeaderRequest request = LeaderRequest.newBuilder()
|
LeaderRequest request = LeaderRequest.newBuilder()
|
||||||
.setName(ByteString.copyFrom(electionName.getBytes()))
|
.setName(Util.prefixNamespace(electionName, namespace))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
stubWithLeader().observeWithHandler(request,
|
stubWithLeader().observeWithHandler(request,
|
||||||
|
@ -162,7 +162,7 @@ final class ElectionImpl extends Impl implements Election {
|
||||||
execute(
|
execute(
|
||||||
() -> stubWithLeader().resign(request),
|
() -> stubWithLeader().resign(request),
|
||||||
ResignResponse::new,
|
ResignResponse::new,
|
||||||
Errors::isRetryable));
|
Errors::isRetryableForNoSafeRedoOp));
|
||||||
}
|
}
|
||||||
|
|
||||||
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
|
private <S> CompletableFuture<S> wrapConvertException(CompletableFuture<S> future) {
|
||||||
|
|
|
@ -89,9 +89,11 @@ abstract class Impl {
|
||||||
*/
|
*/
|
||||||
protected <S, T> CompletableFuture<T> execute(
|
protected <S, T> CompletableFuture<T> execute(
|
||||||
Supplier<Future<S>> supplier,
|
Supplier<Future<S>> supplier,
|
||||||
Function<S, T> resultConvert) {
|
Function<S, T> resultConvert,
|
||||||
|
boolean autoRetry) {
|
||||||
|
|
||||||
return execute(supplier, resultConvert, Errors::isRetryable);
|
return execute(supplier, resultConvert,
|
||||||
|
autoRetry ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -33,6 +33,7 @@ import io.etcd.jetcd.options.CompactOption;
|
||||||
import io.etcd.jetcd.options.DeleteOption;
|
import io.etcd.jetcd.options.DeleteOption;
|
||||||
import io.etcd.jetcd.options.GetOption;
|
import io.etcd.jetcd.options.GetOption;
|
||||||
import io.etcd.jetcd.options.PutOption;
|
import io.etcd.jetcd.options.PutOption;
|
||||||
|
import io.etcd.jetcd.options.TxnOption;
|
||||||
import io.etcd.jetcd.support.Errors;
|
import io.etcd.jetcd.support.Errors;
|
||||||
import io.etcd.jetcd.support.Requests;
|
import io.etcd.jetcd.support.Requests;
|
||||||
|
|
||||||
|
@ -65,7 +66,7 @@ final class KVImpl extends Impl implements KV {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
|
() -> stub.put(Requests.mapPutRequest(key, value, option, namespace)),
|
||||||
response -> new PutResponse(response, namespace),
|
response -> new PutResponse(response, namespace),
|
||||||
Errors::isRetryable);
|
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -81,7 +82,7 @@ final class KVImpl extends Impl implements KV {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
|
() -> stub.range(Requests.mapRangeRequest(key, option, namespace)),
|
||||||
response -> new GetResponse(response, namespace),
|
response -> new GetResponse(response, namespace),
|
||||||
Errors::isRetryable);
|
Errors::isRetryableForSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -97,7 +98,7 @@ final class KVImpl extends Impl implements KV {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
|
() -> stub.deleteRange(Requests.mapDeleteRequest(key, option, namespace)),
|
||||||
response -> new DeleteResponse(response, namespace),
|
response -> new DeleteResponse(response, namespace),
|
||||||
Errors::isRetryable);
|
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -116,15 +117,21 @@ final class KVImpl extends Impl implements KV {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stub.compact(request),
|
() -> stub.compact(request),
|
||||||
CompactResponse::new,
|
CompactResponse::new,
|
||||||
Errors::isRetryable);
|
Errors::isRetryableForSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Txn txn() {
|
public Txn txn() {
|
||||||
|
return txn(TxnOption.DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Txn txn(TxnOption option) {
|
||||||
return TxnImpl.newTxn(
|
return TxnImpl.newTxn(
|
||||||
request -> execute(
|
request -> execute(
|
||||||
() -> stub.txn(request),
|
() -> stub.txn(request),
|
||||||
response -> new TxnResponse(response, namespace), Errors::isRetryable),
|
response -> new TxnResponse(response, namespace),
|
||||||
|
option.isAutoRetry() ? Errors::isRetryableForSafeRedoOp : Errors::isRetryableForNoSafeRedoOp),
|
||||||
namespace);
|
namespace);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,8 @@ final class LeaseImpl extends Impl implements Lease {
|
||||||
LeaseGrantRequest.newBuilder()
|
LeaseGrantRequest.newBuilder()
|
||||||
.setTTL(ttl)
|
.setTTL(ttl)
|
||||||
.build()),
|
.build()),
|
||||||
LeaseGrantResponse::new);
|
LeaseGrantResponse::new,
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -94,7 +95,8 @@ final class LeaseImpl extends Impl implements Lease {
|
||||||
LeaseGrantRequest.newBuilder()
|
LeaseGrantRequest.newBuilder()
|
||||||
.setTTL(ttl)
|
.setTTL(ttl)
|
||||||
.build()),
|
.build()),
|
||||||
LeaseGrantResponse::new);
|
LeaseGrantResponse::new,
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,7 +106,8 @@ final class LeaseImpl extends Impl implements Lease {
|
||||||
LeaseRevokeRequest.newBuilder()
|
LeaseRevokeRequest.newBuilder()
|
||||||
.setID(leaseId)
|
.setID(leaseId)
|
||||||
.build()),
|
.build()),
|
||||||
LeaseRevokeResponse::new);
|
LeaseRevokeResponse::new,
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -118,7 +121,8 @@ final class LeaseImpl extends Impl implements Lease {
|
||||||
|
|
||||||
return execute(
|
return execute(
|
||||||
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
|
() -> this.stub.leaseTimeToLive(leaseTimeToLiveRequest),
|
||||||
LeaseTimeToLiveResponse::new);
|
LeaseTimeToLiveResponse::new,
|
||||||
|
true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -69,7 +69,7 @@ final class LockImpl extends Impl implements Lock {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stubWithLeader().lock(request),
|
() -> stubWithLeader().lock(request),
|
||||||
response -> new LockResponse(response, namespace),
|
response -> new LockResponse(response, namespace),
|
||||||
Errors::isRetryable);
|
Errors::isRetryableForSafeRedoOp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -83,6 +83,6 @@ final class LockImpl extends Impl implements Lock {
|
||||||
return execute(
|
return execute(
|
||||||
() -> stubWithLeader().unlock(request),
|
() -> stubWithLeader().unlock(request),
|
||||||
UnlockResponse::new,
|
UnlockResponse::new,
|
||||||
Errors::isRetryable);
|
Errors::isRetryableForSafeRedoOp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,13 +23,17 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.checkerframework.checker.nullness.qual.NonNull;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import io.etcd.jetcd.ByteSequence;
|
import io.etcd.jetcd.ByteSequence;
|
||||||
import io.etcd.jetcd.Watch;
|
import io.etcd.jetcd.Watch;
|
||||||
import io.etcd.jetcd.api.*;
|
import io.etcd.jetcd.api.VertxWatchGrpc;
|
||||||
|
import io.etcd.jetcd.api.WatchCancelRequest;
|
||||||
|
import io.etcd.jetcd.api.WatchCreateRequest;
|
||||||
|
import io.etcd.jetcd.api.WatchProgressRequest;
|
||||||
|
import io.etcd.jetcd.api.WatchRequest;
|
||||||
|
import io.etcd.jetcd.api.WatchResponse;
|
||||||
import io.etcd.jetcd.common.exception.ErrorCode;
|
import io.etcd.jetcd.common.exception.ErrorCode;
|
||||||
import io.etcd.jetcd.common.exception.EtcdException;
|
import io.etcd.jetcd.common.exception.EtcdException;
|
||||||
import io.etcd.jetcd.options.OptionsUtil;
|
import io.etcd.jetcd.options.OptionsUtil;
|
||||||
|
@ -37,7 +41,6 @@ import io.etcd.jetcd.options.WatchOption;
|
||||||
import io.etcd.jetcd.support.Errors;
|
import io.etcd.jetcd.support.Errors;
|
||||||
import io.etcd.jetcd.support.Util;
|
import io.etcd.jetcd.support.Util;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.vertx.core.streams.ReadStream;
|
|
||||||
import io.vertx.core.streams.WriteStream;
|
import io.vertx.core.streams.WriteStream;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
@ -46,7 +49,10 @@ import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
|
||||||
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.*;
|
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newClosedWatchClientException;
|
||||||
|
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newCompactedException;
|
||||||
|
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException;
|
||||||
|
import static io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* watch Implementation.
|
* watch Implementation.
|
||||||
|
@ -117,10 +123,8 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
private final Listener listener;
|
private final Listener listener;
|
||||||
private final AtomicBoolean closed;
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
//private StreamObserver<WatchRequest> stream;
|
|
||||||
private final AtomicReference<WriteStream<WatchRequest>> wstream;
|
private final AtomicReference<WriteStream<WatchRequest>> wstream;
|
||||||
private final AtomicBoolean started;
|
private final AtomicBoolean started;
|
||||||
private ReadStream<WatchResponse> rstream;
|
|
||||||
private long revision;
|
private long revision;
|
||||||
private long id;
|
private long id;
|
||||||
|
|
||||||
|
@ -132,7 +136,6 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
|
|
||||||
this.started = new AtomicBoolean();
|
this.started = new AtomicBoolean();
|
||||||
this.wstream = new AtomicReference<>();
|
this.wstream = new AtomicReference<>();
|
||||||
this.rstream = null;
|
|
||||||
this.id = -1;
|
this.id = -1;
|
||||||
this.revision = this.option.getRevision();
|
this.revision = this.option.getRevision();
|
||||||
}
|
}
|
||||||
|
@ -154,7 +157,7 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (started.compareAndSet(false, true)) {
|
if (started.compareAndSet(false, true)) {
|
||||||
// id is not really useful today but it may be in etcd 3.4
|
// id is not really useful today, but it may be in etcd 3.4
|
||||||
id = -1;
|
id = -1;
|
||||||
|
|
||||||
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
|
WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
|
||||||
|
@ -166,7 +169,7 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
.map(endKey -> Util.prefixNamespaceToRangeEnd(endKey, namespace))
|
.map(endKey -> Util.prefixNamespaceToRangeEnd(endKey, namespace))
|
||||||
.ifPresent(builder::setRangeEnd);
|
.ifPresent(builder::setRangeEnd);
|
||||||
|
|
||||||
if (!option.getEndKey().isPresent() && option.isPrefix()) {
|
if (option.getEndKey().isEmpty() && option.isPrefix()) {
|
||||||
ByteSequence endKey = OptionsUtil.prefixEndOf(key);
|
ByteSequence endKey = OptionsUtil.prefixEndOf(key);
|
||||||
builder.setRangeEnd(Util.prefixNamespaceToRangeEnd(endKey, namespace));
|
builder.setRangeEnd(Util.prefixNamespaceToRangeEnd(endKey, namespace));
|
||||||
}
|
}
|
||||||
|
@ -179,7 +182,7 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
|
builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
rstream = Util.applyRequireLeader(option.withRequireLeader(), stub)
|
var ignored = Util.applyRequireLeader(option.withRequireLeader(), stub)
|
||||||
.watchWithHandler(
|
.watchWithHandler(
|
||||||
stream -> {
|
stream -> {
|
||||||
wstream.set(stream);
|
wstream.set(stream);
|
||||||
|
@ -353,7 +356,7 @@ final class WatchImpl extends Impl implements Watch {
|
||||||
private void reschedule() {
|
private void reschedule() {
|
||||||
Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() {
|
Futures.addCallback(executor.schedule(this::resume, 500, TimeUnit.MILLISECONDS), new FutureCallback<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(@NonNull Throwable t) {
|
public void onFailure(Throwable t) {
|
||||||
LOG.warn("scheduled resume failed", t);
|
LOG.warn("scheduled resume failed", t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,11 @@ public class GetResponse extends AbstractResponse<RangeResponse> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of keys within the range when requested.
|
* Returns the number of keys within the range requested.
|
||||||
|
* Note this value is never affected by filtering options (limit, min or max created or modified revisions)
|
||||||
|
* Count is the count for keys on the range part of a request.
|
||||||
|
* Filters for limit and created or modified revision ranges restrict the
|
||||||
|
* returned KVs, but not the count.
|
||||||
*
|
*
|
||||||
* @return count.
|
* @return count.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -29,11 +29,13 @@ public final class DeleteOption {
|
||||||
private final ByteSequence endKey;
|
private final ByteSequence endKey;
|
||||||
private final boolean prevKV;
|
private final boolean prevKV;
|
||||||
private final boolean prefix;
|
private final boolean prefix;
|
||||||
|
private final boolean autoRetry;
|
||||||
|
|
||||||
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix) {
|
private DeleteOption(ByteSequence endKey, boolean prevKV, boolean prefix, final boolean autoRetry) {
|
||||||
this.endKey = endKey;
|
this.endKey = endKey;
|
||||||
this.prevKV = prevKV;
|
this.prevKV = prevKV;
|
||||||
this.prefix = prefix;
|
this.prefix = prefix;
|
||||||
|
this.autoRetry = autoRetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Optional<ByteSequence> getEndKey() {
|
public Optional<ByteSequence> getEndKey() {
|
||||||
|
@ -49,10 +51,25 @@ public final class DeleteOption {
|
||||||
return prevKV;
|
return prevKV;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to treat this deletion as deletion by prefix
|
||||||
|
*
|
||||||
|
* @return true if deletion by prefix.
|
||||||
|
*/
|
||||||
public boolean isPrefix() {
|
public boolean isPrefix() {
|
||||||
return prefix;
|
return prefix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to treat a delete operation as idempotent from the point of view of automated retries.
|
||||||
|
* Note under failure scenarios this may mean a single delete is attempted more than once.
|
||||||
|
*
|
||||||
|
* @return true if automated retries should happen.
|
||||||
|
*/
|
||||||
|
public boolean isAutoRetry() {
|
||||||
|
return autoRetry;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the builder.
|
* Returns the builder.
|
||||||
*
|
*
|
||||||
|
@ -65,6 +82,11 @@ public final class DeleteOption {
|
||||||
return builder();
|
return builder();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the builder.
|
||||||
|
*
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
public static Builder builder() {
|
public static Builder builder() {
|
||||||
return new Builder();
|
return new Builder();
|
||||||
}
|
}
|
||||||
|
@ -73,6 +95,7 @@ public final class DeleteOption {
|
||||||
private ByteSequence endKey;
|
private ByteSequence endKey;
|
||||||
private boolean prevKV = false;
|
private boolean prevKV = false;
|
||||||
private boolean prefix = false;
|
private boolean prefix = false;
|
||||||
|
private boolean autoRetry = false;
|
||||||
|
|
||||||
private Builder() {
|
private Builder() {
|
||||||
}
|
}
|
||||||
|
@ -144,8 +167,22 @@ public final class DeleteOption {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When autoRetry is set, the delete operation is treated as idempotent from the point of view of automated retries.
|
||||||
|
* Note under some failure scenarios true may make a delete operation be attempted more than once, where
|
||||||
|
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
|
||||||
|
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
|
||||||
|
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||||
|
*
|
||||||
|
* @return builder
|
||||||
|
*/
|
||||||
|
public Builder withAutoRetry() {
|
||||||
|
this.autoRetry = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public DeleteOption build() {
|
public DeleteOption build() {
|
||||||
return new DeleteOption(endKey, prevKV, prefix);
|
return new DeleteOption(endKey, prevKV, prefix, autoRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import java.util.Optional;
|
||||||
|
|
||||||
import io.etcd.jetcd.ByteSequence;
|
import io.etcd.jetcd.ByteSequence;
|
||||||
import io.etcd.jetcd.KV;
|
import io.etcd.jetcd.KV;
|
||||||
|
import io.etcd.jetcd.kv.GetResponse;
|
||||||
|
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
@ -76,56 +77,158 @@ public final class GetOption {
|
||||||
/**
|
/**
|
||||||
* Get the maximum number of keys to return for a get request.
|
* Get the maximum number of keys to return for a get request.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
*
|
||||||
* @return the maximum number of keys to return.
|
* @return the maximum number of keys to return.
|
||||||
*/
|
*/
|
||||||
public long getLimit() {
|
public long getLimit() {
|
||||||
return this.limit;
|
return this.limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the end key for a range request
|
||||||
|
*
|
||||||
|
* @return the end key for a range request
|
||||||
|
*/
|
||||||
public Optional<ByteSequence> getEndKey() {
|
public Optional<ByteSequence> getEndKey() {
|
||||||
return Optional.ofNullable(this.endKey);
|
return Optional.ofNullable(this.endKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the revision for the request
|
||||||
|
*
|
||||||
|
* @return the revision for the request
|
||||||
|
*/
|
||||||
public long getRevision() {
|
public long getRevision() {
|
||||||
return revision;
|
return revision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the sort order for the request
|
||||||
|
*
|
||||||
|
* @return the sort order for the request
|
||||||
|
*/
|
||||||
public SortOrder getSortOrder() {
|
public SortOrder getSortOrder() {
|
||||||
return sortOrder;
|
return sortOrder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the sort field for the request
|
||||||
|
*
|
||||||
|
* @return the sort field for the request
|
||||||
|
*/
|
||||||
public SortTarget getSortField() {
|
public SortTarget getSortField() {
|
||||||
return sortTarget;
|
return sortTarget;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return if the consistency level for this request is "serializable".
|
||||||
|
* Note serializable is a lower than default consistency, and implies
|
||||||
|
* the possibility of getting stale data.
|
||||||
|
*
|
||||||
|
* @return true if this request is only serializable consistency
|
||||||
|
*/
|
||||||
public boolean isSerializable() {
|
public boolean isSerializable() {
|
||||||
return serializable;
|
return serializable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True if this request should get only keys in a range and there is no
|
||||||
|
* need to retrieve the values.
|
||||||
|
*
|
||||||
|
* @return true if only get keys
|
||||||
|
*/
|
||||||
public boolean isKeysOnly() {
|
public boolean isKeysOnly() {
|
||||||
return keysOnly;
|
return keysOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True if this request should only populate the count of keys matching
|
||||||
|
* a range, and no other data.
|
||||||
|
*
|
||||||
|
* @return true if only get the count of keys
|
||||||
|
*/
|
||||||
public boolean isCountOnly() {
|
public boolean isCountOnly() {
|
||||||
return countOnly;
|
return countOnly;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only populate results for keys that match a
|
||||||
|
* minimum value for a created revision.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting a min create revision option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return minimum created revision to match, or zero for any.
|
||||||
|
*/
|
||||||
public long getMinCreateRevision() {
|
public long getMinCreateRevision() {
|
||||||
return this.minCreateRevision;
|
return this.minCreateRevision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only populate results for keys that match a
|
||||||
|
* maximum value for a created revision.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting a max create revision option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return maximum created revision to match, or zero for any.
|
||||||
|
*/
|
||||||
public long getMaxCreateRevision() {
|
public long getMaxCreateRevision() {
|
||||||
return this.maxCreateRevision;
|
return this.maxCreateRevision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only populate results for keys that match a
|
||||||
|
* minimum value for a modified revision.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting a min mod revision option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return minimum modified revision to match, or zero for any.
|
||||||
|
*/
|
||||||
public long getMinModRevision() {
|
public long getMinModRevision() {
|
||||||
return this.minModRevision;
|
return this.minModRevision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only populate results for keys that match a
|
||||||
|
* maximum value for a modified revision.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting a max mod revision option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return maximum modified revision to match, or zero for any.
|
||||||
|
*/
|
||||||
public long getMaxModRevision() {
|
public long getMaxModRevision() {
|
||||||
return this.maxModRevision;
|
return this.maxModRevision;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* True if this Get request should do prefix match
|
||||||
|
*
|
||||||
|
* @return true if this Get request should do prefix match
|
||||||
|
*/
|
||||||
public boolean isPrefix() {
|
public boolean isPrefix() {
|
||||||
return prefix;
|
return prefix;
|
||||||
}
|
}
|
||||||
|
@ -176,6 +279,13 @@ public final class GetOption {
|
||||||
/**
|
/**
|
||||||
* Limit the number of keys to return for a get request. By default is 0 - no limitation.
|
* Limit the number of keys to return for a get request. By default is 0 - no limitation.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting this option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param limit the maximum number of keys to return for a get request.
|
* @param limit the maximum number of keys to return for a get request.
|
||||||
* @return builder
|
* @return builder
|
||||||
*/
|
*/
|
||||||
|
@ -229,7 +339,9 @@ public final class GetOption {
|
||||||
* <p>
|
* <p>
|
||||||
* Get requests are linearizable by
|
* Get requests are linearizable by
|
||||||
* default. For better performance, a serializable get request is served locally without needing
|
* default. For better performance, a serializable get request is served locally without needing
|
||||||
* to reach consensus with other nodes in the cluster.
|
* to reach consensus with other nodes in the cluster. Note this is a tradeoff with strict
|
||||||
|
* consistency so it should be used with care in situations where reading stale
|
||||||
|
* is acceptable.
|
||||||
*
|
*
|
||||||
* @param serializable is the get request a serializable get request.
|
* @param serializable is the get request a serializable get request.
|
||||||
* @return builder
|
* @return builder
|
||||||
|
@ -316,10 +428,17 @@ public final class GetOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limit returned keys to those with create revision greater than the provided value.
|
* Limit returned keys to those with create revision greater or equal than the provided value.
|
||||||
* min_create_revision is the lower bound for returned key create revisions; all keys with
|
* min_create_revision is the lower bound for returned key create revisions; all keys with
|
||||||
* lesser create revisions will be filtered away.
|
* lesser create revisions will be filtered away.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting this option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param createRevision create revision
|
* @param createRevision create revision
|
||||||
* @return builder
|
* @return builder
|
||||||
*/
|
*/
|
||||||
|
@ -329,10 +448,17 @@ public final class GetOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limit returned keys to those with create revision less than the provided value.
|
* Limit returned keys to those with create revision less or equal than the provided value.
|
||||||
* max_create_revision is the upper bound for returned key create revisions; all keys with
|
* max_create_revision is the upper bound for returned key create revisions; all keys with
|
||||||
* greater create revisions will be filtered away.
|
* greater create revisions will be filtered away.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting this option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param createRevision create revision
|
* @param createRevision create revision
|
||||||
* @return builder
|
* @return builder
|
||||||
*/
|
*/
|
||||||
|
@ -342,10 +468,17 @@ public final class GetOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limit returned keys to those with mod revision greater than the provided value.
|
* Limit returned keys to those with mod revision greater or equal than the provided value.
|
||||||
* min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod
|
* min_mod_revision is the lower bound for returned key mod revisions; all keys with lesser mod
|
||||||
* revisions will be filtered away.
|
* revisions will be filtered away.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting this option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param modRevision mod revision
|
* @param modRevision mod revision
|
||||||
* @return this builder instance
|
* @return this builder instance
|
||||||
*/
|
*/
|
||||||
|
@ -355,10 +488,17 @@ public final class GetOption {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Limit returned keys to those with mod revision less than the provided value. max_mod_revision
|
* Limit returned keys to those with mod revision less or equal than the provided value. max_mod_revision
|
||||||
* is the upper bound for returned key mod revisions; all keys with greater mod revisions will
|
* is the upper bound for returned key mod revisions; all keys with greater mod revisions will
|
||||||
* be filtered away.
|
* be filtered away.
|
||||||
*
|
*
|
||||||
|
* <p>
|
||||||
|
* Note this filter does not affect the count field in GetResponse.
|
||||||
|
* {@link GetResponse#getCount()} always counts the number of keys matched on a range, independent of filters.
|
||||||
|
* For the same reason, it would be meaningless to mix setting this option
|
||||||
|
* with the count only option.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
* @param modRevision mod revision
|
* @param modRevision mod revision
|
||||||
* @return this builder instance
|
* @return this builder instance
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -26,10 +26,12 @@ public final class PutOption {
|
||||||
|
|
||||||
private final long leaseId;
|
private final long leaseId;
|
||||||
private final boolean prevKV;
|
private final boolean prevKV;
|
||||||
|
private final boolean autoRetry;
|
||||||
|
|
||||||
private PutOption(long leaseId, boolean prevKV) {
|
private PutOption(long leaseId, boolean prevKV, boolean autoRetry) {
|
||||||
this.leaseId = leaseId;
|
this.leaseId = leaseId;
|
||||||
this.prevKV = prevKV;
|
this.prevKV = prevKV;
|
||||||
|
this.autoRetry = autoRetry;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -50,6 +52,16 @@ public final class PutOption {
|
||||||
return this.prevKV;
|
return this.prevKV;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to treat a put operation as idempotent from the point of view of automated retries.
|
||||||
|
* Note under failure scenarios this may mean a single put executes more than once.
|
||||||
|
*
|
||||||
|
* @return true if automated retries should happen.
|
||||||
|
*/
|
||||||
|
public boolean isAutoRetry() {
|
||||||
|
return autoRetry;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the builder.
|
* Returns the builder.
|
||||||
*
|
*
|
||||||
|
@ -73,6 +85,7 @@ public final class PutOption {
|
||||||
|
|
||||||
private long leaseId = 0L;
|
private long leaseId = 0L;
|
||||||
private boolean prevKV = false;
|
private boolean prevKV = false;
|
||||||
|
private boolean autoRetry = false;
|
||||||
|
|
||||||
private Builder() {
|
private Builder() {
|
||||||
}
|
}
|
||||||
|
@ -100,13 +113,28 @@ public final class PutOption {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When autoRetry is set, treat this put as idempotent from the point of view of automated retries.
|
||||||
|
* Note under some failure scenarios autoRetry=true may make a put operation execute more than once, where
|
||||||
|
* a first attempt succeeded but its result did not reach the client; by default (autoRetry=false),
|
||||||
|
* the client won't retry since it is not safe to assume on such a failure that the operation did not happen
|
||||||
|
* in the server.
|
||||||
|
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||||
|
*
|
||||||
|
* @return builder
|
||||||
|
*/
|
||||||
|
public Builder withAutoRetry() {
|
||||||
|
this.autoRetry = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* build the put option.
|
* build the put option.
|
||||||
*
|
*
|
||||||
* @return the put option
|
* @return the put option
|
||||||
*/
|
*/
|
||||||
public PutOption build() {
|
public PutOption build() {
|
||||||
return new PutOption(this.leaseId, this.prevKV);
|
return new PutOption(this.leaseId, this.prevKV, this.autoRetry);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
package io.etcd.jetcd.options;
|
||||||
|
|
||||||
|
public final class TxnOption {
|
||||||
|
public static final TxnOption DEFAULT = builder().build();
|
||||||
|
|
||||||
|
private final boolean autoRetry;
|
||||||
|
|
||||||
|
private TxnOption(final boolean autoRetry) {
|
||||||
|
this.autoRetry = autoRetry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether to treat a txn operation as idempotent from the point of view of automated retries.
|
||||||
|
*
|
||||||
|
* @return true if automated retries should happen.
|
||||||
|
*/
|
||||||
|
public boolean isAutoRetry() {
|
||||||
|
return autoRetry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the builder.
|
||||||
|
*
|
||||||
|
* @return the builder
|
||||||
|
*/
|
||||||
|
public static TxnOption.Builder builder() {
|
||||||
|
return new TxnOption.Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class Builder {
|
||||||
|
private boolean autoRetry = false;
|
||||||
|
|
||||||
|
private Builder() {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When autoRetry is set, the txn operation is treated as idempotent from the point of view of automated retries.
|
||||||
|
* Note under some failure scenarios true may make a txn operation be attempted and/or execute more than once, where
|
||||||
|
* a first attempt executed but its result status did not reach the client; by default (autoRetry=false),
|
||||||
|
* the client won't retry since it is not safe to assume on such a failure the operation did not happen.
|
||||||
|
* Requesting withAutoRetry means the client is explicitly asking for retry nevertheless.
|
||||||
|
*
|
||||||
|
* @return builder
|
||||||
|
*/
|
||||||
|
public Builder withAutoRetry() {
|
||||||
|
this.autoRetry = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TxnOption build() {
|
||||||
|
return new TxnOption(autoRetry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,6 @@ package io.etcd.jetcd.resolver;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -30,8 +29,6 @@ import io.grpc.Attributes;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.internal.GrpcUtil;
|
|
||||||
import io.grpc.internal.SharedResourceHolder;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -47,7 +44,6 @@ public abstract class AbstractNameResolver extends NameResolver {
|
||||||
private volatile boolean shutdown;
|
private volatile boolean shutdown;
|
||||||
private volatile boolean resolving;
|
private volatile boolean resolving;
|
||||||
|
|
||||||
private Executor executor;
|
|
||||||
private Listener listener;
|
private Listener listener;
|
||||||
|
|
||||||
public AbstractNameResolver(URI targetUri) {
|
public AbstractNameResolver(URI targetUri) {
|
||||||
|
@ -69,7 +65,6 @@ public abstract class AbstractNameResolver extends NameResolver {
|
||||||
public void start(Listener listener) {
|
public void start(Listener listener) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
Preconditions.checkState(this.listener == null, "already started");
|
Preconditions.checkState(this.listener == null, "already started");
|
||||||
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
|
|
||||||
this.listener = Objects.requireNonNull(listener, "listener");
|
this.listener = Objects.requireNonNull(listener, "listener");
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
|
@ -86,21 +81,13 @@ public abstract class AbstractNameResolver extends NameResolver {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
shutdown = true;
|
shutdown = true;
|
||||||
|
|
||||||
synchronized (lock) {
|
|
||||||
if (executor != null) {
|
|
||||||
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resolve() {
|
private void resolve() {
|
||||||
if (resolving || shutdown) {
|
if (resolving || shutdown) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
synchronized (lock) {
|
doResolve();
|
||||||
executor.execute(this::doResolve);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doResolve() {
|
private void doResolve() {
|
||||||
|
|
|
@ -26,9 +26,18 @@ public final class Errors {
|
||||||
private Errors() {
|
private Errors() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isRetryable(Status status) {
|
// isRetryable implementation for idempotent operations.
|
||||||
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isInvalidTokenError(status)
|
public static boolean isRetryableForSafeRedoOp(Status status) {
|
||||||
|| isAuthStoreExpired(status);
|
return Status.UNAVAILABLE.getCode().equals(status.getCode()) || isAlwaysSafeToRetry(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
// isRetryable implementation for non-idempotent operations
|
||||||
|
public static boolean isRetryableForNoSafeRedoOp(Status status) {
|
||||||
|
return isAlwaysSafeToRetry(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isAlwaysSafeToRetry(Status status) {
|
||||||
|
return isInvalidTokenError(status) || isAuthStoreExpired(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isInvalidTokenError(Throwable e) {
|
public static boolean isInvalidTokenError(Throwable e) {
|
||||||
|
|
|
@ -50,6 +50,11 @@ public class ClientBuilderTest {
|
||||||
assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class);
|
assertThatThrownBy(() -> Client.builder().endpoints((URI) null)).isInstanceOf(NullPointerException.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVertx_Null() {
|
||||||
|
assertThatThrownBy(() -> Client.builder().vertx(null)).isInstanceOf(IllegalArgumentException.class);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEndPoints_Verify_Empty() {
|
public void testEndPoints_Verify_Empty() {
|
||||||
assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class);
|
assertThatThrownBy(() -> Client.builder().endpoints(new URI(""))).isInstanceOf(IllegalArgumentException.class);
|
||||||
|
|
|
@ -316,7 +316,7 @@ public class KVTest {
|
||||||
for (int i = 0; i < putCount; ++i) {
|
for (int i = 0; i < putCount; ++i) {
|
||||||
ByteSequence value = ByteSequence
|
ByteSequence value = ByteSequence
|
||||||
.from(Integer.toString(i), StandardCharsets.UTF_8);
|
.from(Integer.toString(i), StandardCharsets.UTF_8);
|
||||||
customClient.getKVClient().put(key, value).join();
|
customClient.getKVClient().put(key, value, PutOption.builder().withAutoRetry().build()).join();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -39,16 +39,18 @@ class UtilTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAuthErrorIsNotRetryable() {
|
public void testAuthErrorIsRetryable() {
|
||||||
Status authErrorStatus = Status.UNAUTHENTICATED
|
Status authErrorStatus = Status.UNAUTHENTICATED
|
||||||
.withDescription("etcdserver: invalid auth token");
|
.withDescription("etcdserver: invalid auth token");
|
||||||
Status status = Status.fromThrowable(new StatusException(authErrorStatus));
|
Status status = Status.fromThrowable(new StatusException(authErrorStatus));
|
||||||
assertThat(Errors.isRetryable(status)).isTrue();
|
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isTrue();
|
||||||
|
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnavailableErrorIsRetryable() {
|
public void testUnavailableErrorIsRetryable() {
|
||||||
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
|
Status status = Status.fromThrowable(new StatusException(Status.UNAVAILABLE));
|
||||||
assertThat(Errors.isRetryable(status)).isTrue();
|
assertThat(Errors.isRetryableForNoSafeRedoOp(status)).isFalse();
|
||||||
|
assertThat(Errors.isRetryableForSafeRedoOp(status)).isTrue();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.testcontainers.containers.Network;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
|
||||||
public final class Etcd {
|
public final class Etcd {
|
||||||
public static final String CONTAINER_IMAGE = "gcr.io/etcd-development/etcd:v3.5.10";
|
public static final String CONTAINER_IMAGE = "quay.io/coreos/etcd:v3.5.14";
|
||||||
public static final int ETCD_CLIENT_PORT = 2379;
|
public static final int ETCD_CLIENT_PORT = 2379;
|
||||||
public static final int ETCD_PEER_PORT = 2380;
|
public static final int ETCD_PEER_PORT = 2380;
|
||||||
public static final String ETCD_DATA_DIR = "/data.etcd";
|
public static final String ETCD_DATA_DIR = "/data.etcd";
|
||||||
|
@ -58,6 +58,7 @@ public final class Etcd {
|
||||||
private List<String> additionalArgs;
|
private List<String> additionalArgs;
|
||||||
private Network network;
|
private Network network;
|
||||||
private boolean shouldMountDataDirectory = true;
|
private boolean shouldMountDataDirectory = true;
|
||||||
|
private String user;
|
||||||
|
|
||||||
public Builder withClusterName(String clusterName) {
|
public Builder withClusterName(String clusterName) {
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
|
@ -114,12 +115,18 @@ public final class Etcd {
|
||||||
debug,
|
debug,
|
||||||
additionalArgs,
|
additionalArgs,
|
||||||
network != null ? network : Network.SHARED,
|
network != null ? network : Network.SHARED,
|
||||||
shouldMountDataDirectory);
|
shouldMountDataDirectory,
|
||||||
|
user);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder withMountedDataDirectory(boolean shouldMountDataDirectory) {
|
public Builder withMountedDataDirectory(boolean shouldMountDataDirectory) {
|
||||||
this.shouldMountDataDirectory = shouldMountDataDirectory;
|
this.shouldMountDataDirectory = shouldMountDataDirectory;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withUser(String user) {
|
||||||
|
this.user = user;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,8 @@ public class EtcdClusterImpl implements EtcdCluster {
|
||||||
boolean debug,
|
boolean debug,
|
||||||
Collection<String> additionalArgs,
|
Collection<String> additionalArgs,
|
||||||
Network network,
|
Network network,
|
||||||
boolean shouldMountDataDirectory) {
|
boolean shouldMountDataDirectory,
|
||||||
|
String user) {
|
||||||
|
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
this.endpoints = IntStream.range(0, nodes)
|
this.endpoints = IntStream.range(0, nodes)
|
||||||
|
@ -57,7 +58,8 @@ public class EtcdClusterImpl implements EtcdCluster {
|
||||||
.withDebug(debug)
|
.withDebug(debug)
|
||||||
.withAdditionalArgs(additionalArgs)
|
.withAdditionalArgs(additionalArgs)
|
||||||
.withNetwork(network)
|
.withNetwork(network)
|
||||||
.withShouldMountDataDirectory(shouldMountDataDirectory))
|
.withShouldMountDataDirectory(shouldMountDataDirectory)
|
||||||
|
.withUser(user))
|
||||||
.collect(toList());
|
.collect(toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,7 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
|
||||||
private Path dataDirectory;
|
private Path dataDirectory;
|
||||||
private Collection<String> additionalArgs;
|
private Collection<String> additionalArgs;
|
||||||
private boolean shouldMountDataDirectory = true;
|
private boolean shouldMountDataDirectory = true;
|
||||||
|
private String user;
|
||||||
|
|
||||||
public EtcdContainer(String image, String node, Collection<String> nodes) {
|
public EtcdContainer(String image, String node, Collection<String> nodes) {
|
||||||
super(image);
|
super(image);
|
||||||
|
@ -102,6 +103,18 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
|
||||||
return self();
|
return self();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optional values are {@code [ user | user:group | uid | uid:gid | user:gid | uid:group ]}.
|
||||||
|
* See <a href="https://docs.docker.com/engine/reference/run/#user">User</a> .
|
||||||
|
*
|
||||||
|
* @param user Refer to {@link com.github.dockerjava.api.command.CreateContainerCmd#withUser(String)}
|
||||||
|
* @return self container.
|
||||||
|
*/
|
||||||
|
public EtcdContainer withUser(String user) {
|
||||||
|
this.user = user;
|
||||||
|
return self();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
if (!configured.compareAndSet(false, true)) {
|
if (!configured.compareAndSet(false, true)) {
|
||||||
|
@ -120,9 +133,13 @@ public class EtcdContainer extends GenericContainer<EtcdContainer> {
|
||||||
withEnv("ETCD_LOG_LEVEL", this.debug ? "debug" : "info");
|
withEnv("ETCD_LOG_LEVEL", this.debug ? "debug" : "info");
|
||||||
withEnv("ETCD_LOGGER", "zap");
|
withEnv("ETCD_LOGGER", "zap");
|
||||||
|
|
||||||
String user = System.getenv("TC_USER");
|
String tempUser = this.user;
|
||||||
if (user != null) {
|
if (tempUser == null) {
|
||||||
withCreateContainerCmdModifier(c -> c.withUser(user));
|
tempUser = System.getenv("TC_USER");
|
||||||
|
}
|
||||||
|
if (tempUser != null) {
|
||||||
|
String finalUser = tempUser;
|
||||||
|
withCreateContainerCmdModifier(c -> c.withUser(finalUser));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ssl) {
|
if (ssl) {
|
||||||
|
|
|
@ -27,9 +27,6 @@
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<!-- package loggers -->
|
<!-- package loggers -->
|
||||||
<Logger name="org.testcontainers" level="INFO"/>
|
<Logger name="org.testcontainers" level="INFO"/>
|
||||||
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.3]" level="WARN"/>
|
|
||||||
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4]" level="WARN"/>
|
|
||||||
<Logger name="🐳 [gcr.io/etcd-development/etcd:v3.4.7]" level="WARN"/>
|
|
||||||
<Logger name="io.etcd.jetcd.internal.infrastructure" level="INFO"/>
|
<Logger name="io.etcd.jetcd.internal.infrastructure" level="INFO"/>
|
||||||
<Logger name="io.etcd.jetcd.launcher.EtcdCluster" level="WARN"/>
|
<Logger name="io.etcd.jetcd.launcher.EtcdCluster" level="WARN"/>
|
||||||
|
|
||||||
|
|
|
@ -168,6 +168,11 @@ public class EtcdClusterExtension implements BeforeAllCallback, BeforeEachCallba
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withUser(String user) {
|
||||||
|
builder.withUser(user);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public EtcdClusterExtension build() {
|
public EtcdClusterExtension build() {
|
||||||
return new EtcdClusterExtension(builder.build());
|
return new EtcdClusterExtension(builder.build());
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Executor;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -31,8 +30,6 @@ import io.grpc.Attributes;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.NameResolver;
|
import io.grpc.NameResolver;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.internal.GrpcUtil;
|
|
||||||
import io.grpc.internal.SharedResourceHolder;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -47,7 +44,6 @@ public class EtcdClusterNameResolver extends NameResolver {
|
||||||
private volatile boolean shutdown;
|
private volatile boolean shutdown;
|
||||||
private volatile boolean resolving;
|
private volatile boolean resolving;
|
||||||
|
|
||||||
private Executor executor;
|
|
||||||
private Listener listener;
|
private Listener listener;
|
||||||
|
|
||||||
public EtcdClusterNameResolver(URI targetUri) {
|
public EtcdClusterNameResolver(URI targetUri) {
|
||||||
|
@ -65,7 +61,6 @@ public class EtcdClusterNameResolver extends NameResolver {
|
||||||
public void start(Listener listener) {
|
public void start(Listener listener) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
Preconditions.checkState(this.listener == null, "already started");
|
Preconditions.checkState(this.listener == null, "already started");
|
||||||
this.executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
|
|
||||||
this.listener = Objects.requireNonNull(listener, "listener");
|
this.listener = Objects.requireNonNull(listener, "listener");
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
|
@ -82,21 +77,14 @@ public class EtcdClusterNameResolver extends NameResolver {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
shutdown = true;
|
shutdown = true;
|
||||||
|
|
||||||
synchronized (lock) {
|
|
||||||
if (executor != null) {
|
|
||||||
executor = SharedResourceHolder.release(GrpcUtil.SHARED_CHANNEL_EXECUTOR, executor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void resolve() {
|
private void resolve() {
|
||||||
if (resolving || shutdown) {
|
if (resolving || shutdown) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
synchronized (lock) {
|
|
||||||
executor.execute(this::doResolve);
|
doResolve();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doResolve() {
|
private void doResolve() {
|
||||||
|
|
Loading…
Reference in New Issue