diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index ad5bed5270..ae5a0c4301 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -15,7 +15,7 @@ import datadog.trace.common.sampling.PrioritySampler; import datadog.trace.common.sampling.Sampler; import datadog.trace.common.writer.DDAgentWriter; import datadog.trace.common.writer.Writer; -import datadog.trace.common.writer.ddagent.DDAgentApi; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.context.ScopeListener; import io.opentracing.References; import io.opentracing.Scope; @@ -245,11 +245,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace injector = HttpCodec.createInjector(Config.get()); extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders); - if (this.writer instanceof DDAgentWriter) { - final DDAgentApi api = ((DDAgentWriter) this.writer).getApi(); - if (sampler instanceof DDAgentApi.ResponseListener) { - api.addResponseListener((DDAgentApi.ResponseListener) this.sampler); - } + if (this.writer instanceof DDAgentWriter && sampler instanceof DDAgentResponseListener) { + ((DDAgentWriter) this.writer).addResponseListener((DDAgentResponseListener) this.sampler); } log.info("New instance: {}", this); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java index d96c2f3592..b8191b1a8f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NumericNode; import datadog.opentracing.DDSpan; import datadog.trace.api.sampling.PrioritySampling; -import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; *

The configuration of (serviceName,env)->rate is configured by the core agent. */ @Slf4j -public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener { +public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentResponseListener { public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr"; /** Key for setting the default/baseline rate */ diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 76ea6b8df9..c63fe9ff8f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -8,6 +8,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; import datadog.trace.common.writer.ddagent.DDAgentApi; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.TraceConsumer; import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; @@ -110,6 +111,10 @@ public class DDAgentWriter implements Writer { apiPhaser.register(); // Register on behalf of the scheduled executor thread. } + public void addResponseListener(final DDAgentResponseListener listener) { + api.addResponseListener(listener); + } + // Exposing some statistics for consumption by monitors public final long getDisruptorCapacity() { return disruptor.getDisruptorCapacity(); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index cc01adc62c..17a607c102 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -43,7 +43,7 @@ public class DDAgentApi { private static final String TRACES_ENDPOINT_V4 = "v0.4/traces"; private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); - private final List responseListeners = new ArrayList<>(); + private final List responseListeners = new ArrayList<>(); private volatile long nextAllowedLogTime = 0; @@ -76,7 +76,7 @@ public class DDAgentApi { } } - public void addResponseListener(final ResponseListener listener) { + public void addResponseListener(final DDAgentResponseListener listener) { if (!responseListeners.contains(listener)) { responseListeners.add(listener); } @@ -186,7 +186,7 @@ public class DDAgentApi { final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString); final String endpoint = tracesUrl.toString(); - for (final ResponseListener listener : responseListeners) { + for (final DDAgentResponseListener listener : responseListeners) { listener.onResponse(endpoint, parsedResponse); } return Response.success(response.code(), parsedResponse); @@ -364,9 +364,4 @@ public class DDAgentApi { return exception; } } - - public interface ResponseListener { - /** Invoked after the api receives a response from the core agent. */ - void onResponse(String endpoint, JsonNode responseJson); - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java new file mode 100644 index 0000000000..a0cbc72fda --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java @@ -0,0 +1,8 @@ +package datadog.trace.common.writer.ddagent; + +import com.fasterxml.jackson.databind.JsonNode; + +public interface DDAgentResponseListener { + /** Invoked after the api receives a response from the core agent. */ + void onResponse(String endpoint, JsonNode responseJson); +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy index 07bd326685..8d5ed0dcdd 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy @@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.JsonNode import datadog.opentracing.SpanFactory import datadog.trace.common.writer.ddagent.DDAgentApi -import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener +import datadog.trace.common.writer.ddagent.DDAgentResponseListener import datadog.trace.util.test.DDSpecification import java.util.concurrent.atomic.AtomicLong @@ -125,7 +125,7 @@ class DDAgentApiTest extends DDSpecification { def "Api ResponseListeners see 200 responses"() { setup: def agentResponse = new AtomicReference(null) - ResponseListener responseListener = { String endpoint, JsonNode responseJson -> + DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson -> agentResponse.set(responseJson.toString()) } def agent = httpServer { diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy index 1d5ee8297f..00db0a213d 100644 --- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy +++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy @@ -6,6 +6,7 @@ import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ddagent.DDAgentApi +import datadog.trace.common.writer.ddagent.DDAgentResponseListener import datadog.trace.util.test.DDSpecification import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy @@ -64,7 +65,7 @@ class DDApiIntegrationTest { def endpoint = new AtomicReference(null) def agentResponse = new AtomicReference(null) - DDAgentApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> + DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> endpoint.set(receivedEndpoint) agentResponse.set(responseJson.toString()) }