Extract DDAgentResponseListener from DDApi.
Reduce references to DDApi
This commit is contained in:
parent
24e2fe6da7
commit
0a89f2a57c
|
@ -15,7 +15,7 @@ import datadog.trace.common.sampling.PrioritySampler;
|
|||
import datadog.trace.common.sampling.Sampler;
|
||||
import datadog.trace.common.writer.DDAgentWriter;
|
||||
import datadog.trace.common.writer.Writer;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
|
||||
import datadog.trace.context.ScopeListener;
|
||||
import io.opentracing.References;
|
||||
import io.opentracing.Scope;
|
||||
|
@ -245,11 +245,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
|
|||
injector = HttpCodec.createInjector(Config.get());
|
||||
extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders);
|
||||
|
||||
if (this.writer instanceof DDAgentWriter) {
|
||||
final DDAgentApi api = ((DDAgentWriter) this.writer).getApi();
|
||||
if (sampler instanceof DDAgentApi.ResponseListener) {
|
||||
api.addResponseListener((DDAgentApi.ResponseListener) this.sampler);
|
||||
}
|
||||
if (this.writer instanceof DDAgentWriter && sampler instanceof DDAgentResponseListener) {
|
||||
((DDAgentWriter) this.writer).addResponseListener((DDAgentResponseListener) this.sampler);
|
||||
}
|
||||
|
||||
log.info("New instance: {}", this);
|
||||
|
|
|
@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||
import com.fasterxml.jackson.databind.node.NumericNode;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.api.sampling.PrioritySampling;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
|
|||
* <p>The configuration of (serviceName,env)->rate is configured by the core agent.
|
||||
*/
|
||||
@Slf4j
|
||||
public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener {
|
||||
public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentResponseListener {
|
||||
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
|
||||
|
||||
/** Key for setting the default/baseline rate */
|
||||
|
|
|
@ -8,6 +8,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
|||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
|
||||
import datadog.trace.common.writer.ddagent.Monitor;
|
||||
import datadog.trace.common.writer.ddagent.TraceConsumer;
|
||||
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
|
||||
|
@ -110,6 +111,10 @@ public class DDAgentWriter implements Writer {
|
|||
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
|
||||
}
|
||||
|
||||
public void addResponseListener(final DDAgentResponseListener listener) {
|
||||
api.addResponseListener(listener);
|
||||
}
|
||||
|
||||
// Exposing some statistics for consumption by monitors
|
||||
public final long getDisruptorCapacity() {
|
||||
return disruptor.getDisruptorCapacity();
|
||||
|
|
|
@ -43,7 +43,7 @@ public class DDAgentApi {
|
|||
private static final String TRACES_ENDPOINT_V4 = "v0.4/traces";
|
||||
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
|
||||
|
||||
private final List<ResponseListener> responseListeners = new ArrayList<>();
|
||||
private final List<DDAgentResponseListener> responseListeners = new ArrayList<>();
|
||||
|
||||
private volatile long nextAllowedLogTime = 0;
|
||||
|
||||
|
@ -76,7 +76,7 @@ public class DDAgentApi {
|
|||
}
|
||||
}
|
||||
|
||||
public void addResponseListener(final ResponseListener listener) {
|
||||
public void addResponseListener(final DDAgentResponseListener listener) {
|
||||
if (!responseListeners.contains(listener)) {
|
||||
responseListeners.add(listener);
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ public class DDAgentApi {
|
|||
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
|
||||
final String endpoint = tracesUrl.toString();
|
||||
|
||||
for (final ResponseListener listener : responseListeners) {
|
||||
for (final DDAgentResponseListener listener : responseListeners) {
|
||||
listener.onResponse(endpoint, parsedResponse);
|
||||
}
|
||||
return Response.success(response.code(), parsedResponse);
|
||||
|
@ -364,9 +364,4 @@ public class DDAgentApi {
|
|||
return exception;
|
||||
}
|
||||
}
|
||||
|
||||
public interface ResponseListener {
|
||||
/** Invoked after the api receives a response from the core agent. */
|
||||
void onResponse(String endpoint, JsonNode responseJson);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
public interface DDAgentResponseListener {
|
||||
/** Invoked after the api receives a response from the core agent. */
|
||||
void onResponse(String endpoint, JsonNode responseJson);
|
||||
}
|
|
@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference
|
|||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import datadog.opentracing.SpanFactory
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
@ -125,7 +125,7 @@ class DDAgentApiTest extends DDSpecification {
|
|||
def "Api ResponseListeners see 200 responses"() {
|
||||
setup:
|
||||
def agentResponse = new AtomicReference<String>(null)
|
||||
ResponseListener responseListener = { String endpoint, JsonNode responseJson ->
|
||||
DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
|
||||
agentResponse.set(responseJson.toString())
|
||||
}
|
||||
def agent = httpServer {
|
||||
|
|
|
@ -6,6 +6,7 @@ import datadog.opentracing.PendingTrace
|
|||
import datadog.trace.api.sampling.PrioritySampling
|
||||
import datadog.trace.common.writer.ListWriter
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import org.testcontainers.containers.GenericContainer
|
||||
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
|
||||
|
@ -64,7 +65,7 @@ class DDApiIntegrationTest {
|
|||
def endpoint = new AtomicReference<String>(null)
|
||||
def agentResponse = new AtomicReference<String>(null)
|
||||
|
||||
DDAgentApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
|
||||
DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
|
||||
endpoint.set(receivedEndpoint)
|
||||
agentResponse.set(responseJson.toString())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue