Compare commits
21 Commits
Author | SHA1 | Date |
---|---|---|
|
5d702ed0d3 | |
|
1773b8f778 | |
|
a820cad474 | |
|
36156f4d2e | |
|
6a3094f18a | |
|
6f3a18a109 | |
|
83d5cd843d | |
|
45ea93180c | |
|
5aaf88fa0a | |
|
c3fc90f076 | |
|
97382709ba | |
|
65179dc9db | |
|
8c6089aa59 | |
|
8add72d056 | |
|
b2c64b3765 | |
|
81ef37bb55 | |
|
8a65c8cbcf | |
|
a6ff9a030c | |
|
01680d8df5 | |
|
358e039e26 | |
|
81a36172bb |
|
@ -5,22 +5,49 @@ set -e
|
|||
export MSYS_NO_PATHCONV=1 # for Git Bash on Windows
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
LYCHEE_CONFIG="$SCRIPT_DIR/../../.lychee.toml"
|
||||
DEPENDENCIES_DOCKERFILE="$SCRIPT_DIR/dependencies.dockerfile"
|
||||
ROOT_DIR="$SCRIPT_DIR/../.."
|
||||
DEPENDENCIES_DOCKERFILE="$SCRIPT_DIR/dependencies.Dockerfile"
|
||||
|
||||
# Parse command line arguments
|
||||
LOCAL_LINKS_ONLY=false
|
||||
TARGET=""
|
||||
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case $1 in
|
||||
--local-links-only)
|
||||
LOCAL_LINKS_ONLY=true
|
||||
shift
|
||||
;;
|
||||
*)
|
||||
# Treat any other arguments as file paths
|
||||
TARGET="$TARGET $1"
|
||||
shift
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
# Extract lychee version from dependencies.dockerfile
|
||||
LYCHEE_VERSION=$(grep "FROM lycheeverse/lychee:" "$DEPENDENCIES_DOCKERFILE" | sed 's/.*FROM lycheeverse\/lychee:\([^ ]*\).*/\1/')
|
||||
|
||||
if [[ -z "$TARGET" ]]; then
|
||||
TARGET="."
|
||||
fi
|
||||
|
||||
# Build the lychee command with optional GitHub token
|
||||
CMD="lycheeverse/lychee:$LYCHEE_VERSION --verbose --config $(basename "$LYCHEE_CONFIG")"
|
||||
CMD="lycheeverse/lychee:$LYCHEE_VERSION --verbose --root-dir /data"
|
||||
|
||||
# Add GitHub token if available
|
||||
if [[ -n "$GITHUB_TOKEN" ]]; then
|
||||
CMD="$CMD --github-token $GITHUB_TOKEN"
|
||||
fi
|
||||
|
||||
# Add the target directory
|
||||
CMD="$CMD ."
|
||||
if [[ "$LOCAL_LINKS_ONLY" == "true" ]]; then
|
||||
CMD="$CMD --scheme file --include-fragments"
|
||||
else
|
||||
CMD="$CMD --config .github/scripts/lychee-config.toml"
|
||||
fi
|
||||
|
||||
CMD="$CMD $TARGET"
|
||||
|
||||
# Determine if we should allocate a TTY
|
||||
DOCKER_FLAGS="--rm --init"
|
||||
|
@ -32,4 +59,4 @@ fi
|
|||
|
||||
# Run lychee with proper signal handling
|
||||
# shellcheck disable=SC2086
|
||||
exec docker run $DOCKER_FLAGS -v "$(dirname "$LYCHEE_CONFIG")":/data -w /data $CMD
|
||||
exec docker run $DOCKER_FLAGS -v "$ROOT_DIR":/data -w /data $CMD
|
||||
|
|
|
@ -18,6 +18,6 @@ jobs:
|
|||
pull-requests: write # for assigning reviewers
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: open-telemetry/assign-reviewers-action@cb42e3ee14a59c01abccd401f126a0f4c3991cb3 # main
|
||||
- uses: open-telemetry/assign-reviewers-action@fcd27c5381c10288b23d423ab85473710a33389e # main
|
||||
with:
|
||||
config-file: .github/component_owners.yml
|
||||
|
|
|
@ -8,6 +8,9 @@ on:
|
|||
pull_request:
|
||||
merge_group:
|
||||
workflow_dispatch:
|
||||
schedule:
|
||||
# Run daily at 7:30 AM UTC
|
||||
- cron: '30 7 * * *'
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
@ -119,10 +122,13 @@ jobs:
|
|||
path: jmx-metrics/build/reports/tests/integrationTest
|
||||
|
||||
link-check:
|
||||
# merge group and push events are excluded to avoid unnecessary CI failures
|
||||
# (these failures will instead be captured by the daily scheduled run)
|
||||
#
|
||||
# release branches are excluded to avoid unnecessary maintenance if external links break
|
||||
# (and also because the README.md might need update on release branches before the release
|
||||
# download has been published)
|
||||
if: "!startsWith(github.ref_name, 'release/')"
|
||||
if: github.event_name != 'merge_group' && github.event_name != 'push' && !startsWith(github.ref_name, 'release/')
|
||||
uses: ./.github/workflows/reusable-link-check.yml
|
||||
|
||||
markdown-lint-check:
|
||||
|
@ -199,3 +205,31 @@ jobs:
|
|||
)
|
||||
)
|
||||
run: exit 1 # fail
|
||||
|
||||
workflow-notification:
|
||||
permissions:
|
||||
contents: read
|
||||
issues: write
|
||||
if: (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') && always()
|
||||
needs:
|
||||
- build
|
||||
- test
|
||||
- integration-test
|
||||
- link-check
|
||||
- markdown-lint-check
|
||||
- misspell-check
|
||||
- shell-script-check
|
||||
- publish-snapshots
|
||||
uses: ./.github/workflows/reusable-workflow-notification.yml
|
||||
with:
|
||||
success: >-
|
||||
${{
|
||||
needs.build.result == 'success' &&
|
||||
needs.test.result == 'success' &&
|
||||
needs.integration-test.result == 'success' &&
|
||||
needs.link-check.result == 'success' &&
|
||||
needs.markdown-lint-check.result == 'success' &&
|
||||
needs.misspell-check.result == 'success' &&
|
||||
needs.shell-script-check.result == 'success' &&
|
||||
needs.publish-snapshots.result == 'success'
|
||||
}}
|
||||
|
|
|
@ -23,8 +23,18 @@ jobs:
|
|||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- uses: actions/create-github-app-token@df432ceedc7162793a195dd1713ff69aefc7379e # v2.0.6
|
||||
id: create-token
|
||||
with:
|
||||
# analyzing classic branch protections requires a token with admin read permissions
|
||||
# see https://github.com/ossf/scorecard-action/blob/main/docs/authentication/fine-grained-auth-token.md
|
||||
# and https://github.com/open-telemetry/community/issues/2769
|
||||
app-id: ${{ vars.OSSF_SCORECARD_APP_ID }}
|
||||
private-key: ${{ secrets.OSSF_SCORECARD_PRIVATE_KEY }}
|
||||
|
||||
- uses: ossf/scorecard-action@05b42c624433fc40578a4040d5cf5e36ddca8cde # v2.4.2
|
||||
with:
|
||||
repo_token: ${{ steps.create-token.outputs.token }}
|
||||
results_file: results.sarif
|
||||
results_format: sarif
|
||||
publish_results: true
|
||||
|
|
|
@ -11,8 +11,35 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0 # needed for merge-base below
|
||||
|
||||
- name: Link check
|
||||
- name: Link check - relative links (all files)
|
||||
if: github.event_name == 'pull_request'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
run: ./.github/scripts/link-check.sh --local-links-only
|
||||
|
||||
- name: Get modified files
|
||||
if: github.event_name == 'pull_request'
|
||||
id: modified-files
|
||||
run: |
|
||||
merge_base=$(git merge-base origin/${{ github.base_ref }} HEAD)
|
||||
# Using lychee's default extension filter here to match when it runs against all files
|
||||
modified_files=$(git diff --name-only $merge_base...${{ github.event.pull_request.head.sha }} \
|
||||
| grep -E '\.(md|mkd|mdx|mdown|mdwn|mkdn|mkdown|markdown|html|htm|txt)$' \
|
||||
| tr '\n' ' ' || true)
|
||||
echo "files=$modified_files" >> $GITHUB_OUTPUT
|
||||
echo "Modified files: $modified_files"
|
||||
|
||||
- name: Link check - all links (modified files only)
|
||||
if: github.event_name == 'pull_request' && steps.modified-files.outputs.files != ''
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
run: ./.github/scripts/link-check.sh ${{ steps.modified-files.outputs.files }}
|
||||
|
||||
- name: Link check - all links (all files)
|
||||
if: github.event_name != 'pull_request'
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
run: ./.github/scripts/link-check.sh
|
||||
|
|
15
CHANGELOG.md
15
CHANGELOG.md
|
@ -2,11 +2,26 @@
|
|||
|
||||
## Unreleased
|
||||
|
||||
## Version 1.47.0 (2025-07-04)
|
||||
|
||||
### Disk buffering
|
||||
|
||||
- Shared storage
|
||||
([#1912](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1912))
|
||||
- Implementing ExtendedLogRecordData
|
||||
([#1918](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1918))
|
||||
- Add missing EventName to disk-buffering LogRecordDataMapper
|
||||
([#1950](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1950))
|
||||
|
||||
### GCP authentication extension
|
||||
|
||||
- Update the internal implementation such that the required headers are retrieved
|
||||
from the Google Auth Library instead of manually constructing and passing them.
|
||||
([#1860](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1860))
|
||||
- Add metrics support to auth extension
|
||||
([#1891](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1891))
|
||||
- Update ConfigurableOptions to read from ConfigProperties
|
||||
([#1904](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1904))
|
||||
|
||||
### Inferred spans
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
plugins {
|
||||
`kotlin-dsl`
|
||||
// When updating, update below in dependencies too
|
||||
id("com.diffplug.spotless") version "7.0.4"
|
||||
id("com.diffplug.spotless") version "7.1.0"
|
||||
}
|
||||
|
||||
repositories {
|
||||
|
@ -12,7 +12,7 @@ repositories {
|
|||
|
||||
dependencies {
|
||||
// When updating, update above in plugins too
|
||||
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.4")
|
||||
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.1.0")
|
||||
implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.3.0")
|
||||
implementation("net.ltgt.gradle:gradle-nullaway-plugin:2.2.0")
|
||||
implementation("org.owasp:dependency-check-gradle:12.1.3")
|
||||
|
|
|
@ -135,7 +135,7 @@ testing {
|
|||
dependencies {
|
||||
implementation(project(project.path))
|
||||
|
||||
implementation(enforcedPlatform("org.junit:junit-bom:5.13.2"))
|
||||
implementation(enforcedPlatform("org.junit:junit-bom:5.13.3"))
|
||||
implementation(enforcedPlatform("org.testcontainers:testcontainers-bom:1.21.3"))
|
||||
implementation(enforcedPlatform("com.google.guava:guava-bom:33.4.8-jre"))
|
||||
implementation(enforcedPlatform("com.linecorp.armeria:armeria-bom:1.32.5"))
|
||||
|
|
|
@ -9,7 +9,7 @@ otelJava.moduleName.set("io.opentelemetry.contrib.compressor.zstd")
|
|||
dependencies {
|
||||
api("io.opentelemetry:opentelemetry-exporter-common")
|
||||
|
||||
implementation("com.github.luben:zstd-jni:1.5.7-3")
|
||||
implementation("com.github.luben:zstd-jni:1.5.7-4")
|
||||
|
||||
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
|
||||
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp")
|
||||
|
|
|
@ -2,7 +2,7 @@ plugins {
|
|||
`java-platform`
|
||||
}
|
||||
|
||||
val otelInstrumentationVersion = "2.17.0-alpha"
|
||||
val otelInstrumentationVersion = "2.17.1-alpha"
|
||||
val semconvVersion = "1.34.0"
|
||||
|
||||
javaPlatform {
|
||||
|
@ -16,6 +16,7 @@ dependencies {
|
|||
// as runtime dependencies if they are actually used as runtime dependencies)
|
||||
api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}"))
|
||||
api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1"))
|
||||
api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1"))
|
||||
|
||||
constraints {
|
||||
api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}")
|
||||
|
@ -25,8 +26,8 @@ dependencies {
|
|||
api("com.google.auto.service:auto-service-annotations:1.1.1")
|
||||
api("com.google.auto.value:auto-value:1.11.0")
|
||||
api("com.google.auto.value:auto-value-annotations:1.11.0")
|
||||
api("com.google.errorprone:error_prone_annotations:2.39.0")
|
||||
api("com.google.errorprone:error_prone_core:2.39.0")
|
||||
api("com.google.errorprone:error_prone_annotations:2.40.0")
|
||||
api("com.google.errorprone:error_prone_core:2.40.0")
|
||||
api("io.github.netmikey.logunit:logunit-jul:2.0.0")
|
||||
api("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha")
|
||||
api("io.prometheus:simpleclient:0.16.0")
|
||||
|
@ -43,7 +44,7 @@ dependencies {
|
|||
|
||||
api("com.google.code.findbugs:annotations:3.0.1u2")
|
||||
api("com.google.code.findbugs:jsr305:3.0.2")
|
||||
api("com.squareup.okhttp3:okhttp:4.12.0")
|
||||
api("com.squareup.okhttp3:okhttp:5.1.0")
|
||||
api("com.uber.nullaway:nullaway:0.12.7")
|
||||
api("org.assertj:assertj-core:3.27.3")
|
||||
api("org.awaitility:awaitility:4.3.0")
|
||||
|
|
|
@ -7,7 +7,7 @@ plugins {
|
|||
id("com.github.johnrengelman.shadow")
|
||||
id("me.champeau.jmh") version "0.7.3"
|
||||
id("ru.vyarus.animalsniffer") version "2.0.1"
|
||||
id("com.squareup.wire") version "5.3.3"
|
||||
id("com.squareup.wire") version "5.3.5"
|
||||
}
|
||||
|
||||
description = "Exporter implementations that store signals on disk"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionSha256Sum=7197a12f450794931532469d4ff21a59ea2c1cd59a3ec3f89c035c3c420a6999
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
|
||||
distributionSha256Sum=bd71102213493060956ec229d946beee57158dbd89d0e62b91bca0fa2c5f3531
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
|
||||
networkTimeout=10000
|
||||
validateDistributionUrl=true
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
|
|
|
@ -70,7 +70,7 @@ Supported values for `otel.jmx.target.system` and support for `otel.jmx.target.s
|
|||
| `kafka-consumer` | Apache Kafka consumer | [`kafka-consumer.yaml`](src/main/resources/kafka-consumer.yaml) | |
|
||||
| `kafka-producer` | Apache Kafka producer | [`kafka-producer.yaml`](src/main/resources/kafka-producer.yaml) | |
|
||||
| `solr` | Apache Solr | [`solr.yaml`](src/main/resources/solr.yaml) | |
|
||||
| `tomcat` | Apache Tomcat | [`tomcat.yaml`](src/main/resources/tomcat.yaml) | |
|
||||
| `tomcat` | Apache Tomcat | [`tomcat.yaml`](src/main/resources/tomcat.yaml) | [`tomcat.yaml`](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/tomcat.yaml) |
|
||||
| `wildfly` | Wildfly | [`wildfly.yaml`](src/main/resources/wildfly.yaml) | |
|
||||
|
||||
The source of metrics definitions is controlled by `otel.jmx.target.source`:
|
||||
|
|
|
@ -120,7 +120,7 @@ public class OpenTelemetrySdkServiceTest {
|
|||
} finally {
|
||||
System.clearProperty("otel.exporter.otlp.endpoint");
|
||||
System.clearProperty("otel.exporter.otlp.traces.endpoint");
|
||||
System.clearProperty("otel.exporter.otlp.traces.protocol");
|
||||
System.clearProperty("otel.traces.exporter");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,14 +20,14 @@ dependencies {
|
|||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
|
||||
testImplementation("io.micrometer:micrometer-core:1.15.1")
|
||||
testImplementation("io.micrometer:micrometer-core:1.15.2")
|
||||
}
|
||||
|
||||
testing {
|
||||
suites {
|
||||
val integrationTest by registering(JvmTestSuite::class) {
|
||||
dependencies {
|
||||
implementation("io.micrometer:micrometer-registry-prometheus:1.15.1")
|
||||
implementation("io.micrometer:micrometer-registry-prometheus:1.15.2")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import java.net.URL
|
|||
plugins {
|
||||
id("otel.java-conventions")
|
||||
id("de.undercouch.download") version "5.6.0"
|
||||
id("com.squareup.wire") version "5.3.3"
|
||||
id("com.squareup.wire") version "5.3.5"
|
||||
}
|
||||
|
||||
description = "Client implementation of the OpAMP spec."
|
||||
|
@ -16,6 +16,7 @@ dependencies {
|
|||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
testImplementation("org.mockito:mockito-inline")
|
||||
testImplementation("com.google.protobuf:protobuf-java-util")
|
||||
}
|
||||
|
||||
val opampProtos = tasks.register<DownloadOpampProtos>("opampProtoDownload", download)
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.WebSocketListener;
|
||||
import okio.ByteString;
|
||||
|
||||
public class OkHttpWebSocket implements WebSocket {
|
||||
private final String url;
|
||||
private final OkHttpClient client;
|
||||
private final AtomicReference<Status> status = new AtomicReference<>(Status.NOT_RUNNING);
|
||||
private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>();
|
||||
|
||||
public static OkHttpWebSocket create(String url) {
|
||||
return create(url, new OkHttpClient());
|
||||
}
|
||||
|
||||
public static OkHttpWebSocket create(String url, OkHttpClient client) {
|
||||
return new OkHttpWebSocket(url, client);
|
||||
}
|
||||
|
||||
private OkHttpWebSocket(String url, OkHttpClient client) {
|
||||
this.url = url;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(Listener listener) {
|
||||
if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) {
|
||||
okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();
|
||||
webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean send(byte[] request) {
|
||||
if (status.get() != Status.RUNNING) {
|
||||
return false;
|
||||
}
|
||||
return getWebSocket().send(ByteString.of(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(int code, @Nullable String reason) {
|
||||
if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) {
|
||||
try {
|
||||
if (!getWebSocket().close(code, reason)) {
|
||||
status.set(Status.NOT_RUNNING);
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
status.set(Status.RUNNING);
|
||||
// Re-throwing as this error happens due to a caller error.
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private okhttp3.WebSocket getWebSocket() {
|
||||
return Objects.requireNonNull(webSocket.get());
|
||||
}
|
||||
|
||||
private class ListenerAdapter extends WebSocketListener {
|
||||
private final Listener delegate;
|
||||
|
||||
private ListenerAdapter(Listener delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) {
|
||||
status.set(Status.RUNNING);
|
||||
delegate.onOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
|
||||
status.set(Status.CLOSING);
|
||||
delegate.onClosing();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
|
||||
status.set(Status.NOT_RUNNING);
|
||||
delegate.onClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(
|
||||
@Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) {
|
||||
status.set(Status.NOT_RUNNING);
|
||||
delegate.onFailure(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
|
||||
delegate.onMessage(bytes.toByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
enum Status {
|
||||
NOT_RUNNING,
|
||||
STARTING,
|
||||
CLOSING,
|
||||
RUNNING
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public interface WebSocket {
|
||||
/**
|
||||
* Starts the websocket connection if it's not yet started or if it has been closed.
|
||||
*
|
||||
* @param listener Will receive events from the websocket connection.
|
||||
*/
|
||||
void open(Listener listener);
|
||||
|
||||
/**
|
||||
* Stops the websocket connection if running. Nothing will happen if it's already stopped.
|
||||
*
|
||||
* @param code Status code as defined by <a
|
||||
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a>
|
||||
* @param reason Reason for shutting down, as explained in <a
|
||||
* href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC
|
||||
* 6455</a>
|
||||
*/
|
||||
void close(int code, @Nullable String reason);
|
||||
|
||||
/**
|
||||
* Sends a message via the websocket connection.
|
||||
*
|
||||
* @param request The message payload.
|
||||
* @return {@code false} If the message can't be dispatched for any reason, whether the websocket
|
||||
* isn't running, or the connection isn't established, or it's terminated. {@code true} if the
|
||||
* message can get sent. Returning {@code true} doesn't guarantee that the message will arrive
|
||||
* at the remote peer.
|
||||
*/
|
||||
boolean send(byte[] request);
|
||||
|
||||
interface Listener {
|
||||
|
||||
/**
|
||||
* Called when the websocket connection is successfully established with the remote peer. The
|
||||
* client may start sending messages after this method is called.
|
||||
*/
|
||||
void onOpen();
|
||||
|
||||
/**
|
||||
* Called when the closing handshake has started. No further messages will be sent after this
|
||||
* method call.
|
||||
*/
|
||||
void onClosing();
|
||||
|
||||
/** Called when the connection is terminated and no further messages can be transmitted. */
|
||||
void onClosed();
|
||||
|
||||
/**
|
||||
* Called when receiving a message from the remote peer.
|
||||
*
|
||||
* @param data The payload sent by the remote peer.
|
||||
*/
|
||||
void onMessage(byte[] data);
|
||||
|
||||
/**
|
||||
* Called when the connection is closed or cannot be established due to an error. No messages
|
||||
* can be transmitted after this method is called.
|
||||
*
|
||||
* @param t The error.
|
||||
*/
|
||||
void onFailure(Throwable t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.request.service;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
final class DaemonThreadFactory implements ThreadFactory {
|
||||
private final ThreadFactory delegate = Executors.defaultThreadFactory();
|
||||
|
||||
@Override
|
||||
public Thread newThread(@Nonnull Runnable r) {
|
||||
Thread t = delegate.newThread(r);
|
||||
try {
|
||||
t.setDaemon(true);
|
||||
} catch (SecurityException e) {
|
||||
// Well, we tried.
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
|
@ -22,13 +22,11 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import opamp.proto.AgentToServer;
|
||||
import opamp.proto.ServerErrorResponse;
|
||||
|
@ -255,19 +253,4 @@ public final class HttpRequestService implements RequestService {
|
|||
return currentDelay.getNextDelay();
|
||||
}
|
||||
}
|
||||
|
||||
private static class DaemonThreadFactory implements ThreadFactory {
|
||||
private final ThreadFactory delegate = Executors.defaultThreadFactory();
|
||||
|
||||
@Override
|
||||
public Thread newThread(@Nonnull Runnable r) {
|
||||
Thread t = delegate.newThread(r);
|
||||
try {
|
||||
t.setDaemon(true);
|
||||
} catch (SecurityException e) {
|
||||
// Well, we tried.
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.request.service;
|
||||
|
||||
import com.squareup.wire.ProtoAdapter;
|
||||
import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket;
|
||||
import io.opentelemetry.opamp.client.internal.request.Request;
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
|
||||
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError;
|
||||
import io.opentelemetry.opamp.client.internal.response.Response;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import opamp.proto.ServerErrorResponse;
|
||||
import opamp.proto.ServerErrorResponseType;
|
||||
import opamp.proto.ServerToAgent;
|
||||
|
||||
public final class WebSocketRequestService implements RequestService, WebSocket.Listener {
|
||||
private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
|
||||
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
|
||||
|
||||
private final WebSocket webSocket;
|
||||
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
||||
private final AtomicBoolean hasStopped = new AtomicBoolean(false);
|
||||
private final ConnectionStatus connectionStatus;
|
||||
private final ScheduledExecutorService executorService;
|
||||
|
||||
/** Defined <a href="https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1">here</a>. */
|
||||
private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000;
|
||||
|
||||
@GuardedBy("hasPendingRequestLock")
|
||||
private boolean hasPendingRequest = false;
|
||||
|
||||
private final Object hasPendingRequestLock = new Object();
|
||||
@Nullable private Callback callback;
|
||||
@Nullable private Supplier<Request> requestSupplier;
|
||||
|
||||
/**
|
||||
* Creates an {@link WebSocketRequestService}.
|
||||
*
|
||||
* @param webSocket The WebSocket implementation.
|
||||
*/
|
||||
public static WebSocketRequestService create(WebSocket webSocket) {
|
||||
return create(webSocket, DEFAULT_DELAY_BETWEEN_RETRIES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an {@link WebSocketRequestService}.
|
||||
*
|
||||
* @param webSocket The WebSocket implementation.
|
||||
* @param periodicRetryDelay The time to wait between retries.
|
||||
*/
|
||||
public static WebSocketRequestService create(
|
||||
WebSocket webSocket, PeriodicDelay periodicRetryDelay) {
|
||||
return new WebSocketRequestService(
|
||||
webSocket,
|
||||
periodicRetryDelay,
|
||||
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()));
|
||||
}
|
||||
|
||||
WebSocketRequestService(
|
||||
WebSocket webSocket,
|
||||
PeriodicDelay periodicRetryDelay,
|
||||
ScheduledExecutorService executorService) {
|
||||
this.webSocket = webSocket;
|
||||
this.executorService = executorService;
|
||||
this.connectionStatus = new ConnectionStatus(periodicRetryDelay);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(Callback callback, Supplier<Request> requestSupplier) {
|
||||
if (hasStopped.get()) {
|
||||
throw new IllegalStateException("This service is already stopped");
|
||||
}
|
||||
if (isRunning.compareAndSet(false, true)) {
|
||||
this.callback = callback;
|
||||
this.requestSupplier = requestSupplier;
|
||||
startConnection();
|
||||
} else {
|
||||
throw new IllegalStateException("The service has already started");
|
||||
}
|
||||
}
|
||||
|
||||
private void startConnection() {
|
||||
webSocket.open(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest() {
|
||||
if (!isRunning.get()) {
|
||||
throw new IllegalStateException("The service is not running");
|
||||
}
|
||||
if (hasStopped.get()) {
|
||||
throw new IllegalStateException("This service is already stopped");
|
||||
}
|
||||
|
||||
doSendRequest();
|
||||
}
|
||||
|
||||
private void doSendRequest() {
|
||||
try {
|
||||
synchronized (hasPendingRequestLock) {
|
||||
if (!trySendRequest()) {
|
||||
hasPendingRequest = true;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
getCallback().onRequestFailed(e);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean trySendRequest() throws IOException {
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
ProtoAdapter.UINT64.encode(outputStream, 0L);
|
||||
byte[] payload = getRequest().getAgentToServer().encode();
|
||||
outputStream.write(payload);
|
||||
return webSocket.send(outputStream.toByteArray());
|
||||
}
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Request getRequest() {
|
||||
return Objects.requireNonNull(requestSupplier).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (hasStopped.compareAndSet(false, true)) {
|
||||
/*
|
||||
Sending last message as explained in the spec:
|
||||
https://opentelemetry.io/docs/specs/opamp/#websocket-transport-opamp-client-initiated.
|
||||
The client implementation must ensure that the "agent_disconnect" field will be provided in the
|
||||
next supplied request body.
|
||||
*/
|
||||
doSendRequest();
|
||||
webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null);
|
||||
executorService.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen() {
|
||||
connectionStatus.success();
|
||||
getCallback().onConnectionSuccess();
|
||||
synchronized (hasPendingRequestLock) {
|
||||
if (hasPendingRequest) {
|
||||
hasPendingRequest = false;
|
||||
sendRequest();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(byte[] data) {
|
||||
try {
|
||||
ServerToAgent serverToAgent = readServerToAgent(data);
|
||||
|
||||
if (serverToAgent.error_response != null) {
|
||||
handleServerError(serverToAgent.error_response);
|
||||
getCallback()
|
||||
.onRequestFailed(
|
||||
new OpampServerResponseError(serverToAgent.error_response.error_message));
|
||||
return;
|
||||
}
|
||||
|
||||
getCallback().onRequestSuccess(Response.create(serverToAgent));
|
||||
} catch (IOException e) {
|
||||
getCallback().onRequestFailed(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static ServerToAgent readServerToAgent(byte[] data) throws IOException {
|
||||
int headerSize = ProtoAdapter.UINT64.encodedSize(ProtoAdapter.UINT64.decode(data));
|
||||
int payloadSize = data.length - headerSize;
|
||||
byte[] payload = new byte[payloadSize];
|
||||
System.arraycopy(data, headerSize, payload, 0, payloadSize);
|
||||
return ServerToAgent.ADAPTER.decode(payload);
|
||||
}
|
||||
|
||||
private void handleServerError(ServerErrorResponse errorResponse) {
|
||||
if (serverIsUnavailable(errorResponse)) {
|
||||
Duration retryAfter = null;
|
||||
|
||||
if (errorResponse.retry_info != null) {
|
||||
retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds);
|
||||
}
|
||||
|
||||
webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null);
|
||||
connectionStatus.retryAfter(retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) {
|
||||
return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosing() {
|
||||
// Noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed() {
|
||||
// If this service isn't stopped, we should retry connecting.
|
||||
connectionStatus.retryAfter(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
getCallback().onConnectionFailed(t);
|
||||
connectionStatus.retryAfter(null);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private Callback getCallback() {
|
||||
return Objects.requireNonNull(callback);
|
||||
}
|
||||
|
||||
private class ConnectionStatus {
|
||||
private final PeriodicDelay periodicRetryDelay;
|
||||
private final AtomicBoolean retryingConnection = new AtomicBoolean(false);
|
||||
private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false);
|
||||
|
||||
ConnectionStatus(PeriodicDelay periodicRetryDelay) {
|
||||
this.periodicRetryDelay = periodicRetryDelay;
|
||||
}
|
||||
|
||||
void success() {
|
||||
retryingConnection.set(false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("FutureReturnValueIgnored")
|
||||
void retryAfter(@Nullable Duration retryAfter) {
|
||||
if (hasStopped.get()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (retryingConnection.compareAndSet(false, true)) {
|
||||
periodicRetryDelay.reset();
|
||||
if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
|
||||
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter);
|
||||
}
|
||||
}
|
||||
|
||||
if (nextRetryScheduled.compareAndSet(false, true)) {
|
||||
executorService.schedule(
|
||||
this::retryConnection,
|
||||
periodicRetryDelay.getNextDelay().toNanos(),
|
||||
TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private void retryConnection() {
|
||||
nextRetryScheduled.set(false);
|
||||
startConnection();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.clearInvocations;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.WebSocketListener;
|
||||
import okio.ByteString;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class OkHttpWebSocketTest {
|
||||
@Mock private OkHttpClient client;
|
||||
@Mock private okhttp3.WebSocket okHttpWebSocket;
|
||||
@Mock private WebSocket.Listener listener;
|
||||
@Captor private ArgumentCaptor<Request> requestCaptor;
|
||||
@Captor private ArgumentCaptor<WebSocketListener> listenerCaptor;
|
||||
private static final String URL = "ws://some.server";
|
||||
private OkHttpWebSocket webSocket;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
webSocket = OkHttpWebSocket.create(URL, client);
|
||||
when(client.newWebSocket(any(), any())).thenReturn(okHttpWebSocket);
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateOpen() {
|
||||
// Assert websocket created
|
||||
openAndCaptureArguments();
|
||||
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
|
||||
|
||||
// Assert further calls to open won't do anything
|
||||
webSocket.open(listener);
|
||||
verifyNoMoreInteractions(client);
|
||||
|
||||
// When connectivity succeeds, open calls won't do anything.
|
||||
callOnOpen();
|
||||
webSocket.open(listener);
|
||||
verifyNoMoreInteractions(client);
|
||||
|
||||
// When connectivity fails, allow future open calls
|
||||
clearInvocations(client);
|
||||
callOnFailure();
|
||||
openAndCaptureArguments();
|
||||
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateSend() {
|
||||
byte[] payload = new byte[1];
|
||||
|
||||
// Before opening
|
||||
assertThat(webSocket.send(payload)).isFalse();
|
||||
|
||||
// After opening successfully
|
||||
when(okHttpWebSocket.send(any(ByteString.class))).thenReturn(true);
|
||||
openAndCaptureArguments();
|
||||
callOnOpen();
|
||||
assertThat(webSocket.send(payload)).isTrue();
|
||||
verify(okHttpWebSocket).send(ByteString.of(payload));
|
||||
|
||||
// After failing
|
||||
callOnFailure();
|
||||
assertThat(webSocket.send(payload)).isFalse();
|
||||
verifyNoMoreInteractions(okHttpWebSocket);
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateClose() {
|
||||
openAndCaptureArguments();
|
||||
|
||||
callOnOpen();
|
||||
webSocket.close(123, "something");
|
||||
verify(okHttpWebSocket).close(123, "something");
|
||||
|
||||
// Validate calling it again
|
||||
webSocket.close(1, null);
|
||||
verifyNoMoreInteractions(okHttpWebSocket);
|
||||
|
||||
// Once closed, it should be possible to reopen it.
|
||||
clearInvocations(client);
|
||||
callOnClosed();
|
||||
openAndCaptureArguments();
|
||||
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateOnClosing() {
|
||||
openAndCaptureArguments();
|
||||
|
||||
callOnOpen();
|
||||
callOnClosing();
|
||||
|
||||
// Validate calling after onClosing
|
||||
webSocket.close(1, null);
|
||||
verifyNoInteractions(okHttpWebSocket);
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateOnMessage() {
|
||||
byte[] payload = new byte[1];
|
||||
openAndCaptureArguments();
|
||||
|
||||
listenerCaptor.getValue().onMessage(mock(), ByteString.of(payload));
|
||||
verify(listener).onMessage(payload);
|
||||
}
|
||||
|
||||
private void callOnOpen() {
|
||||
listenerCaptor.getValue().onOpen(mock(), mock());
|
||||
verify(listener).onOpen();
|
||||
}
|
||||
|
||||
private void callOnClosed() {
|
||||
listenerCaptor.getValue().onClosed(mock(), 0, "");
|
||||
verify(listener).onClosed();
|
||||
}
|
||||
|
||||
private void callOnClosing() {
|
||||
listenerCaptor.getValue().onClosing(mock(), 0, "");
|
||||
verify(listener).onClosing();
|
||||
}
|
||||
|
||||
private void callOnFailure() {
|
||||
Throwable t = mock();
|
||||
listenerCaptor.getValue().onFailure(mock(), t, mock());
|
||||
verify(listener).onFailure(t);
|
||||
}
|
||||
|
||||
private void openAndCaptureArguments() {
|
||||
webSocket.open(listener);
|
||||
verify(client).newWebSocket(requestCaptor.capture(), listenerCaptor.capture());
|
||||
}
|
||||
}
|
|
@ -8,10 +8,8 @@ package io.opentelemetry.opamp.client.internal.request.service;
|
|||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.clearInvocations;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -21,7 +19,6 @@ import io.opentelemetry.opamp.client.internal.connectivity.http.HttpErrorExcepti
|
|||
import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender;
|
||||
import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser;
|
||||
import io.opentelemetry.opamp.client.internal.request.Request;
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
|
||||
import io.opentelemetry.opamp.client.internal.response.Response;
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -33,10 +30,7 @@ import java.util.List;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import opamp.proto.AgentToServer;
|
||||
|
@ -44,7 +38,6 @@ import opamp.proto.RetryInfo;
|
|||
import opamp.proto.ServerErrorResponse;
|
||||
import opamp.proto.ServerErrorResponseType;
|
||||
import opamp.proto.ServerToAgent;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -61,8 +54,7 @@ class HttpRequestServiceTest {
|
|||
private static final Duration RETRY_DELAY = Duration.ofSeconds(5);
|
||||
|
||||
@Mock private RequestService.Callback callback;
|
||||
private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
|
||||
private ScheduledExecutorService executorService;
|
||||
private TestScheduler scheduler;
|
||||
private TestHttpSender requestSender;
|
||||
private PeriodicDelay periodicRequestDelay;
|
||||
private PeriodicDelayWithSuggestion periodicRetryDelay;
|
||||
|
@ -74,42 +66,40 @@ class HttpRequestServiceTest {
|
|||
requestSender = new TestHttpSender();
|
||||
periodicRequestDelay = createPeriodicDelay(REGULAR_DELAY);
|
||||
periodicRetryDelay = createPeriodicDelayWithSuggestionSupport(RETRY_DELAY);
|
||||
executorService = createTestScheduleExecutorService();
|
||||
scheduler = new TestScheduler();
|
||||
httpRequestService =
|
||||
new HttpRequestService(
|
||||
requestSender,
|
||||
executorService,
|
||||
scheduler.getMockService(),
|
||||
periodicRequestDelay,
|
||||
periodicRetryDelay,
|
||||
RetryAfterParser.getInstance());
|
||||
httpRequestService.start(callback, this::createRequestSupplier);
|
||||
httpRequestService.start(callback, this::createRequest);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
httpRequestService.stop();
|
||||
scheduledTasks.clear();
|
||||
verify(executorService).shutdown();
|
||||
verify(scheduler.getMockService()).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifyStart_scheduledFirstTask() {
|
||||
assertThat(scheduledTasks).hasSize(1);
|
||||
ScheduledTask firstTask = scheduledTasks.get(0);
|
||||
assertThat(firstTask.delay).isEqualTo(REGULAR_DELAY);
|
||||
TestScheduler.Task firstTask = assertAndGetSingleCurrentTask();
|
||||
assertThat(firstTask.getDelay()).isEqualTo(REGULAR_DELAY);
|
||||
|
||||
// Verify initial task creates next one
|
||||
scheduledTasks.clear();
|
||||
scheduler.clearTasks();
|
||||
requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build()));
|
||||
firstTask.run();
|
||||
|
||||
assertThat(scheduledTasks).hasSize(1);
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
|
||||
// Check on-demand requests don't create subsequent tasks
|
||||
requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build()));
|
||||
httpRequestService.sendRequest();
|
||||
|
||||
assertThat(scheduledTasks).hasSize(1);
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -128,14 +118,14 @@ class HttpRequestServiceTest {
|
|||
@Test
|
||||
void verifyWhenSendingOnDemandRequest_andDelayChanges() {
|
||||
// Initial state
|
||||
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
|
||||
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY);
|
||||
|
||||
// Trigger delay strategy change
|
||||
requestSender.enqueueResponse(createFailedResponse(503));
|
||||
httpRequestService.sendRequest();
|
||||
|
||||
// Expected state
|
||||
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY);
|
||||
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(RETRY_DELAY);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -252,30 +242,30 @@ class HttpRequestServiceTest {
|
|||
private void verifyRetryDelayOnError(
|
||||
HttpSender.Response errorResponse, Duration expectedRetryDelay) {
|
||||
requestSender.enqueueResponse(errorResponse);
|
||||
ScheduledTask previousTask = assertAndGetSingleCurrentTask();
|
||||
TestScheduler.Task previousTask = assertAndGetSingleCurrentTask();
|
||||
|
||||
previousTask.run();
|
||||
|
||||
verifySingleRequestSent();
|
||||
verify(periodicRetryDelay).reset();
|
||||
verify(callback).onRequestFailed(any());
|
||||
ScheduledTask retryTask = assertAndGetSingleCurrentTask();
|
||||
assertThat(retryTask.delay).isEqualTo(expectedRetryDelay);
|
||||
TestScheduler.Task retryTask = assertAndGetSingleCurrentTask();
|
||||
assertThat(retryTask.getDelay()).isEqualTo(expectedRetryDelay);
|
||||
|
||||
// Retry with another error
|
||||
clearInvocations(callback);
|
||||
scheduledTasks.clear();
|
||||
scheduler.clearTasks();
|
||||
requestSender.enqueueResponse(createFailedResponse(500));
|
||||
retryTask.run();
|
||||
|
||||
verifySingleRequestSent();
|
||||
verify(callback).onRequestFailed(any());
|
||||
ScheduledTask retryTask2 = assertAndGetSingleCurrentTask();
|
||||
assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay);
|
||||
TestScheduler.Task retryTask2 = assertAndGetSingleCurrentTask();
|
||||
assertThat(retryTask2.getDelay()).isEqualTo(expectedRetryDelay);
|
||||
|
||||
// Retry with a success
|
||||
clearInvocations(callback);
|
||||
scheduledTasks.clear();
|
||||
scheduler.clearTasks();
|
||||
ServerToAgent serverToAgent = new ServerToAgent.Builder().build();
|
||||
requestSender.enqueueResponse(createSuccessfulResponse(serverToAgent));
|
||||
retryTask2.run();
|
||||
|
@ -283,16 +273,17 @@ class HttpRequestServiceTest {
|
|||
verify(periodicRequestDelay).reset();
|
||||
verifySingleRequestSent();
|
||||
verifyRequestSuccessCallback(serverToAgent);
|
||||
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
|
||||
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY);
|
||||
}
|
||||
|
||||
private Request createRequestSupplier() {
|
||||
private Request createRequest() {
|
||||
AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build();
|
||||
requestSize = agentToServer.encodeByteString().size();
|
||||
return Request.create(agentToServer);
|
||||
}
|
||||
|
||||
private ScheduledTask assertAndGetSingleCurrentTask() {
|
||||
private TestScheduler.Task assertAndGetSingleCurrentTask() {
|
||||
List<TestScheduler.Task> scheduledTasks = scheduler.getScheduledTasks();
|
||||
assertThat(scheduledTasks).hasSize(1);
|
||||
return scheduledTasks.get(0);
|
||||
}
|
||||
|
@ -313,33 +304,6 @@ class HttpRequestServiceTest {
|
|||
assertThat(captor.getValue().getMessage()).isEqualTo("Error message");
|
||||
}
|
||||
|
||||
private ScheduledExecutorService createTestScheduleExecutorService() {
|
||||
ScheduledExecutorService service = mock();
|
||||
|
||||
lenient()
|
||||
.doAnswer(
|
||||
invocation -> {
|
||||
Runnable runnable = invocation.getArgument(0);
|
||||
runnable.run();
|
||||
return null;
|
||||
})
|
||||
.when(service)
|
||||
.execute(any());
|
||||
|
||||
when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
|
||||
.thenAnswer(
|
||||
invocation -> {
|
||||
ScheduledTask task =
|
||||
new ScheduledTask(invocation.getArgument(0), invocation.getArgument(1));
|
||||
|
||||
scheduledTasks.add(task);
|
||||
|
||||
return task;
|
||||
});
|
||||
|
||||
return service;
|
||||
}
|
||||
|
||||
private static HttpSender.Response createSuccessfulResponse(ServerToAgent serverToAgent) {
|
||||
return createSuccessfulResponse(serverToAgent.encodeByteString().toByteArray());
|
||||
}
|
||||
|
@ -370,32 +334,6 @@ class HttpRequestServiceTest {
|
|||
return spy(new PeriodicDelayWithSuggestion(delay));
|
||||
}
|
||||
|
||||
private static class PeriodicDelayWithSuggestion
|
||||
implements PeriodicDelay, AcceptsDelaySuggestion {
|
||||
private final Duration initialDelay;
|
||||
private Duration currentDelay;
|
||||
|
||||
private PeriodicDelayWithSuggestion(Duration initialDelay) {
|
||||
this.initialDelay = initialDelay;
|
||||
currentDelay = initialDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suggestDelay(Duration delay) {
|
||||
currentDelay = delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getNextDelay() {
|
||||
return currentDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
currentDelay = initialDelay;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestHttpSender implements HttpSender {
|
||||
private final List<RequestParams> requests = new ArrayList<>();
|
||||
|
||||
|
@ -438,55 +376,4 @@ class HttpRequestServiceTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ScheduledTask implements ScheduledFuture<Object> {
|
||||
private final Runnable runnable;
|
||||
private final Duration delay;
|
||||
|
||||
public void run() {
|
||||
get();
|
||||
}
|
||||
|
||||
private ScheduledTask(Runnable runnable, long timeNanos) {
|
||||
this.runnable = runnable;
|
||||
this.delay = Duration.ofNanos(timeNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(@NotNull TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return scheduledTasks.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get() {
|
||||
scheduledTasks.remove(this);
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(long timeout, @NotNull TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull Delayed o) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.request.service;
|
||||
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
|
||||
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
|
||||
import java.time.Duration;
|
||||
|
||||
public class PeriodicDelayWithSuggestion implements PeriodicDelay, AcceptsDelaySuggestion {
|
||||
private final Duration initialDelay;
|
||||
private Duration currentDelay;
|
||||
|
||||
public PeriodicDelayWithSuggestion(Duration initialDelay) {
|
||||
this.initialDelay = initialDelay;
|
||||
currentDelay = initialDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suggestDelay(Duration delay) {
|
||||
currentDelay = delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getNextDelay() {
|
||||
return currentDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
currentDelay = initialDelay;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.request.service;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
public final class TestScheduler {
|
||||
private final List<Task> tasks = new ArrayList<>();
|
||||
private final ScheduledExecutorService service = createTestScheduleExecutorService();
|
||||
|
||||
public ScheduledExecutorService getMockService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
public List<Task> getScheduledTasks() {
|
||||
return Collections.unmodifiableList(tasks);
|
||||
}
|
||||
|
||||
public void clearTasks() {
|
||||
tasks.clear();
|
||||
}
|
||||
|
||||
private ScheduledExecutorService createTestScheduleExecutorService() {
|
||||
ScheduledExecutorService service = mock();
|
||||
|
||||
lenient()
|
||||
.doAnswer(
|
||||
invocation -> {
|
||||
Runnable runnable = invocation.getArgument(0);
|
||||
runnable.run();
|
||||
return null;
|
||||
})
|
||||
.when(service)
|
||||
.execute(any());
|
||||
|
||||
lenient()
|
||||
.when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
|
||||
.thenAnswer(
|
||||
invocation -> {
|
||||
Task task = new Task(invocation.getArgument(0), invocation.getArgument(1));
|
||||
|
||||
tasks.add(task);
|
||||
|
||||
return task;
|
||||
});
|
||||
|
||||
return service;
|
||||
}
|
||||
|
||||
public class Task implements ScheduledFuture<Object> {
|
||||
private final Runnable runnable;
|
||||
private final Duration delay;
|
||||
|
||||
public void run() {
|
||||
get();
|
||||
}
|
||||
|
||||
private Task(Runnable runnable, long timeNanos) {
|
||||
this.runnable = runnable;
|
||||
this.delay = Duration.ofNanos(timeNanos);
|
||||
}
|
||||
|
||||
public Duration getDelay() {
|
||||
return delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(@NotNull TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean cancel(boolean mayInterruptIfRunning) {
|
||||
return tasks.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancelled() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDone() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get() {
|
||||
tasks.remove(this);
|
||||
runnable.run();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(long timeout, @NotNull TimeUnit unit) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull Delayed o) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,336 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.opamp.client.internal.request.service;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.clearInvocations;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket;
|
||||
import io.opentelemetry.opamp.client.internal.request.Request;
|
||||
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError;
|
||||
import io.opentelemetry.opamp.client.internal.response.Response;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import opamp.proto.AgentToServer;
|
||||
import opamp.proto.RetryInfo;
|
||||
import opamp.proto.ServerErrorResponse;
|
||||
import opamp.proto.ServerErrorResponseType;
|
||||
import opamp.proto.ServerToAgent;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class WebSocketRequestServiceTest {
|
||||
@Mock private WebSocket webSocket;
|
||||
@Mock private RequestService.Callback callback;
|
||||
@Mock private PeriodicDelayWithSuggestion retryDelay;
|
||||
private Request request;
|
||||
private TestScheduler scheduler;
|
||||
private WebSocketRequestService requestService;
|
||||
private static final Duration INITIAL_RETRY_DELAY = Duration.ofSeconds(1);
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
lenient().when(retryDelay.getNextDelay()).thenReturn(INITIAL_RETRY_DELAY);
|
||||
scheduler = new TestScheduler();
|
||||
requestService = new WebSocketRequestService(webSocket, retryDelay, scheduler.getMockService());
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifySuccessfulStart() {
|
||||
startService();
|
||||
verify(webSocket).open(requestService);
|
||||
|
||||
// When opening successfully, notify callback
|
||||
requestService.onOpen();
|
||||
verify(callback).onConnectionSuccess();
|
||||
verifyNoMoreInteractions(callback);
|
||||
|
||||
// It shouldn't allow starting again
|
||||
try {
|
||||
startService();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e).hasMessage("The service has already started");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifyFailedStart() {
|
||||
startService();
|
||||
verify(webSocket).open(requestService);
|
||||
|
||||
// When failing while opening, notify callback
|
||||
Throwable t = mock();
|
||||
requestService.onFailure(t);
|
||||
verify(retryDelay).reset();
|
||||
verify(callback).onConnectionFailed(t);
|
||||
verifyNoMoreInteractions(callback);
|
||||
|
||||
// Check connection retry is scheduled
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(INITIAL_RETRY_DELAY);
|
||||
|
||||
// It shouldn't allow starting again
|
||||
try {
|
||||
startService();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e).hasMessage("The service has already started");
|
||||
}
|
||||
|
||||
// It shouldn't schedule more than one retry at a time
|
||||
clearInvocations(retryDelay, callback);
|
||||
requestService.onFailure(t);
|
||||
verify(callback).onConnectionFailed(t);
|
||||
verifyNoInteractions(retryDelay);
|
||||
verifyNoMoreInteractions(callback);
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
|
||||
// Execute retry with new delay
|
||||
clearInvocations(webSocket, callback);
|
||||
when(retryDelay.getNextDelay()).thenReturn(Duration.ofSeconds(5));
|
||||
scheduler.getScheduledTasks().get(0).run();
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
verify(webSocket).open(requestService);
|
||||
|
||||
// Fail again
|
||||
requestService.onFailure(t);
|
||||
verify(retryDelay, never()).reset();
|
||||
verify(callback).onConnectionFailed(t);
|
||||
|
||||
// A new retry has been scheduled
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(Duration.ofSeconds(5));
|
||||
|
||||
// Execute retry again
|
||||
clearInvocations(webSocket, callback);
|
||||
scheduler.getScheduledTasks().get(0).run();
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
verify(webSocket).open(requestService);
|
||||
|
||||
// Succeed
|
||||
requestService.onOpen();
|
||||
verify(callback).onConnectionSuccess();
|
||||
verifyNoMoreInteractions(callback);
|
||||
|
||||
// Fail at some point
|
||||
clearInvocations(callback);
|
||||
requestService.onFailure(t);
|
||||
verify(callback).onConnectionFailed(t);
|
||||
verifyNoMoreInteractions(callback);
|
||||
verify(retryDelay).reset();
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifySendRequest() {
|
||||
// Validate when not running
|
||||
try {
|
||||
requestService.sendRequest();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e).hasMessage("The service is not running");
|
||||
}
|
||||
|
||||
startService();
|
||||
|
||||
// Successful send
|
||||
when(webSocket.send(any())).thenReturn(true);
|
||||
requestService.sendRequest();
|
||||
verify(webSocket).send(getExpectedOutgoingBytes());
|
||||
|
||||
// Check there are no pending requests
|
||||
clearInvocations(webSocket);
|
||||
requestService.onOpen();
|
||||
verifyNoInteractions(webSocket);
|
||||
|
||||
// Failed send
|
||||
when(webSocket.send(any())).thenReturn(false);
|
||||
requestService.sendRequest();
|
||||
clearInvocations(webSocket);
|
||||
|
||||
// Check pending request
|
||||
when(webSocket.send(any())).thenReturn(true);
|
||||
requestService.onOpen();
|
||||
verify(webSocket).send(getExpectedOutgoingBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifyOnMessage() {
|
||||
startService();
|
||||
|
||||
// Successful message
|
||||
ServerToAgent serverToAgent = new ServerToAgent.Builder().build();
|
||||
requestService.onMessage(createServerToAgentPayload(serverToAgent));
|
||||
verify(callback).onRequestSuccess(Response.create(serverToAgent));
|
||||
verifyNoMoreInteractions(callback);
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
|
||||
// Regular error message
|
||||
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
|
||||
clearInvocations(callback);
|
||||
serverToAgent =
|
||||
new ServerToAgent.Builder()
|
||||
.error_response(new ServerErrorResponse.Builder().error_message("A message").build())
|
||||
.build();
|
||||
requestService.onMessage(createServerToAgentPayload(serverToAgent));
|
||||
verify(callback).onRequestFailed(throwableCaptor.capture());
|
||||
verifyNoMoreInteractions(callback);
|
||||
OpampServerResponseError error = (OpampServerResponseError) throwableCaptor.getValue();
|
||||
assertThat(error.getMessage()).isEqualTo("A message");
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
|
||||
// Error message with unavailable status
|
||||
clearInvocations(callback);
|
||||
serverToAgent =
|
||||
new ServerToAgent.Builder()
|
||||
.error_response(
|
||||
new ServerErrorResponse.Builder()
|
||||
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
|
||||
.error_message("Try later")
|
||||
.build())
|
||||
.build();
|
||||
requestService.onMessage(createServerToAgentPayload(serverToAgent));
|
||||
verify(callback).onRequestFailed(throwableCaptor.capture());
|
||||
verifyNoMoreInteractions(callback);
|
||||
OpampServerResponseError unavailableError =
|
||||
(OpampServerResponseError) throwableCaptor.getValue();
|
||||
assertThat(unavailableError.getMessage()).isEqualTo("Try later");
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
verify(retryDelay, never()).suggestDelay(any());
|
||||
|
||||
// Reset scheduled retry
|
||||
scheduler.getScheduledTasks().get(0).run();
|
||||
requestService.onOpen();
|
||||
|
||||
// Error message with unavailable status and suggested delay
|
||||
Duration suggestedDelay = Duration.ofSeconds(10);
|
||||
clearInvocations(callback, retryDelay);
|
||||
serverToAgent =
|
||||
new ServerToAgent.Builder()
|
||||
.error_response(
|
||||
new ServerErrorResponse.Builder()
|
||||
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
|
||||
.retry_info(
|
||||
new RetryInfo.Builder()
|
||||
.retry_after_nanoseconds(suggestedDelay.toNanos())
|
||||
.build())
|
||||
.build())
|
||||
.build();
|
||||
requestService.onMessage(createServerToAgentPayload(serverToAgent));
|
||||
verify(callback).onRequestFailed(throwableCaptor.capture());
|
||||
verifyNoMoreInteractions(callback);
|
||||
OpampServerResponseError unavailableErrorWithSuggestedDelay =
|
||||
(OpampServerResponseError) throwableCaptor.getValue();
|
||||
assertThat(unavailableErrorWithSuggestedDelay.getMessage()).isEmpty();
|
||||
assertThat(scheduler.getScheduledTasks()).hasSize(1);
|
||||
verify(retryDelay).suggestDelay(suggestedDelay);
|
||||
}
|
||||
|
||||
@Test
|
||||
void verifyStop() {
|
||||
startService();
|
||||
|
||||
requestService.stop();
|
||||
|
||||
InOrder inOrder = inOrder(webSocket);
|
||||
inOrder.verify(webSocket).send(getExpectedOutgoingBytes());
|
||||
inOrder.verify(webSocket).close(1000, null);
|
||||
verify(scheduler.getMockService()).shutdown();
|
||||
|
||||
// If something fails afterward, no retry should get scheduled.
|
||||
requestService.onFailure(mock());
|
||||
verifyNoInteractions(retryDelay);
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
|
||||
// If onClosed is called afterward, no retry should get scheduled.
|
||||
requestService.onClosed();
|
||||
verifyNoInteractions(retryDelay);
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
|
||||
// If a new message with a server unavailable error arrives afterward, no retry should get
|
||||
// scheduled.
|
||||
ServerToAgent serverToAgent =
|
||||
new ServerToAgent.Builder()
|
||||
.error_response(
|
||||
new ServerErrorResponse.Builder()
|
||||
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
|
||||
.build())
|
||||
.build();
|
||||
requestService.onMessage(createServerToAgentPayload(serverToAgent));
|
||||
verifyNoInteractions(retryDelay);
|
||||
assertThat(scheduler.getScheduledTasks()).isEmpty();
|
||||
|
||||
// Requests cannot get enqueued afterward.
|
||||
try {
|
||||
requestService.sendRequest();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e).hasMessage("This service is already stopped");
|
||||
}
|
||||
|
||||
// The service cannot get restarted afterward.
|
||||
try {
|
||||
startService();
|
||||
fail();
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e).hasMessage("This service is already stopped");
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] getExpectedOutgoingBytes() {
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream);
|
||||
codedOutput.writeUInt64NoTag(0);
|
||||
byte[] payload = request.getAgentToServer().encode();
|
||||
codedOutput.writeRawBytes(payload);
|
||||
codedOutput.flush();
|
||||
return outputStream.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] createServerToAgentPayload(ServerToAgent serverToAgent) {
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream);
|
||||
codedOutput.writeUInt64NoTag(0);
|
||||
codedOutput.writeRawBytes(serverToAgent.encode());
|
||||
codedOutput.flush();
|
||||
return outputStream.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void startService() {
|
||||
requestService.start(callback, this::createRequest);
|
||||
}
|
||||
|
||||
private Request createRequest() {
|
||||
AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build();
|
||||
request = Request.create(agentToServer);
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
val stableVersion = "1.47.0-SNAPSHOT"
|
||||
val alphaVersion = "1.47.0-alpha-SNAPSHOT"
|
||||
val stableVersion = "1.48.0-SNAPSHOT"
|
||||
val alphaVersion = "1.48.0-alpha-SNAPSHOT"
|
||||
|
||||
allprojects {
|
||||
if (findProperty("otel.stable") != "true") {
|
||||
|
|
Loading…
Reference in New Issue