Opamp first iteration completion (#2067)

This commit is contained in:
César 2025-08-19 23:48:25 +02:00 committed by GitHub
parent ad68129d98
commit 2765b97e89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 269 additions and 115 deletions

View File

@ -16,6 +16,7 @@ dependencies {
implementation("net.ltgt.gradle:gradle-errorprone-plugin:4.3.0")
implementation("net.ltgt.gradle:gradle-nullaway-plugin:2.2.0")
implementation("org.owasp:dependency-check-gradle:12.1.3")
implementation("ru.vyarus.animalsniffer:ru.vyarus.animalsniffer.gradle.plugin:2.0.1")
}
spotless {

View File

@ -0,0 +1,19 @@
import ru.vyarus.gradle.plugin.animalsniffer.AnimalSniffer
plugins {
id("otel.java-conventions")
id("ru.vyarus.animalsniffer")
}
dependencies {
signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature")
}
animalsniffer {
sourceSets = listOf(java.sourceSets.main.get())
}
// Always having declared output makes this task properly participate in tasks up-to-date checks
tasks.withType<AnimalSniffer> {
reports.text.required.set(true)
}

View File

@ -1,12 +1,11 @@
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import ru.vyarus.gradle.plugin.animalsniffer.AnimalSniffer
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
id("otel.animalsniffer-conventions")
id("com.gradleup.shadow")
id("me.champeau.jmh") version "0.7.3"
id("ru.vyarus.animalsniffer") version "2.0.1"
id("com.squareup.wire") version "5.3.11"
}
@ -20,27 +19,12 @@ dependencies {
implementation("io.opentelemetry:opentelemetry-api-incubator")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
signature("com.toasttab.android:gummy-bears-api-21:0.12.0:coreLib@signature")
testImplementation("org.mockito:mockito-inline")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
protos("io.opentelemetry.proto:opentelemetry-proto:1.7.0-alpha@jar")
}
animalsniffer {
sourceSets = listOf(java.sourceSets.main.get())
}
// Always having declared output makes this task properly participate in tasks up-to-date checks
tasks.withType<AnimalSniffer> {
reports.text.required.set(true)
}
// Attaching animalsniffer check to the compilation process.
tasks.named("classes").configure {
finalizedBy("animalsnifferMain")
}
jmh {
warmupIterations.set(0)
fork.set(2)

View File

@ -3,6 +3,56 @@
Java implementation of the OpAMP
client [spec](https://github.com/open-telemetry/opamp-spec/blob/main/specification.md).
> [!WARNING]
> This is an incubating feature. Breaking changes can happen on a new release without previous
> notice and without backward compatibility guarantees.
## Usage
```java
// Initializing it
RequestService requestService = HttpRequestService.create(OkHttpSender.create("[OPAMP_SERVICE_URL]"));
// RequestService requestService = WebSocketRequestService.create(OkHttpWebSocket.create("[OPAMP_SERVICE_URL]")); // Use this instead to connect to the server via WebSocket.
OpampClient client =
OpampClient.builder()
.putIdentifyingAttribute("service.name", "My service name")
.enableRemoteConfig()
.setRequestService(requestService)
.build(
new OpampClient.Callbacks() {
@Override
public void onConnect() {}
@Override
public void onConnectFailed(@Nullable Throwable throwable) {}
@Override
public void onErrorResponse(ServerErrorResponse errorResponse) {}
@Override
public void onMessage(MessageData messageData) {
AgentRemoteConfig remoteConfig = messageData.getRemoteConfig();
if (remoteConfig != null) {
// A remote config was received
// After applying it...
client.setRemoteConfigStatus(
new RemoteConfigStatus.Builder()
.status(RemoteConfigStatuses.RemoteConfigStatuses_APPLIED)
.build());
}
}
});
// State update
client.setAgentDescription(new AgentDescription.Builder().build());
// App shutdown
client.close();
```
## Component owners
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic

View File

@ -4,6 +4,8 @@ import java.net.URL
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
id("otel.animalsniffer-conventions")
id("de.undercouch.download") version "5.6.0"
id("com.squareup.wire") version "5.3.11"
}
@ -14,6 +16,7 @@ otelJava.moduleName.set("io.opentelemetry.contrib.opamp.client")
dependencies {
implementation("com.squareup.okhttp3:okhttp")
implementation("com.github.f4b6a3:uuid-creator")
implementation("io.opentelemetry:opentelemetry-api")
annotationProcessor("com.google.auto.value:auto-value")
compileOnly("com.google.auto.value:auto-value-annotations")
testImplementation("org.mockito:mockito-inline")

View File

@ -6,45 +6,23 @@
package io.opentelemetry.opamp.client.internal;
import io.opentelemetry.opamp.client.internal.response.MessageData;
import java.io.Closeable;
import javax.annotation.Nullable;
import opamp.proto.AgentDescription;
import opamp.proto.RemoteConfigStatus;
import opamp.proto.ServerErrorResponse;
public interface OpampClient {
public interface OpampClient extends Closeable {
static OpampClientBuilder builder() {
return new OpampClientBuilder();
}
/**
* Starts the client and begin attempts to connect to the Server. Once connection is established
* the client will attempt to maintain it by reconnecting if the connection is lost. All failed
* connection attempts will be reported via {@link Callbacks#onConnectFailed(Throwable)} callback.
*
* <p>This method does not wait until the connection to the Server is established and will likely
* return before the connection attempts are even made.
*
* <p>This method may be called only once.
*
* @param callbacks The Callback to which the Client will notify about any Server requests and
* responses.
*/
void start(Callbacks callbacks);
/**
* Stops the client. May be called only after {@link #start(Callbacks)}. May be called only once.
* After this call returns successfully it is guaranteed that no callbacks will be called. Once
* stopped, the client cannot be started again.
*/
void stop();
/**
* Sets attributes of the Agent. The attributes will be included in the next status report sent to
* the Server. When called after {@link #start(Callbacks)}, the attributes will be included in the
* next outgoing status report. This is typically used by Agents which allow their
* AgentDescription to change dynamically while the OpAMPClient is started. May be also called
* from {@link Callbacks#onMessage(MessageData)}.
* Sets attributes of the Agent. The attributes will be included in the next outgoing status
* report. This is typically used by Agents which allow their AgentDescription to change
* dynamically while the OpAMPClient is started. May be also called from {@link
* Callbacks#onMessage(MessageData)}.
*
* @param agentDescription The new agent description.
*/
@ -59,16 +37,14 @@ public interface OpampClient {
interface Callbacks {
/**
* Called when the connection is successfully established to the Server. May be called after
* {@link #start(Callbacks)} is called and every time a connection is established to the Server.
* For WebSocket clients this is called after the handshake is completed without any error. For
* HTTP clients this is called for any request if the response status is OK.
* Called when the connection is successfully established to the Server. For WebSocket clients
* this is called after the handshake is completed without any error. For HTTP clients this is
* called for any request if the response status is OK.
*/
void onConnect();
/**
* Called when the connection to the Server cannot be established. May be called after {@link
* #start(Callbacks)} is called and tries to connect to the Server. May also be called if the
* Called when the connection to the Server cannot be established. May also be called if the
* connection is lost and reconnection attempt fails.
*
* @param throwable The exception.

View File

@ -381,11 +381,7 @@ public final class OpampClientBuilder {
return this;
}
public OpampClient build() {
if (service == null) {
throw new IllegalStateException(
"The request service is not set. You must provide it by calling setRequestService()");
}
public OpampClient build(OpampClient.Callbacks callbacks) {
List<KeyValue> protoIdentifyingAttributes = new ArrayList<>();
List<KeyValue> protoNonIdentifyingAttributes = new ArrayList<>();
identifyingAttributes.forEach(
@ -411,7 +407,7 @@ public final class OpampClientBuilder {
new State.InstanceUid(instanceUid),
new State.Flags(0L),
effectiveConfigState);
return OpampClientImpl.create(service, state);
return OpampClientImpl.create(service, state, callbacks);
}
private static State.EffectiveConfig createEffectiveConfigNoop() {

View File

@ -5,6 +5,7 @@
package io.opentelemetry.opamp.client.internal.connectivity.http;
import io.opentelemetry.api.internal.InstrumentationUtil;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
@ -13,6 +14,7 @@ import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okio.BufferedSink;
@ -45,8 +47,16 @@ public final class OkHttpSender implements HttpSender {
RequestBody body = new RawRequestBody(writer, contentLength, MEDIA_TYPE);
builder.post(body);
// By suppressing instrumentations, we prevent automatic instrumentations for the okhttp request
// that polls the opamp server.
InstrumentationUtil.suppressInstrumentation(() -> doSendRequest(builder.build(), future));
return future;
}
private void doSendRequest(Request request, CompletableFuture<Response> future) {
client
.newCall(builder.build())
.newCall(request)
.enqueue(
new Callback() {
@Override
@ -59,8 +69,6 @@ public final class OkHttpSender implements HttpSender {
future.completeExceptionally(e);
}
});
return future;
}
private static class OkHttpResponse implements Response {

View File

@ -28,11 +28,9 @@ import io.opentelemetry.opamp.client.internal.state.State;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import okio.ByteString;
import opamp.proto.AgentDescription;
import opamp.proto.AgentToServer;
@ -51,9 +49,8 @@ public final class OpampClientImpl
private final AgentToServerAppenders appenders;
private final OpampClientState state;
private final RecipeManager recipeManager;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final AtomicBoolean hasStopped = new AtomicBoolean(false);
@Nullable private Callbacks callbacks;
private final Callbacks callbacks;
/** Fields that must always be sent. */
private static final List<Field> REQUIRED_FIELDS;
@ -84,7 +81,8 @@ public final class OpampClientImpl
COMPRESSABLE_FIELDS = Collections.unmodifiableList(compressableFields);
}
public static OpampClientImpl create(RequestService requestService, OpampClientState state) {
public static OpampClientImpl create(
RequestService requestService, OpampClientState state, Callbacks callbacks) {
AgentToServerAppenders appenders =
new AgentToServerAppenders(
AgentDescriptionAppender.create(state.agentDescription),
@ -95,41 +93,35 @@ public final class OpampClientImpl
InstanceUidAppender.create(state.instanceUid),
FlagsAppender.create(state.flags),
AgentDisconnectAppender.create());
return new OpampClientImpl(
requestService, appenders, state, RecipeManager.create(REQUIRED_FIELDS));
OpampClientImpl client =
new OpampClientImpl(
requestService, appenders, state, RecipeManager.create(REQUIRED_FIELDS), callbacks);
// Start
requestService.start(client, client);
client.disableCompression();
client.startObservingStateChange();
requestService.sendRequest();
return client;
}
private OpampClientImpl(
RequestService requestService,
AgentToServerAppenders appenders,
OpampClientState state,
RecipeManager recipeManager) {
RecipeManager recipeManager,
Callbacks callbacks) {
this.requestService = requestService;
this.appenders = appenders;
this.state = state;
this.recipeManager = recipeManager;
}
@Override
public void start(@Nonnull Callbacks callbacks) {
if (hasStopped.get()) {
throw new IllegalStateException("The client cannot start after it has been stopped.");
}
if (isRunning.compareAndSet(false, true)) {
this.callbacks = callbacks;
requestService.start(this, this);
disableCompression();
startObservingStateChange();
requestService.sendRequest();
} else {
throw new IllegalStateException("The client has already been started");
}
}
@Override
public void stop() {
if (isRunning.compareAndSet(true, false)) {
hasStopped.set(true);
public void close() {
if (hasStopped.compareAndSet(false, true)) {
stopObservingStateChange();
prepareDisconnectRequest();
requestService.stop();
@ -154,12 +146,12 @@ public final class OpampClientImpl
@Override
public void onConnectionSuccess() {
getCallbacks().onConnect();
callbacks.onConnect();
}
@Override
public void onConnectionFailed(Throwable throwable) {
getCallbacks().onConnectFailed(throwable);
callbacks.onConnectFailed(throwable);
}
@Override
@ -176,7 +168,7 @@ public final class OpampClientImpl
preserveFailedRequestRecipe();
if (throwable instanceof OpampServerResponseException) {
ServerErrorResponse errorResponse = ((OpampServerResponseException) throwable).errorResponse;
getCallbacks().onErrorResponse(errorResponse);
callbacks.onErrorResponse(errorResponse);
}
}
@ -203,7 +195,7 @@ public final class OpampClientImpl
}
if (notifyOnMessage) {
getCallbacks().onMessage(messageBuilder.build());
callbacks.onMessage(messageBuilder.build());
}
}
@ -224,11 +216,6 @@ public final class OpampClientImpl
recipeManager.next().addField(Field.AGENT_DISCONNECT);
}
@Nonnull
private Callbacks getCallbacks() {
return Objects.requireNonNull(callbacks);
}
@Override
public Request get() {
AgentToServer.Builder builder = new AgentToServer.Builder();

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.delay;
import java.time.Duration;
import javax.annotation.concurrent.GuardedBy;
public final class ExponentialBackoffPeriodicDelay implements PeriodicDelay {
private final Duration initialDelay;
private final Object delayNanosLock = new Object();
@GuardedBy("delayNanosLock")
private long delayNanos;
public ExponentialBackoffPeriodicDelay(Duration initialDelay) {
this.initialDelay = initialDelay;
delayNanos = initialDelay.toNanos();
}
@Override
public Duration getNextDelay() {
synchronized (delayNanosLock) {
long previousValue = delayNanos;
delayNanos = delayNanos * 2;
return Duration.ofNanos(previousValue);
}
}
@Override
public void reset() {
synchronized (delayNanosLock) {
delayNanos = initialDelay.toNanos();
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.delay;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
/** Defaults to an exponential backoff strategy, unless a delay is suggested. */
public final class RetryPeriodicDelay implements PeriodicDelay, AcceptsDelaySuggestion {
private final ExponentialBackoffPeriodicDelay exponentialBackoff;
private final AtomicReference<PeriodicDelay> currentDelay;
public static RetryPeriodicDelay create(Duration initialDelay) {
return new RetryPeriodicDelay(new ExponentialBackoffPeriodicDelay(initialDelay));
}
private RetryPeriodicDelay(ExponentialBackoffPeriodicDelay exponentialBackoff) {
this.exponentialBackoff = exponentialBackoff;
currentDelay = new AtomicReference<>(exponentialBackoff);
}
@Override
public void suggestDelay(Duration delay) {
currentDelay.set(PeriodicDelay.ofFixedDuration(delay));
}
@Override
public Duration getNextDelay() {
return Objects.requireNonNull(currentDelay.get()).getNextDelay();
}
@Override
public void reset() {
exponentialBackoff.reset();
currentDelay.set(exponentialBackoff);
}
}

View File

@ -11,6 +11,7 @@ import io.opentelemetry.opamp.client.internal.connectivity.http.RetryAfterParser
import io.opentelemetry.opamp.client.internal.request.Request;
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
import io.opentelemetry.opamp.client.internal.request.delay.RetryPeriodicDelay;
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseException;
import io.opentelemetry.opamp.client.internal.response.Response;
import java.io.IOException;
@ -47,6 +48,8 @@ public final class HttpRequestService implements RequestService {
@Nullable private Supplier<Request> requestSupplier;
public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_REQUESTS =
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
public static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
RetryPeriodicDelay.create(Duration.ofSeconds(30));
/**
* Creates an {@link HttpRequestService}.
@ -54,7 +57,7 @@ public final class HttpRequestService implements RequestService {
* @param requestSender The HTTP sender implementation.
*/
public static HttpRequestService create(HttpSender requestSender) {
return create(requestSender, DEFAULT_DELAY_BETWEEN_REQUESTS, DEFAULT_DELAY_BETWEEN_REQUESTS);
return create(requestSender, DEFAULT_DELAY_BETWEEN_REQUESTS, DEFAULT_DELAY_BETWEEN_RETRIES);
}
/**

View File

@ -10,6 +10,7 @@ import io.opentelemetry.opamp.client.internal.connectivity.websocket.WebSocket;
import io.opentelemetry.opamp.client.internal.request.Request;
import io.opentelemetry.opamp.client.internal.request.delay.AcceptsDelaySuggestion;
import io.opentelemetry.opamp.client.internal.request.delay.PeriodicDelay;
import io.opentelemetry.opamp.client.internal.request.delay.RetryPeriodicDelay;
import io.opentelemetry.opamp.client.internal.response.OpampServerResponseException;
import io.opentelemetry.opamp.client.internal.response.Response;
import java.io.ByteArrayOutputStream;
@ -30,7 +31,7 @@ import opamp.proto.ServerToAgent;
public final class WebSocketRequestService implements RequestService, WebSocket.Listener {
private static final PeriodicDelay DEFAULT_DELAY_BETWEEN_RETRIES =
PeriodicDelay.ofFixedDuration(Duration.ofSeconds(30));
RetryPeriodicDelay.create(Duration.ofSeconds(30));
private final WebSocket webSocket;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

View File

@ -6,7 +6,6 @@
package io.opentelemetry.opamp.client.internal.impl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
@ -90,7 +89,7 @@ class OpampClientImplTest {
@AfterEach
void tearDown() {
client.stop();
client.close();
}
@Test
@ -178,23 +177,12 @@ class OpampClientImplTest {
initializeClient();
enqueueServerToAgentResponse(new ServerToAgent.Builder().build());
client.stop();
client.close();
AgentToServer agentToServerMessage = getAgentToServerMessage(takeRequest());
assertThat(agentToServerMessage.agent_disconnect).isNotNull();
}
@Test
void verifyStartOnlyOnce() {
initializeClient();
try {
client.start(callbacks);
fail("Should have thrown an exception");
} catch (IllegalStateException e) {
assertThat(e).hasMessage("The client has already been started");
}
}
@Test
void onSuccess_withChangesToReport_notifyCallbackOnMessage() {
initializeClient();
@ -403,13 +391,12 @@ class OpampClientImplTest {
}
private RecordedRequest initializeClient(ServerToAgent initialResponse) {
client = OpampClientImpl.create(requestService, state);
// Prepare first request on start
enqueueServerToAgentResponse(initialResponse);
callbacks = spy(new TestCallbacks());
client.start(callbacks);
client = OpampClientImpl.create(requestService, state, callbacks);
return takeRequest();
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.delay;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.Test;
class ExponentialBackoffPeriodicDelayTest {
@Test
void verifyDelayUpdates() {
ExponentialBackoffPeriodicDelay delay =
new ExponentialBackoffPeriodicDelay(Duration.ofSeconds(1));
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(1));
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(2));
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(4));
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(8));
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(16));
// Reset
delay.reset();
assertThat(delay.getNextDelay()).isEqualTo(Duration.ofSeconds(1));
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.opamp.client.internal.request.delay;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import org.junit.jupiter.api.Test;
class RetryPeriodicDelayTest {
@Test
public void verifyDelayBehavior() {
RetryPeriodicDelay retryPeriodicDelay = RetryPeriodicDelay.create(Duration.ofSeconds(1));
// Without suggested delay
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(1));
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(2));
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(4));
retryPeriodicDelay.reset();
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(1));
// With suggested delay
retryPeriodicDelay.suggestDelay(Duration.ofSeconds(5));
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(5));
retryPeriodicDelay.reset();
assertThat(retryPeriodicDelay.getNextDelay()).isEqualTo(Duration.ofSeconds(1));
}
}