diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java
index 85908feb28..ae5a0c4301 100644
--- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java
+++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java
@@ -14,8 +14,8 @@ import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.sampling.PrioritySampler;
import datadog.trace.common.sampling.Sampler;
import datadog.trace.common.writer.DDAgentWriter;
-import datadog.trace.common.writer.DDApi;
import datadog.trace.common.writer.Writer;
+import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import datadog.trace.context.ScopeListener;
import io.opentracing.References;
import io.opentracing.Scope;
@@ -245,11 +245,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
injector = HttpCodec.createInjector(Config.get());
extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders);
- if (this.writer instanceof DDAgentWriter) {
- final DDApi api = ((DDAgentWriter) this.writer).getApi();
- if (sampler instanceof DDApi.ResponseListener) {
- api.addResponseListener((DDApi.ResponseListener) this.sampler);
- }
+ if (this.writer instanceof DDAgentWriter && sampler instanceof DDAgentResponseListener) {
+ ((DDAgentWriter) this.writer).addResponseListener((DDAgentResponseListener) this.sampler);
}
log.info("New instance: {}", this);
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java
index c5b8242d76..b8191b1a8f 100644
--- a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java
@@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
-import datadog.trace.common.writer.DDApi.ResponseListener;
+import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
*
The configuration of (serviceName,env)->rate is configured by the core agent.
*/
@Slf4j
-public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener {
+public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentResponseListener {
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
/** Key for setting the default/baseline rate */
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java
index a98baabf56..c63fe9ff8f 100644
--- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java
@@ -5,31 +5,19 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.EventTranslatorOneArg;
-import com.lmax.disruptor.SleepingWaitStrategy;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
-import com.timgroup.statsd.NonBlockingStatsDClient;
-import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
-import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.util.DaemonThreadFactory;
-import java.util.ArrayList;
+import datadog.trace.common.writer.ddagent.DDAgentApi;
+import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
+import datadog.trace.common.writer.ddagent.Monitor;
+import datadog.trace.common.writer.ddagent.TraceConsumer;
+import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
@@ -43,57 +31,35 @@ import lombok.extern.slf4j.Slf4j;
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
- private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
- private static final EventTranslatorOneArg>, List> TRANSLATOR =
- new EventTranslatorOneArg>, List>() {
- @Override
- public void translateTo(
- final Event> event, final long sequence, final List trace) {
- event.data = trace;
- }
- };
- private static final EventTranslator>> FLUSH_TRANSLATOR =
- new EventTranslator>>() {
- @Override
- public void translateTo(final Event> event, final long sequence) {
- event.shouldFlush = true;
- }
- };
-
- private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
- new DaemonThreadFactory("dd-trace-disruptor");
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
- private final Runnable flushTask = new FlushTask();
- private final DDApi api;
- private final int flushFrequencySeconds;
- private final Disruptor>> disruptor;
+ private final DDAgentApi api;
+ public final int flushFrequencySeconds;
+ public final TraceSerializingDisruptor disruptor;
- private final Semaphore senderSemaphore;
- private final ScheduledExecutorService scheduledWriterExecutor;
+ public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
- private final AtomicReference> flushSchedule = new AtomicReference<>();
- private final Phaser apiPhaser;
- private volatile boolean running = false;
+ public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
- private final Monitor monitor;
+ public final Monitor monitor;
public DDAgentWriter() {
this(
- new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
- new NoopMonitor());
+ new DDAgentApi(
+ DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
+ new Monitor.Noop());
}
- public DDAgentWriter(final DDApi api, final Monitor monitor) {
+ public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
- private DDAgentWriter(final DDApi api) {
- this(api, new NoopMonitor());
+ private DDAgentWriter(final DDAgentApi api) {
+ this(api, new Monitor.Noop());
}
/**
@@ -104,16 +70,16 @@ public class DDAgentWriter implements Writer {
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(
- final DDApi api,
+ final DDAgentApi api,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
- this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds);
+ this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
- final DDApi api,
+ final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
@@ -121,12 +87,13 @@ public class DDAgentWriter implements Writer {
}
// DQH - TODO - Update the tests & remove this
- private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
- this(api, new NoopMonitor(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
+ private DDAgentWriter(
+ final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
+ this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
- final DDApi api,
+ final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int senderQueueSize,
@@ -135,25 +102,22 @@ public class DDAgentWriter implements Writer {
this.monitor = monitor;
disruptor =
- new Disruptor<>(
- new DisruptorEventFactory>(),
- Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
- DISRUPTOR_THREAD_FACTORY,
- ProducerType.MULTI,
- new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
- disruptor.handleEventsWith(new TraceConsumer());
+ new TraceSerializingDisruptor(
+ disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this));
this.flushFrequencySeconds = flushFrequencySeconds;
- senderSemaphore = new Semaphore(senderQueueSize);
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
- apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
}
+ public void addResponseListener(final DDAgentResponseListener listener) {
+ api.addResponseListener(listener);
+ }
+
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
- return disruptor.getRingBuffer().getBufferSize();
+ return disruptor.getDisruptorCapacity();
}
public final long getDisruptorUtilizedCapacity() {
@@ -161,14 +125,14 @@ public class DDAgentWriter implements Writer {
}
public final long getDisruptorRemainingCapacity() {
- return disruptor.getRingBuffer().remainingCapacity();
+ return disruptor.getDisruptorRemainingCapacity();
}
@Override
public void write(final List trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
- if (running) {
- final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
+ if (disruptor.running) {
+ final boolean published = disruptor.tryPublish(trace);
if (published) {
monitor.onPublish(DDAgentWriter.this, trace);
@@ -191,22 +155,19 @@ public class DDAgentWriter implements Writer {
traceCount.incrementAndGet();
}
- public DDApi getApi() {
+ public DDAgentApi getApi() {
return api;
}
@Override
public void start() {
disruptor.start();
- running = true;
- scheduleFlush();
monitor.onStart(this);
}
@Override
public void close() {
- running = false;
boolean flushSuccess = true;
@@ -221,34 +182,13 @@ public class DDAgentWriter implements Writer {
flushSuccess = false;
}
- flushSuccess |= flush();
- disruptor.shutdown();
+ flushSuccess |= disruptor.flush();
+
+ disruptor.close();
monitor.onShutdown(this, flushSuccess);
}
- /** This method will block until the flush is complete. */
- public boolean flush() {
- if (running) {
- log.info("Flushing any remaining traces.");
- // Register with the phaser so we can block until the flush completion.
- apiPhaser.register();
- disruptor.publishEvent(FLUSH_TRANSLATOR);
- try {
- // Allow thread to be interrupted.
- apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
-
- return true;
- } catch (final InterruptedException e) {
- log.warn("Waiting for flush interrupted.", e);
-
- return false;
- }
- } else {
- return false;
- }
- }
-
@Override
public String toString() {
// DQH - I don't particularly like the instanceof check,
@@ -257,396 +197,11 @@ public class DDAgentWriter implements Writer {
// if something is *probably* the NoopMonitor.
String str = "DDAgentWriter { api=" + api;
- if (!(monitor instanceof NoopMonitor)) {
+ if (!(monitor instanceof Monitor.Noop)) {
str += ", monitor=" + monitor;
}
str += " }";
return str;
}
-
- private void scheduleFlush() {
- if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) {
- final ScheduledFuture> previous =
- flushSchedule.getAndSet(
- scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
-
- final boolean previousIncomplete = (previous != null);
- if (previousIncomplete) {
- previous.cancel(true);
- }
-
- monitor.onScheduleFlush(this, previousIncomplete);
- }
- }
-
- private class FlushTask implements Runnable {
- @Override
- public void run() {
- // Don't call flush() because it would block the thread also used for sending the traces.
- disruptor.publishEvent(FLUSH_TRANSLATOR);
- }
- }
-
- /** This class is intentionally not threadsafe. */
- private class TraceConsumer implements EventHandler>> {
- private List serializedTraces = new ArrayList<>();
- private int payloadSize = 0;
-
- @Override
- public void onEvent(
- final Event> event, final long sequence, final boolean endOfBatch) {
- final List trace = event.data;
- event.data = null; // clear the event for reuse.
- if (trace != null) {
- traceCount.incrementAndGet();
- try {
- final byte[] serializedTrace = api.serializeTrace(trace);
- payloadSize += serializedTrace.length;
- serializedTraces.add(serializedTrace);
-
- monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace);
- } catch (final JsonProcessingException e) {
- log.warn("Error serializing trace", e);
-
- monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
- } catch (final Throwable e) {
- log.debug("Error while serializing trace", e);
-
- monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
- }
- }
-
- if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
- final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
-
- reportTraces(early);
- event.shouldFlush = false;
- }
- }
-
- private void reportTraces(final boolean early) {
- try {
- if (serializedTraces.isEmpty()) {
- monitor.onFlush(DDAgentWriter.this, early);
-
- apiPhaser.arrive(); // Allow flush to return
- return;
- // scheduleFlush called in finally block.
- }
- if (scheduledWriterExecutor.isShutdown()) {
- monitor.onFailedSend(
- DDAgentWriter.this, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
- apiPhaser.arrive(); // Allow flush to return
- return;
- }
- final List toSend = serializedTraces;
- serializedTraces = new ArrayList<>(toSend.size());
- // ^ Initialize with similar size to reduce arraycopy churn.
-
- final int representativeCount = traceCount.getAndSet(0);
- final int sizeInBytes = payloadSize;
-
- try {
- monitor.onFlush(DDAgentWriter.this, early);
-
- // Run the actual IO task on a different thread to avoid blocking the consumer.
- try {
- senderSemaphore.acquire();
- } catch (final InterruptedException e) {
- monitor.onFailedSend(
- DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
-
- // Finally, we'll schedule another flush
- // Any threads awaiting the flush will continue to wait
- return;
- }
- scheduledWriterExecutor.execute(
- new Runnable() {
- @Override
- public void run() {
- senderSemaphore.release();
-
- try {
- final DDApi.Response response =
- api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
-
- if (response.success()) {
- log.debug("Successfully sent {} traces to the API", toSend.size());
-
- monitor.onSend(
- DDAgentWriter.this, representativeCount, sizeInBytes, response);
- } else {
- log.debug(
- "Failed to send {} traces (representing {}) of size {} bytes to the API",
- toSend.size(),
- representativeCount,
- sizeInBytes);
-
- monitor.onFailedSend(
- DDAgentWriter.this, representativeCount, sizeInBytes, response);
- }
- } catch (final Throwable e) {
- log.debug("Failed to send traces to the API: {}", e.getMessage());
-
- // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
- // shouldn't occur.
- // However, just to be safe to start, create a failed Response to handle any
- // spurious Throwable-s.
- monitor.onFailedSend(
- DDAgentWriter.this,
- representativeCount,
- sizeInBytes,
- DDApi.Response.failed(e));
- } finally {
- apiPhaser.arrive(); // Flush completed.
- }
- }
- });
- } catch (final RejectedExecutionException ex) {
- monitor.onFailedSend(
- DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
- apiPhaser.arrive(); // Allow flush to return
- }
- } finally {
- payloadSize = 0;
- scheduleFlush();
- }
- }
- }
-
- private static class Event {
- private volatile boolean shouldFlush = false;
- private volatile T data = null;
- }
-
- private static class DisruptorEventFactory implements EventFactory> {
- @Override
- public Event newInstance() {
- return new Event<>();
- }
- }
-
- /**
- * Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
- * lifecycle events...
- *
- *
- * - start
- *
- shutdown
- *
- publishing to disruptor
- *
- serializing
- *
- sending to agent
- *
- */
- public interface Monitor {
- void onStart(final DDAgentWriter agentWriter);
-
- void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
-
- void onPublish(final DDAgentWriter agentWriter, final List trace);
-
- void onFailedPublish(final DDAgentWriter agentWriter, final List trace);
-
- void onFlush(final DDAgentWriter agentWriter, final boolean early);
-
- void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
-
- void onSerialize(
- final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace);
-
- void onFailedSerialize(
- final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause);
-
- void onSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response);
-
- void onFailedSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response);
- }
-
- public static final class NoopMonitor implements Monitor {
- @Override
- public void onStart(final DDAgentWriter agentWriter) {}
-
- @Override
- public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
-
- @Override
- public void onPublish(final DDAgentWriter agentWriter, final List trace) {}
-
- @Override
- public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {}
-
- @Override
- public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
-
- @Override
- public void onScheduleFlush(
- final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
-
- @Override
- public void onSerialize(
- final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {}
-
- @Override
- public void onFailedSerialize(
- final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {}
-
- @Override
- public void onSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response) {}
-
- @Override
- public void onFailedSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response) {}
-
- @Override
- public String toString() {
- return "NoOp";
- }
- }
-
- public static final class StatsDMonitor implements Monitor {
- public static final String PREFIX = "datadog.tracer";
-
- public static final String LANG_TAG = "lang";
- public static final String LANG_VERSION_TAG = "lang_version";
- public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
- public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
- public static final String TRACER_VERSION_TAG = "tracer_version";
-
- private final String hostInfo;
- private final StatsDClient statsd;
-
- // DQH - Made a conscious choice to not take a Config object here.
- // Letting the creating of the Monitor take the Config,
- // so it can decide which Monitor variant to create.
- public StatsDMonitor(final String host, final int port) {
- hostInfo = host + ":" + port;
- statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
- }
-
- // Currently, intended for testing
- private StatsDMonitor(final StatsDClient statsd) {
- hostInfo = null;
- this.statsd = statsd;
- }
-
- protected static final String[] getDefaultTags() {
- return new String[] {
- tag(LANG_TAG, "java"),
- tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
- tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
- tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
- tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
- };
- }
-
- private static String tag(final String tagPrefix, final String tagValue) {
- return tagPrefix + ":" + tagValue;
- }
-
- @Override
- public void onStart(final DDAgentWriter agentWriter) {
- statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
- }
-
- @Override
- public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
-
- @Override
- public void onPublish(final DDAgentWriter agentWriter, final List trace) {
- statsd.incrementCounter("queue.accepted");
- statsd.count("queue.accepted_lengths", trace.size());
- }
-
- @Override
- public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {
- statsd.incrementCounter("queue.dropped");
- }
-
- @Override
- public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
- // not recorded
- }
-
- @Override
- public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
-
- @Override
- public void onSerialize(
- final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {
- // DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
- // map precisely
- statsd.count("queue.accepted_size", serializedTrace.length);
- }
-
- @Override
- public void onFailedSerialize(
- final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {
- // TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
- // api.errors???
- }
-
- @Override
- public void onSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response) {
- onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
- }
-
- @Override
- public void onFailedSend(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response) {
- onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
- }
-
- private void onSendAttempt(
- final DDAgentWriter agentWriter,
- final int representativeCount,
- final int sizeInBytes,
- final DDApi.Response response) {
- statsd.incrementCounter("api.requests");
- statsd.recordGaugeValue("queue.length", representativeCount);
- // TODO: missing queue.spans (# of spans being sent)
- statsd.recordGaugeValue("queue.size", sizeInBytes);
-
- if (response.exception() != null) {
- // covers communication errors -- both not receiving a response or
- // receiving malformed response (even when otherwise successful)
- statsd.incrementCounter("api.errors");
- }
-
- if (response.status() != null) {
- statsd.incrementCounter("api.responses", "status: " + response.status());
- }
- }
-
- @Override
- public String toString() {
- if (hostInfo == null) {
- return "StatsD";
- } else {
- return "StatsD { host=" + hostInfo + " }";
- }
- }
- }
}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java
index a6d99ee749..b14cbfdd23 100644
--- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java
@@ -2,6 +2,8 @@ package datadog.trace.common.writer;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
+import datadog.trace.common.writer.ddagent.DDAgentApi;
+import datadog.trace.common.writer.ddagent.Monitor;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
@@ -65,14 +67,14 @@ public interface Writer extends Closeable {
return new DDAgentWriter(createApi(config), createMonitor(config));
}
- private static final DDApi createApi(final Config config) {
- return new DDApi(
+ private static DDAgentApi createApi(final Config config) {
+ return new DDAgentApi(
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
}
- private static final DDAgentWriter.Monitor createMonitor(final Config config) {
+ private static Monitor createMonitor(final Config config) {
if (!config.isHealthMetricsEnabled()) {
- return new DDAgentWriter.NoopMonitor();
+ return new Monitor.Noop();
} else {
String host = config.getHealthMetricsStatsdHost();
if (host == null) {
@@ -87,7 +89,7 @@ public interface Writer extends Closeable {
port = config.getJmxFetchStatsdPort();
}
- return new DDAgentWriter.StatsDMonitor(host, port);
+ return new Monitor.StatsD(host, port);
}
}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java
similarity index 94%
rename from dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java
rename to dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java
index 164089f3c5..17a607c102 100644
--- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java
@@ -1,4 +1,4 @@
-package datadog.trace.common.writer;
+package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -21,7 +21,6 @@ import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
-import okhttp3.Response;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
@@ -29,7 +28,7 @@ import org.msgpack.jackson.dataformat.MessagePackFactory;
/** The API pointing to a DD agent */
@Slf4j
-public class DDApi {
+public class DDAgentApi {
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
@@ -44,7 +43,7 @@ public class DDApi {
private static final String TRACES_ENDPOINT_V4 = "v0.4/traces";
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
- private final List responseListeners = new ArrayList<>();
+ private final List responseListeners = new ArrayList<>();
private volatile long nextAllowedLogTime = 0;
@@ -54,7 +53,7 @@ public class DDApi {
private final OkHttpClient httpClient;
private final HttpUrl tracesUrl;
- public DDApi(final String host, final int port, final String unixDomainSocketPath) {
+ public DDAgentApi(final String host, final int port, final String unixDomainSocketPath) {
this(
host,
port,
@@ -62,7 +61,7 @@ public class DDApi {
unixDomainSocketPath);
}
- DDApi(
+ DDAgentApi(
final String host,
final int port,
final boolean v4EndpointsAvailable,
@@ -77,7 +76,7 @@ public class DDApi {
}
}
- public void addResponseListener(final ResponseListener listener) {
+ public void addResponseListener(final DDAgentResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
@@ -90,7 +89,7 @@ public class DDApi {
* @return a Response object -- encapsulating success of communication, sending, and result
* parsing
*/
- public Response sendTraces(final List> traces) {
+ Response sendTraces(final List> traces) {
final List serializedTraces = new ArrayList<>(traces.size());
int sizeInBytes = 0;
for (final List trace : traces) {
@@ -187,7 +186,7 @@ public class DDApi {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final String endpoint = tracesUrl.toString();
- for (final ResponseListener listener : responseListeners) {
+ for (final DDAgentResponseListener listener : responseListeners) {
listener.onResponse(endpoint, parsedResponse);
}
return Response.success(response.code(), parsedResponse);
@@ -348,26 +347,21 @@ public class DDApi {
}
public final boolean success() {
- return this.success;
+ return success;
}
// TODO: DQH - In Java 8, switch to OptionalInteger
public final Integer status() {
- return this.status;
+ return status;
}
public final JsonNode json() {
- return this.json;
+ return json;
}
// TODO: DQH - In Java 8, switch to Optional?
public final Throwable exception() {
- return this.exception;
+ return exception;
}
}
-
- public interface ResponseListener {
- /** Invoked after the api receives a response from the core agent. */
- void onResponse(String endpoint, JsonNode responseJson);
- }
}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java
new file mode 100644
index 0000000000..a0cbc72fda
--- /dev/null
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java
@@ -0,0 +1,8 @@
+package datadog.trace.common.writer.ddagent;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public interface DDAgentResponseListener {
+ /** Invoked after the api receives a response from the core agent. */
+ void onResponse(String endpoint, JsonNode responseJson);
+}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java
new file mode 100644
index 0000000000..66ff7a499b
--- /dev/null
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java
@@ -0,0 +1,41 @@
+package datadog.trace.common.writer.ddagent;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.EventTranslatorOneArg;
+import datadog.opentracing.DDSpan;
+import java.util.List;
+
+class DisruptorEvent {
+ public volatile boolean shouldFlush = false;
+ public volatile T data = null;
+
+ static class Factory implements EventFactory> {
+ @Override
+ public DisruptorEvent newInstance() {
+ return new DisruptorEvent<>();
+ }
+ }
+
+ static class TraceTranslator
+ implements EventTranslatorOneArg>, List> {
+ static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
+ new DisruptorEvent.TraceTranslator();
+
+ @Override
+ public void translateTo(
+ final DisruptorEvent> event, final long sequence, final List trace) {
+ event.data = trace;
+ }
+ }
+
+ static class FlushTranslator implements EventTranslator>> {
+ static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
+ new DisruptorEvent.FlushTranslator();
+
+ @Override
+ public void translateTo(final DisruptorEvent> event, final long sequence) {
+ event.shouldFlush = true;
+ }
+ }
+}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java
new file mode 100644
index 0000000000..298fb94b93
--- /dev/null
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java
@@ -0,0 +1,231 @@
+package datadog.trace.common.writer.ddagent;
+
+import com.timgroup.statsd.NonBlockingStatsDClient;
+import com.timgroup.statsd.StatsDClient;
+import datadog.opentracing.DDSpan;
+import datadog.opentracing.DDTraceOTInfo;
+import datadog.trace.common.writer.DDAgentWriter;
+import java.util.List;
+
+/**
+ * Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
+ * lifecycle events...
+ *
+ *
+ * - start
+ *
- shutdown
+ *
- publishing to disruptor
+ *
- serializing
+ *
- sending to agent
+ *
+ */
+public interface Monitor {
+ void onStart(final DDAgentWriter agentWriter);
+
+ void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
+
+ void onPublish(final DDAgentWriter agentWriter, final List trace);
+
+ void onFailedPublish(final DDAgentWriter agentWriter, final List trace);
+
+ void onFlush(final DDAgentWriter agentWriter, final boolean early);
+
+ void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
+
+ void onSerialize(
+ final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace);
+
+ void onFailedSerialize(
+ final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause);
+
+ void onSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response);
+
+ void onFailedSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response);
+
+ final class StatsD implements Monitor {
+ public static final String PREFIX = "datadog.tracer";
+
+ public static final String LANG_TAG = "lang";
+ public static final String LANG_VERSION_TAG = "lang_version";
+ public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
+ public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
+ public static final String TRACER_VERSION_TAG = "tracer_version";
+
+ private final String hostInfo;
+ private final StatsDClient statsd;
+
+ // DQH - Made a conscious choice to not take a Config object here.
+ // Letting the creating of the Monitor take the Config,
+ // so it can decide which Monitor variant to create.
+ public StatsD(final String host, final int port) {
+ hostInfo = host + ":" + port;
+ statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
+ }
+
+ // Currently, intended for testing
+ private StatsD(final StatsDClient statsd) {
+ hostInfo = null;
+ this.statsd = statsd;
+ }
+
+ protected static final String[] getDefaultTags() {
+ return new String[] {
+ tag(LANG_TAG, "java"),
+ tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
+ tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
+ tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
+ tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
+ };
+ }
+
+ private static String tag(final String tagPrefix, final String tagValue) {
+ return tagPrefix + ":" + tagValue;
+ }
+
+ @Override
+ public void onStart(final DDAgentWriter agentWriter) {
+ statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
+ }
+
+ @Override
+ public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
+
+ @Override
+ public void onPublish(final DDAgentWriter agentWriter, final List trace) {
+ statsd.incrementCounter("queue.accepted");
+ statsd.count("queue.accepted_lengths", trace.size());
+ }
+
+ @Override
+ public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {
+ statsd.incrementCounter("queue.dropped");
+ }
+
+ @Override
+ public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
+ // not recorded
+ }
+
+ @Override
+ public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
+
+ @Override
+ public void onSerialize(
+ final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {
+ // DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
+ // map precisely
+ statsd.count("queue.accepted_size", serializedTrace.length);
+ }
+
+ @Override
+ public void onFailedSerialize(
+ final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {
+ // TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
+ // api.errors???
+ }
+
+ @Override
+ public void onSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response) {
+ onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
+ }
+
+ @Override
+ public void onFailedSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response) {
+ onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
+ }
+
+ private void onSendAttempt(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response) {
+ statsd.incrementCounter("api.requests");
+ statsd.recordGaugeValue("queue.length", representativeCount);
+ // TODO: missing queue.spans (# of spans being sent)
+ statsd.recordGaugeValue("queue.size", sizeInBytes);
+
+ if (response.exception() != null) {
+ // covers communication errors -- both not receiving a response or
+ // receiving malformed response (even when otherwise successful)
+ statsd.incrementCounter("api.errors");
+ }
+
+ if (response.status() != null) {
+ statsd.incrementCounter("api.responses", "status: " + response.status());
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (hostInfo == null) {
+ return "StatsD";
+ } else {
+ return "StatsD { host=" + hostInfo + " }";
+ }
+ }
+ }
+
+ final class Noop implements Monitor {
+ @Override
+ public void onStart(final DDAgentWriter agentWriter) {}
+
+ @Override
+ public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
+
+ @Override
+ public void onPublish(final DDAgentWriter agentWriter, final List trace) {}
+
+ @Override
+ public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {}
+
+ @Override
+ public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
+
+ @Override
+ public void onScheduleFlush(
+ final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
+
+ @Override
+ public void onSerialize(
+ final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {}
+
+ @Override
+ public void onFailedSerialize(
+ final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {}
+
+ @Override
+ public void onSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response) {}
+
+ @Override
+ public void onFailedSend(
+ final DDAgentWriter agentWriter,
+ final int representativeCount,
+ final int sizeInBytes,
+ final DDAgentApi.Response response) {}
+
+ @Override
+ public String toString() {
+ return "NoOp";
+ }
+ }
+}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java
new file mode 100644
index 0000000000..1081f4352e
--- /dev/null
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java
@@ -0,0 +1,150 @@
+package datadog.trace.common.writer.ddagent;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.lmax.disruptor.EventHandler;
+import datadog.opentracing.DDSpan;
+import datadog.trace.common.writer.DDAgentWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+
+/** This class is intentionally not threadsafe. */
+@Slf4j
+public class TraceConsumer implements EventHandler>> {
+ private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
+
+ private final AtomicInteger traceCount;
+ private final Semaphore senderSemaphore;
+ private final DDAgentWriter writer;
+
+ private List serializedTraces = new ArrayList<>();
+ private int payloadSize = 0;
+
+ public TraceConsumer(
+ final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
+ this.traceCount = traceCount;
+ senderSemaphore = new Semaphore(senderQueueSize);
+ this.writer = writer;
+ }
+
+ @Override
+ public void onEvent(
+ final DisruptorEvent> event, final long sequence, final boolean endOfBatch) {
+ final List trace = event.data;
+ event.data = null; // clear the event for reuse.
+ if (trace != null) {
+ traceCount.incrementAndGet();
+ try {
+ final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
+ payloadSize += serializedTrace.length;
+ serializedTraces.add(serializedTrace);
+
+ writer.monitor.onSerialize(writer, trace, serializedTrace);
+ } catch (final JsonProcessingException e) {
+ log.warn("Error serializing trace", e);
+
+ writer.monitor.onFailedSerialize(writer, trace, e);
+ } catch (final Throwable e) {
+ log.debug("Error while serializing trace", e);
+
+ writer.monitor.onFailedSerialize(writer, trace, e);
+ }
+ }
+
+ if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
+ final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
+
+ reportTraces(early);
+ event.shouldFlush = false;
+ }
+ }
+
+ private void reportTraces(final boolean early) {
+ try {
+ if (serializedTraces.isEmpty()) {
+ writer.monitor.onFlush(writer, early);
+
+ writer.apiPhaser.arrive(); // Allow flush to return
+ return;
+ // scheduleFlush called in finally block.
+ }
+ if (writer.scheduledWriterExecutor.isShutdown()) {
+ writer.monitor.onFailedSend(
+ writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
+ writer.apiPhaser.arrive(); // Allow flush to return
+ return;
+ }
+ final List toSend = serializedTraces;
+ serializedTraces = new ArrayList<>(toSend.size());
+ // ^ Initialize with similar size to reduce arraycopy churn.
+
+ final int representativeCount = traceCount.getAndSet(0);
+ final int sizeInBytes = payloadSize;
+
+ try {
+ writer.monitor.onFlush(writer, early);
+
+ // Run the actual IO task on a different thread to avoid blocking the consumer.
+ try {
+ senderSemaphore.acquire();
+ } catch (final InterruptedException e) {
+ writer.monitor.onFailedSend(
+ writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
+
+ // Finally, we'll schedule another flush
+ // Any threads awaiting the flush will continue to wait
+ return;
+ }
+ writer.scheduledWriterExecutor.execute(
+ new Runnable() {
+ @Override
+ public void run() {
+ senderSemaphore.release();
+
+ try {
+ final DDAgentApi.Response response =
+ writer
+ .getApi()
+ .sendSerializedTraces(representativeCount, sizeInBytes, toSend);
+
+ if (response.success()) {
+ log.debug("Successfully sent {} traces to the API", toSend.size());
+
+ writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
+ } else {
+ log.debug(
+ "Failed to send {} traces (representing {}) of size {} bytes to the API",
+ toSend.size(),
+ representativeCount,
+ sizeInBytes);
+
+ writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
+ }
+ } catch (final Throwable e) {
+ log.debug("Failed to send traces to the API: {}", e.getMessage());
+
+ // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
+ // shouldn't occur.
+ // However, just to be safe to start, create a failed Response to handle any
+ // spurious Throwable-s.
+ writer.monitor.onFailedSend(
+ writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
+ } finally {
+ writer.apiPhaser.arrive(); // Flush completed.
+ }
+ }
+ });
+ } catch (final RejectedExecutionException ex) {
+ writer.monitor.onFailedSend(
+ writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
+ writer.apiPhaser.arrive(); // Allow flush to return
+ }
+ } finally {
+ payloadSize = 0;
+ writer.disruptor.scheduleFlush();
+ }
+ }
+}
diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java
new file mode 100644
index 0000000000..f878cbb178
--- /dev/null
+++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java
@@ -0,0 +1,117 @@
+package datadog.trace.common.writer.ddagent;
+
+import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
+import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import com.lmax.disruptor.SleepingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import datadog.opentracing.DDSpan;
+import datadog.trace.common.util.DaemonThreadFactory;
+import datadog.trace.common.writer.DDAgentWriter;
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TraceSerializingDisruptor implements Closeable {
+ private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
+ new DaemonThreadFactory("dd-trace-disruptor");
+ private final FlushTask flushTask = new FlushTask();
+
+ private final Disruptor>> disruptor;
+ private final DDAgentWriter writer;
+
+ public volatile boolean running = false;
+
+ private final AtomicReference> flushSchedule = new AtomicReference<>();
+
+ public TraceSerializingDisruptor(
+ final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
+ disruptor =
+ new Disruptor<>(
+ new DisruptorEvent.Factory>(),
+ Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
+ DISRUPTOR_THREAD_FACTORY,
+ ProducerType.MULTI,
+ new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
+ this.writer = writer;
+ disruptor.handleEventsWith(handler);
+ }
+
+ public void start() {
+ disruptor.start();
+ running = true;
+ scheduleFlush();
+ }
+
+ @Override
+ public void close() {
+ running = false;
+ disruptor.shutdown();
+ }
+
+ public boolean tryPublish(final List trace) {
+ return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
+ }
+
+ /** This method will block until the flush is complete. */
+ public boolean flush() {
+ if (running) {
+ log.info("Flushing any remaining traces.");
+ // Register with the phaser so we can block until the flush completion.
+ writer.apiPhaser.register();
+ disruptor.publishEvent(FLUSH_TRANSLATOR);
+ try {
+ // Allow thread to be interrupted.
+ writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
+
+ return true;
+ } catch (final InterruptedException e) {
+ log.warn("Waiting for flush interrupted.", e);
+
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ public void scheduleFlush() {
+ if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
+ final ScheduledFuture> previous =
+ flushSchedule.getAndSet(
+ writer.scheduledWriterExecutor.schedule(
+ flushTask, writer.flushFrequencySeconds, SECONDS));
+
+ final boolean previousIncomplete = (previous != null);
+ if (previousIncomplete) {
+ previous.cancel(true);
+ }
+
+ writer.monitor.onScheduleFlush(writer, previousIncomplete);
+ }
+ }
+
+ private class FlushTask implements Runnable {
+ @Override
+ public void run() {
+ // Don't call flush() because it would block the thread also used for sending the traces.
+ disruptor.publishEvent(FLUSH_TRANSLATOR);
+ }
+ }
+
+ // Exposing some statistics for consumption by monitors
+ public final long getDisruptorCapacity() {
+ return disruptor.getRingBuffer().getBufferSize();
+ }
+
+ public final long getDisruptorRemainingCapacity() {
+ return disruptor.getRingBuffer().remainingCapacity();
+ }
+}
diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy
index 65ab4d4305..5a7ac705c5 100644
--- a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy
+++ b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy
@@ -14,6 +14,7 @@ import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
+import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import io.opentracing.propagation.TextMapInject
import org.junit.Rule
@@ -49,7 +50,7 @@ class DDTracerTest extends DDSpecification {
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" ||
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces"
- tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor
+ tracer.writer.monitor instanceof Monitor.Noop
tracer.spanContextDecorators.size() == 15
@@ -65,7 +66,7 @@ class DDTracerTest extends DDSpecification {
def tracer = new DDTracer(new Config())
then:
- tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor
+ tracer.writer.monitor instanceof Monitor.StatsD
tracer.writer.monitor.hostInfo == "localhost:8125"
}
diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy
similarity index 89%
rename from dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy
rename to dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy
index 17c8388cd6..8d5ed0dcdd 100644
--- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy
+++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy
@@ -3,8 +3,8 @@ package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.SpanFactory
-import datadog.trace.common.writer.DDApi
-import datadog.trace.common.writer.DDApi.ResponseListener
+import datadog.trace.common.writer.ddagent.DDAgentApi
+import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import java.util.concurrent.atomic.AtomicLong
@@ -12,8 +12,8 @@ import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
-class DDApiTest extends DDSpecification {
- static mapper = DDApi.OBJECT_MAPPER
+class DDAgentApiTest extends DDSpecification {
+ static mapper = DDAgentApi.OBJECT_MAPPER
def "sending an empty list of traces returns no errors"() {
setup:
@@ -30,7 +30,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", agent.address.port, null)
+ def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
@@ -51,7 +51,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", agent.address.port, null)
+ def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces"
@@ -73,7 +73,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", agent.address.port, null)
+ def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
@@ -125,7 +125,7 @@ class DDApiTest extends DDSpecification {
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference(null)
- ResponseListener responseListener = { String endpoint, JsonNode responseJson ->
+ DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
agentResponse.set(responseJson.toString())
}
def agent = httpServer {
@@ -136,7 +136,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", agent.address.port, null)
+ def client = new DDAgentApi("localhost", agent.address.port, null)
client.addResponseListener(responseListener)
when:
@@ -162,7 +162,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", v3Agent.address.port, null)
+ def client = new DDAgentApi("localhost", v3Agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces"
@@ -189,7 +189,7 @@ class DDApiTest extends DDSpecification {
}
}
def port = badPort ? 999 : agent.address.port
- def client = new DDApi("localhost", port, null)
+ def client = new DDAgentApi("localhost", port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces"
@@ -216,7 +216,7 @@ class DDApiTest extends DDSpecification {
}
}
}
- def client = new DDApi("localhost", agent.address.port, null)
+ def client = new DDAgentApi("localhost", agent.address.port, null)
when:
def success = client.sendTraces(traces).success()
diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy
index 0868e56ba5..37de61bf23 100644
--- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy
+++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy
@@ -7,7 +7,9 @@ import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
-import datadog.trace.common.writer.DDApi
+import datadog.trace.common.writer.ddagent.DDAgentApi
+import datadog.trace.common.writer.ddagent.Monitor
+import datadog.trace.common.writer.ddagent.TraceConsumer
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
@@ -22,7 +24,7 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
class DDAgentWriterTest extends DDSpecification {
- def api = Mock(DDApi)
+ def api = Mock(DDAgentApi)
def "test happy path"() {
setup:
@@ -32,7 +34,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(trace)
writer.write(trace)
- writer.flush()
+ writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@@ -55,7 +57,7 @@ class DDAgentWriterTest extends DDSpecification {
(1..traceCount).each {
writer.write(trace)
}
- writer.flush()
+ writer.disruptor.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
@@ -95,7 +97,7 @@ class DDAgentWriterTest extends DDSpecification {
writer.write(trace)
}
// Flush the remaining 2
- writer.flush()
+ writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@@ -116,7 +118,7 @@ class DDAgentWriterTest extends DDSpecification {
def phaser = writer.apiPhaser
phaser.register()
writer.start()
- writer.flush()
+ writer.disruptor.flush()
when:
(1..5).each {
@@ -151,7 +153,7 @@ class DDAgentWriterTest extends DDSpecification {
// Busywait because we don't want to fill up the ring buffer
}
}
- writer.flush()
+ writer.disruptor.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
@@ -178,8 +180,8 @@ class DDAgentWriterTest extends DDSpecification {
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
- traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
- maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
+ traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
+ maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {
@@ -191,7 +193,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.close()
writer.write([])
- writer.flush()
+ writer.disruptor.flush()
then:
0 * _
@@ -206,7 +208,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write([])
- writer.flush()
+ writer.disruptor.flush()
then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@@ -251,8 +253,8 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
- def api = new DDApi("localhost", agent.address.port, null)
- def monitor = Mock(DDAgentWriter.Monitor)
+ def api = new DDAgentApi("localhost", agent.address.port, null)
+ def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@@ -263,7 +265,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
- writer.flush()
+ writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@@ -300,8 +302,8 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
- def api = new DDApi("localhost", agent.address.port, null)
- def monitor = Mock(DDAgentWriter.Monitor)
+ def api = new DDAgentApi("localhost", agent.address.port, null)
+ def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@@ -312,7 +314,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
- writer.flush()
+ writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@@ -334,16 +336,16 @@ class DDAgentWriterTest extends DDSpecification {
setup:
def minimalTrace = createMinimalTrace()
- def api = new DDApi("localhost", 8192, null) {
- DDApi.Response sendSerializedTraces(
+ def api = new DDAgentApi("localhost", 8192, null) {
+ DDAgentApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List traces) {
// simulating a communication failure to a server
- return DDApi.Response.failed(new IOException("comm error"))
+ return DDAgentApi.Response.failed(new IOException("comm error"))
}
}
- def monitor = Mock(DDAgentWriter.Monitor)
+ def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@@ -354,7 +356,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
- writer.flush()
+ writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@@ -398,10 +400,10 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
- def api = new DDApi("localhost", agent.address.port, null)
+ def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
- def monitor = Stub(DDAgentWriter.Monitor)
+ def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@@ -436,7 +438,7 @@ class DDAgentWriterTest extends DDSpecification {
// sanity check coordination mechanism of test
// release to allow response to be generated
responseSemaphore.release()
- writer.flush()
+ writer.disruptor.flush()
// reacquire semaphore to stall further responses
responseSemaphore.acquire()
@@ -503,10 +505,10 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
- def api = new DDApi("localhost", agent.address.port, null)
+ def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
- def monitor = Stub(DDAgentWriter.Monitor)
+ def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@@ -536,7 +538,7 @@ class DDAgentWriterTest extends DDSpecification {
t1.join()
t2.join()
- writer.flush()
+ writer.disruptor.flush()
then:
def totalTraces = 100 + 100
@@ -564,7 +566,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
- def api = new DDApi("localhost", agent.address.port, null)
+ def api = new DDAgentApi("localhost", agent.address.port, null)
def statsd = Stub(StatsDClient)
statsd.incrementCounter("queue.accepted") >> { stat ->
@@ -577,13 +579,13 @@ class DDAgentWriterTest extends DDSpecification {
numResponses += 1
}
- def monitor = new DDAgentWriter.StatsDMonitor(statsd)
+ def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
- writer.flush()
+ writer.disruptor.flush()
then:
numTracesAccepted == 1
@@ -604,13 +606,13 @@ class DDAgentWriterTest extends DDSpecification {
def minimalTrace = createMinimalTrace()
// DQH -- need to set-up a dummy agent for the final send callback to work
- def api = new DDApi("localhost", 8192, null) {
- DDApi.Response sendSerializedTraces(
+ def api = new DDAgentApi("localhost", 8192, null) {
+ DDAgentApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List traces) {
// simulating a communication failure to a server
- return DDApi.Response.failed(new IOException("comm error"))
+ return DDAgentApi.Response.failed(new IOException("comm error"))
}
}
@@ -625,13 +627,13 @@ class DDAgentWriterTest extends DDSpecification {
numErrors += 1
}
- def monitor = new DDAgentWriter.StatsDMonitor(statsd)
+ def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
- writer.flush()
+ writer.disruptor.flush()
then:
numRequests == 1
diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy
index ef45512c5d..00db0a213d 100644
--- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy
+++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy
@@ -4,8 +4,9 @@ import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
-import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ListWriter
+import datadog.trace.common.writer.ddagent.DDAgentApi
+import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
@@ -20,7 +21,7 @@ class DDApiIntegrationTest {
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
- static class DDApiIntegrationV4Test extends DDSpecification {
+ static class DDAgentApiIntegrationV4Test extends DDSpecification {
static final WRITER = new ListWriter()
static final TRACER = new DDTracer(WRITER)
static final CONTEXT = new DDSpanContext(
@@ -64,7 +65,7 @@ class DDApiIntegrationTest {
def endpoint = new AtomicReference(null)
def agentResponse = new AtomicReference(null)
- DDApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
+ DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
endpoint.set(receivedEndpoint)
agentResponse.set(responseJson.toString())
}
@@ -109,10 +110,10 @@ class DDApiIntegrationTest {
}
def setup() {
- api = new DDApi(agentContainerHost, agentContainerPort, v4(), null)
+ api = new DDAgentApi(agentContainerHost, agentContainerPort, v4(), null)
api.addResponseListener(responseListener)
- unixDomainSocketApi = new DDApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
+ unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
unixDomainSocketApi.addResponseListener(responseListener)
}
@@ -159,7 +160,7 @@ class DDApiIntegrationTest {
}
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
- static class DDApiIntegrationV3Test extends DDApiIntegrationV4Test {
+ static class DDAgentApiIntegrationV3Test extends DDAgentApiIntegrationV4Test {
boolean v4() {
return false
}