From 451fba256a5068b3b40871dea630b5ede4213823 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 2 Jan 2020 08:19:50 -0800 Subject: [PATCH 1/5] Split TraceConsumer into two different disruptors First disruptor (TraceProcessingDisruptor) does processing, which is currently limited to serialization, but in the future can do other processing such as TraceInterceptor invocation. Second disruptor (BatchWritingDisruptor) takes serialized traces and batches them into groups and flushes them periodically based on size and time. --- dd-trace-ot/dd-trace-ot.gradle | 2 + .../trace/common/writer/DDAgentWriter.java | 157 ++++++---------- .../datadog/trace/common/writer/Writer.java | 1 + .../writer/ddagent/AbstractDisruptor.java | 89 +++++++++ .../writer/ddagent/BatchWritingDisruptor.java | 162 +++++++++++++++++ .../common/writer/ddagent/DisruptorEvent.java | 49 +++-- .../common/writer/ddagent/TraceConsumer.java | 150 --------------- .../ddagent/TraceProcessingDisruptor.java | 88 +++++++++ .../ddagent/TraceSerializingDisruptor.java | 117 ------------ .../trace/api/writer/DDAgentWriterTest.groovy | 172 ++++++++++-------- 10 files changed, 534 insertions(+), 453 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java delete mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java create mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java delete mode 100644 dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java diff --git a/dd-trace-ot/dd-trace-ot.gradle b/dd-trace-ot/dd-trace-ot.gradle index f943f74e64..96f9a4bd6d 100644 --- a/dd-trace-ot/dd-trace-ot.gradle +++ b/dd-trace-ot/dd-trace-ot.gradle @@ -12,6 +12,8 @@ minimumInstructionCoverage = 0.6 excludedClassesCoverage += [ 'datadog.trace.common.writer.ListWriter', 'datadog.trace.common.writer.LoggingWriter', + 'datadog.trace.common.writer.DDAgentWriter.Spec', + 'datadog.trace.common.writer.DDAgentWriter.Spec.SpecBuilder', 'datadog.trace.common.sampling.PrioritySampling', // This code is copied from okHttp samples and we have integration tests to verify that it works. 'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket', diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index c63fe9ff8f..965976d4ba 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -3,21 +3,16 @@ package datadog.trace.common.writer; import static datadog.trace.api.Config.DEFAULT_AGENT_HOST; import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; -import static java.util.concurrent.TimeUnit.SECONDS; import datadog.opentracing.DDSpan; -import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.ddagent.BatchWritingDisruptor; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.common.writer.ddagent.Monitor; -import datadog.trace.common.writer.ddagent.TraceConsumer; -import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; +import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.Phaser; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Value; import lombok.extern.slf4j.Slf4j; /** @@ -29,86 +24,65 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class DDAgentWriter implements Writer { - private static final int DISRUPTOR_BUFFER_SIZE = 1024; - private static final int SENDER_QUEUE_SIZE = 16; - private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second + @Value + @lombok.Builder + public static class Spec { + @lombok.Builder.Default public String agentHost = DEFAULT_AGENT_HOST; + @lombok.Builder.Default public int traceAgentPort = DEFAULT_TRACE_AGENT_PORT; + @lombok.Builder.Default public String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; + @lombok.Builder.Default public int traceBufferSize = DISRUPTOR_BUFFER_SIZE; + @lombok.Builder.Default public Monitor monitor = new Monitor.Noop(); + @lombok.Builder.Default public int flushFrequencySeconds = 1; + } - private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = - new DaemonThreadFactory("dd-trace-writer"); + private static final int DISRUPTOR_BUFFER_SIZE = 1024; private final DDAgentApi api; - public final int flushFrequencySeconds; - public final TraceSerializingDisruptor disruptor; + public final TraceProcessingDisruptor traceProcessingDisruptor; + public final BatchWritingDisruptor batchWritingDisruptor; - public final ScheduledExecutorService scheduledWriterExecutor; private final AtomicInteger traceCount = new AtomicInteger(0); - public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing; public final Monitor monitor; public DDAgentWriter() { - this( - new DDAgentApi( - DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), - new Monitor.Noop()); + this(Spec.builder().build()); } + public DDAgentWriter(final Spec spec) { + api = new DDAgentApi(spec.agentHost, spec.traceAgentPort, spec.unixDomainSocket); + monitor = spec.monitor; + + batchWritingDisruptor = + new BatchWritingDisruptor( + spec.traceBufferSize, spec.flushFrequencySeconds, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor( + spec.traceBufferSize, api, batchWritingDisruptor, monitor, this); + } + + @Deprecated public DDAgentWriter(final DDAgentApi api, final Monitor monitor) { - this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY); - } - - /** Old signature (pre-Monitor) used in tests */ - private DDAgentWriter(final DDAgentApi api) { - this(api, new Monitor.Noop()); - } - - /** - * Used in the tests. - * - * @param api - * @param disruptorSize Rounded up to next power of 2 - * @param flushFrequencySeconds value < 1 disables scheduled flushes - */ - private DDAgentWriter( - final DDAgentApi api, - final int disruptorSize, - final int senderQueueSize, - final int flushFrequencySeconds) { - this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds); - } - - // DQH - TODO - Update the tests & remove this - private DDAgentWriter( - final DDAgentApi api, - final Monitor monitor, - final int disruptorSize, - final int flushFrequencySeconds) { - this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); - } - - // DQH - TODO - Update the tests & remove this - private DDAgentWriter( - final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { - this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); - } - - private DDAgentWriter( - final DDAgentApi api, - final Monitor monitor, - final int disruptorSize, - final int senderQueueSize, - final int flushFrequencySeconds) { this.api = api; this.monitor = monitor; - disruptor = - new TraceSerializingDisruptor( - disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this)); + batchWritingDisruptor = new BatchWritingDisruptor(DISRUPTOR_BUFFER_SIZE, 1, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor( + DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this); + } - this.flushFrequencySeconds = flushFrequencySeconds; - scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); + @Deprecated + // DQH - TODO - Update the tests & remove this + private DDAgentWriter( + final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { + this.api = api; + monitor = new Monitor.Noop(); - apiPhaser.register(); // Register on behalf of the scheduled executor thread. + batchWritingDisruptor = + new BatchWritingDisruptor(disruptorSize, flushFrequencySeconds, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor(disruptorSize, api, batchWritingDisruptor, monitor, this); } public void addResponseListener(final DDAgentResponseListener listener) { @@ -117,7 +91,7 @@ public class DDAgentWriter implements Writer { // Exposing some statistics for consumption by monitors public final long getDisruptorCapacity() { - return disruptor.getDisruptorCapacity(); + return traceProcessingDisruptor.getDisruptorCapacity(); } public final long getDisruptorUtilizedCapacity() { @@ -125,20 +99,21 @@ public class DDAgentWriter implements Writer { } public final long getDisruptorRemainingCapacity() { - return disruptor.getDisruptorRemainingCapacity(); + return traceProcessingDisruptor.getDisruptorRemainingCapacity(); } @Override public void write(final List trace) { // We can't add events after shutdown otherwise it will never complete shutting down. - if (disruptor.running) { - final boolean published = disruptor.tryPublish(trace); + if (traceProcessingDisruptor.running) { + final int representativeCount = traceCount.getAndSet(0) + 1; + final boolean published = traceProcessingDisruptor.publish(trace, representativeCount); if (published) { monitor.onPublish(DDAgentWriter.this, trace); } else { // We're discarding the trace, but we still want to count it. - traceCount.incrementAndGet(); + traceCount.addAndGet(representativeCount); log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace); monitor.onFailedPublish(this, trace); @@ -150,6 +125,10 @@ public class DDAgentWriter implements Writer { } } + public boolean flush() { + return traceProcessingDisruptor.flush(traceCount.getAndSet(0)); + } + @Override public void incrementTraceCount() { traceCount.incrementAndGet(); @@ -161,32 +140,16 @@ public class DDAgentWriter implements Writer { @Override public void start() { - disruptor.start(); - + batchWritingDisruptor.start(); + traceProcessingDisruptor.start(); monitor.onStart(this); } @Override public void close() { - - boolean flushSuccess = true; - - // We have to shutdown scheduled executor first to make sure no flush events issued after - // disruptor has been shutdown. - // Otherwise those events will never be processed and flush call will wait forever. - scheduledWriterExecutor.shutdown(); - try { - scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS); - } catch (final InterruptedException e) { - log.warn("Waiting for flush executor shutdown interrupted.", e); - - flushSuccess = false; - } - flushSuccess |= disruptor.flush(); - - disruptor.close(); - - monitor.onShutdown(this, flushSuccess); + monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0))); + traceProcessingDisruptor.close(); + batchWritingDisruptor.close(); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index b14cbfdd23..51e0e85c4b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -64,6 +64,7 @@ public interface Writer extends Closeable { } private static Writer createAgentWriter(final Config config) { + // TODO: switch to using DDAgentWriter.Spec constructor... return new DDAgentWriter(createApi(config), createMonitor(config)); } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java new file mode 100644 index 0000000000..2bff4028ac --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java @@ -0,0 +1,89 @@ +package datadog.trace.common.writer.ddagent; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractDisruptor implements Closeable { + + protected final Disruptor> disruptor; + + public volatile boolean running = false; + + protected final DisruptorEvent.FlushTranslator flushTranslator = + new DisruptorEvent.FlushTranslator<>(); + protected final DisruptorEvent.DataTranslator dataTranslator = + new DisruptorEvent.DataTranslator<>(); + + public AbstractDisruptor(final int disruptorSize, final EventHandler> handler) { + disruptor = + new Disruptor<>( + new DisruptorEvent.Factory(), + Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 + getThreadFactory(), + ProducerType.MULTI, + new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); + disruptor.handleEventsWith(handler); + } + + protected abstract ThreadFactory getThreadFactory(); + + public void start() { + disruptor.start(); + running = true; + } + + @Override + public void close() { + running = false; + disruptor.shutdown(); + } + + public abstract boolean publish(final T data, int representativeCount); + + /** + * This method will block until the flush is complete. + * + * @param traceCount - number of unreported traces to include in this batch. + */ + public boolean flush(final int traceCount) { + if (running) { + return flush(traceCount, new CountDownLatch(1)); + } else { + return false; + } + } + + /** This method will block until the flush is complete. */ + protected boolean flush(final int traceCount, final CountDownLatch latch) { + log.info("Flushing any remaining traces."); + disruptor.publishEvent(flushTranslator, traceCount, latch); + try { + latch.await(); + return true; + } catch (final InterruptedException e) { + log.warn("Waiting for flush interrupted.", e); + return false; + } + } + + // Exposing some statistics for consumption by monitors + public final long getDisruptorCapacity() { + return disruptor.getRingBuffer().getBufferSize(); + } + + public final long getDisruptorRemainingCapacity() { + return disruptor.getRingBuffer().remainingCapacity(); + } + + public final long getCurrentCount() { + return disruptor.getCursor() - disruptor.getRingBuffer().getMinimumGatingSequence(); + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java new file mode 100644 index 0000000000..fc08591548 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java @@ -0,0 +1,162 @@ +package datadog.trace.common.writer.ddagent; + +import com.lmax.disruptor.EventHandler; +import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.DDAgentWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BatchWritingDisruptor extends AbstractDisruptor { + private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + + private final ScheduledExecutorService heartbeatExecutor = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat")); + + private final DisruptorEvent.HeartbeatTranslator heartbeatTranslator = + new DisruptorEvent.HeartbeatTranslator(); + + public BatchWritingDisruptor( + final int disruptorSize, + final int flushFrequencySeconds, + final DDAgentApi api, + final Monitor monitor, + final DDAgentWriter writer) { + super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer)); + + if (0 < flushFrequencySeconds) { + // This provides a steady stream of events to enable flushing with a low throughput. + final Runnable heartbeat = + new Runnable() { + @Override + public void run() { + // Only add if the buffer is empty. + if (running && getCurrentCount() == 0) { + disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator); + } + } + }; + heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); + } + } + + @Override + protected ThreadFactory getThreadFactory() { + return new DaemonThreadFactory("dd-trace-writer"); + } + + @Override + public boolean publish(final byte[] data, final int representativeCount) { + disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount); + return true; + } + + // Intentionally not thread safe. + private static class BatchWritingHandler implements EventHandler> { + + private final long flushFrequencyNanos; + private final DDAgentApi api; + private final Monitor monitor; + private final DDAgentWriter writer; + private final List serializedTraces = new ArrayList<>(); + private int representativeCount = 0; + private int sizeInBytes = 0; + private long nextScheduledFlush; + + private BatchWritingHandler( + final int flushFrequencySeconds, + final DDAgentApi api, + final Monitor monitor, + final DDAgentWriter writer) { + flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds); + scheduleNextFlush(); + this.api = api; + this.monitor = monitor; + this.writer = writer; + } + + @Override + public void onEvent( + final DisruptorEvent event, final long sequence, final boolean endOfBatch) { + try { + if (event.data != null) { + sizeInBytes += event.data.length; + serializedTraces.add(event.data); + } + + // Flush events might increase this with no data. + representativeCount += event.representativeCount; + + if (event.flushLatch != null + || FLUSH_PAYLOAD_BYTES <= sizeInBytes + || nextScheduledFlush <= System.nanoTime()) { + flush(event.flushLatch, FLUSH_PAYLOAD_BYTES <= sizeInBytes); + } + } finally { + event.reset(); + } + } + + private void flush(final CountDownLatch flushLatch, final boolean early) { + try { + if (serializedTraces.isEmpty()) { + // FIXME: this will reset representativeCount without reporting + // anything even if representativeCount > 0. + return; + } + + monitor.onFlush(writer, early); + // TODO add retry and rate limiting + final DDAgentApi.Response response = + api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces); + + if (response.success()) { + log.debug("Successfully sent {} traces to the API", serializedTraces.size()); + + monitor.onSend(writer, representativeCount, sizeInBytes, response); + } else { + log.debug( + "Failed to send {} traces (representing {}) of size {} bytes to the API", + serializedTraces.size(), + representativeCount, + sizeInBytes); + + monitor.onFailedSend(writer, representativeCount, sizeInBytes, response); + } + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); + + // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really + // shouldn't occur. + // However, just to be safe to start, create a failed Response to handle any + // spurious Throwable-s. + monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); + } finally { + serializedTraces.clear(); + sizeInBytes = 0; + representativeCount = 0; + scheduleNextFlush(); + + if (flushLatch != null) { + flushLatch.countDown(); + } + } + } + + private void scheduleNextFlush() { + // TODO: adjust this depending on responsiveness of the agent. + if (0 < flushFrequencyNanos) { + nextScheduledFlush = System.nanoTime() + flushFrequencyNanos; + } else { + nextScheduledFlush = Long.MAX_VALUE; + } + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java index 66ff7a499b..8e754fa3fe 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -2,13 +2,19 @@ package datadog.trace.common.writer.ddagent; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslator; -import com.lmax.disruptor.EventTranslatorOneArg; -import datadog.opentracing.DDSpan; -import java.util.List; +import com.lmax.disruptor.EventTranslatorTwoArg; +import java.util.concurrent.CountDownLatch; class DisruptorEvent { - public volatile boolean shouldFlush = false; public volatile T data = null; + public volatile int representativeCount = 0; + public volatile CountDownLatch flushLatch = null; + + public void reset() { + data = null; + representativeCount = 0; + flushLatch = null; + } static class Factory implements EventFactory> { @Override @@ -17,25 +23,38 @@ class DisruptorEvent { } } - static class TraceTranslator - implements EventTranslatorOneArg>, List> { - static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR = - new DisruptorEvent.TraceTranslator(); + static class DataTranslator implements EventTranslatorTwoArg, T, Integer> { @Override public void translateTo( - final DisruptorEvent> event, final long sequence, final List trace) { - event.data = trace; + final DisruptorEvent event, + final long sequence, + final T data, + final Integer representativeCount) { + event.data = data; + event.representativeCount = representativeCount; } } - static class FlushTranslator implements EventTranslator>> { - static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = - new DisruptorEvent.FlushTranslator(); + static class HeartbeatTranslator implements EventTranslator> { @Override - public void translateTo(final DisruptorEvent> event, final long sequence) { - event.shouldFlush = true; + public void translateTo(final DisruptorEvent event, final long sequence) { + return; + } + } + + static class FlushTranslator + implements EventTranslatorTwoArg, Integer, CountDownLatch> { + + @Override + public void translateTo( + final DisruptorEvent event, + final long sequence, + final Integer representativeCount, + final CountDownLatch latch) { + event.representativeCount = representativeCount; + event.flushLatch = latch; } } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java deleted file mode 100644 index 1081f4352e..0000000000 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java +++ /dev/null @@ -1,150 +0,0 @@ -package datadog.trace.common.writer.ddagent; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.lmax.disruptor.EventHandler; -import datadog.opentracing.DDSpan; -import datadog.trace.common.writer.DDAgentWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.extern.slf4j.Slf4j; - -/** This class is intentionally not threadsafe. */ -@Slf4j -public class TraceConsumer implements EventHandler>> { - private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB - - private final AtomicInteger traceCount; - private final Semaphore senderSemaphore; - private final DDAgentWriter writer; - - private List serializedTraces = new ArrayList<>(); - private int payloadSize = 0; - - public TraceConsumer( - final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) { - this.traceCount = traceCount; - senderSemaphore = new Semaphore(senderQueueSize); - this.writer = writer; - } - - @Override - public void onEvent( - final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { - final List trace = event.data; - event.data = null; // clear the event for reuse. - if (trace != null) { - traceCount.incrementAndGet(); - try { - final byte[] serializedTrace = writer.getApi().serializeTrace(trace); - payloadSize += serializedTrace.length; - serializedTraces.add(serializedTrace); - - writer.monitor.onSerialize(writer, trace, serializedTrace); - } catch (final JsonProcessingException e) { - log.warn("Error serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } catch (final Throwable e) { - log.debug("Error while serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } - } - - if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) { - final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES); - - reportTraces(early); - event.shouldFlush = false; - } - } - - private void reportTraces(final boolean early) { - try { - if (serializedTraces.isEmpty()) { - writer.monitor.onFlush(writer, early); - - writer.apiPhaser.arrive(); // Allow flush to return - return; - // scheduleFlush called in finally block. - } - if (writer.scheduledWriterExecutor.isShutdown()) { - writer.monitor.onFailedSend( - writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1)); - writer.apiPhaser.arrive(); // Allow flush to return - return; - } - final List toSend = serializedTraces; - serializedTraces = new ArrayList<>(toSend.size()); - // ^ Initialize with similar size to reduce arraycopy churn. - - final int representativeCount = traceCount.getAndSet(0); - final int sizeInBytes = payloadSize; - - try { - writer.monitor.onFlush(writer, early); - - // Run the actual IO task on a different thread to avoid blocking the consumer. - try { - senderSemaphore.acquire(); - } catch (final InterruptedException e) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); - - // Finally, we'll schedule another flush - // Any threads awaiting the flush will continue to wait - return; - } - writer.scheduledWriterExecutor.execute( - new Runnable() { - @Override - public void run() { - senderSemaphore.release(); - - try { - final DDAgentApi.Response response = - writer - .getApi() - .sendSerializedTraces(representativeCount, sizeInBytes, toSend); - - if (response.success()) { - log.debug("Successfully sent {} traces to the API", toSend.size()); - - writer.monitor.onSend(writer, representativeCount, sizeInBytes, response); - } else { - log.debug( - "Failed to send {} traces (representing {}) of size {} bytes to the API", - toSend.size(), - representativeCount, - sizeInBytes); - - writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response); - } - } catch (final Throwable e) { - log.debug("Failed to send traces to the API: {}", e.getMessage()); - - // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really - // shouldn't occur. - // However, just to be safe to start, create a failed Response to handle any - // spurious Throwable-s. - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); - } finally { - writer.apiPhaser.arrive(); // Flush completed. - } - } - }); - } catch (final RejectedExecutionException ex) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex)); - writer.apiPhaser.arrive(); // Allow flush to return - } - } finally { - payloadSize = 0; - writer.disruptor.scheduleFlush(); - } - } -} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java new file mode 100644 index 0000000000..0088227be7 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -0,0 +1,88 @@ +package datadog.trace.common.writer.ddagent; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lmax.disruptor.EventHandler; +import datadog.opentracing.DDSpan; +import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.DDAgentWriter; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TraceProcessingDisruptor extends AbstractDisruptor> { + + public TraceProcessingDisruptor( + final int disruptorSize, + final DDAgentApi api, + final BatchWritingDisruptor batchWritingDisruptor, + final Monitor monitor, + final DDAgentWriter writer) { + // TODO: add config to enable control over serialization overhead. + super(disruptorSize, new TraceSerializingHandler(api, batchWritingDisruptor, monitor, writer)); + } + + @Override + protected ThreadFactory getThreadFactory() { + return new DaemonThreadFactory("dd-trace-processor"); + } + + @Override + public boolean publish(final List data, final int representativeCount) { + return disruptor.getRingBuffer().tryPublishEvent(dataTranslator, data, representativeCount); + } + + // This class is threadsafe if we want to enable more processors. + public static class TraceSerializingHandler + implements EventHandler>> { + private final DDAgentApi api; + private final BatchWritingDisruptor batchWritingDisruptor; + private final Monitor monitor; + private final DDAgentWriter writer; + + public TraceSerializingHandler( + final DDAgentApi api, + final BatchWritingDisruptor batchWritingDisruptor, + final Monitor monitor, + final DDAgentWriter writer) { + this.api = api; + this.batchWritingDisruptor = batchWritingDisruptor; + this.monitor = monitor; + this.writer = writer; + } + + @Override + public void onEvent( + final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { + try { + if (event.data != null) { + try { + final byte[] serializedTrace = api.serializeTrace(event.data); + monitor.onSerialize(writer, event.data, serializedTrace); + batchWritingDisruptor.publish(serializedTrace, event.representativeCount); + event.representativeCount = 0; // reset in case flush is invoked below. + } catch (final JsonProcessingException e) { + log.debug("Error serializing trace", e); + monitor.onFailedSerialize(writer, event.data, e); + } catch (final Throwable e) { + log.debug("Error while serializing trace", e); + monitor.onFailedSerialize(writer, event.data, e); + } + } + + if (event.flushLatch != null) { + if (batchWritingDisruptor.running) { + // propagate the flush. + batchWritingDisruptor.flush(event.representativeCount, event.flushLatch); + } + if (!batchWritingDisruptor.running) { // check again to protect against race condition. + // got shutdown early somehow? + event.flushLatch.countDown(); + } + } + } finally { + event.reset(); + } + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java deleted file mode 100644 index f878cbb178..0000000000 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java +++ /dev/null @@ -1,117 +0,0 @@ -package datadog.trace.common.writer.ddagent; - -import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR; -import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR; -import static java.util.concurrent.TimeUnit.SECONDS; - -import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import datadog.opentracing.DDSpan; -import datadog.trace.common.util.DaemonThreadFactory; -import datadog.trace.common.writer.DDAgentWriter; -import java.io.Closeable; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class TraceSerializingDisruptor implements Closeable { - private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = - new DaemonThreadFactory("dd-trace-disruptor"); - private final FlushTask flushTask = new FlushTask(); - - private final Disruptor>> disruptor; - private final DDAgentWriter writer; - - public volatile boolean running = false; - - private final AtomicReference> flushSchedule = new AtomicReference<>(); - - public TraceSerializingDisruptor( - final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) { - disruptor = - new Disruptor<>( - new DisruptorEvent.Factory>(), - Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 - DISRUPTOR_THREAD_FACTORY, - ProducerType.MULTI, - new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); - this.writer = writer; - disruptor.handleEventsWith(handler); - } - - public void start() { - disruptor.start(); - running = true; - scheduleFlush(); - } - - @Override - public void close() { - running = false; - disruptor.shutdown(); - } - - public boolean tryPublish(final List trace) { - return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace); - } - - /** This method will block until the flush is complete. */ - public boolean flush() { - if (running) { - log.info("Flushing any remaining traces."); - // Register with the phaser so we can block until the flush completion. - writer.apiPhaser.register(); - disruptor.publishEvent(FLUSH_TRANSLATOR); - try { - // Allow thread to be interrupted. - writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister()); - - return true; - } catch (final InterruptedException e) { - log.warn("Waiting for flush interrupted.", e); - - return false; - } - } else { - return false; - } - } - - public void scheduleFlush() { - if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) { - final ScheduledFuture previous = - flushSchedule.getAndSet( - writer.scheduledWriterExecutor.schedule( - flushTask, writer.flushFrequencySeconds, SECONDS)); - - final boolean previousIncomplete = (previous != null); - if (previousIncomplete) { - previous.cancel(true); - } - - writer.monitor.onScheduleFlush(writer, previousIncomplete); - } - } - - private class FlushTask implements Runnable { - @Override - public void run() { - // Don't call flush() because it would block the thread also used for sending the traces. - disruptor.publishEvent(FLUSH_TRANSLATOR); - } - } - - // Exposing some statistics for consumption by monitors - public final long getDisruptorCapacity() { - return disruptor.getRingBuffer().getBufferSize(); - } - - public final long getDisruptorRemainingCapacity() { - return disruptor.getRingBuffer().remainingCapacity(); - } -} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 37de61bf23..d954fe6594 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -7,12 +7,13 @@ import datadog.opentracing.DDTracer import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter +import datadog.trace.common.writer.ddagent.BatchWritingDisruptor import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor -import datadog.trace.common.writer.ddagent.TraceConsumer import datadog.trace.util.test.DDSpecification import spock.lang.Timeout +import java.util.concurrent.Phaser import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -24,21 +25,41 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE @Timeout(20) class DDAgentWriterTest extends DDSpecification { - def api = Mock(DDAgentApi) + def phaser = new Phaser() + def api = Mock(DDAgentApi) { + // Define the following response in the spec: +// sendSerializedTraces(_, _, _) >> { +// phaser.arrive() +// return DDAgentApi.Response.success(200) +// } + } + def monitor = Mock(Monitor) + + def setup() { + // Register for two threads. + phaser.register() + phaser.register() + } def "test happy path"() { setup: def writer = new DDAgentWriter(api, 2, -1) writer.start() + when: + writer.flush() + + then: + 0 * _ + when: writer.write(trace) writer.write(trace) - writer.disruptor.flush() + writer.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -57,11 +78,11 @@ class DDAgentWriterTest extends DDSpecification { (1..traceCount).each { writer.write(trace) } - writer.disruptor.flush() + writer.flush() then: _ * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) + 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -76,9 +97,7 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by size"() { setup: def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) - def phaser = writer.apiPhaser writer.start() - phaser.register() when: (1..6).each { @@ -90,18 +109,21 @@ class DDAgentWriterTest extends DDSpecification { then: 6 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) + 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) >> { + phaser.arrive() + return DDAgentApi.Response.success(200) + } when: (1..2).each { writer.write(trace) } // Flush the remaining 2 - writer.disruptor.flush() + writer.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -114,11 +136,8 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by time"() { setup: - def writer = new DDAgentWriter(api) - def phaser = writer.apiPhaser - phaser.register() + def writer = new DDAgentWriter(api, monitor) writer.start() - writer.disruptor.flush() when: (1..5).each { @@ -128,7 +147,14 @@ class DDAgentWriterTest extends DDSpecification { then: 5 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) + 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> { + phaser.arrive() + return DDAgentApi.Response.success(200) + } + 5 * monitor.onPublish(_, _) + 5 * monitor.onSerialize(_, _, _) + 1 * monitor.onFlush(_, _) + (0..1) * monitor.onSend(_, _, _, _) // This gets called after phaser.arrive(), so there's a race condition. 0 * _ cleanup: @@ -153,11 +179,11 @@ class DDAgentWriterTest extends DDSpecification { // Busywait because we don't want to fill up the ring buffer } } - writer.disruptor.flush() + writer.flush() then: (maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) + 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) >> DDAgentApi.Response.success(200) cleanup: writer.close() @@ -181,39 +207,43 @@ class DDAgentWriterTest extends DDSpecification { minimalSpan = new DDSpan(0, minimalContext) minimalTrace = [minimalSpan] traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length - maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 + maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } def "check that are no interactions after close"() { - setup: - def writer = new DDAgentWriter(api) + def writer = new DDAgentWriter(api, monitor) writer.start() when: writer.close() writer.write([]) - writer.disruptor.flush() + writer.flush() then: +// 2 * monitor.onFlush(_, false) + 1 * monitor.onFailedPublish(_, _) + 1 * monitor.onShutdown(_, _) 0 * _ writer.traceCount.get() == 0 } - def "check shutdown if executor stopped first"() { + def "check shutdown if batchWritingDisruptor stopped first"() { setup: - def writer = new DDAgentWriter(api) + def writer = new DDAgentWriter(api, monitor) writer.start() - writer.scheduledWriterExecutor.shutdown() + writer.batchWritingDisruptor.close() when: writer.write([]) - writer.disruptor.flush() + writer.flush() then: 1 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * monitor.onSerialize(writer, _, _) + 1 * monitor.onPublish(writer, _) 0 * _ - writer.traceCount.get() == 1 + writer.traceCount.get() == 0 cleanup: writer.close() @@ -253,9 +283,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) - def monitor = Mock(Monitor) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) when: writer.start() @@ -265,12 +293,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 }) when: @@ -302,9 +330,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) - def monitor = Mock(Monitor) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) when: writer.start() @@ -314,12 +340,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 }) when: @@ -345,7 +371,6 @@ class DDAgentWriterTest extends DDSpecification { return DDAgentApi.Response.failed(new IOException("comm error")) } } - def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) when: @@ -356,12 +381,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null }) when: @@ -400,30 +425,30 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(Monitor) - monitor.onPublish(_, _) >> { - numPublished.incrementAndGet() - } - monitor.onFailedPublish(_, _) >> { - numFailedPublish.incrementAndGet() - } - monitor.onFlush(_, _) >> { - numFlushes.incrementAndGet() - } - monitor.onSend(_, _, _, _) >> { - numRequests.incrementAndGet() - } - monitor.onFailedPublish(_, _, _, _) >> { - numFailedRequests.incrementAndGet() + def monitor = Stub(Monitor) { + onPublish(_, _) >> { + numPublished.incrementAndGet() + } + onFailedPublish(_, _) >> { + numFailedPublish.incrementAndGet() + } + onFlush(_, _) >> { + numFlushes.incrementAndGet() + } + onSend(_, _, _, _) >> { + numRequests.incrementAndGet() + } + onFailedPublish(_, _, _, _) >> { + numFailedRequests.incrementAndGet() + } } // sender queue is sized in requests -- not traces def bufferSize = 32 def senderQueueSize = 2 - def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()) writer.start() // gate responses @@ -438,7 +463,7 @@ class DDAgentWriterTest extends DDSpecification { // sanity check coordination mechanism of test // release to allow response to be generated responseSemaphore.release() - writer.disruptor.flush() + writer.flush() // reacquire semaphore to stall further responses responseSemaphore.acquire() @@ -505,21 +530,21 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(Monitor) - monitor.onPublish(_, _) >> { - numPublished.incrementAndGet() - } - monitor.onFailedPublish(_, _) >> { - numFailedPublish.incrementAndGet() - } - monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response -> - numRepSent.addAndGet(repCount) + def monitor = Stub(Monitor) { + onPublish(_, _) >> { + numPublished.incrementAndGet() + } + onFailedPublish(_, _) >> { + numFailedPublish.incrementAndGet() + } + onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response -> + numRepSent.addAndGet(repCount) + } } - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) writer.start() when: @@ -538,7 +563,7 @@ class DDAgentWriterTest extends DDSpecification { t1.join() t2.join() - writer.disruptor.flush() + writer.flush() then: def totalTraces = 100 + 100 @@ -566,7 +591,6 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) def statsd = Stub(StatsDClient) statsd.incrementCounter("queue.accepted") >> { stat -> @@ -580,12 +604,12 @@ class DDAgentWriterTest extends DDSpecification { } def monitor = new Monitor.StatsD(statsd) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) writer.start() when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: numTracesAccepted == 1 @@ -633,7 +657,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: numRequests == 1 From 5ff855737b4fca280519f5fee0750a075b94f052 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Tue, 7 Jan 2020 10:07:23 -0800 Subject: [PATCH 2/5] Add documentation, remove volatile/public, improve test reliability. --- .../trace/common/writer/DDAgentWriter.java | 29 +++++++++++------ .../writer/ddagent/AbstractDisruptor.java | 9 +++++- .../writer/ddagent/BatchWritingDisruptor.java | 11 ++++++- .../common/writer/ddagent/DisruptorEvent.java | 9 +++--- .../ddagent/TraceProcessingDisruptor.java | 9 +++++- .../trace/api/writer/DDAgentWriterTest.groovy | 31 ++++++++++--------- 6 files changed, 67 insertions(+), 31 deletions(-) diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 965976d4ba..cf725b365b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -16,11 +16,18 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; /** - * This writer buffers traces and sends them to the provided DDApi instance. + * This writer buffers traces and sends them to the provided DDApi instance. Buffering is done with + * a distruptor to limit blocking the application threads. Internally, the trace is serialized and + * put onto a separate disruptor that does block to decouple the CPU intensive from the IO bound + * threads. * - *

Written traces are passed off to a disruptor so as to avoid blocking the application's thread. - * If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the - * threshold will be counted and sampled. + *

[Application] -> [trace processing buffer] -> [serialized trace batching buffer] -> [dd-agent] + * + *

Note: the first buffer is non-blocking and will discard if full, the second is blocking and + * will cause back pressure on the trace processing (serializing) thread. + * + *

If the buffer is filled traces are discarded before serializing. Once serialized every effort + * is made to keep, to avoid wasting the serialization effort. */ @Slf4j public class DDAgentWriter implements Writer { @@ -38,8 +45,8 @@ public class DDAgentWriter implements Writer { private static final int DISRUPTOR_BUFFER_SIZE = 1024; private final DDAgentApi api; - public final TraceProcessingDisruptor traceProcessingDisruptor; - public final BatchWritingDisruptor batchWritingDisruptor; + private final TraceProcessingDisruptor traceProcessingDisruptor; + private final BatchWritingDisruptor batchWritingDisruptor; private final AtomicInteger traceCount = new AtomicInteger(0); @@ -147,9 +154,13 @@ public class DDAgentWriter implements Writer { @Override public void close() { - monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0))); - traceProcessingDisruptor.close(); - batchWritingDisruptor.close(); + final boolean flushSuccess = traceProcessingDisruptor.flush(traceCount.getAndSet(0)); + try { + traceProcessingDisruptor.close(); + } finally { // in case first close fails. + batchWritingDisruptor.close(); + } + monitor.onShutdown(this, flushSuccess); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java index 2bff4028ac..86c7fca315 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java @@ -11,7 +11,7 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @Slf4j -public abstract class AbstractDisruptor implements Closeable { +abstract class AbstractDisruptor implements Closeable { protected final Disruptor> disruptor; @@ -46,6 +46,13 @@ public abstract class AbstractDisruptor implements Closeable { disruptor.shutdown(); } + /** + * Allows the underlying publish to be defined as a blocking or non blocking call. + * + * @param data + * @param representativeCount + * @return + */ public abstract boolean publish(final T data, int representativeCount); /** diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java index fc08591548..b02f2d2e25 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java @@ -12,10 +12,16 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +/** + * Disruptor that takes serialized traces and batches them into appropriately sized requests. + * + *

publishing to the buffer will block if the buffer is full. + */ @Slf4j public class BatchWritingDisruptor extends AbstractDisruptor { private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + // TODO: move executor to tracer for sharing with other tasks. private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat")); @@ -53,6 +59,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor { @Override public boolean publish(final byte[] data, final int representativeCount) { + // blocking call to ensure serialized traces aren't discarded and apply back pressure. disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount); return true; } @@ -81,6 +88,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor { this.writer = writer; } + // TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning. @Override public void onEvent( final DisruptorEvent event, final long sequence, final boolean endOfBatch) { @@ -111,11 +119,12 @@ public class BatchWritingDisruptor extends AbstractDisruptor { return; } - monitor.onFlush(writer, early); // TODO add retry and rate limiting final DDAgentApi.Response response = api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces); + monitor.onFlush(writer, early); + if (response.success()) { log.debug("Successfully sent {} traces to the API", serializedTraces.size()); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java index 8e754fa3fe..65cb6db45c 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -6,11 +6,12 @@ import com.lmax.disruptor.EventTranslatorTwoArg; import java.util.concurrent.CountDownLatch; class DisruptorEvent { - public volatile T data = null; - public volatile int representativeCount = 0; - public volatile CountDownLatch flushLatch = null; + // Memory ordering enforced by disruptor's memory fences, so volatile not required. + T data = null; + int representativeCount = 0; + CountDownLatch flushLatch = null; - public void reset() { + void reset() { data = null; representativeCount = 0; flushLatch = null; diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java index 0088227be7..e0cdf3dc37 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -9,6 +9,13 @@ import java.util.List; import java.util.concurrent.ThreadFactory; import lombok.extern.slf4j.Slf4j; +/** + * Disruptor that takes completed traces and applies processing to them. Upon completion, the + * serialized trace is published to {@link BatchWritingDisruptor}. + * + *

publishing to the buffer will not block the calling thread, but instead will return false if + * the buffer is full. This is to avoid impacting an application thread. + */ @Slf4j public class TraceProcessingDisruptor extends AbstractDisruptor> { @@ -58,8 +65,8 @@ public class TraceProcessingDisruptor extends AbstractDisruptor> { if (event.data != null) { try { final byte[] serializedTrace = api.serializeTrace(event.data); - monitor.onSerialize(writer, event.data, serializedTrace); batchWritingDisruptor.publish(serializedTrace, event.representativeCount); + monitor.onSerialize(writer, event.data, serializedTrace); event.representativeCount = 0; // reset in case flush is invoked below. } catch (final JsonProcessingException e) { log.debug("Error serializing trace", e); diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index d954fe6594..e05a99dd38 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -11,6 +11,7 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.util.test.DDSpecification +import spock.lang.Retry import spock.lang.Timeout import java.util.concurrent.Phaser @@ -396,6 +397,8 @@ class DDAgentWriterTest extends DDSpecification { 1 * monitor.onShutdown(writer, true) } + @Retry(delay = 10) + // if execution is too slow, the http client timeout may trigger. def "slow response test"() { def numWritten = 0 def numFlushes = new AtomicInteger(0) @@ -407,7 +410,6 @@ class DDAgentWriterTest extends DDSpecification { def responseSemaphore = new Semaphore(1) setup: - def minimalTrace = createMinimalTrace() // Need to set-up a dummy agent for the final send callback to work def agent = httpServer { @@ -445,9 +447,6 @@ class DDAgentWriterTest extends DDSpecification { } } - // sender queue is sized in requests -- not traces - def bufferSize = 32 - def senderQueueSize = 2 def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()) writer.start() @@ -477,13 +476,10 @@ class DDAgentWriterTest extends DDSpecification { when: // send many traces to fill the sender queue... // loop until outstanding requests > finished requests - while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) { - // chunk the loop & wait to allow for flushing to send queue - (1..1_000).each { - writer.write(minimalTrace) - numWritten += 1 - } - Thread.sleep(100) + while (writer.traceProcessingDisruptor.disruptorRemainingCapacity + writer.batchWritingDisruptor.disruptorRemainingCapacity > 0 || numFailedPublish.get() == 0) { + writer.write(minimalTrace) + numWritten += 1 + Thread.sleep(1) // Allow traces to get serialized. } then: @@ -494,17 +490,18 @@ class DDAgentWriterTest extends DDSpecification { def priorNumFailed = numFailedPublish.get() // with both disruptor & queue full, should reject everything - def expectedRejects = 100_000 + def expectedRejects = 100 (1..expectedRejects).each { writer.write(minimalTrace) numWritten += 1 } then: - // If the in-flight requests timeouts and frees up a slot in the sending queue, then - // many of traces will be accepted and batched into a new failing request. + // If the in-flight request times out (we don't currently retry), + // then a new batch will begin processing and many of traces will + // be accepted and batched into a new failing request. // In that case, the reject number will be low. - numFailedPublish.get() - priorNumFailed > expectedRejects * 0.40 + numFailedPublish.get() - priorNumFailed >= expectedRejects * 0.80 numPublished.get() + numFailedPublish.get() == numWritten cleanup: @@ -512,6 +509,10 @@ class DDAgentWriterTest extends DDSpecification { writer.close() agent.close() + + where: + bufferSize = 16 + minimalTrace = createMinimalTrace() } def "multi threaded"() { From a4db31cf79a4fc8ec5bf1ad7ba9d44988daf7383 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Mon, 13 Jan 2020 15:44:51 -0800 Subject: [PATCH 3/5] Apply `_sample_rate` metric to allow dd-agent to do proper scaling of metrics when traces are sampled. --- .../java/datadog/trace/common/writer/DDAgentWriter.java | 8 +++++++- .../common/writer/ddagent/TraceProcessingDisruptor.java | 6 ++++++ .../datadog/trace/api/writer/DDAgentWriterTest.groovy | 4 ++-- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index cf725b365b..81b884e43d 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -113,7 +113,13 @@ public class DDAgentWriter implements Writer { public void write(final List trace) { // We can't add events after shutdown otherwise it will never complete shutting down. if (traceProcessingDisruptor.running) { - final int representativeCount = traceCount.getAndSet(0) + 1; + final int representativeCount; + if (trace.isEmpty() || !(trace.get(0).isRootSpan())) { + // We don't want to reset the count if we can't correctly report the value. + representativeCount = 1; + } else { + representativeCount = traceCount.getAndSet(0) + 1; + } final boolean published = traceProcessingDisruptor.publish(trace, representativeCount); if (published) { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java index e0cdf3dc37..e1a62bafed 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -63,6 +63,12 @@ public class TraceProcessingDisruptor extends AbstractDisruptor> { final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { try { if (event.data != null) { + if (1 < event.representativeCount && !event.data.isEmpty()) { + // attempt to have agent scale the metrics properly + ((DDSpan) event.data.get(0).getLocalRootSpan()) + .context() + .setMetric("_sample_rate", 1d / event.representativeCount); + } try { final byte[] serializedTrace = api.serializeTrace(event.data); batchWritingDisruptor.publish(serializedTrace, event.representativeCount); diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index e05a99dd38..24340e32f9 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -131,7 +131,7 @@ class DDAgentWriterTest extends DDSpecification { writer.close() where: - span = [newSpanOf(0, "fixed-thread-name")] + span = newSpanOf(0, "fixed-thread-name") trace = (0..10000).collect { span } } @@ -162,7 +162,7 @@ class DDAgentWriterTest extends DDSpecification { writer.close() where: - span = [newSpanOf(0, "fixed-thread-name")] + span = newSpanOf(0, "fixed-thread-name") trace = (1..10).collect { span } } From 3aea763769557b2f75da901cf34af1308927495f Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Mon, 13 Jan 2020 16:21:37 -0800 Subject: [PATCH 4/5] Remove test race condition --- .../datadog/trace/api/writer/DDAgentWriterTest.groovy | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 24340e32f9..23e3e691ef 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -148,14 +148,13 @@ class DDAgentWriterTest extends DDSpecification { then: 5 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> { - phaser.arrive() - return DDAgentApi.Response.success(200) - } + 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> DDAgentApi.Response.success(200) 5 * monitor.onPublish(_, _) 5 * monitor.onSerialize(_, _, _) 1 * monitor.onFlush(_, _) - (0..1) * monitor.onSend(_, _, _, _) // This gets called after phaser.arrive(), so there's a race condition. + 1 * monitor.onSend(_, _, _, _) >> { + phaser.arrive() + } 0 * _ cleanup: From 5cce4cb783edb1b766f6c22be4b1701739dcc9ae Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 16 Jan 2020 16:43:01 -0800 Subject: [PATCH 5/5] Replace DDAgentWriter.Spec with a proper Builder. Also rename the builder class on DDTracer to default name generated by Lombok. --- dd-trace-ot/dd-trace-ot.gradle | 3 +- .../java/datadog/opentracing/DDTracer.java | 11 ++-- .../trace/common/writer/DDAgentWriter.java | 61 ++++++++++--------- .../datadog/trace/common/writer/Writer.java | 8 ++- .../trace/api/writer/DDAgentWriterTest.groovy | 28 ++++----- 5 files changed, 57 insertions(+), 54 deletions(-) diff --git a/dd-trace-ot/dd-trace-ot.gradle b/dd-trace-ot/dd-trace-ot.gradle index 96f9a4bd6d..cf92790fcb 100644 --- a/dd-trace-ot/dd-trace-ot.gradle +++ b/dd-trace-ot/dd-trace-ot.gradle @@ -12,8 +12,7 @@ minimumInstructionCoverage = 0.6 excludedClassesCoverage += [ 'datadog.trace.common.writer.ListWriter', 'datadog.trace.common.writer.LoggingWriter', - 'datadog.trace.common.writer.DDAgentWriter.Spec', - 'datadog.trace.common.writer.DDAgentWriter.Spec.SpecBuilder', + 'datadog.trace.common.writer.DDAgentWriter.DDAgentWriterBuilder', 'datadog.trace.common.sampling.PrioritySampling', // This code is copied from okHttp samples and we have integration tests to verify that it works. 'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket', diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index 7f182b5aec..7312fa398c 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -44,6 +44,7 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; +import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -96,18 +97,18 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace private final HttpCodec.Injector injector; private final HttpCodec.Extractor extractor; - public static class Builder { + public static class DDTracerBuilder { - public Builder() { + public DDTracerBuilder() { // Apply the default values from config. config(Config.get()); } - public Builder withProperties(final Properties properties) { + public DDTracerBuilder withProperties(final Properties properties) { return config(Config.get(properties)); } - public Builder config(final Config config) { + public DDTracerBuilder config(final Config config) { this.config = config; serviceName(config.getServiceName()); // Explicitly skip setting writer to avoid allocating resources prematurely. @@ -265,7 +266,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace partialFlushMinSpans); } - @lombok.Builder(builderClassName = "Builder") + @Builder // These field names must be stable to ensure the builder api is stable. private DDTracer( final Config config, diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 81b884e43d..44e28da917 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -12,7 +12,6 @@ import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import lombok.Value; import lombok.extern.slf4j.Slf4j; /** @@ -31,16 +30,6 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class DDAgentWriter implements Writer { - @Value - @lombok.Builder - public static class Spec { - @lombok.Builder.Default public String agentHost = DEFAULT_AGENT_HOST; - @lombok.Builder.Default public int traceAgentPort = DEFAULT_TRACE_AGENT_PORT; - @lombok.Builder.Default public String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; - @lombok.Builder.Default public int traceBufferSize = DISRUPTOR_BUFFER_SIZE; - @lombok.Builder.Default public Monitor monitor = new Monitor.Noop(); - @lombok.Builder.Default public int flushFrequencySeconds = 1; - } private static final int DISRUPTOR_BUFFER_SIZE = 1024; @@ -52,20 +41,22 @@ public class DDAgentWriter implements Writer { public final Monitor monitor; - public DDAgentWriter() { - this(Spec.builder().build()); + // Apply defaults to the class generated by lombok. + public static class DDAgentWriterBuilder { + String agentHost = DEFAULT_AGENT_HOST; + int traceAgentPort = DEFAULT_TRACE_AGENT_PORT; + String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; + int traceBufferSize = DISRUPTOR_BUFFER_SIZE; + Monitor monitor = new Monitor.Noop(); + int flushFrequencySeconds = 1; } - public DDAgentWriter(final Spec spec) { - api = new DDAgentApi(spec.agentHost, spec.traceAgentPort, spec.unixDomainSocket); - monitor = spec.monitor; - - batchWritingDisruptor = - new BatchWritingDisruptor( - spec.traceBufferSize, spec.flushFrequencySeconds, api, monitor, this); - traceProcessingDisruptor = - new TraceProcessingDisruptor( - spec.traceBufferSize, api, batchWritingDisruptor, monitor, this); + @Deprecated + public DDAgentWriter() { + this( + new DDAgentApi( + DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), + new Monitor.Noop()); } @Deprecated @@ -79,17 +70,27 @@ public class DDAgentWriter implements Writer { DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this); } - @Deprecated - // DQH - TODO - Update the tests & remove this + @lombok.Builder + // These field names must be stable to ensure the builder api is stable. private DDAgentWriter( - final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { - this.api = api; - monitor = new Monitor.Noop(); + final DDAgentApi agentApi, + final String agentHost, + final int traceAgentPort, + final String unixDomainSocket, + final int traceBufferSize, + final Monitor monitor, + final int flushFrequencySeconds) { + if (agentApi != null) { + api = agentApi; + } else { + api = new DDAgentApi(agentHost, traceAgentPort, unixDomainSocket); + } + this.monitor = monitor; batchWritingDisruptor = - new BatchWritingDisruptor(disruptorSize, flushFrequencySeconds, api, monitor, this); + new BatchWritingDisruptor(traceBufferSize, flushFrequencySeconds, api, monitor, this); traceProcessingDisruptor = - new TraceProcessingDisruptor(disruptorSize, api, batchWritingDisruptor, monitor, this); + new TraceProcessingDisruptor(traceBufferSize, api, batchWritingDisruptor, monitor, this); } public void addResponseListener(final DDAgentResponseListener listener) { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index 51e0e85c4b..190c75d0e4 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -53,7 +53,7 @@ public interface Writer extends Closeable { } else { log.warn( "Writer type not configured correctly: No config provided! Defaulting to DDAgentWriter."); - writer = new DDAgentWriter(); + writer = DDAgentWriter.builder().build(); } return writer; @@ -64,8 +64,10 @@ public interface Writer extends Closeable { } private static Writer createAgentWriter(final Config config) { - // TODO: switch to using DDAgentWriter.Spec constructor... - return new DDAgentWriter(createApi(config), createMonitor(config)); + return DDAgentWriter.builder() + .agentApi(createApi(config)) + .monitor(createMonitor(config)) + .build(); } private static DDAgentApi createApi(final Config config) { diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 23e3e691ef..46f65e7c0e 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -44,7 +44,7 @@ class DDAgentWriterTest extends DDSpecification { def "test happy path"() { setup: - def writer = new DDAgentWriter(api, 2, -1) + def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(2).flushFrequencySeconds(-1).build() writer.start() when: @@ -72,7 +72,7 @@ class DDAgentWriterTest extends DDSpecification { def "test flood of traces"() { setup: - def writer = new DDAgentWriter(api, disruptorSize, -1) + def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(disruptorSize).flushFrequencySeconds(-1).build() writer.start() when: @@ -97,7 +97,7 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by size"() { setup: - def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) + def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build() writer.start() when: @@ -137,7 +137,7 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by time"() { setup: - def writer = new DDAgentWriter(api, monitor) + def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build() writer.start() when: @@ -167,7 +167,7 @@ class DDAgentWriterTest extends DDSpecification { def "test default buffer size"() { setup: - def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) + def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build() writer.start() when: @@ -212,7 +212,7 @@ class DDAgentWriterTest extends DDSpecification { def "check that are no interactions after close"() { setup: - def writer = new DDAgentWriter(api, monitor) + def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build() writer.start() when: @@ -230,7 +230,7 @@ class DDAgentWriterTest extends DDSpecification { def "check shutdown if batchWritingDisruptor stopped first"() { setup: - def writer = new DDAgentWriter(api, monitor) + def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build() writer.start() writer.batchWritingDisruptor.close() @@ -283,7 +283,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) + def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build() when: writer.start() @@ -330,7 +330,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) + def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build() when: writer.start() @@ -371,7 +371,7 @@ class DDAgentWriterTest extends DDSpecification { return DDAgentApi.Response.failed(new IOException("comm error")) } } - def writer = new DDAgentWriter(api, monitor) + def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build() when: writer.start() @@ -446,7 +446,7 @@ class DDAgentWriterTest extends DDSpecification { } } - def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()) + def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build() writer.start() // gate responses @@ -544,7 +544,7 @@ class DDAgentWriterTest extends DDSpecification { } } - def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) + def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build() writer.start() when: @@ -604,7 +604,7 @@ class DDAgentWriterTest extends DDSpecification { } def monitor = new Monitor.StatsD(statsd) - def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) + def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build() writer.start() when: @@ -652,7 +652,7 @@ class DDAgentWriterTest extends DDSpecification { } def monitor = new Monitor.StatsD(statsd) - def writer = new DDAgentWriter(api, monitor) + def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build() writer.start() when: