Compare commits

...

21 Commits

Author SHA1 Message Date
renovate[bot] 5d702ed0d3
fix(deps): update all patch versions to v1.15.2 (#2026)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-14 06:42:15 +00:00
renovate[bot] 1773b8f778
fix(deps): update spotless packages to v7.1.0 (#2025)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-14 06:41:37 +00:00
renovate[bot] a820cad474
fix(deps): update dependency com.squareup.okhttp3:okhttp to v5.1.0 (#2023)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-14 06:06:37 +00:00
renovate[bot] 36156f4d2e
fix(deps): update errorprone packages to v2.40.0 (#2024)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-14 06:05:49 +00:00
Trask Stalnaker 6a3094f18a
More link check improvements (#2010) 2025-07-12 23:47:12 +00:00
César 6f3a18a109
OpAMP WebSocket service (#1969)
Co-authored-by: Lauri Tulmin <tulmin@gmail.com>
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
2025-07-10 13:21:23 +00:00
renovate[bot] 83d5cd843d
fix(deps): update all patch versions (#2017)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-10 05:55:16 +00:00
Trask Stalnaker 45ea93180c
Fix OSSF scorecard branch protection check (#2015) 2025-07-09 21:58:24 +00:00
renovate[bot] 5aaf88fa0a
chore(deps): update plugin com.squareup.wire to v5.3.4 (#2009)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-09 05:46:27 +00:00
renovate[bot] c3fc90f076
chore(deps): update open-telemetry/assign-reviewers-action digest to fcd27c5 (#2008)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-09 04:00:49 +00:00
Trask Stalnaker 97382709ba
link check small refactoring (#2006) 2025-07-08 19:06:33 +00:00
Trask Stalnaker 65179dc9db
Also open issue on workflow dispatch failure (#2005) 2025-07-08 05:36:12 +00:00
Trask Stalnaker 8c6089aa59
Stop lychee from causing so many PR failures (#2002) 2025-07-07 21:40:32 +00:00
renovate[bot] 8add72d056
fix(deps): update dependency com.squareup.okhttp3:okhttp to v5 (#2000)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-07 18:21:23 +00:00
SylvainJuge b2c64b3765
jmx scraper add link to instrumentation yaml for tomcat (#2003) 2025-07-07 14:56:41 +00:00
Trask Stalnaker 81ef37bb55
Add unreleased section (#2004) 2025-07-07 14:39:42 +00:00
renovate[bot] 8a65c8cbcf
fix(deps): update all patch versions (#1992)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-07-07 07:08:36 +00:00
Trask Stalnaker a6ff9a030c
Fix sporadically failing maven extension test (#2001) 2025-07-07 05:31:57 +00:00
otelbot[bot] 01680d8df5
Merge change log updates from release/v1.47.x (#1999)
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
2025-07-04 05:39:26 +00:00
otelbot[bot] 358e039e26
Update version to 1.48.0-SNAPSHOT (#1994)
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
Co-authored-by: jason plumb <75337021+breedx-splk@users.noreply.github.com>
2025-07-04 04:25:42 +00:00
Trask Stalnaker 81a36172bb
Update change log for upcoming release (#1990)
Co-authored-by: otelbot <197425009+otelbot@users.noreply.github.com>
2025-07-03 22:27:44 +00:00
29 changed files with 1291 additions and 180 deletions

View File

@ -5,22 +5,49 @@ set -e
export MSYS_NO_PATHCONV=1 # for Git Bash on Windows
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LYCHEE_CONFIG="$SCRIPT_DIR/../../.lychee.toml"
DEPENDENCIES_DOCKERFILE="$SCRIPT_DIR/dependencies.dockerfile"
ROOT_DIR="$SCRIPT_DIR/../.."
DEPENDENCIES_DOCKERFILE="$SCRIPT_DIR/dependencies.Dockerfile"
# Parse command line arguments
LOCAL_LINKS_ONLY=false
TARGET=""
while [[ $# -gt 0 ]]; do
case $1 in
--local-links-only)
LOCAL_LINKS_ONLY=true
shift
;;
*)
# Treat any other arguments as file paths
TARGET="$TARGET $1"
shift
;;
esac
done
# Extract lychee version from dependencies.dockerfile
LYCHEE_VERSION=$(grep "FROM lycheeverse/lychee:" "$DEPENDENCIES_DOCKERFILE" | sed 's/.*FROM lycheeverse\/lychee:\([^ ]*\).*/\1/')
if [[ -z "$TARGET" ]]; then
TARGET="."
fi
# Build the lychee command with optional GitHub token
CMD="lycheeverse/lychee:$LYCHEE_VERSION --verbose --config $(basename "$LYCHEE_CONFIG")"
CMD="lycheeverse/lychee:$LYCHEE_VERSION --verbose --root-dir /data"
# Add GitHub token if available
if [[ -n "$GITHUB_TOKEN" ]]; then
CMD="$CMD --github-token $GITHUB_TOKEN"
fi
# Add the target directory
CMD="$CMD ."
if [[ "$LOCAL_LINKS_ONLY" == "true" ]]; then
CMD="$CMD --scheme file --include-fragments"
else
CMD="$CMD --config .github/scripts/lychee-config.toml"
fi
CMD="$CMD $TARGET"
# Determine if we should allocate a TTY
DOCKER_FLAGS="--rm --init"
@ -32,4 +59,4 @@ fi
# Run lychee with proper signal handling
# shellcheck disable=SC2086
exec docker run $DOCKER_FLAGS -v "$(dirname "$LYCHEE_CONFIG")":/data -w /data $CMD
exec docker run $DOCKER_FLAGS -v "$ROOT_DIR":/data -w /data $CMD

View File

@ -18,6 +18,6 @@ jobs:
pull-requests: write # for assigning reviewers
runs-on: ubuntu-latest
steps:
- uses: open-telemetry/assign-reviewers-action@cb42e3ee14a59c01abccd401f126a0f4c3991cb3 # main
- uses: open-telemetry/assign-reviewers-action@fcd27c5381c10288b23d423ab85473710a33389e # main
with:
config-file: .github/component_owners.yml

View File

@ -8,6 +8,9 @@ on:
pull_request:
merge_group:
workflow_dispatch:
schedule:
# Run daily at 7:30 AM UTC
- cron: '30 7 * * *'
permissions:
contents: read
@ -119,10 +122,13 @@ jobs:
path: jmx-metrics/build/reports/tests/integrationTest
link-check:
# merge group and push events are excluded to avoid unnecessary CI failures
# (these failures will instead be captured by the daily scheduled run)
#
# release branches are excluded to avoid unnecessary maintenance if external links break
# (and also because the README.md might need update on release branches before the release
# download has been published)
if: "!startsWith(github.ref_name, 'release/')"
if: github.event_name != 'merge_group' && github.event_name != 'push' && !startsWith(github.ref_name, 'release/')
uses: ./.github/workflows/reusable-link-check.yml
markdown-lint-check:
@ -199,3 +205,31 @@ jobs:
)
)
run: exit 1 # fail
workflow-notification:
permissions:
contents: read
issues: write
if: (github.event_name == 'schedule' || github.event_name == 'workflow_dispatch') && always()
needs:
- build
- test
- integration-test
- link-check
- markdown-lint-check
- misspell-check
- shell-script-check
- publish-snapshots
uses: ./.github/workflows/reusable-workflow-notification.yml
with:
success: >-
${{
needs.build.result == 'success' &&
needs.test.result == 'success' &&
needs.integration-test.result == 'success' &&
needs.link-check.result == 'success' &&
needs.markdown-lint-check.result == 'success' &&
needs.misspell-check.result == 'success' &&
needs.shell-script-check.result == 'success' &&
needs.publish-snapshots.result == 'success'
}}

View File

@ -23,8 +23,18 @@ jobs:
with:
persist-credentials: false
- uses: actions/create-github-app-token@df432ceedc7162793a195dd1713ff69aefc7379e # v2.0.6
id: create-token
with:
# analyzing classic branch protections requires a token with admin read permissions
# see https://github.com/ossf/scorecard-action/blob/main/docs/authentication/fine-grained-auth-token.md
# and https://github.com/open-telemetry/community/issues/2769
app-id: ${{ vars.OSSF_SCORECARD_APP_ID }}
private-key: ${{ secrets.OSSF_SCORECARD_PRIVATE_KEY }}
- uses: ossf/scorecard-action@05b42c624433fc40578a4040d5cf5e36ddca8cde # v2.4.2
with:
repo_token: ${{ steps.create-token.outputs.token }}
results_file: results.sarif
results_format: sarif
publish_results: true

View File

@ -11,8 +11,35 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0 # needed for merge-base below
- name: Link check
- name: Link check - relative links (all files)
if: github.event_name == 'pull_request'
env:
GITHUB_TOKEN: ${{ github.token }}
run: ./.github/scripts/link-check.sh --local-links-only
- name: Get modified files
if: github.event_name == 'pull_request'
id: modified-files
run: |
merge_base=$(git merge-base origin/${{ github.base_ref }} HEAD)
# Using lychee's default extension filter here to match when it runs against all files
modified_files=$(git diff --name-only $merge_base...${{ github.event.pull_request.head.sha }} \
| grep -E '\.(md|mkd|mdx|mdown|mdwn|mkdn|mkdown|markdown|html|htm|txt)$' \
| tr '\n' ' ' || true)
echo "files=$modified_files" >> $GITHUB_OUTPUT
echo "Modified files: $modified_files"
- name: Link check - all links (modified files only)
if: github.event_name == 'pull_request' && steps.modified-files.outputs.files != ''
env:
GITHUB_TOKEN: ${{ github.token }}
run: ./.github/scripts/link-check.sh ${{ steps.modified-files.outputs.files }}
- name: Link check - all links (all files)
if: github.event_name != 'pull_request'
env:
GITHUB_TOKEN: ${{ github.token }}
run: ./.github/scripts/link-check.sh

View File

@ -2,11 +2,26 @@
## Unreleased
## Version 1.47.0 (2025-07-04)
### Disk buffering
- Shared storage
([#1912](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1912))
- Implementing ExtendedLogRecordData
([#1918](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1918))
- Add missing EventName to disk-buffering LogRecordDataMapper
([#1950](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1950))
### GCP authentication extension
- Update the internal implementation such that the required headers are retrieved
from the Google Auth Library instead of manually constructing and passing them.
([#1860](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1860))
- Add metrics support to auth extension
([#1891](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1891))
- Update ConfigurableOptions to read from ConfigProperties
([#1904](https://github.com/open-telemetry/opentelemetry-java-contrib/pull/1904))
### Inferred spans

View File

@ -1,7 +1,7 @@
plugins {
`kotlin-dsl`
// When updating, update below in dependencies too
id("com.diffplug.spotless") version "7.0.4"
id("com.diffplug.spotless") version "7.1.0"
}
repositories {
@ -12,7 +12,7 @@ repositories {
dependencies {
// When updating, update above in plugins too
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.0.4")
implementation("com.diffplug.spotless:spotless-plugin-gradle:7.1.0")
implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.3.0")
implementation("net.ltgt.gradle:gradle-nullaway-plugin:2.2.0")
implementation("org.owasp:dependency-check-gradle:12.1.3")

View File

@ -135,7 +135,7 @@ testing {
dependencies {
implementation(project(project.path))
implementation(enforcedPlatform("org.junit:junit-bom:5.13.2"))
implementation(enforcedPlatform("org.junit:junit-bom:5.13.3"))
implementation(enforcedPlatform("org.testcontainers:testcontainers-bom:1.21.3"))
implementation(enforcedPlatform("com.google.guava:guava-bom:33.4.8-jre"))
implementation(enforcedPlatform("com.linecorp.armeria:armeria-bom:1.32.5"))

View File

@ -9,7 +9,7 @@ otelJava.moduleName.set("io.opentelemetry.contrib.compressor.zstd")
dependencies {
api("io.opentelemetry:opentelemetry-exporter-common")
implementation("com.github.luben:zstd-jni:1.5.7-3")
implementation("com.github.luben:zstd-jni:1.5.7-4")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-exporter-otlp")

View File

@ -2,7 +2,7 @@ plugins {
`java-platform`
}
val otelInstrumentationVersion = "2.17.0-alpha"
val otelInstrumentationVersion = "2.17.1-alpha"
val semconvVersion = "1.34.0"
javaPlatform {
@ -16,6 +16,7 @@ dependencies {
// as runtime dependencies if they are actually used as runtime dependencies)
api(enforcedPlatform("io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:${otelInstrumentationVersion}"))
api(enforcedPlatform("com.fasterxml.jackson:jackson-bom:2.19.1"))
api(enforcedPlatform("com.google.protobuf:protobuf-bom:4.31.1"))
constraints {
api("io.opentelemetry.semconv:opentelemetry-semconv:${semconvVersion}")
@ -25,8 +26,8 @@ dependencies {
api("com.google.auto.service:auto-service-annotations:1.1.1")
api("com.google.auto.value:auto-value:1.11.0")
api("com.google.auto.value:auto-value-annotations:1.11.0")
api("com.google.errorprone:error_prone_annotations:2.39.0")
api("com.google.errorprone:error_prone_core:2.39.0")
api("com.google.errorprone:error_prone_annotations:2.40.0")
api("com.google.errorprone:error_prone_core:2.40.0")
api("io.github.netmikey.logunit:logunit-jul:2.0.0")
api("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha")
api("io.prometheus:simpleclient:0.16.0")
@ -43,7 +44,7 @@ dependencies {
api("com.google.code.findbugs:annotations:3.0.1u2")
api("com.google.code.findbugs:jsr305:3.0.2")
api("com.squareup.okhttp3:okhttp:4.12.0")
api("com.squareup.okhttp3:okhttp:5.1.0")
api("com.uber.nullaway:nullaway:0.12.7")
api("org.assertj:assertj-core:3.27.3")
api("org.awaitility:awaitility:4.3.0")

View File

@ -7,7 +7,7 @@ plugins {
id("com.github.johnrengelman.shadow")
id("me.champeau.jmh") version "0.7.3"
id("ru.vyarus.animalsniffer") version "2.0.1"
id("com.squareup.wire") version "5.3.3"
id("com.squareup.wire") version "5.3.5"
}
description = "Exporter implementations that store signals on disk"

View File

@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=7197a12f450794931532469d4ff21a59ea2c1cd59a3ec3f89c035c3c420a6999
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
distributionSha256Sum=bd71102213493060956ec229d946beee57158dbd89d0e62b91bca0fa2c5f3531
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

View File

@ -70,7 +70,7 @@ Supported values for `otel.jmx.target.system` and support for `otel.jmx.target.s
| `kafka-consumer` | Apache Kafka consumer | [`kafka-consumer.yaml`](src/main/resources/kafka-consumer.yaml) | |
| `kafka-producer` | Apache Kafka producer | [`kafka-producer.yaml`](src/main/resources/kafka-producer.yaml) | |
| `solr` | Apache Solr | [`solr.yaml`](src/main/resources/solr.yaml) | |
| `tomcat` | Apache Tomcat | [`tomcat.yaml`](src/main/resources/tomcat.yaml) | |
| `tomcat` | Apache Tomcat | [`tomcat.yaml`](src/main/resources/tomcat.yaml) | [`tomcat.yaml`](https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jmx-metrics/library/src/main/resources/jmx/rules/tomcat.yaml) |
| `wildfly` | Wildfly | [`wildfly.yaml`](src/main/resources/wildfly.yaml) | |
The source of metrics definitions is controlled by `otel.jmx.target.source`:

View File

@ -120,7 +120,7 @@ public class OpenTelemetrySdkServiceTest {
} finally {
System.clearProperty("otel.exporter.otlp.endpoint");
System.clearProperty("otel.exporter.otlp.traces.endpoint");
System.clearProperty("otel.exporter.otlp.traces.protocol");
System.clearProperty("otel.traces.exporter");
}
}

View File

@ -20,14 +20,14 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("io.micrometer:micrometer-core:1.15.1")
testImplementation("io.micrometer:micrometer-core:1.15.2")
}
testing {
suites {
val integrationTest by registering(JvmTestSuite::class) {
dependencies {
implementation("io.micrometer:micrometer-registry-prometheus:1.15.1")
implementation("io.micrometer:micrometer-registry-prometheus:1.15.2")
}
}
}

View File

@ -5,7 +5,7 @@ import java.net.URL
plugins {
id("otel.java-conventions")
id("de.undercouch.download") version "5.6.0"
id("com.squareup.wire") version "5.3.3"
id("com.squareup.wire") version "5.3.5"
}
description = "Client implementation of the OpAMP spec."
@ -16,6 +16,7 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("org.mockito:mockito-inline")
testImplementation("com.google.protobuf:protobuf-java-util")
}
val opampProtos = tasks.register<DownloadOpampProtos>("opampProtoDownload", download)

View File

@ -0,0 +1,115 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class OkHttpWebSocket implements WebSocket {
private final String url;
private final OkHttpClient client;
private final AtomicReference<Status> status = new AtomicReference<>(Status.NOT_RUNNING);
private final AtomicReference<okhttp3.WebSocket> webSocket = new AtomicReference<>();
public static OkHttpWebSocket create(String url) {
return create(url, new OkHttpClient());
}
public static OkHttpWebSocket create(String url, OkHttpClient client) {
return new OkHttpWebSocket(url, client);
}
private OkHttpWebSocket(String url, OkHttpClient client) {
this.url = url;
this.client = client;
}
@Override
public void open(Listener listener) {
if (status.compareAndSet(Status.NOT_RUNNING, Status.STARTING)) {
okhttp3.Request request = new okhttp3.Request.Builder().url(url).build();
webSocket.set(client.newWebSocket(request, new ListenerAdapter(listener)));
}
}
@Override
public boolean send(byte[] request) {
if (status.get() != Status.RUNNING) {
return false;
}
return getWebSocket().send(ByteString.of(request));
}
@Override
public void close(int code, @Nullable String reason) {
if (status.compareAndSet(Status.RUNNING, Status.CLOSING)) {
try {
if (!getWebSocket().close(code, reason)) {
status.set(Status.NOT_RUNNING);
}
} catch (IllegalArgumentException e) {
status.set(Status.RUNNING);
// Re-throwing as this error happens due to a caller error.
throw e;
}
}
}
private okhttp3.WebSocket getWebSocket() {
return Objects.requireNonNull(webSocket.get());
}
private class ListenerAdapter extends WebSocketListener {
private final Listener delegate;
private ListenerAdapter(Listener delegate) {
this.delegate = delegate;
}
@Override
public void onOpen(@Nonnull okhttp3.WebSocket webSocket, @Nonnull Response response) {
status.set(Status.RUNNING);
delegate.onOpen();
}
@Override
public void onClosing(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
status.set(Status.CLOSING);
delegate.onClosing();
}
@Override
public void onClosed(@Nonnull okhttp3.WebSocket webSocket, int code, @Nonnull String reason) {
status.set(Status.NOT_RUNNING);
delegate.onClosed();
}
@Override
public void onFailure(
@Nonnull okhttp3.WebSocket webSocket, @Nonnull Throwable t, @Nullable Response response) {
status.set(Status.NOT_RUNNING);
delegate.onFailure(t);
}
@Override
public void onMessage(@Nonnull okhttp3.WebSocket webSocket, @Nonnull ByteString bytes) {
delegate.onMessage(bytes.toByteArray());
}
}
enum Status {
NOT_RUNNING,
STARTING,
CLOSING,
RUNNING
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
import javax.annotation.Nullable;
public interface WebSocket {
/**
* Starts the websocket connection if it's not yet started or if it has been closed.
*
* @param listener Will receive events from the websocket connection.
*/
void open(Listener listener);
/**
* Stops the websocket connection if running. Nothing will happen if it's already stopped.
*
* @param code Status code as defined by <a
* href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a>
* @param reason Reason for shutting down, as explained in <a
* href="https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1">Section 5.5.1 of RFC
* 6455</a>
*/
void close(int code, @Nullable String reason);
/**
* Sends a message via the websocket connection.
*
* @param request The message payload.
* @return {@code false} If the message can't be dispatched for any reason, whether the websocket
* isn't running, or the connection isn't established, or it's terminated. {@code true} if the
* message can get sent. Returning {@code true} doesn't guarantee that the message will arrive
* at the remote peer.
*/
boolean send(byte[] request);
interface Listener {
/**
* Called when the websocket connection is successfully established with the remote peer. The
* client may start sending messages after this method is called.
*/
void onOpen();
/**
* Called when the closing handshake has started. No further messages will be sent after this
* method call.
*/
void onClosing();
/** Called when the connection is terminated and no further messages can be transmitted. */
void onClosed();
/**
* Called when receiving a message from the remote peer.
*
* @param data The payload sent by the remote peer.
*/
void onMessage(byte[] data);
/**
* Called when the connection is closed or cannot be established due to an error. No messages
* can be transmitted after this method is called.
*
* @param t The error.
*/
void onFailure(Throwable t);
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.service;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
final class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override
public Thread newThread(@Nonnull Runnable r) {
Thread t = delegate.newThread(r);
try {
t.setDaemon(true);
} catch (SecurityException e) {
// Well, we tried.
}
return t;
}
}

View File

@ -22,13 +22,11 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import opamp.proto.AgentToServer;
import opamp.proto.ServerErrorResponse;
@ -255,19 +253,4 @@ public final class HttpRequestService implements RequestService {
return currentDelay.getNextDelay();
}
}
private static class DaemonThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = Executors.defaultThreadFactory();
@Override
public Thread newThread(@Nonnull Runnable r) {
Thread t = delegate.newThread(r);
try {
t.setDaemon(true);
} catch (SecurityException e) {
// Well, we tried.
}
return t;
}
}
}

View File

@ -0,0 +1,271 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.service;
import com.squareup.wire.ProtoAdapter;
import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket;
import io.opentelemetry.opamp.client.internal.request.Request;
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError;
import io.opentelemetry.opamp.client.internal.response.Response;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import opamp.proto.ServerErrorResponse;
import opamp.proto.ServerErrorResponseType;
import opamp.proto.ServerToAgent;
public final class WebSocketRequestService implements RequestService, WebSocket.Listener {
private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
private final WebSocket webSocket;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final AtomicBoolean hasStopped = new AtomicBoolean(false);
private final ConnectionStatus connectionStatus;
private final ScheduledExecutorService executorService;
/** Defined <a href="https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1">here</a>. */
private static final int WEBSOCKET_NORMAL_CLOSURE_CODE = 1000;
@GuardedBy("hasPendingRequestLock")
private boolean hasPendingRequest = false;
private final Object hasPendingRequestLock = new Object();
@Nullable private Callback callback;
@Nullable private Supplier<Request> requestSupplier;
/**
* Creates an {@link WebSocketRequestService}.
*
* @param webSocket The WebSocket implementation.
*/
public static WebSocketRequestService create(WebSocket webSocket) {
return create(webSocket, DEFAULT_DELAY_BETWEEN_RETRIES);
}
/**
* Creates an {@link WebSocketRequestService}.
*
* @param webSocket The WebSocket implementation.
* @param periodicRetryDelay The time to wait between retries.
*/
public static WebSocketRequestService create(
WebSocket webSocket, PeriodicDelay periodicRetryDelay) {
return new WebSocketRequestService(
webSocket,
periodicRetryDelay,
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory()));
}
WebSocketRequestService(
WebSocket webSocket,
PeriodicDelay periodicRetryDelay,
ScheduledExecutorService executorService) {
this.webSocket = webSocket;
this.executorService = executorService;
this.connectionStatus = new ConnectionStatus(periodicRetryDelay);
}
@Override
public void start(Callback callback, Supplier<Request> requestSupplier) {
if (hasStopped.get()) {
throw new IllegalStateException("This service is already stopped");
}
if (isRunning.compareAndSet(false, true)) {
this.callback = callback;
this.requestSupplier = requestSupplier;
startConnection();
} else {
throw new IllegalStateException("The service has already started");
}
}
private void startConnection() {
webSocket.open(this);
}
@Override
public void sendRequest() {
if (!isRunning.get()) {
throw new IllegalStateException("The service is not running");
}
if (hasStopped.get()) {
throw new IllegalStateException("This service is already stopped");
}
doSendRequest();
}
private void doSendRequest() {
try {
synchronized (hasPendingRequestLock) {
if (!trySendRequest()) {
hasPendingRequest = true;
}
}
} catch (IOException e) {
getCallback().onRequestFailed(e);
}
}
private boolean trySendRequest() throws IOException {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
ProtoAdapter.UINT64.encode(outputStream, 0L);
byte[] payload = getRequest().getAgentToServer().encode();
outputStream.write(payload);
return webSocket.send(outputStream.toByteArray());
}
}
@Nonnull
private Request getRequest() {
return Objects.requireNonNull(requestSupplier).get();
}
@Override
public void stop() {
if (hasStopped.compareAndSet(false, true)) {
/*
Sending last message as explained in the spec:
https://opentelemetry.io/docs/specs/opamp/#websocket-transport-opamp-client-initiated.
The client implementation must ensure that the "agent_disconnect" field will be provided in the
next supplied request body.
*/
doSendRequest();
webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null);
executorService.shutdown();
}
}
@Override
public void onOpen() {
connectionStatus.success();
getCallback().onConnectionSuccess();
synchronized (hasPendingRequestLock) {
if (hasPendingRequest) {
hasPendingRequest = false;
sendRequest();
}
}
}
@Override
public void onMessage(byte[] data) {
try {
ServerToAgent serverToAgent = readServerToAgent(data);
if (serverToAgent.error_response != null) {
handleServerError(serverToAgent.error_response);
getCallback()
.onRequestFailed(
new OpampServerResponseError(serverToAgent.error_response.error_message));
return;
}
getCallback().onRequestSuccess(Response.create(serverToAgent));
} catch (IOException e) {
getCallback().onRequestFailed(e);
}
}
private static ServerToAgent readServerToAgent(byte[] data) throws IOException {
int headerSize = ProtoAdapter.UINT64.encodedSize(ProtoAdapter.UINT64.decode(data));
int payloadSize = data.length - headerSize;
byte[] payload = new byte[payloadSize];
System.arraycopy(data, headerSize, payload, 0, payloadSize);
return ServerToAgent.ADAPTER.decode(payload);
}
private void handleServerError(ServerErrorResponse errorResponse) {
if (serverIsUnavailable(errorResponse)) {
Duration retryAfter = null;
if (errorResponse.retry_info != null) {
retryAfter = Duration.ofNanos(errorResponse.retry_info.retry_after_nanoseconds);
}
webSocket.close(WEBSOCKET_NORMAL_CLOSURE_CODE, null);
connectionStatus.retryAfter(retryAfter);
}
}
private static boolean serverIsUnavailable(ServerErrorResponse errorResponse) {
return errorResponse.type.equals(ServerErrorResponseType.ServerErrorResponseType_Unavailable);
}
@Override
public void onClosing() {
// Noop
}
@Override
public void onClosed() {
// If this service isn't stopped, we should retry connecting.
connectionStatus.retryAfter(null);
}
@Override
public void onFailure(Throwable t) {
getCallback().onConnectionFailed(t);
connectionStatus.retryAfter(null);
}
@Nonnull
private Callback getCallback() {
return Objects.requireNonNull(callback);
}
private class ConnectionStatus {
private final PeriodicDelay periodicRetryDelay;
private final AtomicBoolean retryingConnection = new AtomicBoolean(false);
private final AtomicBoolean nextRetryScheduled = new AtomicBoolean(false);
ConnectionStatus(PeriodicDelay periodicRetryDelay) {
this.periodicRetryDelay = periodicRetryDelay;
}
void success() {
retryingConnection.set(false);
}
@SuppressWarnings("FutureReturnValueIgnored")
void retryAfter(@Nullable Duration retryAfter) {
if (hasStopped.get()) {
return;
}
if (retryingConnection.compareAndSet(false, true)) {
periodicRetryDelay.reset();
if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion) {
((AcceptsDelaySuggestion) periodicRetryDelay).suggestDelay(retryAfter);
}
}
if (nextRetryScheduled.compareAndSet(false, true)) {
executorService.schedule(
this::retryConnection,
periodicRetryDelay.getNextDelay().toNanos(),
TimeUnit.NANOSECONDS);
}
}
private void retryConnection() {
nextRetryScheduled.set(false);
startConnection();
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.connectivity.websocket;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class OkHttpWebSocketTest {
@Mock private OkHttpClient client;
@Mock private okhttp3.WebSocket okHttpWebSocket;
@Mock private WebSocket.Listener listener;
@Captor private ArgumentCaptor<Request> requestCaptor;
@Captor private ArgumentCaptor<WebSocketListener> listenerCaptor;
private static final String URL = "ws://some.server";
private OkHttpWebSocket webSocket;
@BeforeEach
void setUp() {
webSocket = OkHttpWebSocket.create(URL, client);
when(client.newWebSocket(any(), any())).thenReturn(okHttpWebSocket);
}
@Test
void validateOpen() {
// Assert websocket created
openAndCaptureArguments();
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
// Assert further calls to open won't do anything
webSocket.open(listener);
verifyNoMoreInteractions(client);
// When connectivity succeeds, open calls won't do anything.
callOnOpen();
webSocket.open(listener);
verifyNoMoreInteractions(client);
// When connectivity fails, allow future open calls
clearInvocations(client);
callOnFailure();
openAndCaptureArguments();
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
}
@Test
void validateSend() {
byte[] payload = new byte[1];
// Before opening
assertThat(webSocket.send(payload)).isFalse();
// After opening successfully
when(okHttpWebSocket.send(any(ByteString.class))).thenReturn(true);
openAndCaptureArguments();
callOnOpen();
assertThat(webSocket.send(payload)).isTrue();
verify(okHttpWebSocket).send(ByteString.of(payload));
// After failing
callOnFailure();
assertThat(webSocket.send(payload)).isFalse();
verifyNoMoreInteractions(okHttpWebSocket);
}
@Test
void validateClose() {
openAndCaptureArguments();
callOnOpen();
webSocket.close(123, "something");
verify(okHttpWebSocket).close(123, "something");
// Validate calling it again
webSocket.close(1, null);
verifyNoMoreInteractions(okHttpWebSocket);
// Once closed, it should be possible to reopen it.
clearInvocations(client);
callOnClosed();
openAndCaptureArguments();
assertThat(requestCaptor.getValue().url().host()).isEqualTo("some.server");
}
@Test
void validateOnClosing() {
openAndCaptureArguments();
callOnOpen();
callOnClosing();
// Validate calling after onClosing
webSocket.close(1, null);
verifyNoInteractions(okHttpWebSocket);
}
@Test
void validateOnMessage() {
byte[] payload = new byte[1];
openAndCaptureArguments();
listenerCaptor.getValue().onMessage(mock(), ByteString.of(payload));
verify(listener).onMessage(payload);
}
private void callOnOpen() {
listenerCaptor.getValue().onOpen(mock(), mock());
verify(listener).onOpen();
}
private void callOnClosed() {
listenerCaptor.getValue().onClosed(mock(), 0, "");
verify(listener).onClosed();
}
private void callOnClosing() {
listenerCaptor.getValue().onClosing(mock(), 0, "");
verify(listener).onClosing();
}
private void callOnFailure() {
Throwable t = mock();
listenerCaptor.getValue().onFailure(mock(), t, mock());
verify(listener).onFailure(t);
}
private void openAndCaptureArguments() {
webSocket.open(listener);
verify(client).newWebSocket(requestCaptor.capture(), listenerCaptor.capture());
}
}

View File

@ -8,10 +8,8 @@ package io.opentelemetry.opamp.client.internal.request.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@ -21,7 +19,6 @@ import io.opentelemetry.opamp.client.internal.connectivity.http.HttpErrorExcepti
import io.opentelemetry.opamp.client.internal.connectivity.http.HttpSender;
import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser;
import io.opentelemetry.opamp.client.internal.request.Request;
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
import io.opentelemetry.opamp.client.internal.response.Response;
import java.io.ByteArrayInputStream;
@ -33,10 +30,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import opamp.proto.AgentToServer;
@ -44,7 +38,6 @@ import opamp.proto.RetryInfo;
import opamp.proto.ServerErrorResponse;
import opamp.proto.ServerErrorResponseType;
import opamp.proto.ServerToAgent;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -61,8 +54,7 @@ class HttpRequestServiceTest {
private static final Duration RETRY_DELAY = Duration.ofSeconds(5);
@Mock private RequestService.Callback callback;
private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
private ScheduledExecutorService executorService;
private TestScheduler scheduler;
private TestHttpSender requestSender;
private PeriodicDelay periodicRequestDelay;
private PeriodicDelayWithSuggestion periodicRetryDelay;
@ -74,42 +66,40 @@ class HttpRequestServiceTest {
requestSender = new TestHttpSender();
periodicRequestDelay = createPeriodicDelay(REGULAR_DELAY);
periodicRetryDelay = createPeriodicDelayWithSuggestionSupport(RETRY_DELAY);
executorService = createTestScheduleExecutorService();
scheduler = new TestScheduler();
httpRequestService =
new HttpRequestService(
requestSender,
executorService,
scheduler.getMockService(),
periodicRequestDelay,
periodicRetryDelay,
RetryAfterParser.getInstance());
httpRequestService.start(callback, this::createRequestSupplier);
httpRequestService.start(callback, this::createRequest);
}
@AfterEach
void tearDown() {
httpRequestService.stop();
scheduledTasks.clear();
verify(executorService).shutdown();
verify(scheduler.getMockService()).shutdown();
}
@Test
void verifyStart_scheduledFirstTask() {
assertThat(scheduledTasks).hasSize(1);
ScheduledTask firstTask = scheduledTasks.get(0);
assertThat(firstTask.delay).isEqualTo(REGULAR_DELAY);
TestScheduler.Task firstTask = assertAndGetSingleCurrentTask();
assertThat(firstTask.getDelay()).isEqualTo(REGULAR_DELAY);
// Verify initial task creates next one
scheduledTasks.clear();
scheduler.clearTasks();
requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build()));
firstTask.run();
assertThat(scheduledTasks).hasSize(1);
assertThat(scheduler.getScheduledTasks()).hasSize(1);
// Check on-demand requests don't create subsequent tasks
requestSender.enqueueResponse(createSuccessfulResponse(new ServerToAgent.Builder().build()));
httpRequestService.sendRequest();
assertThat(scheduledTasks).hasSize(1);
assertThat(scheduler.getScheduledTasks()).hasSize(1);
}
@Test
@ -128,14 +118,14 @@ class HttpRequestServiceTest {
@Test
void verifyWhenSendingOnDemandRequest_andDelayChanges() {
// Initial state
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY);
// Trigger delay strategy change
requestSender.enqueueResponse(createFailedResponse(503));
httpRequestService.sendRequest();
// Expected state
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(RETRY_DELAY);
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(RETRY_DELAY);
}
@Test
@ -252,30 +242,30 @@ class HttpRequestServiceTest {
private void verifyRetryDelayOnError(
HttpSender.Response errorResponse, Duration expectedRetryDelay) {
requestSender.enqueueResponse(errorResponse);
ScheduledTask previousTask = assertAndGetSingleCurrentTask();
TestScheduler.Task previousTask = assertAndGetSingleCurrentTask();
previousTask.run();
verifySingleRequestSent();
verify(periodicRetryDelay).reset();
verify(callback).onRequestFailed(any());
ScheduledTask retryTask = assertAndGetSingleCurrentTask();
assertThat(retryTask.delay).isEqualTo(expectedRetryDelay);
TestScheduler.Task retryTask = assertAndGetSingleCurrentTask();
assertThat(retryTask.getDelay()).isEqualTo(expectedRetryDelay);
// Retry with another error
clearInvocations(callback);
scheduledTasks.clear();
scheduler.clearTasks();
requestSender.enqueueResponse(createFailedResponse(500));
retryTask.run();
verifySingleRequestSent();
verify(callback).onRequestFailed(any());
ScheduledTask retryTask2 = assertAndGetSingleCurrentTask();
assertThat(retryTask2.delay).isEqualTo(expectedRetryDelay);
TestScheduler.Task retryTask2 = assertAndGetSingleCurrentTask();
assertThat(retryTask2.getDelay()).isEqualTo(expectedRetryDelay);
// Retry with a success
clearInvocations(callback);
scheduledTasks.clear();
scheduler.clearTasks();
ServerToAgent serverToAgent = new ServerToAgent.Builder().build();
requestSender.enqueueResponse(createSuccessfulResponse(serverToAgent));
retryTask2.run();
@ -283,16 +273,17 @@ class HttpRequestServiceTest {
verify(periodicRequestDelay).reset();
verifySingleRequestSent();
verifyRequestSuccessCallback(serverToAgent);
assertThat(assertAndGetSingleCurrentTask().delay).isEqualTo(REGULAR_DELAY);
assertThat(assertAndGetSingleCurrentTask().getDelay()).isEqualTo(REGULAR_DELAY);
}
private Request createRequestSupplier() {
private Request createRequest() {
AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build();
requestSize = agentToServer.encodeByteString().size();
return Request.create(agentToServer);
}
private ScheduledTask assertAndGetSingleCurrentTask() {
private TestScheduler.Task assertAndGetSingleCurrentTask() {
List<TestScheduler.Task> scheduledTasks = scheduler.getScheduledTasks();
assertThat(scheduledTasks).hasSize(1);
return scheduledTasks.get(0);
}
@ -313,33 +304,6 @@ class HttpRequestServiceTest {
assertThat(captor.getValue().getMessage()).isEqualTo("Error message");
}
private ScheduledExecutorService createTestScheduleExecutorService() {
ScheduledExecutorService service = mock();
lenient()
.doAnswer(
invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
})
.when(service)
.execute(any());
when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
.thenAnswer(
invocation -> {
ScheduledTask task =
new ScheduledTask(invocation.getArgument(0), invocation.getArgument(1));
scheduledTasks.add(task);
return task;
});
return service;
}
private static HttpSender.Response createSuccessfulResponse(ServerToAgent serverToAgent) {
return createSuccessfulResponse(serverToAgent.encodeByteString().toByteArray());
}
@ -370,32 +334,6 @@ class HttpRequestServiceTest {
return spy(new PeriodicDelayWithSuggestion(delay));
}
private static class PeriodicDelayWithSuggestion
implements PeriodicDelay, AcceptsDelaySuggestion {
private final Duration initialDelay;
private Duration currentDelay;
private PeriodicDelayWithSuggestion(Duration initialDelay) {
this.initialDelay = initialDelay;
currentDelay = initialDelay;
}
@Override
public void suggestDelay(Duration delay) {
currentDelay = delay;
}
@Override
public Duration getNextDelay() {
return currentDelay;
}
@Override
public void reset() {
currentDelay = initialDelay;
}
}
private static class TestHttpSender implements HttpSender {
private final List<RequestParams> requests = new ArrayList<>();
@ -438,55 +376,4 @@ class HttpRequestServiceTest {
}
}
}
private class ScheduledTask implements ScheduledFuture<Object> {
private final Runnable runnable;
private final Duration delay;
public void run() {
get();
}
private ScheduledTask(Runnable runnable, long timeNanos) {
this.runnable = runnable;
this.delay = Duration.ofNanos(timeNanos);
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return scheduledTasks.remove(this);
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public Object get() {
scheduledTasks.remove(this);
runnable.run();
return null;
}
@Override
public Object get(long timeout, @NotNull TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public int compareTo(@NotNull Delayed o) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.service;
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
import java.time.Duration;
public class PeriodicDelayWithSuggestion implements PeriodicDelay, AcceptsDelaySuggestion {
private final Duration initialDelay;
private Duration currentDelay;
public PeriodicDelayWithSuggestion(Duration initialDelay) {
this.initialDelay = initialDelay;
currentDelay = initialDelay;
}
@Override
public void suggestDelay(Duration delay) {
currentDelay = delay;
}
@Override
public Duration getNextDelay() {
return currentDelay;
}
@Override
public void reset() {
currentDelay = initialDelay;
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;
public final class TestScheduler {
private final List<Task> tasks = new ArrayList<>();
private final ScheduledExecutorService service = createTestScheduleExecutorService();
public ScheduledExecutorService getMockService() {
return service;
}
public List<Task> getScheduledTasks() {
return Collections.unmodifiableList(tasks);
}
public void clearTasks() {
tasks.clear();
}
private ScheduledExecutorService createTestScheduleExecutorService() {
ScheduledExecutorService service = mock();
lenient()
.doAnswer(
invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
})
.when(service)
.execute(any());
lenient()
.when(service.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)))
.thenAnswer(
invocation -> {
Task task = new Task(invocation.getArgument(0), invocation.getArgument(1));
tasks.add(task);
return task;
});
return service;
}
public class Task implements ScheduledFuture<Object> {
private final Runnable runnable;
private final Duration delay;
public void run() {
get();
}
private Task(Runnable runnable, long timeNanos) {
this.runnable = runnable;
this.delay = Duration.ofNanos(timeNanos);
}
public Duration getDelay() {
return delay;
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return tasks.remove(this);
}
@Override
public boolean isCancelled() {
throw new UnsupportedOperationException();
}
@Override
public boolean isDone() {
throw new UnsupportedOperationException();
}
@Override
public Object get() {
tasks.remove(this);
runnable.run();
return null;
}
@Override
public Object get(long timeout, @NotNull TimeUnit unit) {
throw new UnsupportedOperationException();
}
@Override
public int compareTo(@NotNull Delayed o) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.protobuf.CodedOutputStream;
import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket;
import io.opentelemetry.opamp.client.internal.request.Request;
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseError;
import io.opentelemetry.opamp.client.internal.response.Response;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import opamp.proto.AgentToServer;
import opamp.proto.RetryInfo;
import opamp.proto.ServerErrorResponse;
import opamp.proto.ServerErrorResponseType;
import opamp.proto.ServerToAgent;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class WebSocketRequestServiceTest {
@Mock private WebSocket webSocket;
@Mock private RequestService.Callback callback;
@Mock private PeriodicDelayWithSuggestion retryDelay;
private Request request;
private TestScheduler scheduler;
private WebSocketRequestService requestService;
private static final Duration INITIAL_RETRY_DELAY = Duration.ofSeconds(1);
@BeforeEach
void setUp() {
lenient().when(retryDelay.getNextDelay()).thenReturn(INITIAL_RETRY_DELAY);
scheduler = new TestScheduler();
requestService = new WebSocketRequestService(webSocket, retryDelay, scheduler.getMockService());
}
@Test
void verifySuccessfulStart() {
startService();
verify(webSocket).open(requestService);
// When opening successfully, notify callback
requestService.onOpen();
verify(callback).onConnectionSuccess();
verifyNoMoreInteractions(callback);
// It shouldn't allow starting again
try {
startService();
fail();
} catch (IllegalStateException e) {
assertThat(e).hasMessage("The service has already started");
}
}
@Test
void verifyFailedStart() {
startService();
verify(webSocket).open(requestService);
// When failing while opening, notify callback
Throwable t = mock();
requestService.onFailure(t);
verify(retryDelay).reset();
verify(callback).onConnectionFailed(t);
verifyNoMoreInteractions(callback);
// Check connection retry is scheduled
assertThat(scheduler.getScheduledTasks()).hasSize(1);
assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(INITIAL_RETRY_DELAY);
// It shouldn't allow starting again
try {
startService();
fail();
} catch (IllegalStateException e) {
assertThat(e).hasMessage("The service has already started");
}
// It shouldn't schedule more than one retry at a time
clearInvocations(retryDelay, callback);
requestService.onFailure(t);
verify(callback).onConnectionFailed(t);
verifyNoInteractions(retryDelay);
verifyNoMoreInteractions(callback);
assertThat(scheduler.getScheduledTasks()).hasSize(1);
// Execute retry with new delay
clearInvocations(webSocket, callback);
when(retryDelay.getNextDelay()).thenReturn(Duration.ofSeconds(5));
scheduler.getScheduledTasks().get(0).run();
assertThat(scheduler.getScheduledTasks()).isEmpty();
verify(webSocket).open(requestService);
// Fail again
requestService.onFailure(t);
verify(retryDelay, never()).reset();
verify(callback).onConnectionFailed(t);
// A new retry has been scheduled
assertThat(scheduler.getScheduledTasks()).hasSize(1);
assertThat(scheduler.getScheduledTasks().get(0).getDelay()).isEqualTo(Duration.ofSeconds(5));
// Execute retry again
clearInvocations(webSocket, callback);
scheduler.getScheduledTasks().get(0).run();
assertThat(scheduler.getScheduledTasks()).isEmpty();
verify(webSocket).open(requestService);
// Succeed
requestService.onOpen();
verify(callback).onConnectionSuccess();
verifyNoMoreInteractions(callback);
// Fail at some point
clearInvocations(callback);
requestService.onFailure(t);
verify(callback).onConnectionFailed(t);
verifyNoMoreInteractions(callback);
verify(retryDelay).reset();
assertThat(scheduler.getScheduledTasks()).hasSize(1);
}
@Test
void verifySendRequest() {
// Validate when not running
try {
requestService.sendRequest();
fail();
} catch (IllegalStateException e) {
assertThat(e).hasMessage("The service is not running");
}
startService();
// Successful send
when(webSocket.send(any())).thenReturn(true);
requestService.sendRequest();
verify(webSocket).send(getExpectedOutgoingBytes());
// Check there are no pending requests
clearInvocations(webSocket);
requestService.onOpen();
verifyNoInteractions(webSocket);
// Failed send
when(webSocket.send(any())).thenReturn(false);
requestService.sendRequest();
clearInvocations(webSocket);
// Check pending request
when(webSocket.send(any())).thenReturn(true);
requestService.onOpen();
verify(webSocket).send(getExpectedOutgoingBytes());
}
@Test
void verifyOnMessage() {
startService();
// Successful message
ServerToAgent serverToAgent = new ServerToAgent.Builder().build();
requestService.onMessage(createServerToAgentPayload(serverToAgent));
verify(callback).onRequestSuccess(Response.create(serverToAgent));
verifyNoMoreInteractions(callback);
assertThat(scheduler.getScheduledTasks()).isEmpty();
// Regular error message
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
clearInvocations(callback);
serverToAgent =
new ServerToAgent.Builder()
.error_response(new ServerErrorResponse.Builder().error_message("A message").build())
.build();
requestService.onMessage(createServerToAgentPayload(serverToAgent));
verify(callback).onRequestFailed(throwableCaptor.capture());
verifyNoMoreInteractions(callback);
OpampServerResponseError error = (OpampServerResponseError) throwableCaptor.getValue();
assertThat(error.getMessage()).isEqualTo("A message");
assertThat(scheduler.getScheduledTasks()).isEmpty();
// Error message with unavailable status
clearInvocations(callback);
serverToAgent =
new ServerToAgent.Builder()
.error_response(
new ServerErrorResponse.Builder()
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
.error_message("Try later")
.build())
.build();
requestService.onMessage(createServerToAgentPayload(serverToAgent));
verify(callback).onRequestFailed(throwableCaptor.capture());
verifyNoMoreInteractions(callback);
OpampServerResponseError unavailableError =
(OpampServerResponseError) throwableCaptor.getValue();
assertThat(unavailableError.getMessage()).isEqualTo("Try later");
assertThat(scheduler.getScheduledTasks()).hasSize(1);
verify(retryDelay, never()).suggestDelay(any());
// Reset scheduled retry
scheduler.getScheduledTasks().get(0).run();
requestService.onOpen();
// Error message with unavailable status and suggested delay
Duration suggestedDelay = Duration.ofSeconds(10);
clearInvocations(callback, retryDelay);
serverToAgent =
new ServerToAgent.Builder()
.error_response(
new ServerErrorResponse.Builder()
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
.retry_info(
new RetryInfo.Builder()
.retry_after_nanoseconds(suggestedDelay.toNanos())
.build())
.build())
.build();
requestService.onMessage(createServerToAgentPayload(serverToAgent));
verify(callback).onRequestFailed(throwableCaptor.capture());
verifyNoMoreInteractions(callback);
OpampServerResponseError unavailableErrorWithSuggestedDelay =
(OpampServerResponseError) throwableCaptor.getValue();
assertThat(unavailableErrorWithSuggestedDelay.getMessage()).isEmpty();
assertThat(scheduler.getScheduledTasks()).hasSize(1);
verify(retryDelay).suggestDelay(suggestedDelay);
}
@Test
void verifyStop() {
startService();
requestService.stop();
InOrder inOrder = inOrder(webSocket);
inOrder.verify(webSocket).send(getExpectedOutgoingBytes());
inOrder.verify(webSocket).close(1000, null);
verify(scheduler.getMockService()).shutdown();
// If something fails afterward, no retry should get scheduled.
requestService.onFailure(mock());
verifyNoInteractions(retryDelay);
assertThat(scheduler.getScheduledTasks()).isEmpty();
// If onClosed is called afterward, no retry should get scheduled.
requestService.onClosed();
verifyNoInteractions(retryDelay);
assertThat(scheduler.getScheduledTasks()).isEmpty();
// If a new message with a server unavailable error arrives afterward, no retry should get
// scheduled.
ServerToAgent serverToAgent =
new ServerToAgent.Builder()
.error_response(
new ServerErrorResponse.Builder()
.type(ServerErrorResponseType.ServerErrorResponseType_Unavailable)
.build())
.build();
requestService.onMessage(createServerToAgentPayload(serverToAgent));
verifyNoInteractions(retryDelay);
assertThat(scheduler.getScheduledTasks()).isEmpty();
// Requests cannot get enqueued afterward.
try {
requestService.sendRequest();
fail();
} catch (IllegalStateException e) {
assertThat(e).hasMessage("This service is already stopped");
}
// The service cannot get restarted afterward.
try {
startService();
fail();
} catch (IllegalStateException e) {
assertThat(e).hasMessage("This service is already stopped");
}
}
private byte[] getExpectedOutgoingBytes() {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream);
codedOutput.writeUInt64NoTag(0);
byte[] payload = request.getAgentToServer().encode();
codedOutput.writeRawBytes(payload);
codedOutput.flush();
return outputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static byte[] createServerToAgentPayload(ServerToAgent serverToAgent) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
CodedOutputStream codedOutput = CodedOutputStream.newInstance(outputStream);
codedOutput.writeUInt64NoTag(0);
codedOutput.writeRawBytes(serverToAgent.encode());
codedOutput.flush();
return outputStream.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void startService() {
requestService.start(callback, this::createRequest);
}
private Request createRequest() {
AgentToServer agentToServer = new AgentToServer.Builder().sequence_num(10).build();
request = Request.create(agentToServer);
return request;
}
}

View File

@ -1,5 +1,5 @@
val stableVersion = "1.47.0-SNAPSHOT"
val alphaVersion = "1.47.0-alpha-SNAPSHOT"
val stableVersion = "1.48.0-SNAPSHOT"
val alphaVersion = "1.48.0-alpha-SNAPSHOT"
allprojects {
if (findProperty("otel.stable") != "true") {