From 6e57041a6a3cd75bf651ccdcbf486fba1617c8c5 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 12:09:14 -0800 Subject: [PATCH 1/7] Extract Monitor classes from DDAgentWriter --- .../trace/common/writer/DDAgentWriter.java | 237 +----------------- .../datadog/trace/common/writer/Writer.java | 9 +- .../trace/common/writer/ddagent/Monitor.java | 232 +++++++++++++++++ .../groovy/datadog/trace/DDTracerTest.groovy | 5 +- .../trace/api/writer/DDAgentWriterTest.groovy | 15 +- 5 files changed, 254 insertions(+), 244 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index a98baabf56..4a8e230a3f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -13,11 +13,9 @@ import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import com.timgroup.statsd.NonBlockingStatsDClient; -import com.timgroup.statsd.StatsDClient; import datadog.opentracing.DDSpan; -import datadog.opentracing.DDTraceOTInfo; import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.ddagent.Monitor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; @@ -84,7 +82,7 @@ public class DDAgentWriter implements Writer { public DDAgentWriter() { this( new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), - new NoopMonitor()); + new Monitor.Noop()); } public DDAgentWriter(final DDApi api, final Monitor monitor) { @@ -93,7 +91,7 @@ public class DDAgentWriter implements Writer { /** Old signature (pre-Monitor) used in tests */ private DDAgentWriter(final DDApi api) { - this(api, new NoopMonitor()); + this(api, new Monitor.Noop()); } /** @@ -108,7 +106,7 @@ public class DDAgentWriter implements Writer { final int disruptorSize, final int senderQueueSize, final int flushFrequencySeconds) { - this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds); + this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds); } // DQH - TODO - Update the tests & remove this @@ -122,7 +120,7 @@ public class DDAgentWriter implements Writer { // DQH - TODO - Update the tests & remove this private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) { - this(api, new NoopMonitor(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); + this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); } private DDAgentWriter( @@ -257,7 +255,7 @@ public class DDAgentWriter implements Writer { // if something is *probably* the NoopMonitor. String str = "DDAgentWriter { api=" + api; - if (!(monitor instanceof NoopMonitor)) { + if (!(monitor instanceof Monitor.Noop)) { str += ", monitor=" + monitor; } str += " }"; @@ -426,227 +424,4 @@ public class DDAgentWriter implements Writer { return new Event<>(); } } - - /** - * Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major - * lifecycle events... - * - * - */ - public interface Monitor { - void onStart(final DDAgentWriter agentWriter); - - void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess); - - void onPublish(final DDAgentWriter agentWriter, final List trace); - - void onFailedPublish(final DDAgentWriter agentWriter, final List trace); - - void onFlush(final DDAgentWriter agentWriter, final boolean early); - - void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete); - - void onSerialize( - final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace); - - void onFailedSerialize( - final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause); - - void onSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response); - - void onFailedSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response); - } - - public static final class NoopMonitor implements Monitor { - @Override - public void onStart(final DDAgentWriter agentWriter) {} - - @Override - public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {} - - @Override - public void onPublish(final DDAgentWriter agentWriter, final List trace) {} - - @Override - public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {} - - @Override - public void onFlush(final DDAgentWriter agentWriter, final boolean early) {} - - @Override - public void onScheduleFlush( - final DDAgentWriter agentWriter, final boolean previousIncomplete) {} - - @Override - public void onSerialize( - final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {} - - @Override - public void onFailedSerialize( - final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {} - - @Override - public void onSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response) {} - - @Override - public void onFailedSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response) {} - - @Override - public String toString() { - return "NoOp"; - } - } - - public static final class StatsDMonitor implements Monitor { - public static final String PREFIX = "datadog.tracer"; - - public static final String LANG_TAG = "lang"; - public static final String LANG_VERSION_TAG = "lang_version"; - public static final String LANG_INTERPRETER_TAG = "lang_interpreter"; - public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor"; - public static final String TRACER_VERSION_TAG = "tracer_version"; - - private final String hostInfo; - private final StatsDClient statsd; - - // DQH - Made a conscious choice to not take a Config object here. - // Letting the creating of the Monitor take the Config, - // so it can decide which Monitor variant to create. - public StatsDMonitor(final String host, final int port) { - hostInfo = host + ":" + port; - statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags()); - } - - // Currently, intended for testing - private StatsDMonitor(final StatsDClient statsd) { - hostInfo = null; - this.statsd = statsd; - } - - protected static final String[] getDefaultTags() { - return new String[] { - tag(LANG_TAG, "java"), - tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION), - tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME), - tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR), - tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION) - }; - } - - private static String tag(final String tagPrefix, final String tagValue) { - return tagPrefix + ":" + tagValue; - } - - @Override - public void onStart(final DDAgentWriter agentWriter) { - statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity()); - } - - @Override - public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {} - - @Override - public void onPublish(final DDAgentWriter agentWriter, final List trace) { - statsd.incrementCounter("queue.accepted"); - statsd.count("queue.accepted_lengths", trace.size()); - } - - @Override - public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) { - statsd.incrementCounter("queue.dropped"); - } - - @Override - public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) { - // not recorded - } - - @Override - public void onFlush(final DDAgentWriter agentWriter, final boolean early) {} - - @Override - public void onSerialize( - final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) { - // DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't - // map precisely - statsd.count("queue.accepted_size", serializedTrace.length); - } - - @Override - public void onFailedSerialize( - final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) { - // TODO - DQH - make a new stat for serialization failure -- or maybe count this towards - // api.errors??? - } - - @Override - public void onSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response) { - onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); - } - - @Override - public void onFailedSend( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response) { - onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); - } - - private void onSendAttempt( - final DDAgentWriter agentWriter, - final int representativeCount, - final int sizeInBytes, - final DDApi.Response response) { - statsd.incrementCounter("api.requests"); - statsd.recordGaugeValue("queue.length", representativeCount); - // TODO: missing queue.spans (# of spans being sent) - statsd.recordGaugeValue("queue.size", sizeInBytes); - - if (response.exception() != null) { - // covers communication errors -- both not receiving a response or - // receiving malformed response (even when otherwise successful) - statsd.incrementCounter("api.errors"); - } - - if (response.status() != null) { - statsd.incrementCounter("api.responses", "status: " + response.status()); - } - } - - @Override - public String toString() { - if (hostInfo == null) { - return "StatsD"; - } else { - return "StatsD { host=" + hostInfo + " }"; - } - } - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index a6d99ee749..669bcfd9fc 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -2,6 +2,7 @@ package datadog.trace.common.writer; import datadog.opentracing.DDSpan; import datadog.trace.api.Config; +import datadog.trace.common.writer.ddagent.Monitor; import java.io.Closeable; import java.util.List; import java.util.Properties; @@ -65,14 +66,14 @@ public interface Writer extends Closeable { return new DDAgentWriter(createApi(config), createMonitor(config)); } - private static final DDApi createApi(final Config config) { + private static DDApi createApi(final Config config) { return new DDApi( config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket()); } - private static final DDAgentWriter.Monitor createMonitor(final Config config) { + private static Monitor createMonitor(final Config config) { if (!config.isHealthMetricsEnabled()) { - return new DDAgentWriter.NoopMonitor(); + return new Monitor.Noop(); } else { String host = config.getHealthMetricsStatsdHost(); if (host == null) { @@ -87,7 +88,7 @@ public interface Writer extends Closeable { port = config.getJmxFetchStatsdPort(); } - return new DDAgentWriter.StatsDMonitor(host, port); + return new Monitor.StatsD(host, port); } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java new file mode 100644 index 0000000000..12afd9906c --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java @@ -0,0 +1,232 @@ +package datadog.trace.common.writer.ddagent; + +import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.StatsDClient; +import datadog.opentracing.DDSpan; +import datadog.opentracing.DDTraceOTInfo; +import datadog.trace.common.writer.DDAgentWriter; +import datadog.trace.common.writer.DDApi; +import java.util.List; + +/** + * Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major + * lifecycle events... + * + *
    + *
  • start + *
  • shutdown + *
  • publishing to disruptor + *
  • serializing + *
  • sending to agent + *
+ */ +public interface Monitor { + void onStart(final DDAgentWriter agentWriter); + + void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess); + + void onPublish(final DDAgentWriter agentWriter, final List trace); + + void onFailedPublish(final DDAgentWriter agentWriter, final List trace); + + void onFlush(final DDAgentWriter agentWriter, final boolean early); + + void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete); + + void onSerialize( + final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace); + + void onFailedSerialize( + final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause); + + void onSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response); + + void onFailedSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response); + + final class StatsD implements Monitor { + public static final String PREFIX = "datadog.tracer"; + + public static final String LANG_TAG = "lang"; + public static final String LANG_VERSION_TAG = "lang_version"; + public static final String LANG_INTERPRETER_TAG = "lang_interpreter"; + public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor"; + public static final String TRACER_VERSION_TAG = "tracer_version"; + + private final String hostInfo; + private final StatsDClient statsd; + + // DQH - Made a conscious choice to not take a Config object here. + // Letting the creating of the Monitor take the Config, + // so it can decide which Monitor variant to create. + public StatsD(final String host, final int port) { + hostInfo = host + ":" + port; + statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags()); + } + + // Currently, intended for testing + private StatsD(final StatsDClient statsd) { + hostInfo = null; + this.statsd = statsd; + } + + protected static final String[] getDefaultTags() { + return new String[] { + tag(LANG_TAG, "java"), + tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION), + tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME), + tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR), + tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION) + }; + } + + private static String tag(final String tagPrefix, final String tagValue) { + return tagPrefix + ":" + tagValue; + } + + @Override + public void onStart(final DDAgentWriter agentWriter) { + statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity()); + } + + @Override + public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {} + + @Override + public void onPublish(final DDAgentWriter agentWriter, final List trace) { + statsd.incrementCounter("queue.accepted"); + statsd.count("queue.accepted_lengths", trace.size()); + } + + @Override + public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) { + statsd.incrementCounter("queue.dropped"); + } + + @Override + public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) { + // not recorded + } + + @Override + public void onFlush(final DDAgentWriter agentWriter, final boolean early) {} + + @Override + public void onSerialize( + final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) { + // DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't + // map precisely + statsd.count("queue.accepted_size", serializedTrace.length); + } + + @Override + public void onFailedSerialize( + final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) { + // TODO - DQH - make a new stat for serialization failure -- or maybe count this towards + // api.errors??? + } + + @Override + public void onSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response) { + onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); + } + + @Override + public void onFailedSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response) { + onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); + } + + private void onSendAttempt( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response) { + statsd.incrementCounter("api.requests"); + statsd.recordGaugeValue("queue.length", representativeCount); + // TODO: missing queue.spans (# of spans being sent) + statsd.recordGaugeValue("queue.size", sizeInBytes); + + if (response.exception() != null) { + // covers communication errors -- both not receiving a response or + // receiving malformed response (even when otherwise successful) + statsd.incrementCounter("api.errors"); + } + + if (response.status() != null) { + statsd.incrementCounter("api.responses", "status: " + response.status()); + } + } + + @Override + public String toString() { + if (hostInfo == null) { + return "StatsD"; + } else { + return "StatsD { host=" + hostInfo + " }"; + } + } + } + + final class Noop implements Monitor { + @Override + public void onStart(final DDAgentWriter agentWriter) {} + + @Override + public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {} + + @Override + public void onPublish(final DDAgentWriter agentWriter, final List trace) {} + + @Override + public void onFailedPublish(final DDAgentWriter agentWriter, final List trace) {} + + @Override + public void onFlush(final DDAgentWriter agentWriter, final boolean early) {} + + @Override + public void onScheduleFlush( + final DDAgentWriter agentWriter, final boolean previousIncomplete) {} + + @Override + public void onSerialize( + final DDAgentWriter agentWriter, final List trace, final byte[] serializedTrace) {} + + @Override + public void onFailedSerialize( + final DDAgentWriter agentWriter, final List trace, final Throwable optionalCause) {} + + @Override + public void onSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response) {} + + @Override + public void onFailedSend( + final DDAgentWriter agentWriter, + final int representativeCount, + final int sizeInBytes, + final DDApi.Response response) {} + + @Override + public String toString() { + return "NoOp"; + } + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy index 65ab4d4305..5a7ac705c5 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy @@ -14,6 +14,7 @@ import datadog.trace.common.sampling.Sampler import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.LoggingWriter +import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.util.test.DDSpecification import io.opentracing.propagation.TextMapInject import org.junit.Rule @@ -49,7 +50,7 @@ class DDTracerTest extends DDSpecification { ((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126 ((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" || ((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces" - tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor + tracer.writer.monitor instanceof Monitor.Noop tracer.spanContextDecorators.size() == 15 @@ -65,7 +66,7 @@ class DDTracerTest extends DDSpecification { def tracer = new DDTracer(new Config()) then: - tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor + tracer.writer.monitor instanceof Monitor.StatsD tracer.writer.monitor.hostInfo == "localhost:8125" } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 0868e56ba5..f61ffb6641 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -8,6 +8,7 @@ import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.DDApi +import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.util.test.DDSpecification import spock.lang.Timeout @@ -252,7 +253,7 @@ class DDAgentWriterTest extends DDSpecification { } } def api = new DDApi("localhost", agent.address.port, null) - def monitor = Mock(DDAgentWriter.Monitor) + def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) when: @@ -301,7 +302,7 @@ class DDAgentWriterTest extends DDSpecification { } } def api = new DDApi("localhost", agent.address.port, null) - def monitor = Mock(DDAgentWriter.Monitor) + def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) when: @@ -343,7 +344,7 @@ class DDAgentWriterTest extends DDSpecification { return DDApi.Response.failed(new IOException("comm error")) } } - def monitor = Mock(DDAgentWriter.Monitor) + def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) when: @@ -401,7 +402,7 @@ class DDAgentWriterTest extends DDSpecification { def api = new DDApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(DDAgentWriter.Monitor) + def monitor = Stub(Monitor) monitor.onPublish(_, _) >> { numPublished.incrementAndGet() } @@ -506,7 +507,7 @@ class DDAgentWriterTest extends DDSpecification { def api = new DDApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(DDAgentWriter.Monitor) + def monitor = Stub(Monitor) monitor.onPublish(_, _) >> { numPublished.incrementAndGet() } @@ -577,7 +578,7 @@ class DDAgentWriterTest extends DDSpecification { numResponses += 1 } - def monitor = new DDAgentWriter.StatsDMonitor(statsd) + def monitor = new Monitor.StatsD(statsd) def writer = new DDAgentWriter(api, monitor) writer.start() @@ -625,7 +626,7 @@ class DDAgentWriterTest extends DDSpecification { numErrors += 1 } - def monitor = new DDAgentWriter.StatsDMonitor(statsd) + def monitor = new Monitor.StatsD(statsd) def writer = new DDAgentWriter(api, monitor) writer.start() From 2ea76494f8bac76976ee05e9b078c8f3edd91acb Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 12:19:00 -0800 Subject: [PATCH 2/7] Extract DisruptorEvent and EventTranslator classes from DDAgentWriter --- .../trace/common/writer/DDAgentWriter.java | 43 ++++--------------- .../common/writer/ddagent/DisruptorEvent.java | 35 +++++++++++++++ 2 files changed, 44 insertions(+), 34 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 4a8e230a3f..fc0191fb09 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -6,15 +6,13 @@ import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; import static java.util.concurrent.TimeUnit.SECONDS; import com.fasterxml.jackson.core.JsonProcessingException; -import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.EventTranslator; -import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.ddagent.DisruptorEvent; import datadog.trace.common.writer.ddagent.Monitor; import java.util.ArrayList; import java.util.List; @@ -44,21 +42,10 @@ public class DDAgentWriter implements Writer { private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second - private static final EventTranslatorOneArg>, List> TRANSLATOR = - new EventTranslatorOneArg>, List>() { - @Override - public void translateTo( - final Event> event, final long sequence, final List trace) { - event.data = trace; - } - }; - private static final EventTranslator>> FLUSH_TRANSLATOR = - new EventTranslator>>() { - @Override - public void translateTo(final Event> event, final long sequence) { - event.shouldFlush = true; - } - }; + private static final DisruptorEvent.TraceTranslator TRANSLATOR = + new DisruptorEvent.TraceTranslator(); + private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = + new DisruptorEvent.FlushTranslator(); private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-disruptor"); @@ -68,7 +55,7 @@ public class DDAgentWriter implements Writer { private final Runnable flushTask = new FlushTask(); private final DDApi api; private final int flushFrequencySeconds; - private final Disruptor>> disruptor; + private final Disruptor>> disruptor; private final Semaphore senderSemaphore; private final ScheduledExecutorService scheduledWriterExecutor; @@ -134,7 +121,7 @@ public class DDAgentWriter implements Writer { disruptor = new Disruptor<>( - new DisruptorEventFactory>(), + new DisruptorEvent.Factory>(), Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, @@ -287,13 +274,13 @@ public class DDAgentWriter implements Writer { } /** This class is intentionally not threadsafe. */ - private class TraceConsumer implements EventHandler>> { + private class TraceConsumer implements EventHandler>> { private List serializedTraces = new ArrayList<>(); private int payloadSize = 0; @Override public void onEvent( - final Event> event, final long sequence, final boolean endOfBatch) { + final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { final List trace = event.data; event.data = null; // clear the event for reuse. if (trace != null) { @@ -412,16 +399,4 @@ public class DDAgentWriter implements Writer { } } } - - private static class Event { - private volatile boolean shouldFlush = false; - private volatile T data = null; - } - - private static class DisruptorEventFactory implements EventFactory> { - @Override - public Event newInstance() { - return new Event<>(); - } - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java new file mode 100644 index 0000000000..30db86f21a --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -0,0 +1,35 @@ +package datadog.trace.common.writer.ddagent; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.EventTranslatorOneArg; +import datadog.opentracing.DDSpan; +import java.util.List; + +public class DisruptorEvent { + public volatile boolean shouldFlush = false; + public volatile T data = null; + + public static class Factory implements EventFactory> { + @Override + public DisruptorEvent newInstance() { + return new DisruptorEvent<>(); + } + } + + public static class TraceTranslator + implements EventTranslatorOneArg>, List> { + @Override + public void translateTo( + final DisruptorEvent> event, final long sequence, final List trace) { + event.data = trace; + } + } + + public static class FlushTranslator implements EventTranslator>> { + @Override + public void translateTo(final DisruptorEvent> event, final long sequence) { + event.shouldFlush = true; + } + } +} From 8fdd30d3edc22a35cf26821f2ad5167e2c8ee2c9 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 12:54:40 -0800 Subject: [PATCH 3/7] Make TraceConsumer a static class --- .../trace/common/writer/DDAgentWriter.java | 71 ++++++++++--------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index fc0191fb09..70851bd433 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -57,7 +57,6 @@ public class DDAgentWriter implements Writer { private final int flushFrequencySeconds; private final Disruptor>> disruptor; - private final Semaphore senderSemaphore; private final ScheduledExecutorService scheduledWriterExecutor; private final AtomicInteger traceCount = new AtomicInteger(0); private final AtomicReference> flushSchedule = new AtomicReference<>(); @@ -126,10 +125,9 @@ public class DDAgentWriter implements Writer { DISRUPTOR_THREAD_FACTORY, ProducerType.MULTI, new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); - disruptor.handleEventsWith(new TraceConsumer()); + disruptor.handleEventsWith(new TraceConsumer(traceCount, senderQueueSize, this)); this.flushFrequencySeconds = flushFrequencySeconds; - senderSemaphore = new Semaphore(senderQueueSize); scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); apiPhaser = new Phaser(); // Ensure API calls are completed when flushing @@ -274,10 +272,21 @@ public class DDAgentWriter implements Writer { } /** This class is intentionally not threadsafe. */ - private class TraceConsumer implements EventHandler>> { + private static class TraceConsumer implements EventHandler>> { + private final AtomicInteger traceCount; + private final Semaphore senderSemaphore; + private final DDAgentWriter writer; + private List serializedTraces = new ArrayList<>(); private int payloadSize = 0; + private TraceConsumer( + final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) { + this.traceCount = traceCount; + senderSemaphore = new Semaphore(senderQueueSize); + this.writer = writer; + } + @Override public void onEvent( final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { @@ -286,19 +295,19 @@ public class DDAgentWriter implements Writer { if (trace != null) { traceCount.incrementAndGet(); try { - final byte[] serializedTrace = api.serializeTrace(trace); + final byte[] serializedTrace = writer.api.serializeTrace(trace); payloadSize += serializedTrace.length; serializedTraces.add(serializedTrace); - monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace); + writer.monitor.onSerialize(writer, trace, serializedTrace); } catch (final JsonProcessingException e) { log.warn("Error serializing trace", e); - monitor.onFailedSerialize(DDAgentWriter.this, trace, e); + writer.monitor.onFailedSerialize(writer, trace, e); } catch (final Throwable e) { log.debug("Error while serializing trace", e); - monitor.onFailedSerialize(DDAgentWriter.this, trace, e); + writer.monitor.onFailedSerialize(writer, trace, e); } } @@ -313,16 +322,16 @@ public class DDAgentWriter implements Writer { private void reportTraces(final boolean early) { try { if (serializedTraces.isEmpty()) { - monitor.onFlush(DDAgentWriter.this, early); + writer.monitor.onFlush(writer, early); - apiPhaser.arrive(); // Allow flush to return + writer.apiPhaser.arrive(); // Allow flush to return return; // scheduleFlush called in finally block. } - if (scheduledWriterExecutor.isShutdown()) { - monitor.onFailedSend( - DDAgentWriter.this, traceCount.get(), payloadSize, DDApi.Response.failed(-1)); - apiPhaser.arrive(); // Allow flush to return + if (writer.scheduledWriterExecutor.isShutdown()) { + writer.monitor.onFailedSend( + writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1)); + writer.apiPhaser.arrive(); // Allow flush to return return; } final List toSend = serializedTraces; @@ -333,20 +342,20 @@ public class DDAgentWriter implements Writer { final int sizeInBytes = payloadSize; try { - monitor.onFlush(DDAgentWriter.this, early); + writer.monitor.onFlush(writer, early); // Run the actual IO task on a different thread to avoid blocking the consumer. try { senderSemaphore.acquire(); } catch (final InterruptedException e) { - monitor.onFailedSend( - DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e)); + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); // Finally, we'll schedule another flush // Any threads awaiting the flush will continue to wait return; } - scheduledWriterExecutor.execute( + writer.scheduledWriterExecutor.execute( new Runnable() { @Override public void run() { @@ -354,13 +363,12 @@ public class DDAgentWriter implements Writer { try { final DDApi.Response response = - api.sendSerializedTraces(representativeCount, sizeInBytes, toSend); + writer.api.sendSerializedTraces(representativeCount, sizeInBytes, toSend); if (response.success()) { log.debug("Successfully sent {} traces to the API", toSend.size()); - monitor.onSend( - DDAgentWriter.this, representativeCount, sizeInBytes, response); + writer.monitor.onSend(writer, representativeCount, sizeInBytes, response); } else { log.debug( "Failed to send {} traces (representing {}) of size {} bytes to the API", @@ -368,8 +376,8 @@ public class DDAgentWriter implements Writer { representativeCount, sizeInBytes); - monitor.onFailedSend( - DDAgentWriter.this, representativeCount, sizeInBytes, response); + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, response); } } catch (final Throwable e) { log.debug("Failed to send traces to the API: {}", e.getMessage()); @@ -378,24 +386,21 @@ public class DDAgentWriter implements Writer { // shouldn't occur. // However, just to be safe to start, create a failed Response to handle any // spurious Throwable-s. - monitor.onFailedSend( - DDAgentWriter.this, - representativeCount, - sizeInBytes, - DDApi.Response.failed(e)); + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); } finally { - apiPhaser.arrive(); // Flush completed. + writer.apiPhaser.arrive(); // Flush completed. } } }); } catch (final RejectedExecutionException ex) { - monitor.onFailedSend( - DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(ex)); - apiPhaser.arrive(); // Allow flush to return + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex)); + writer.apiPhaser.arrive(); // Allow flush to return } } finally { payloadSize = 0; - scheduleFlush(); + writer.scheduleFlush(); } } } From 97ed587547c201f785a3e24f75ce8af5f4ae4943 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 13:01:14 -0800 Subject: [PATCH 4/7] Extract TraceConsumer from DDAgentWriter Unfortunately this required making some things public that were previously private or package visible. I expect this to be temporary. --- .../trace/common/writer/DDAgentWriter.java | 149 +---------------- .../datadog/trace/common/writer/DDApi.java | 13 +- .../common/writer/ddagent/TraceConsumer.java | 151 ++++++++++++++++++ .../trace/api/writer/DDAgentWriterTest.groovy | 3 +- 4 files changed, 164 insertions(+), 152 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 70851bd433..1010b1043f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -5,8 +5,6 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; import static java.util.concurrent.TimeUnit.SECONDS; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; @@ -14,14 +12,12 @@ import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; import datadog.trace.common.writer.ddagent.DisruptorEvent; import datadog.trace.common.writer.ddagent.Monitor; -import java.util.ArrayList; +import datadog.trace.common.writer.ddagent.TraceConsumer; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +35,6 @@ import lombok.extern.slf4j.Slf4j; public class DDAgentWriter implements Writer { private static final int DISRUPTOR_BUFFER_SIZE = 1024; private static final int SENDER_QUEUE_SIZE = 16; - private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second private static final DisruptorEvent.TraceTranslator TRANSLATOR = @@ -57,13 +52,13 @@ public class DDAgentWriter implements Writer { private final int flushFrequencySeconds; private final Disruptor>> disruptor; - private final ScheduledExecutorService scheduledWriterExecutor; + public final ScheduledExecutorService scheduledWriterExecutor; private final AtomicInteger traceCount = new AtomicInteger(0); private final AtomicReference> flushSchedule = new AtomicReference<>(); - private final Phaser apiPhaser; + public final Phaser apiPhaser; private volatile boolean running = false; - private final Monitor monitor; + public final Monitor monitor; public DDAgentWriter() { this( @@ -248,7 +243,7 @@ public class DDAgentWriter implements Writer { return str; } - private void scheduleFlush() { + public void scheduleFlush() { if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) { final ScheduledFuture previous = flushSchedule.getAndSet( @@ -270,138 +265,4 @@ public class DDAgentWriter implements Writer { disruptor.publishEvent(FLUSH_TRANSLATOR); } } - - /** This class is intentionally not threadsafe. */ - private static class TraceConsumer implements EventHandler>> { - private final AtomicInteger traceCount; - private final Semaphore senderSemaphore; - private final DDAgentWriter writer; - - private List serializedTraces = new ArrayList<>(); - private int payloadSize = 0; - - private TraceConsumer( - final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) { - this.traceCount = traceCount; - senderSemaphore = new Semaphore(senderQueueSize); - this.writer = writer; - } - - @Override - public void onEvent( - final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { - final List trace = event.data; - event.data = null; // clear the event for reuse. - if (trace != null) { - traceCount.incrementAndGet(); - try { - final byte[] serializedTrace = writer.api.serializeTrace(trace); - payloadSize += serializedTrace.length; - serializedTraces.add(serializedTrace); - - writer.monitor.onSerialize(writer, trace, serializedTrace); - } catch (final JsonProcessingException e) { - log.warn("Error serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } catch (final Throwable e) { - log.debug("Error while serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } - } - - if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) { - final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES); - - reportTraces(early); - event.shouldFlush = false; - } - } - - private void reportTraces(final boolean early) { - try { - if (serializedTraces.isEmpty()) { - writer.monitor.onFlush(writer, early); - - writer.apiPhaser.arrive(); // Allow flush to return - return; - // scheduleFlush called in finally block. - } - if (writer.scheduledWriterExecutor.isShutdown()) { - writer.monitor.onFailedSend( - writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1)); - writer.apiPhaser.arrive(); // Allow flush to return - return; - } - final List toSend = serializedTraces; - serializedTraces = new ArrayList<>(toSend.size()); - // ^ Initialize with similar size to reduce arraycopy churn. - - final int representativeCount = traceCount.getAndSet(0); - final int sizeInBytes = payloadSize; - - try { - writer.monitor.onFlush(writer, early); - - // Run the actual IO task on a different thread to avoid blocking the consumer. - try { - senderSemaphore.acquire(); - } catch (final InterruptedException e) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); - - // Finally, we'll schedule another flush - // Any threads awaiting the flush will continue to wait - return; - } - writer.scheduledWriterExecutor.execute( - new Runnable() { - @Override - public void run() { - senderSemaphore.release(); - - try { - final DDApi.Response response = - writer.api.sendSerializedTraces(representativeCount, sizeInBytes, toSend); - - if (response.success()) { - log.debug("Successfully sent {} traces to the API", toSend.size()); - - writer.monitor.onSend(writer, representativeCount, sizeInBytes, response); - } else { - log.debug( - "Failed to send {} traces (representing {}) of size {} bytes to the API", - toSend.size(), - representativeCount, - sizeInBytes); - - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, response); - } - } catch (final Throwable e) { - log.debug("Failed to send traces to the API: {}", e.getMessage()); - - // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really - // shouldn't occur. - // However, just to be safe to start, create a failed Response to handle any - // spurious Throwable-s. - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); - } finally { - writer.apiPhaser.arrive(); // Flush completed. - } - } - }); - } catch (final RejectedExecutionException ex) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex)); - writer.apiPhaser.arrive(); // Allow flush to return - } - } finally { - payloadSize = 0; - writer.scheduleFlush(); - } - } - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java index 164089f3c5..2e42f3194f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java @@ -21,7 +21,6 @@ import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; -import okhttp3.Response; import okio.BufferedSink; import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePacker; @@ -108,11 +107,11 @@ public class DDApi { return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces); } - byte[] serializeTrace(final List trace) throws JsonProcessingException { + public byte[] serializeTrace(final List trace) throws JsonProcessingException { return OBJECT_MAPPER.writeValueAsBytes(trace); } - Response sendSerializedTraces( + public Response sendSerializedTraces( final int representativeCount, final Integer sizeInBytes, final List traces) { try { final RequestBody body = @@ -348,21 +347,21 @@ public class DDApi { } public final boolean success() { - return this.success; + return success; } // TODO: DQH - In Java 8, switch to OptionalInteger public final Integer status() { - return this.status; + return status; } public final JsonNode json() { - return this.json; + return json; } // TODO: DQH - In Java 8, switch to Optional? public final Throwable exception() { - return this.exception; + return exception; } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java new file mode 100644 index 0000000000..fb63d1f93b --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java @@ -0,0 +1,151 @@ +package datadog.trace.common.writer.ddagent; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lmax.disruptor.EventHandler; +import datadog.opentracing.DDSpan; +import datadog.trace.common.writer.DDAgentWriter; +import datadog.trace.common.writer.DDApi; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; + +/** This class is intentionally not threadsafe. */ +@Slf4j +public class TraceConsumer implements EventHandler>> { + private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + + private final AtomicInteger traceCount; + private final Semaphore senderSemaphore; + private final DDAgentWriter writer; + + private List serializedTraces = new ArrayList<>(); + private int payloadSize = 0; + + public TraceConsumer( + final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) { + this.traceCount = traceCount; + senderSemaphore = new Semaphore(senderQueueSize); + this.writer = writer; + } + + @Override + public void onEvent( + final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { + final List trace = event.data; + event.data = null; // clear the event for reuse. + if (trace != null) { + traceCount.incrementAndGet(); + try { + final byte[] serializedTrace = writer.getApi().serializeTrace(trace); + payloadSize += serializedTrace.length; + serializedTraces.add(serializedTrace); + + writer.monitor.onSerialize(writer, trace, serializedTrace); + } catch (final JsonProcessingException e) { + log.warn("Error serializing trace", e); + + writer.monitor.onFailedSerialize(writer, trace, e); + } catch (final Throwable e) { + log.debug("Error while serializing trace", e); + + writer.monitor.onFailedSerialize(writer, trace, e); + } + } + + if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) { + final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES); + + reportTraces(early); + event.shouldFlush = false; + } + } + + private void reportTraces(final boolean early) { + try { + if (serializedTraces.isEmpty()) { + writer.monitor.onFlush(writer, early); + + writer.apiPhaser.arrive(); // Allow flush to return + return; + // scheduleFlush called in finally block. + } + if (writer.scheduledWriterExecutor.isShutdown()) { + writer.monitor.onFailedSend( + writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1)); + writer.apiPhaser.arrive(); // Allow flush to return + return; + } + final List toSend = serializedTraces; + serializedTraces = new ArrayList<>(toSend.size()); + // ^ Initialize with similar size to reduce arraycopy churn. + + final int representativeCount = traceCount.getAndSet(0); + final int sizeInBytes = payloadSize; + + try { + writer.monitor.onFlush(writer, early); + + // Run the actual IO task on a different thread to avoid blocking the consumer. + try { + senderSemaphore.acquire(); + } catch (final InterruptedException e) { + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); + + // Finally, we'll schedule another flush + // Any threads awaiting the flush will continue to wait + return; + } + writer.scheduledWriterExecutor.execute( + new Runnable() { + @Override + public void run() { + senderSemaphore.release(); + + try { + final DDApi.Response response = + writer + .getApi() + .sendSerializedTraces(representativeCount, sizeInBytes, toSend); + + if (response.success()) { + log.debug("Successfully sent {} traces to the API", toSend.size()); + + writer.monitor.onSend(writer, representativeCount, sizeInBytes, response); + } else { + log.debug( + "Failed to send {} traces (representing {}) of size {} bytes to the API", + toSend.size(), + representativeCount, + sizeInBytes); + + writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response); + } + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); + + // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really + // shouldn't occur. + // However, just to be safe to start, create a failed Response to handle any + // spurious Throwable-s. + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); + } finally { + writer.apiPhaser.arrive(); // Flush completed. + } + } + }); + } catch (final RejectedExecutionException ex) { + writer.monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex)); + writer.apiPhaser.arrive(); // Allow flush to return + } + } finally { + payloadSize = 0; + writer.scheduleFlush(); + } + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index f61ffb6641..07835abe5d 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -9,6 +9,7 @@ import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.DDApi import datadog.trace.common.writer.ddagent.Monitor +import datadog.trace.common.writer.ddagent.TraceConsumer import datadog.trace.util.test.DDSpecification import spock.lang.Timeout @@ -180,7 +181,7 @@ class DDAgentWriterTest extends DDSpecification { minimalSpan = new DDSpan(0, minimalContext) minimalTrace = [minimalSpan] traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length - maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 + maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } def "check that are no interactions after close"() { From 84f9d80258e457624f126e16a30b0716007167e8 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 13:36:16 -0800 Subject: [PATCH 5/7] Extract TraceSerializingDisruptor class from DDAgentWriter --- .../trace/common/writer/DDAgentWriter.java | 95 ++------------ .../common/writer/ddagent/TraceConsumer.java | 2 +- .../ddagent/TraceSerializingDisruptor.java | 119 ++++++++++++++++++ .../trace/api/writer/DDAgentWriterTest.groovy | 28 ++--- 4 files changed, 147 insertions(+), 97 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 1010b1043f..a822c540ae 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -5,23 +5,17 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; import static java.util.concurrent.TimeUnit.SECONDS; -import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; -import datadog.trace.common.writer.ddagent.DisruptorEvent; import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.TraceConsumer; +import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; /** @@ -37,26 +31,16 @@ public class DDAgentWriter implements Writer { private static final int SENDER_QUEUE_SIZE = 16; private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second - private static final DisruptorEvent.TraceTranslator TRANSLATOR = - new DisruptorEvent.TraceTranslator(); - private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = - new DisruptorEvent.FlushTranslator(); - - private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = - new DaemonThreadFactory("dd-trace-disruptor"); private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-writer"); - private final Runnable flushTask = new FlushTask(); private final DDApi api; - private final int flushFrequencySeconds; - private final Disruptor>> disruptor; + public final int flushFrequencySeconds; + public final TraceSerializingDisruptor disruptor; public final ScheduledExecutorService scheduledWriterExecutor; private final AtomicInteger traceCount = new AtomicInteger(0); - private final AtomicReference> flushSchedule = new AtomicReference<>(); - public final Phaser apiPhaser; - private volatile boolean running = false; + public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing; public final Monitor monitor; @@ -114,24 +98,18 @@ public class DDAgentWriter implements Writer { this.monitor = monitor; disruptor = - new Disruptor<>( - new DisruptorEvent.Factory>(), - Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 - DISRUPTOR_THREAD_FACTORY, - ProducerType.MULTI, - new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); - disruptor.handleEventsWith(new TraceConsumer(traceCount, senderQueueSize, this)); + new TraceSerializingDisruptor( + disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this)); this.flushFrequencySeconds = flushFrequencySeconds; scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); - apiPhaser = new Phaser(); // Ensure API calls are completed when flushing apiPhaser.register(); // Register on behalf of the scheduled executor thread. } // Exposing some statistics for consumption by monitors public final long getDisruptorCapacity() { - return disruptor.getRingBuffer().getBufferSize(); + return disruptor.getDisruptorCapacity(); } public final long getDisruptorUtilizedCapacity() { @@ -139,14 +117,14 @@ public class DDAgentWriter implements Writer { } public final long getDisruptorRemainingCapacity() { - return disruptor.getRingBuffer().remainingCapacity(); + return disruptor.getDisruptorRemainingCapacity(); } @Override public void write(final List trace) { // We can't add events after shutdown otherwise it will never complete shutting down. - if (running) { - final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace); + if (disruptor.running) { + final boolean published = disruptor.tryPublish(trace); if (published) { monitor.onPublish(DDAgentWriter.this, trace); @@ -176,15 +154,12 @@ public class DDAgentWriter implements Writer { @Override public void start() { disruptor.start(); - running = true; - scheduleFlush(); monitor.onStart(this); } @Override public void close() { - running = false; boolean flushSuccess = true; @@ -199,34 +174,13 @@ public class DDAgentWriter implements Writer { flushSuccess = false; } - flushSuccess |= flush(); - disruptor.shutdown(); + flushSuccess |= disruptor.flush(); + + disruptor.close(); monitor.onShutdown(this, flushSuccess); } - /** This method will block until the flush is complete. */ - public boolean flush() { - if (running) { - log.info("Flushing any remaining traces."); - // Register with the phaser so we can block until the flush completion. - apiPhaser.register(); - disruptor.publishEvent(FLUSH_TRANSLATOR); - try { - // Allow thread to be interrupted. - apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister()); - - return true; - } catch (final InterruptedException e) { - log.warn("Waiting for flush interrupted.", e); - - return false; - } - } else { - return false; - } - } - @Override public String toString() { // DQH - I don't particularly like the instanceof check, @@ -242,27 +196,4 @@ public class DDAgentWriter implements Writer { return str; } - - public void scheduleFlush() { - if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) { - final ScheduledFuture previous = - flushSchedule.getAndSet( - scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS)); - - final boolean previousIncomplete = (previous != null); - if (previousIncomplete) { - previous.cancel(true); - } - - monitor.onScheduleFlush(this, previousIncomplete); - } - } - - private class FlushTask implements Runnable { - @Override - public void run() { - // Don't call flush() because it would block the thread also used for sending the traces. - disruptor.publishEvent(FLUSH_TRANSLATOR); - } - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java index fb63d1f93b..921961eb02 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java @@ -145,7 +145,7 @@ public class TraceConsumer implements EventHandler>> } } finally { payloadSize = 0; - writer.scheduleFlush(); + writer.disruptor.scheduleFlush(); } } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java new file mode 100644 index 0000000000..6404f66891 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java @@ -0,0 +1,119 @@ +package datadog.trace.common.writer.ddagent; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import datadog.opentracing.DDSpan; +import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.DDAgentWriter; +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TraceSerializingDisruptor implements Closeable { + private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = + new DaemonThreadFactory("dd-trace-disruptor"); + private static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR = + new DisruptorEvent.TraceTranslator(); + private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = + new DisruptorEvent.FlushTranslator(); + private final FlushTask flushTask = new FlushTask(); + + private final Disruptor>> disruptor; + private final DDAgentWriter writer; + + public volatile boolean running = false; + + private final AtomicReference> flushSchedule = new AtomicReference<>(); + + public TraceSerializingDisruptor( + final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) { + disruptor = + new Disruptor<>( + new DisruptorEvent.Factory>(), + Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 + DISRUPTOR_THREAD_FACTORY, + ProducerType.MULTI, + new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); + this.writer = writer; + disruptor.handleEventsWith(handler); + } + + public void start() { + disruptor.start(); + running = true; + scheduleFlush(); + } + + @Override + public void close() { + running = false; + disruptor.shutdown(); + } + + public boolean tryPublish(final List trace) { + return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace); + } + + /** This method will block until the flush is complete. */ + public boolean flush() { + if (running) { + log.info("Flushing any remaining traces."); + // Register with the phaser so we can block until the flush completion. + writer.apiPhaser.register(); + disruptor.publishEvent(FLUSH_TRANSLATOR); + try { + // Allow thread to be interrupted. + writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister()); + + return true; + } catch (final InterruptedException e) { + log.warn("Waiting for flush interrupted.", e); + + return false; + } + } else { + return false; + } + } + + public void scheduleFlush() { + if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) { + final ScheduledFuture previous = + flushSchedule.getAndSet( + writer.scheduledWriterExecutor.schedule( + flushTask, writer.flushFrequencySeconds, SECONDS)); + + final boolean previousIncomplete = (previous != null); + if (previousIncomplete) { + previous.cancel(true); + } + + writer.monitor.onScheduleFlush(writer, previousIncomplete); + } + } + + private class FlushTask implements Runnable { + @Override + public void run() { + // Don't call flush() because it would block the thread also used for sending the traces. + disruptor.publishEvent(FLUSH_TRANSLATOR); + } + } + + // Exposing some statistics for consumption by monitors + public final long getDisruptorCapacity() { + return disruptor.getRingBuffer().getBufferSize(); + } + + public final long getDisruptorRemainingCapacity() { + return disruptor.getRingBuffer().remainingCapacity(); + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 07835abe5d..c4b7c043be 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -34,7 +34,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(trace) writer.write(trace) - writer.flush() + writer.disruptor.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } @@ -57,7 +57,7 @@ class DDAgentWriterTest extends DDSpecification { (1..traceCount).each { writer.write(trace) } - writer.flush() + writer.disruptor.flush() then: _ * api.serializeTrace(_) >> { trace -> callRealMethod() } @@ -97,7 +97,7 @@ class DDAgentWriterTest extends DDSpecification { writer.write(trace) } // Flush the remaining 2 - writer.flush() + writer.disruptor.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } @@ -118,7 +118,7 @@ class DDAgentWriterTest extends DDSpecification { def phaser = writer.apiPhaser phaser.register() writer.start() - writer.flush() + writer.disruptor.flush() when: (1..5).each { @@ -153,7 +153,7 @@ class DDAgentWriterTest extends DDSpecification { // Busywait because we don't want to fill up the ring buffer } } - writer.flush() + writer.disruptor.flush() then: (maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() } @@ -193,7 +193,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.close() writer.write([]) - writer.flush() + writer.disruptor.flush() then: 0 * _ @@ -208,7 +208,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write([]) - writer.flush() + writer.disruptor.flush() then: 1 * api.serializeTrace(_) >> { trace -> callRealMethod() } @@ -265,7 +265,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.flush() + writer.disruptor.flush() then: 1 * monitor.onPublish(writer, minimalTrace) @@ -314,7 +314,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.flush() + writer.disruptor.flush() then: 1 * monitor.onPublish(writer, minimalTrace) @@ -356,7 +356,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.flush() + writer.disruptor.flush() then: 1 * monitor.onPublish(writer, minimalTrace) @@ -438,7 +438,7 @@ class DDAgentWriterTest extends DDSpecification { // sanity check coordination mechanism of test // release to allow response to be generated responseSemaphore.release() - writer.flush() + writer.disruptor.flush() // reacquire semaphore to stall further responses responseSemaphore.acquire() @@ -538,7 +538,7 @@ class DDAgentWriterTest extends DDSpecification { t1.join() t2.join() - writer.flush() + writer.disruptor.flush() then: def totalTraces = 100 + 100 @@ -585,7 +585,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.flush() + writer.disruptor.flush() then: numTracesAccepted == 1 @@ -633,7 +633,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.flush() + writer.disruptor.flush() then: numRequests == 1 From 24e2fe6da707c8fe2cc2e767a39fb6ae7f82ef48 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 14:36:45 -0800 Subject: [PATCH 6/7] Move DDApi and rename to DDAgentApi Reduce visibility back to what it was before refactoring. --- .../java/datadog/opentracing/DDTracer.java | 8 +++--- .../common/sampling/RateByServiceSampler.java | 2 +- .../trace/common/writer/DDAgentWriter.java | 21 ++++++++------ .../datadog/trace/common/writer/Writer.java | 5 ++-- .../{DDApi.java => ddagent/DDAgentApi.java} | 14 +++++----- .../common/writer/ddagent/DisruptorEvent.java | 14 +++++++--- .../trace/common/writer/ddagent/Monitor.java | 15 +++++----- .../common/writer/ddagent/TraceConsumer.java | 11 ++++---- .../ddagent/TraceSerializingDisruptor.java | 6 ++-- ...DDApiTest.groovy => DDAgentApiTest.groovy} | 22 +++++++-------- .../trace/api/writer/DDAgentWriterTest.groovy | 28 +++++++++---------- .../groovy/DDApiIntegrationTest.groovy | 12 ++++---- 12 files changed, 82 insertions(+), 76 deletions(-) rename dd-trace-ot/src/main/java/datadog/trace/common/writer/{DDApi.java => ddagent/DDAgentApi.java} (97%) rename dd-trace-ot/src/test/groovy/datadog/trace/api/writer/{DDApiTest.groovy => DDAgentApiTest.groovy} (90%) diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index 85908feb28..ad5bed5270 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -14,8 +14,8 @@ import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.common.sampling.PrioritySampler; import datadog.trace.common.sampling.Sampler; import datadog.trace.common.writer.DDAgentWriter; -import datadog.trace.common.writer.DDApi; import datadog.trace.common.writer.Writer; +import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.context.ScopeListener; import io.opentracing.References; import io.opentracing.Scope; @@ -246,9 +246,9 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders); if (this.writer instanceof DDAgentWriter) { - final DDApi api = ((DDAgentWriter) this.writer).getApi(); - if (sampler instanceof DDApi.ResponseListener) { - api.addResponseListener((DDApi.ResponseListener) this.sampler); + final DDAgentApi api = ((DDAgentWriter) this.writer).getApi(); + if (sampler instanceof DDAgentApi.ResponseListener) { + api.addResponseListener((DDAgentApi.ResponseListener) this.sampler); } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java index c5b8242d76..d96c2f3592 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NumericNode; import datadog.opentracing.DDSpan; import datadog.trace.api.sampling.PrioritySampling; -import datadog.trace.common.writer.DDApi.ResponseListener; +import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener; import java.util.HashMap; import java.util.Iterator; import java.util.Map; diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index a822c540ae..76ea6b8df9 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -7,6 +7,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.TraceConsumer; import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; @@ -34,7 +35,7 @@ public class DDAgentWriter implements Writer { private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-writer"); - private final DDApi api; + private final DDAgentApi api; public final int flushFrequencySeconds; public final TraceSerializingDisruptor disruptor; @@ -46,16 +47,17 @@ public class DDAgentWriter implements Writer { public DDAgentWriter() { this( - new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), + new DDAgentApi( + DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), new Monitor.Noop()); } - public DDAgentWriter(final DDApi api, final Monitor monitor) { + public DDAgentWriter(final DDAgentApi api, final Monitor monitor) { this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY); } /** Old signature (pre-Monitor) used in tests */ - private DDAgentWriter(final DDApi api) { + private DDAgentWriter(final DDAgentApi api) { this(api, new Monitor.Noop()); } @@ -67,7 +69,7 @@ public class DDAgentWriter implements Writer { * @param flushFrequencySeconds value < 1 disables scheduled flushes */ private DDAgentWriter( - final DDApi api, + final DDAgentApi api, final int disruptorSize, final int senderQueueSize, final int flushFrequencySeconds) { @@ -76,7 +78,7 @@ public class DDAgentWriter implements Writer { // DQH - TODO - Update the tests & remove this private DDAgentWriter( - final DDApi api, + final DDAgentApi api, final Monitor monitor, final int disruptorSize, final int flushFrequencySeconds) { @@ -84,12 +86,13 @@ public class DDAgentWriter implements Writer { } // DQH - TODO - Update the tests & remove this - private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) { + private DDAgentWriter( + final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); } private DDAgentWriter( - final DDApi api, + final DDAgentApi api, final Monitor monitor, final int disruptorSize, final int senderQueueSize, @@ -147,7 +150,7 @@ public class DDAgentWriter implements Writer { traceCount.incrementAndGet(); } - public DDApi getApi() { + public DDAgentApi getApi() { return api; } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index 669bcfd9fc..b14cbfdd23 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -2,6 +2,7 @@ package datadog.trace.common.writer; import datadog.opentracing.DDSpan; import datadog.trace.api.Config; +import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.common.writer.ddagent.Monitor; import java.io.Closeable; import java.util.List; @@ -66,8 +67,8 @@ public interface Writer extends Closeable { return new DDAgentWriter(createApi(config), createMonitor(config)); } - private static DDApi createApi(final Config config) { - return new DDApi( + private static DDAgentApi createApi(final Config config) { + return new DDAgentApi( config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket()); } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java similarity index 97% rename from dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java rename to dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index 2e42f3194f..cc01adc62c 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -1,4 +1,4 @@ -package datadog.trace.common.writer; +package datadog.trace.common.writer.ddagent; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -28,7 +28,7 @@ import org.msgpack.jackson.dataformat.MessagePackFactory; /** The API pointing to a DD agent */ @Slf4j -public class DDApi { +public class DDAgentApi { private static final String DATADOG_META_LANG = "Datadog-Meta-Lang"; private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version"; private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter"; @@ -53,7 +53,7 @@ public class DDApi { private final OkHttpClient httpClient; private final HttpUrl tracesUrl; - public DDApi(final String host, final int port, final String unixDomainSocketPath) { + public DDAgentApi(final String host, final int port, final String unixDomainSocketPath) { this( host, port, @@ -61,7 +61,7 @@ public class DDApi { unixDomainSocketPath); } - DDApi( + DDAgentApi( final String host, final int port, final boolean v4EndpointsAvailable, @@ -89,7 +89,7 @@ public class DDApi { * @return a Response object -- encapsulating success of communication, sending, and result * parsing */ - public Response sendTraces(final List> traces) { + Response sendTraces(final List> traces) { final List serializedTraces = new ArrayList<>(traces.size()); int sizeInBytes = 0; for (final List trace : traces) { @@ -107,11 +107,11 @@ public class DDApi { return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces); } - public byte[] serializeTrace(final List trace) throws JsonProcessingException { + byte[] serializeTrace(final List trace) throws JsonProcessingException { return OBJECT_MAPPER.writeValueAsBytes(trace); } - public Response sendSerializedTraces( + Response sendSerializedTraces( final int representativeCount, final Integer sizeInBytes, final List traces) { try { final RequestBody body = diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java index 30db86f21a..66ff7a499b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -6,19 +6,22 @@ import com.lmax.disruptor.EventTranslatorOneArg; import datadog.opentracing.DDSpan; import java.util.List; -public class DisruptorEvent { +class DisruptorEvent { public volatile boolean shouldFlush = false; public volatile T data = null; - public static class Factory implements EventFactory> { + static class Factory implements EventFactory> { @Override public DisruptorEvent newInstance() { return new DisruptorEvent<>(); } } - public static class TraceTranslator + static class TraceTranslator implements EventTranslatorOneArg>, List> { + static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR = + new DisruptorEvent.TraceTranslator(); + @Override public void translateTo( final DisruptorEvent> event, final long sequence, final List trace) { @@ -26,7 +29,10 @@ public class DisruptorEvent { } } - public static class FlushTranslator implements EventTranslator>> { + static class FlushTranslator implements EventTranslator>> { + static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = + new DisruptorEvent.FlushTranslator(); + @Override public void translateTo(final DisruptorEvent> event, final long sequence) { event.shouldFlush = true; diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java index 12afd9906c..298fb94b93 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/Monitor.java @@ -5,7 +5,6 @@ import com.timgroup.statsd.StatsDClient; import datadog.opentracing.DDSpan; import datadog.opentracing.DDTraceOTInfo; import datadog.trace.common.writer.DDAgentWriter; -import datadog.trace.common.writer.DDApi; import java.util.List; /** @@ -43,13 +42,13 @@ public interface Monitor { final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response); + final DDAgentApi.Response response); void onFailedSend( final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response); + final DDAgentApi.Response response); final class StatsD implements Monitor { public static final String PREFIX = "datadog.tracer"; @@ -138,7 +137,7 @@ public interface Monitor { final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response) { + final DDAgentApi.Response response) { onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); } @@ -147,7 +146,7 @@ public interface Monitor { final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response) { + final DDAgentApi.Response response) { onSendAttempt(agentWriter, representativeCount, sizeInBytes, response); } @@ -155,7 +154,7 @@ public interface Monitor { final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response) { + final DDAgentApi.Response response) { statsd.incrementCounter("api.requests"); statsd.recordGaugeValue("queue.length", representativeCount); // TODO: missing queue.spans (# of spans being sent) @@ -215,14 +214,14 @@ public interface Monitor { final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response) {} + final DDAgentApi.Response response) {} @Override public void onFailedSend( final DDAgentWriter agentWriter, final int representativeCount, final int sizeInBytes, - final DDApi.Response response) {} + final DDAgentApi.Response response) {} @Override public String toString() { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java index 921961eb02..1081f4352e 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.lmax.disruptor.EventHandler; import datadog.opentracing.DDSpan; import datadog.trace.common.writer.DDAgentWriter; -import datadog.trace.common.writer.DDApi; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RejectedExecutionException; @@ -74,7 +73,7 @@ public class TraceConsumer implements EventHandler>> } if (writer.scheduledWriterExecutor.isShutdown()) { writer.monitor.onFailedSend( - writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1)); + writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1)); writer.apiPhaser.arrive(); // Allow flush to return return; } @@ -93,7 +92,7 @@ public class TraceConsumer implements EventHandler>> senderSemaphore.acquire(); } catch (final InterruptedException e) { writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); + writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); // Finally, we'll schedule another flush // Any threads awaiting the flush will continue to wait @@ -106,7 +105,7 @@ public class TraceConsumer implements EventHandler>> senderSemaphore.release(); try { - final DDApi.Response response = + final DDAgentApi.Response response = writer .getApi() .sendSerializedTraces(representativeCount, sizeInBytes, toSend); @@ -132,7 +131,7 @@ public class TraceConsumer implements EventHandler>> // However, just to be safe to start, create a failed Response to handle any // spurious Throwable-s. writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(e)); + writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); } finally { writer.apiPhaser.arrive(); // Flush completed. } @@ -140,7 +139,7 @@ public class TraceConsumer implements EventHandler>> }); } catch (final RejectedExecutionException ex) { writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex)); + writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex)); writer.apiPhaser.arrive(); // Allow flush to return } } finally { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java index 6404f66891..f878cbb178 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java @@ -1,5 +1,7 @@ package datadog.trace.common.writer.ddagent; +import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR; +import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR; import static java.util.concurrent.TimeUnit.SECONDS; import com.lmax.disruptor.SleepingWaitStrategy; @@ -20,10 +22,6 @@ import lombok.extern.slf4j.Slf4j; public class TraceSerializingDisruptor implements Closeable { private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = new DaemonThreadFactory("dd-trace-disruptor"); - private static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR = - new DisruptorEvent.TraceTranslator(); - private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = - new DisruptorEvent.FlushTranslator(); private final FlushTask flushTask = new FlushTask(); private final Disruptor>> disruptor; diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy similarity index 90% rename from dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy rename to dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy index 17c8388cd6..07bd326685 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy @@ -3,8 +3,8 @@ package datadog.trace.api.writer import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.JsonNode import datadog.opentracing.SpanFactory -import datadog.trace.common.writer.DDApi -import datadog.trace.common.writer.DDApi.ResponseListener +import datadog.trace.common.writer.ddagent.DDAgentApi +import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener import datadog.trace.util.test.DDSpecification import java.util.concurrent.atomic.AtomicLong @@ -12,8 +12,8 @@ import java.util.concurrent.atomic.AtomicReference import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer -class DDApiTest extends DDSpecification { - static mapper = DDApi.OBJECT_MAPPER +class DDAgentApiTest extends DDSpecification { + static mapper = DDAgentApi.OBJECT_MAPPER def "sending an empty list of traces returns no errors"() { setup: @@ -30,7 +30,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", agent.address.port, null) + def client = new DDAgentApi("localhost", agent.address.port, null) expect: client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces" @@ -51,7 +51,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", agent.address.port, null) + def client = new DDAgentApi("localhost", agent.address.port, null) expect: client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces" @@ -73,7 +73,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", agent.address.port, null) + def client = new DDAgentApi("localhost", agent.address.port, null) expect: client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces" @@ -136,7 +136,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", agent.address.port, null) + def client = new DDAgentApi("localhost", agent.address.port, null) client.addResponseListener(responseListener) when: @@ -162,7 +162,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", v3Agent.address.port, null) + def client = new DDAgentApi("localhost", v3Agent.address.port, null) expect: client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces" @@ -189,7 +189,7 @@ class DDApiTest extends DDSpecification { } } def port = badPort ? 999 : agent.address.port - def client = new DDApi("localhost", port, null) + def client = new DDAgentApi("localhost", port, null) expect: client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces" @@ -216,7 +216,7 @@ class DDApiTest extends DDSpecification { } } } - def client = new DDApi("localhost", agent.address.port, null) + def client = new DDAgentApi("localhost", agent.address.port, null) when: def success = client.sendTraces(traces).success() diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index c4b7c043be..37de61bf23 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -7,7 +7,7 @@ import datadog.opentracing.DDTracer import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter -import datadog.trace.common.writer.DDApi +import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.common.writer.ddagent.TraceConsumer import datadog.trace.util.test.DDSpecification @@ -24,7 +24,7 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE @Timeout(20) class DDAgentWriterTest extends DDSpecification { - def api = Mock(DDApi) + def api = Mock(DDAgentApi) def "test happy path"() { setup: @@ -180,7 +180,7 @@ class DDAgentWriterTest extends DDSpecification { Mock(DDTracer)) minimalSpan = new DDSpan(0, minimalContext) minimalTrace = [minimalSpan] - traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length + traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } @@ -253,7 +253,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDApi("localhost", agent.address.port, null) + def api = new DDAgentApi("localhost", agent.address.port, null) def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) @@ -302,7 +302,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDApi("localhost", agent.address.port, null) + def api = new DDAgentApi("localhost", agent.address.port, null) def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) @@ -336,13 +336,13 @@ class DDAgentWriterTest extends DDSpecification { setup: def minimalTrace = createMinimalTrace() - def api = new DDApi("localhost", 8192, null) { - DDApi.Response sendSerializedTraces( + def api = new DDAgentApi("localhost", 8192, null) { + DDAgentApi.Response sendSerializedTraces( int representativeCount, Integer sizeInBytes, List traces) { // simulating a communication failure to a server - return DDApi.Response.failed(new IOException("comm error")) + return DDAgentApi.Response.failed(new IOException("comm error")) } } def monitor = Mock(Monitor) @@ -400,7 +400,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDApi("localhost", agent.address.port, null) + def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback def monitor = Stub(Monitor) @@ -505,7 +505,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDApi("localhost", agent.address.port, null) + def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback def monitor = Stub(Monitor) @@ -566,7 +566,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDApi("localhost", agent.address.port, null) + def api = new DDAgentApi("localhost", agent.address.port, null) def statsd = Stub(StatsDClient) statsd.incrementCounter("queue.accepted") >> { stat -> @@ -606,13 +606,13 @@ class DDAgentWriterTest extends DDSpecification { def minimalTrace = createMinimalTrace() // DQH -- need to set-up a dummy agent for the final send callback to work - def api = new DDApi("localhost", 8192, null) { - DDApi.Response sendSerializedTraces( + def api = new DDAgentApi("localhost", 8192, null) { + DDAgentApi.Response sendSerializedTraces( int representativeCount, Integer sizeInBytes, List traces) { // simulating a communication failure to a server - return DDApi.Response.failed(new IOException("comm error")) + return DDAgentApi.Response.failed(new IOException("comm error")) } } diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy index ef45512c5d..1d5ee8297f 100644 --- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy +++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy @@ -4,8 +4,8 @@ import datadog.opentracing.DDSpanContext import datadog.opentracing.DDTracer import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling -import datadog.trace.common.writer.DDApi import datadog.trace.common.writer.ListWriter +import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.util.test.DDSpecification import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy @@ -20,7 +20,7 @@ class DDApiIntegrationTest { // Do not run tests locally on Java7 since testcontainers are not compatible with Java7 // It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers @Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) - static class DDApiIntegrationV4Test extends DDSpecification { + static class DDAgentApiIntegrationV4Test extends DDSpecification { static final WRITER = new ListWriter() static final TRACER = new DDTracer(WRITER) static final CONTEXT = new DDSpanContext( @@ -64,7 +64,7 @@ class DDApiIntegrationTest { def endpoint = new AtomicReference(null) def agentResponse = new AtomicReference(null) - DDApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> + DDAgentApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> endpoint.set(receivedEndpoint) agentResponse.set(responseJson.toString()) } @@ -109,10 +109,10 @@ class DDApiIntegrationTest { } def setup() { - api = new DDApi(agentContainerHost, agentContainerPort, v4(), null) + api = new DDAgentApi(agentContainerHost, agentContainerPort, v4(), null) api.addResponseListener(responseListener) - unixDomainSocketApi = new DDApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString()) + unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString()) unixDomainSocketApi.addResponseListener(responseListener) } @@ -159,7 +159,7 @@ class DDApiIntegrationTest { } @Requires({ "true" == System.getenv("CI") || jvm.java8Compatible }) - static class DDApiIntegrationV3Test extends DDApiIntegrationV4Test { + static class DDAgentApiIntegrationV3Test extends DDAgentApiIntegrationV4Test { boolean v4() { return false } From 0a89f2a57c6429bc0fa8b317793e9c938ae126ff Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Fri, 20 Dec 2019 14:49:31 -0800 Subject: [PATCH 7/7] Extract DDAgentResponseListener from DDApi. Reduce references to DDApi --- .../src/main/java/datadog/opentracing/DDTracer.java | 9 +++------ .../trace/common/sampling/RateByServiceSampler.java | 4 ++-- .../datadog/trace/common/writer/DDAgentWriter.java | 5 +++++ .../trace/common/writer/ddagent/DDAgentApi.java | 11 +++-------- .../writer/ddagent/DDAgentResponseListener.java | 8 ++++++++ .../datadog/trace/api/writer/DDAgentApiTest.groovy | 4 ++-- .../traceAgentTest/groovy/DDApiIntegrationTest.groovy | 3 ++- 7 files changed, 25 insertions(+), 19 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index ad5bed5270..ae5a0c4301 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -15,7 +15,7 @@ import datadog.trace.common.sampling.PrioritySampler; import datadog.trace.common.sampling.Sampler; import datadog.trace.common.writer.DDAgentWriter; import datadog.trace.common.writer.Writer; -import datadog.trace.common.writer.ddagent.DDAgentApi; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.context.ScopeListener; import io.opentracing.References; import io.opentracing.Scope; @@ -245,11 +245,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace injector = HttpCodec.createInjector(Config.get()); extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders); - if (this.writer instanceof DDAgentWriter) { - final DDAgentApi api = ((DDAgentWriter) this.writer).getApi(); - if (sampler instanceof DDAgentApi.ResponseListener) { - api.addResponseListener((DDAgentApi.ResponseListener) this.sampler); - } + if (this.writer instanceof DDAgentWriter && sampler instanceof DDAgentResponseListener) { + ((DDAgentWriter) this.writer).addResponseListener((DDAgentResponseListener) this.sampler); } log.info("New instance: {}", this); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java index d96c2f3592..b8191b1a8f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java @@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.NumericNode; import datadog.opentracing.DDSpan; import datadog.trace.api.sampling.PrioritySampling; -import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; *

The configuration of (serviceName,env)->rate is configured by the core agent. */ @Slf4j -public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener { +public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentResponseListener { public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr"; /** Key for setting the default/baseline rate */ diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 76ea6b8df9..c63fe9ff8f 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -8,6 +8,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; import datadog.trace.common.writer.ddagent.DDAgentApi; +import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.TraceConsumer; import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; @@ -110,6 +111,10 @@ public class DDAgentWriter implements Writer { apiPhaser.register(); // Register on behalf of the scheduled executor thread. } + public void addResponseListener(final DDAgentResponseListener listener) { + api.addResponseListener(listener); + } + // Exposing some statistics for consumption by monitors public final long getDisruptorCapacity() { return disruptor.getDisruptorCapacity(); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index cc01adc62c..17a607c102 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -43,7 +43,7 @@ public class DDAgentApi { private static final String TRACES_ENDPOINT_V4 = "v0.4/traces"; private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); - private final List responseListeners = new ArrayList<>(); + private final List responseListeners = new ArrayList<>(); private volatile long nextAllowedLogTime = 0; @@ -76,7 +76,7 @@ public class DDAgentApi { } } - public void addResponseListener(final ResponseListener listener) { + public void addResponseListener(final DDAgentResponseListener listener) { if (!responseListeners.contains(listener)) { responseListeners.add(listener); } @@ -186,7 +186,7 @@ public class DDAgentApi { final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString); final String endpoint = tracesUrl.toString(); - for (final ResponseListener listener : responseListeners) { + for (final DDAgentResponseListener listener : responseListeners) { listener.onResponse(endpoint, parsedResponse); } return Response.success(response.code(), parsedResponse); @@ -364,9 +364,4 @@ public class DDAgentApi { return exception; } } - - public interface ResponseListener { - /** Invoked after the api receives a response from the core agent. */ - void onResponse(String endpoint, JsonNode responseJson); - } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java new file mode 100644 index 0000000000..a0cbc72fda --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java @@ -0,0 +1,8 @@ +package datadog.trace.common.writer.ddagent; + +import com.fasterxml.jackson.databind.JsonNode; + +public interface DDAgentResponseListener { + /** Invoked after the api receives a response from the core agent. */ + void onResponse(String endpoint, JsonNode responseJson); +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy index 07bd326685..8d5ed0dcdd 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy @@ -4,7 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.JsonNode import datadog.opentracing.SpanFactory import datadog.trace.common.writer.ddagent.DDAgentApi -import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener +import datadog.trace.common.writer.ddagent.DDAgentResponseListener import datadog.trace.util.test.DDSpecification import java.util.concurrent.atomic.AtomicLong @@ -125,7 +125,7 @@ class DDAgentApiTest extends DDSpecification { def "Api ResponseListeners see 200 responses"() { setup: def agentResponse = new AtomicReference(null) - ResponseListener responseListener = { String endpoint, JsonNode responseJson -> + DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson -> agentResponse.set(responseJson.toString()) } def agent = httpServer { diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy index 1d5ee8297f..00db0a213d 100644 --- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy +++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy @@ -6,6 +6,7 @@ import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ddagent.DDAgentApi +import datadog.trace.common.writer.ddagent.DDAgentResponseListener import datadog.trace.util.test.DDSpecification import org.testcontainers.containers.GenericContainer import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy @@ -64,7 +65,7 @@ class DDApiIntegrationTest { def endpoint = new AtomicReference(null) def agentResponse = new AtomicReference(null) - DDAgentApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> + DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> endpoint.set(receivedEndpoint) agentResponse.set(responseJson.toString()) }