diff --git a/dd-trace-ot/dd-trace-ot.gradle b/dd-trace-ot/dd-trace-ot.gradle index f943f74e64..96f9a4bd6d 100644 --- a/dd-trace-ot/dd-trace-ot.gradle +++ b/dd-trace-ot/dd-trace-ot.gradle @@ -12,6 +12,8 @@ minimumInstructionCoverage = 0.6 excludedClassesCoverage += [ 'datadog.trace.common.writer.ListWriter', 'datadog.trace.common.writer.LoggingWriter', + 'datadog.trace.common.writer.DDAgentWriter.Spec', + 'datadog.trace.common.writer.DDAgentWriter.Spec.SpecBuilder', 'datadog.trace.common.sampling.PrioritySampling', // This code is copied from okHttp samples and we have integration tests to verify that it works. 'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket', diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index c63fe9ff8f..965976d4ba 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -3,21 +3,16 @@ package datadog.trace.common.writer; import static datadog.trace.api.Config.DEFAULT_AGENT_HOST; import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; -import static java.util.concurrent.TimeUnit.SECONDS; import datadog.opentracing.DDSpan; -import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.ddagent.BatchWritingDisruptor; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.common.writer.ddagent.Monitor; -import datadog.trace.common.writer.ddagent.TraceConsumer; -import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor; +import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.Phaser; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Value; import lombok.extern.slf4j.Slf4j; /** @@ -29,86 +24,65 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class DDAgentWriter implements Writer { - private static final int DISRUPTOR_BUFFER_SIZE = 1024; - private static final int SENDER_QUEUE_SIZE = 16; - private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second + @Value + @lombok.Builder + public static class Spec { + @lombok.Builder.Default public String agentHost = DEFAULT_AGENT_HOST; + @lombok.Builder.Default public int traceAgentPort = DEFAULT_TRACE_AGENT_PORT; + @lombok.Builder.Default public String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; + @lombok.Builder.Default public int traceBufferSize = DISRUPTOR_BUFFER_SIZE; + @lombok.Builder.Default public Monitor monitor = new Monitor.Noop(); + @lombok.Builder.Default public int flushFrequencySeconds = 1; + } - private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = - new DaemonThreadFactory("dd-trace-writer"); + private static final int DISRUPTOR_BUFFER_SIZE = 1024; private final DDAgentApi api; - public final int flushFrequencySeconds; - public final TraceSerializingDisruptor disruptor; + public final TraceProcessingDisruptor traceProcessingDisruptor; + public final BatchWritingDisruptor batchWritingDisruptor; - public final ScheduledExecutorService scheduledWriterExecutor; private final AtomicInteger traceCount = new AtomicInteger(0); - public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing; public final Monitor monitor; public DDAgentWriter() { - this( - new DDAgentApi( - DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET), - new Monitor.Noop()); + this(Spec.builder().build()); } + public DDAgentWriter(final Spec spec) { + api = new DDAgentApi(spec.agentHost, spec.traceAgentPort, spec.unixDomainSocket); + monitor = spec.monitor; + + batchWritingDisruptor = + new BatchWritingDisruptor( + spec.traceBufferSize, spec.flushFrequencySeconds, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor( + spec.traceBufferSize, api, batchWritingDisruptor, monitor, this); + } + + @Deprecated public DDAgentWriter(final DDAgentApi api, final Monitor monitor) { - this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY); - } - - /** Old signature (pre-Monitor) used in tests */ - private DDAgentWriter(final DDAgentApi api) { - this(api, new Monitor.Noop()); - } - - /** - * Used in the tests. - * - * @param api - * @param disruptorSize Rounded up to next power of 2 - * @param flushFrequencySeconds value < 1 disables scheduled flushes - */ - private DDAgentWriter( - final DDAgentApi api, - final int disruptorSize, - final int senderQueueSize, - final int flushFrequencySeconds) { - this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds); - } - - // DQH - TODO - Update the tests & remove this - private DDAgentWriter( - final DDAgentApi api, - final Monitor monitor, - final int disruptorSize, - final int flushFrequencySeconds) { - this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); - } - - // DQH - TODO - Update the tests & remove this - private DDAgentWriter( - final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { - this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds); - } - - private DDAgentWriter( - final DDAgentApi api, - final Monitor monitor, - final int disruptorSize, - final int senderQueueSize, - final int flushFrequencySeconds) { this.api = api; this.monitor = monitor; - disruptor = - new TraceSerializingDisruptor( - disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this)); + batchWritingDisruptor = new BatchWritingDisruptor(DISRUPTOR_BUFFER_SIZE, 1, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor( + DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this); + } - this.flushFrequencySeconds = flushFrequencySeconds; - scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); + @Deprecated + // DQH - TODO - Update the tests & remove this + private DDAgentWriter( + final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) { + this.api = api; + monitor = new Monitor.Noop(); - apiPhaser.register(); // Register on behalf of the scheduled executor thread. + batchWritingDisruptor = + new BatchWritingDisruptor(disruptorSize, flushFrequencySeconds, api, monitor, this); + traceProcessingDisruptor = + new TraceProcessingDisruptor(disruptorSize, api, batchWritingDisruptor, monitor, this); } public void addResponseListener(final DDAgentResponseListener listener) { @@ -117,7 +91,7 @@ public class DDAgentWriter implements Writer { // Exposing some statistics for consumption by monitors public final long getDisruptorCapacity() { - return disruptor.getDisruptorCapacity(); + return traceProcessingDisruptor.getDisruptorCapacity(); } public final long getDisruptorUtilizedCapacity() { @@ -125,20 +99,21 @@ public class DDAgentWriter implements Writer { } public final long getDisruptorRemainingCapacity() { - return disruptor.getDisruptorRemainingCapacity(); + return traceProcessingDisruptor.getDisruptorRemainingCapacity(); } @Override public void write(final List trace) { // We can't add events after shutdown otherwise it will never complete shutting down. - if (disruptor.running) { - final boolean published = disruptor.tryPublish(trace); + if (traceProcessingDisruptor.running) { + final int representativeCount = traceCount.getAndSet(0) + 1; + final boolean published = traceProcessingDisruptor.publish(trace, representativeCount); if (published) { monitor.onPublish(DDAgentWriter.this, trace); } else { // We're discarding the trace, but we still want to count it. - traceCount.incrementAndGet(); + traceCount.addAndGet(representativeCount); log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace); monitor.onFailedPublish(this, trace); @@ -150,6 +125,10 @@ public class DDAgentWriter implements Writer { } } + public boolean flush() { + return traceProcessingDisruptor.flush(traceCount.getAndSet(0)); + } + @Override public void incrementTraceCount() { traceCount.incrementAndGet(); @@ -161,32 +140,16 @@ public class DDAgentWriter implements Writer { @Override public void start() { - disruptor.start(); - + batchWritingDisruptor.start(); + traceProcessingDisruptor.start(); monitor.onStart(this); } @Override public void close() { - - boolean flushSuccess = true; - - // We have to shutdown scheduled executor first to make sure no flush events issued after - // disruptor has been shutdown. - // Otherwise those events will never be processed and flush call will wait forever. - scheduledWriterExecutor.shutdown(); - try { - scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS); - } catch (final InterruptedException e) { - log.warn("Waiting for flush executor shutdown interrupted.", e); - - flushSuccess = false; - } - flushSuccess |= disruptor.flush(); - - disruptor.close(); - - monitor.onShutdown(this, flushSuccess); + monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0))); + traceProcessingDisruptor.close(); + batchWritingDisruptor.close(); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index b14cbfdd23..51e0e85c4b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -64,6 +64,7 @@ public interface Writer extends Closeable { } private static Writer createAgentWriter(final Config config) { + // TODO: switch to using DDAgentWriter.Spec constructor... return new DDAgentWriter(createApi(config), createMonitor(config)); } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java new file mode 100644 index 0000000000..2bff4028ac --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java @@ -0,0 +1,89 @@ +package datadog.trace.common.writer.ddagent; + +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class AbstractDisruptor implements Closeable { + + protected final Disruptor> disruptor; + + public volatile boolean running = false; + + protected final DisruptorEvent.FlushTranslator flushTranslator = + new DisruptorEvent.FlushTranslator<>(); + protected final DisruptorEvent.DataTranslator dataTranslator = + new DisruptorEvent.DataTranslator<>(); + + public AbstractDisruptor(final int disruptorSize, final EventHandler> handler) { + disruptor = + new Disruptor<>( + new DisruptorEvent.Factory(), + Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 + getThreadFactory(), + ProducerType.MULTI, + new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); + disruptor.handleEventsWith(handler); + } + + protected abstract ThreadFactory getThreadFactory(); + + public void start() { + disruptor.start(); + running = true; + } + + @Override + public void close() { + running = false; + disruptor.shutdown(); + } + + public abstract boolean publish(final T data, int representativeCount); + + /** + * This method will block until the flush is complete. + * + * @param traceCount - number of unreported traces to include in this batch. + */ + public boolean flush(final int traceCount) { + if (running) { + return flush(traceCount, new CountDownLatch(1)); + } else { + return false; + } + } + + /** This method will block until the flush is complete. */ + protected boolean flush(final int traceCount, final CountDownLatch latch) { + log.info("Flushing any remaining traces."); + disruptor.publishEvent(flushTranslator, traceCount, latch); + try { + latch.await(); + return true; + } catch (final InterruptedException e) { + log.warn("Waiting for flush interrupted.", e); + return false; + } + } + + // Exposing some statistics for consumption by monitors + public final long getDisruptorCapacity() { + return disruptor.getRingBuffer().getBufferSize(); + } + + public final long getDisruptorRemainingCapacity() { + return disruptor.getRingBuffer().remainingCapacity(); + } + + public final long getCurrentCount() { + return disruptor.getCursor() - disruptor.getRingBuffer().getMinimumGatingSequence(); + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java new file mode 100644 index 0000000000..fc08591548 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java @@ -0,0 +1,162 @@ +package datadog.trace.common.writer.ddagent; + +import com.lmax.disruptor.EventHandler; +import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.DDAgentWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BatchWritingDisruptor extends AbstractDisruptor { + private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + + private final ScheduledExecutorService heartbeatExecutor = + Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat")); + + private final DisruptorEvent.HeartbeatTranslator heartbeatTranslator = + new DisruptorEvent.HeartbeatTranslator(); + + public BatchWritingDisruptor( + final int disruptorSize, + final int flushFrequencySeconds, + final DDAgentApi api, + final Monitor monitor, + final DDAgentWriter writer) { + super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer)); + + if (0 < flushFrequencySeconds) { + // This provides a steady stream of events to enable flushing with a low throughput. + final Runnable heartbeat = + new Runnable() { + @Override + public void run() { + // Only add if the buffer is empty. + if (running && getCurrentCount() == 0) { + disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator); + } + } + }; + heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS); + } + } + + @Override + protected ThreadFactory getThreadFactory() { + return new DaemonThreadFactory("dd-trace-writer"); + } + + @Override + public boolean publish(final byte[] data, final int representativeCount) { + disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount); + return true; + } + + // Intentionally not thread safe. + private static class BatchWritingHandler implements EventHandler> { + + private final long flushFrequencyNanos; + private final DDAgentApi api; + private final Monitor monitor; + private final DDAgentWriter writer; + private final List serializedTraces = new ArrayList<>(); + private int representativeCount = 0; + private int sizeInBytes = 0; + private long nextScheduledFlush; + + private BatchWritingHandler( + final int flushFrequencySeconds, + final DDAgentApi api, + final Monitor monitor, + final DDAgentWriter writer) { + flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds); + scheduleNextFlush(); + this.api = api; + this.monitor = monitor; + this.writer = writer; + } + + @Override + public void onEvent( + final DisruptorEvent event, final long sequence, final boolean endOfBatch) { + try { + if (event.data != null) { + sizeInBytes += event.data.length; + serializedTraces.add(event.data); + } + + // Flush events might increase this with no data. + representativeCount += event.representativeCount; + + if (event.flushLatch != null + || FLUSH_PAYLOAD_BYTES <= sizeInBytes + || nextScheduledFlush <= System.nanoTime()) { + flush(event.flushLatch, FLUSH_PAYLOAD_BYTES <= sizeInBytes); + } + } finally { + event.reset(); + } + } + + private void flush(final CountDownLatch flushLatch, final boolean early) { + try { + if (serializedTraces.isEmpty()) { + // FIXME: this will reset representativeCount without reporting + // anything even if representativeCount > 0. + return; + } + + monitor.onFlush(writer, early); + // TODO add retry and rate limiting + final DDAgentApi.Response response = + api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces); + + if (response.success()) { + log.debug("Successfully sent {} traces to the API", serializedTraces.size()); + + monitor.onSend(writer, representativeCount, sizeInBytes, response); + } else { + log.debug( + "Failed to send {} traces (representing {}) of size {} bytes to the API", + serializedTraces.size(), + representativeCount, + sizeInBytes); + + monitor.onFailedSend(writer, representativeCount, sizeInBytes, response); + } + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); + + // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really + // shouldn't occur. + // However, just to be safe to start, create a failed Response to handle any + // spurious Throwable-s. + monitor.onFailedSend( + writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); + } finally { + serializedTraces.clear(); + sizeInBytes = 0; + representativeCount = 0; + scheduleNextFlush(); + + if (flushLatch != null) { + flushLatch.countDown(); + } + } + } + + private void scheduleNextFlush() { + // TODO: adjust this depending on responsiveness of the agent. + if (0 < flushFrequencyNanos) { + nextScheduledFlush = System.nanoTime() + flushFrequencyNanos; + } else { + nextScheduledFlush = Long.MAX_VALUE; + } + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java index 66ff7a499b..8e754fa3fe 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -2,13 +2,19 @@ package datadog.trace.common.writer.ddagent; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventTranslator; -import com.lmax.disruptor.EventTranslatorOneArg; -import datadog.opentracing.DDSpan; -import java.util.List; +import com.lmax.disruptor.EventTranslatorTwoArg; +import java.util.concurrent.CountDownLatch; class DisruptorEvent { - public volatile boolean shouldFlush = false; public volatile T data = null; + public volatile int representativeCount = 0; + public volatile CountDownLatch flushLatch = null; + + public void reset() { + data = null; + representativeCount = 0; + flushLatch = null; + } static class Factory implements EventFactory> { @Override @@ -17,25 +23,38 @@ class DisruptorEvent { } } - static class TraceTranslator - implements EventTranslatorOneArg>, List> { - static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR = - new DisruptorEvent.TraceTranslator(); + static class DataTranslator implements EventTranslatorTwoArg, T, Integer> { @Override public void translateTo( - final DisruptorEvent> event, final long sequence, final List trace) { - event.data = trace; + final DisruptorEvent event, + final long sequence, + final T data, + final Integer representativeCount) { + event.data = data; + event.representativeCount = representativeCount; } } - static class FlushTranslator implements EventTranslator>> { - static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR = - new DisruptorEvent.FlushTranslator(); + static class HeartbeatTranslator implements EventTranslator> { @Override - public void translateTo(final DisruptorEvent> event, final long sequence) { - event.shouldFlush = true; + public void translateTo(final DisruptorEvent event, final long sequence) { + return; + } + } + + static class FlushTranslator + implements EventTranslatorTwoArg, Integer, CountDownLatch> { + + @Override + public void translateTo( + final DisruptorEvent event, + final long sequence, + final Integer representativeCount, + final CountDownLatch latch) { + event.representativeCount = representativeCount; + event.flushLatch = latch; } } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java deleted file mode 100644 index 1081f4352e..0000000000 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceConsumer.java +++ /dev/null @@ -1,150 +0,0 @@ -package datadog.trace.common.writer.ddagent; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.lmax.disruptor.EventHandler; -import datadog.opentracing.DDSpan; -import datadog.trace.common.writer.DDAgentWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import lombok.extern.slf4j.Slf4j; - -/** This class is intentionally not threadsafe. */ -@Slf4j -public class TraceConsumer implements EventHandler>> { - private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB - - private final AtomicInteger traceCount; - private final Semaphore senderSemaphore; - private final DDAgentWriter writer; - - private List serializedTraces = new ArrayList<>(); - private int payloadSize = 0; - - public TraceConsumer( - final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) { - this.traceCount = traceCount; - senderSemaphore = new Semaphore(senderQueueSize); - this.writer = writer; - } - - @Override - public void onEvent( - final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { - final List trace = event.data; - event.data = null; // clear the event for reuse. - if (trace != null) { - traceCount.incrementAndGet(); - try { - final byte[] serializedTrace = writer.getApi().serializeTrace(trace); - payloadSize += serializedTrace.length; - serializedTraces.add(serializedTrace); - - writer.monitor.onSerialize(writer, trace, serializedTrace); - } catch (final JsonProcessingException e) { - log.warn("Error serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } catch (final Throwable e) { - log.debug("Error while serializing trace", e); - - writer.monitor.onFailedSerialize(writer, trace, e); - } - } - - if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) { - final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES); - - reportTraces(early); - event.shouldFlush = false; - } - } - - private void reportTraces(final boolean early) { - try { - if (serializedTraces.isEmpty()) { - writer.monitor.onFlush(writer, early); - - writer.apiPhaser.arrive(); // Allow flush to return - return; - // scheduleFlush called in finally block. - } - if (writer.scheduledWriterExecutor.isShutdown()) { - writer.monitor.onFailedSend( - writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1)); - writer.apiPhaser.arrive(); // Allow flush to return - return; - } - final List toSend = serializedTraces; - serializedTraces = new ArrayList<>(toSend.size()); - // ^ Initialize with similar size to reduce arraycopy churn. - - final int representativeCount = traceCount.getAndSet(0); - final int sizeInBytes = payloadSize; - - try { - writer.monitor.onFlush(writer, early); - - // Run the actual IO task on a different thread to avoid blocking the consumer. - try { - senderSemaphore.acquire(); - } catch (final InterruptedException e) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); - - // Finally, we'll schedule another flush - // Any threads awaiting the flush will continue to wait - return; - } - writer.scheduledWriterExecutor.execute( - new Runnable() { - @Override - public void run() { - senderSemaphore.release(); - - try { - final DDAgentApi.Response response = - writer - .getApi() - .sendSerializedTraces(representativeCount, sizeInBytes, toSend); - - if (response.success()) { - log.debug("Successfully sent {} traces to the API", toSend.size()); - - writer.monitor.onSend(writer, representativeCount, sizeInBytes, response); - } else { - log.debug( - "Failed to send {} traces (representing {}) of size {} bytes to the API", - toSend.size(), - representativeCount, - sizeInBytes); - - writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response); - } - } catch (final Throwable e) { - log.debug("Failed to send traces to the API: {}", e.getMessage()); - - // DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really - // shouldn't occur. - // However, just to be safe to start, create a failed Response to handle any - // spurious Throwable-s. - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e)); - } finally { - writer.apiPhaser.arrive(); // Flush completed. - } - } - }); - } catch (final RejectedExecutionException ex) { - writer.monitor.onFailedSend( - writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex)); - writer.apiPhaser.arrive(); // Allow flush to return - } - } finally { - payloadSize = 0; - writer.disruptor.scheduleFlush(); - } - } -} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java new file mode 100644 index 0000000000..0088227be7 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -0,0 +1,88 @@ +package datadog.trace.common.writer.ddagent; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lmax.disruptor.EventHandler; +import datadog.opentracing.DDSpan; +import datadog.trace.common.util.DaemonThreadFactory; +import datadog.trace.common.writer.DDAgentWriter; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class TraceProcessingDisruptor extends AbstractDisruptor> { + + public TraceProcessingDisruptor( + final int disruptorSize, + final DDAgentApi api, + final BatchWritingDisruptor batchWritingDisruptor, + final Monitor monitor, + final DDAgentWriter writer) { + // TODO: add config to enable control over serialization overhead. + super(disruptorSize, new TraceSerializingHandler(api, batchWritingDisruptor, monitor, writer)); + } + + @Override + protected ThreadFactory getThreadFactory() { + return new DaemonThreadFactory("dd-trace-processor"); + } + + @Override + public boolean publish(final List data, final int representativeCount) { + return disruptor.getRingBuffer().tryPublishEvent(dataTranslator, data, representativeCount); + } + + // This class is threadsafe if we want to enable more processors. + public static class TraceSerializingHandler + implements EventHandler>> { + private final DDAgentApi api; + private final BatchWritingDisruptor batchWritingDisruptor; + private final Monitor monitor; + private final DDAgentWriter writer; + + public TraceSerializingHandler( + final DDAgentApi api, + final BatchWritingDisruptor batchWritingDisruptor, + final Monitor monitor, + final DDAgentWriter writer) { + this.api = api; + this.batchWritingDisruptor = batchWritingDisruptor; + this.monitor = monitor; + this.writer = writer; + } + + @Override + public void onEvent( + final DisruptorEvent> event, final long sequence, final boolean endOfBatch) { + try { + if (event.data != null) { + try { + final byte[] serializedTrace = api.serializeTrace(event.data); + monitor.onSerialize(writer, event.data, serializedTrace); + batchWritingDisruptor.publish(serializedTrace, event.representativeCount); + event.representativeCount = 0; // reset in case flush is invoked below. + } catch (final JsonProcessingException e) { + log.debug("Error serializing trace", e); + monitor.onFailedSerialize(writer, event.data, e); + } catch (final Throwable e) { + log.debug("Error while serializing trace", e); + monitor.onFailedSerialize(writer, event.data, e); + } + } + + if (event.flushLatch != null) { + if (batchWritingDisruptor.running) { + // propagate the flush. + batchWritingDisruptor.flush(event.representativeCount, event.flushLatch); + } + if (!batchWritingDisruptor.running) { // check again to protect against race condition. + // got shutdown early somehow? + event.flushLatch.countDown(); + } + } + } finally { + event.reset(); + } + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java deleted file mode 100644 index f878cbb178..0000000000 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceSerializingDisruptor.java +++ /dev/null @@ -1,117 +0,0 @@ -package datadog.trace.common.writer.ddagent; - -import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR; -import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR; -import static java.util.concurrent.TimeUnit.SECONDS; - -import com.lmax.disruptor.SleepingWaitStrategy; -import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; -import datadog.opentracing.DDSpan; -import datadog.trace.common.util.DaemonThreadFactory; -import datadog.trace.common.writer.DDAgentWriter; -import java.io.Closeable; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class TraceSerializingDisruptor implements Closeable { - private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = - new DaemonThreadFactory("dd-trace-disruptor"); - private final FlushTask flushTask = new FlushTask(); - - private final Disruptor>> disruptor; - private final DDAgentWriter writer; - - public volatile boolean running = false; - - private final AtomicReference> flushSchedule = new AtomicReference<>(); - - public TraceSerializingDisruptor( - final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) { - disruptor = - new Disruptor<>( - new DisruptorEvent.Factory>(), - Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 - DISRUPTOR_THREAD_FACTORY, - ProducerType.MULTI, - new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); - this.writer = writer; - disruptor.handleEventsWith(handler); - } - - public void start() { - disruptor.start(); - running = true; - scheduleFlush(); - } - - @Override - public void close() { - running = false; - disruptor.shutdown(); - } - - public boolean tryPublish(final List trace) { - return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace); - } - - /** This method will block until the flush is complete. */ - public boolean flush() { - if (running) { - log.info("Flushing any remaining traces."); - // Register with the phaser so we can block until the flush completion. - writer.apiPhaser.register(); - disruptor.publishEvent(FLUSH_TRANSLATOR); - try { - // Allow thread to be interrupted. - writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister()); - - return true; - } catch (final InterruptedException e) { - log.warn("Waiting for flush interrupted.", e); - - return false; - } - } else { - return false; - } - } - - public void scheduleFlush() { - if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) { - final ScheduledFuture previous = - flushSchedule.getAndSet( - writer.scheduledWriterExecutor.schedule( - flushTask, writer.flushFrequencySeconds, SECONDS)); - - final boolean previousIncomplete = (previous != null); - if (previousIncomplete) { - previous.cancel(true); - } - - writer.monitor.onScheduleFlush(writer, previousIncomplete); - } - } - - private class FlushTask implements Runnable { - @Override - public void run() { - // Don't call flush() because it would block the thread also used for sending the traces. - disruptor.publishEvent(FLUSH_TRANSLATOR); - } - } - - // Exposing some statistics for consumption by monitors - public final long getDisruptorCapacity() { - return disruptor.getRingBuffer().getBufferSize(); - } - - public final long getDisruptorRemainingCapacity() { - return disruptor.getRingBuffer().remainingCapacity(); - } -} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 37de61bf23..d954fe6594 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -7,12 +7,13 @@ import datadog.opentracing.DDTracer import datadog.opentracing.PendingTrace import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter +import datadog.trace.common.writer.ddagent.BatchWritingDisruptor import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor -import datadog.trace.common.writer.ddagent.TraceConsumer import datadog.trace.util.test.DDSpecification import spock.lang.Timeout +import java.util.concurrent.Phaser import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -24,21 +25,41 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE @Timeout(20) class DDAgentWriterTest extends DDSpecification { - def api = Mock(DDAgentApi) + def phaser = new Phaser() + def api = Mock(DDAgentApi) { + // Define the following response in the spec: +// sendSerializedTraces(_, _, _) >> { +// phaser.arrive() +// return DDAgentApi.Response.success(200) +// } + } + def monitor = Mock(Monitor) + + def setup() { + // Register for two threads. + phaser.register() + phaser.register() + } def "test happy path"() { setup: def writer = new DDAgentWriter(api, 2, -1) writer.start() + when: + writer.flush() + + then: + 0 * _ + when: writer.write(trace) writer.write(trace) - writer.disruptor.flush() + writer.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -57,11 +78,11 @@ class DDAgentWriterTest extends DDSpecification { (1..traceCount).each { writer.write(trace) } - writer.disruptor.flush() + writer.flush() then: _ * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) + 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -76,9 +97,7 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by size"() { setup: def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) - def phaser = writer.apiPhaser writer.start() - phaser.register() when: (1..6).each { @@ -90,18 +109,21 @@ class DDAgentWriterTest extends DDSpecification { then: 6 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) + 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) >> { + phaser.arrive() + return DDAgentApi.Response.success(200) + } when: (1..2).each { writer.write(trace) } // Flush the remaining 2 - writer.disruptor.flush() + writer.flush() then: 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200) 0 * _ cleanup: @@ -114,11 +136,8 @@ class DDAgentWriterTest extends DDSpecification { def "test flush by time"() { setup: - def writer = new DDAgentWriter(api) - def phaser = writer.apiPhaser - phaser.register() + def writer = new DDAgentWriter(api, monitor) writer.start() - writer.disruptor.flush() when: (1..5).each { @@ -128,7 +147,14 @@ class DDAgentWriterTest extends DDSpecification { then: 5 * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) + 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> { + phaser.arrive() + return DDAgentApi.Response.success(200) + } + 5 * monitor.onPublish(_, _) + 5 * monitor.onSerialize(_, _, _) + 1 * monitor.onFlush(_, _) + (0..1) * monitor.onSend(_, _, _, _) // This gets called after phaser.arrive(), so there's a race condition. 0 * _ cleanup: @@ -153,11 +179,11 @@ class DDAgentWriterTest extends DDSpecification { // Busywait because we don't want to fill up the ring buffer } } - writer.disruptor.flush() + writer.flush() then: (maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() } - 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) + 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) >> DDAgentApi.Response.success(200) cleanup: writer.close() @@ -181,39 +207,43 @@ class DDAgentWriterTest extends DDSpecification { minimalSpan = new DDSpan(0, minimalContext) minimalTrace = [minimalSpan] traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length - maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 + maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } def "check that are no interactions after close"() { - setup: - def writer = new DDAgentWriter(api) + def writer = new DDAgentWriter(api, monitor) writer.start() when: writer.close() writer.write([]) - writer.disruptor.flush() + writer.flush() then: +// 2 * monitor.onFlush(_, false) + 1 * monitor.onFailedPublish(_, _) + 1 * monitor.onShutdown(_, _) 0 * _ writer.traceCount.get() == 0 } - def "check shutdown if executor stopped first"() { + def "check shutdown if batchWritingDisruptor stopped first"() { setup: - def writer = new DDAgentWriter(api) + def writer = new DDAgentWriter(api, monitor) writer.start() - writer.scheduledWriterExecutor.shutdown() + writer.batchWritingDisruptor.close() when: writer.write([]) - writer.disruptor.flush() + writer.flush() then: 1 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * monitor.onSerialize(writer, _, _) + 1 * monitor.onPublish(writer, _) 0 * _ - writer.traceCount.get() == 1 + writer.traceCount.get() == 0 cleanup: writer.close() @@ -253,9 +283,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) - def monitor = Mock(Monitor) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) when: writer.start() @@ -265,12 +293,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 }) when: @@ -302,9 +330,7 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) - def monitor = Mock(Monitor) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) when: writer.start() @@ -314,12 +340,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 }) when: @@ -345,7 +371,6 @@ class DDAgentWriterTest extends DDSpecification { return DDAgentApi.Response.failed(new IOException("comm error")) } } - def monitor = Mock(Monitor) def writer = new DDAgentWriter(api, monitor) when: @@ -356,12 +381,12 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: 1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onSerialize(writer, minimalTrace, _) - 1 * monitor.onScheduleFlush(writer, _) + 1 * monitor.onFlush(writer, _) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null }) when: @@ -400,30 +425,30 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(Monitor) - monitor.onPublish(_, _) >> { - numPublished.incrementAndGet() - } - monitor.onFailedPublish(_, _) >> { - numFailedPublish.incrementAndGet() - } - monitor.onFlush(_, _) >> { - numFlushes.incrementAndGet() - } - monitor.onSend(_, _, _, _) >> { - numRequests.incrementAndGet() - } - monitor.onFailedPublish(_, _, _, _) >> { - numFailedRequests.incrementAndGet() + def monitor = Stub(Monitor) { + onPublish(_, _) >> { + numPublished.incrementAndGet() + } + onFailedPublish(_, _) >> { + numFailedPublish.incrementAndGet() + } + onFlush(_, _) >> { + numFlushes.incrementAndGet() + } + onSend(_, _, _, _) >> { + numRequests.incrementAndGet() + } + onFailedPublish(_, _, _, _) >> { + numFailedRequests.incrementAndGet() + } } // sender queue is sized in requests -- not traces def bufferSize = 32 def senderQueueSize = 2 - def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()) writer.start() // gate responses @@ -438,7 +463,7 @@ class DDAgentWriterTest extends DDSpecification { // sanity check coordination mechanism of test // release to allow response to be generated responseSemaphore.release() - writer.disruptor.flush() + writer.flush() // reacquire semaphore to stall further responses responseSemaphore.acquire() @@ -505,21 +530,21 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) // This test focuses just on failed publish, so not verifying every callback - def monitor = Stub(Monitor) - monitor.onPublish(_, _) >> { - numPublished.incrementAndGet() - } - monitor.onFailedPublish(_, _) >> { - numFailedPublish.incrementAndGet() - } - monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response -> - numRepSent.addAndGet(repCount) + def monitor = Stub(Monitor) { + onPublish(_, _) >> { + numPublished.incrementAndGet() + } + onFailedPublish(_, _) >> { + numFailedPublish.incrementAndGet() + } + onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response -> + numRepSent.addAndGet(repCount) + } } - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) writer.start() when: @@ -538,7 +563,7 @@ class DDAgentWriterTest extends DDSpecification { t1.join() t2.join() - writer.disruptor.flush() + writer.flush() then: def totalTraces = 100 + 100 @@ -566,7 +591,6 @@ class DDAgentWriterTest extends DDSpecification { } } } - def api = new DDAgentApi("localhost", agent.address.port, null) def statsd = Stub(StatsDClient) statsd.incrementCounter("queue.accepted") >> { stat -> @@ -580,12 +604,12 @@ class DDAgentWriterTest extends DDSpecification { } def monitor = new Monitor.StatsD(statsd) - def writer = new DDAgentWriter(api, monitor) + def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build()) writer.start() when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: numTracesAccepted == 1 @@ -633,7 +657,7 @@ class DDAgentWriterTest extends DDSpecification { when: writer.write(minimalTrace) - writer.disruptor.flush() + writer.flush() then: numRequests == 1