From 5ff855737b4fca280519f5fee0750a075b94f052 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Tue, 7 Jan 2020 10:07:23 -0800 Subject: [PATCH] Add documentation, remove volatile/public, improve test reliability. --- .../trace/common/writer/DDAgentWriter.java | 29 +++++++++++------ .../writer/ddagent/AbstractDisruptor.java | 9 +++++- .../writer/ddagent/BatchWritingDisruptor.java | 11 ++++++- .../common/writer/ddagent/DisruptorEvent.java | 9 +++--- .../ddagent/TraceProcessingDisruptor.java | 9 +++++- .../trace/api/writer/DDAgentWriterTest.groovy | 31 ++++++++++--------- 6 files changed, 67 insertions(+), 31 deletions(-) diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 965976d4ba..cf725b365b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -16,11 +16,18 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; /** - * This writer buffers traces and sends them to the provided DDApi instance. + * This writer buffers traces and sends them to the provided DDApi instance. Buffering is done with + * a distruptor to limit blocking the application threads. Internally, the trace is serialized and + * put onto a separate disruptor that does block to decouple the CPU intensive from the IO bound + * threads. * - *

Written traces are passed off to a disruptor so as to avoid blocking the application's thread. - * If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the - * threshold will be counted and sampled. + *

[Application] -> [trace processing buffer] -> [serialized trace batching buffer] -> [dd-agent] + * + *

Note: the first buffer is non-blocking and will discard if full, the second is blocking and + * will cause back pressure on the trace processing (serializing) thread. + * + *

If the buffer is filled traces are discarded before serializing. Once serialized every effort + * is made to keep, to avoid wasting the serialization effort. */ @Slf4j public class DDAgentWriter implements Writer { @@ -38,8 +45,8 @@ public class DDAgentWriter implements Writer { private static final int DISRUPTOR_BUFFER_SIZE = 1024; private final DDAgentApi api; - public final TraceProcessingDisruptor traceProcessingDisruptor; - public final BatchWritingDisruptor batchWritingDisruptor; + private final TraceProcessingDisruptor traceProcessingDisruptor; + private final BatchWritingDisruptor batchWritingDisruptor; private final AtomicInteger traceCount = new AtomicInteger(0); @@ -147,9 +154,13 @@ public class DDAgentWriter implements Writer { @Override public void close() { - monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0))); - traceProcessingDisruptor.close(); - batchWritingDisruptor.close(); + final boolean flushSuccess = traceProcessingDisruptor.flush(traceCount.getAndSet(0)); + try { + traceProcessingDisruptor.close(); + } finally { // in case first close fails. + batchWritingDisruptor.close(); + } + monitor.onShutdown(this, flushSuccess); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java index 2bff4028ac..86c7fca315 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/AbstractDisruptor.java @@ -11,7 +11,7 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @Slf4j -public abstract class AbstractDisruptor implements Closeable { +abstract class AbstractDisruptor implements Closeable { protected final Disruptor> disruptor; @@ -46,6 +46,13 @@ public abstract class AbstractDisruptor implements Closeable { disruptor.shutdown(); } + /** + * Allows the underlying publish to be defined as a blocking or non blocking call. + * + * @param data + * @param representativeCount + * @return + */ public abstract boolean publish(final T data, int representativeCount); /** diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java index fc08591548..b02f2d2e25 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/BatchWritingDisruptor.java @@ -12,10 +12,16 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +/** + * Disruptor that takes serialized traces and batches them into appropriately sized requests. + * + *

publishing to the buffer will block if the buffer is full. + */ @Slf4j public class BatchWritingDisruptor extends AbstractDisruptor { private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + // TODO: move executor to tracer for sharing with other tasks. private final ScheduledExecutorService heartbeatExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat")); @@ -53,6 +59,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor { @Override public boolean publish(final byte[] data, final int representativeCount) { + // blocking call to ensure serialized traces aren't discarded and apply back pressure. disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount); return true; } @@ -81,6 +88,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor { this.writer = writer; } + // TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning. @Override public void onEvent( final DisruptorEvent event, final long sequence, final boolean endOfBatch) { @@ -111,11 +119,12 @@ public class BatchWritingDisruptor extends AbstractDisruptor { return; } - monitor.onFlush(writer, early); // TODO add retry and rate limiting final DDAgentApi.Response response = api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces); + monitor.onFlush(writer, early); + if (response.success()) { log.debug("Successfully sent {} traces to the API", serializedTraces.size()); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java index 8e754fa3fe..65cb6db45c 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DisruptorEvent.java @@ -6,11 +6,12 @@ import com.lmax.disruptor.EventTranslatorTwoArg; import java.util.concurrent.CountDownLatch; class DisruptorEvent { - public volatile T data = null; - public volatile int representativeCount = 0; - public volatile CountDownLatch flushLatch = null; + // Memory ordering enforced by disruptor's memory fences, so volatile not required. + T data = null; + int representativeCount = 0; + CountDownLatch flushLatch = null; - public void reset() { + void reset() { data = null; representativeCount = 0; flushLatch = null; diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java index 0088227be7..e0cdf3dc37 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -9,6 +9,13 @@ import java.util.List; import java.util.concurrent.ThreadFactory; import lombok.extern.slf4j.Slf4j; +/** + * Disruptor that takes completed traces and applies processing to them. Upon completion, the + * serialized trace is published to {@link BatchWritingDisruptor}. + * + *

publishing to the buffer will not block the calling thread, but instead will return false if + * the buffer is full. This is to avoid impacting an application thread. + */ @Slf4j public class TraceProcessingDisruptor extends AbstractDisruptor> { @@ -58,8 +65,8 @@ public class TraceProcessingDisruptor extends AbstractDisruptor> { if (event.data != null) { try { final byte[] serializedTrace = api.serializeTrace(event.data); - monitor.onSerialize(writer, event.data, serializedTrace); batchWritingDisruptor.publish(serializedTrace, event.representativeCount); + monitor.onSerialize(writer, event.data, serializedTrace); event.representativeCount = 0; // reset in case flush is invoked below. } catch (final JsonProcessingException e) { log.debug("Error serializing trace", e); diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index d954fe6594..e05a99dd38 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -11,6 +11,7 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.util.test.DDSpecification +import spock.lang.Retry import spock.lang.Timeout import java.util.concurrent.Phaser @@ -396,6 +397,8 @@ class DDAgentWriterTest extends DDSpecification { 1 * monitor.onShutdown(writer, true) } + @Retry(delay = 10) + // if execution is too slow, the http client timeout may trigger. def "slow response test"() { def numWritten = 0 def numFlushes = new AtomicInteger(0) @@ -407,7 +410,6 @@ class DDAgentWriterTest extends DDSpecification { def responseSemaphore = new Semaphore(1) setup: - def minimalTrace = createMinimalTrace() // Need to set-up a dummy agent for the final send callback to work def agent = httpServer { @@ -445,9 +447,6 @@ class DDAgentWriterTest extends DDSpecification { } } - // sender queue is sized in requests -- not traces - def bufferSize = 32 - def senderQueueSize = 2 def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()) writer.start() @@ -477,13 +476,10 @@ class DDAgentWriterTest extends DDSpecification { when: // send many traces to fill the sender queue... // loop until outstanding requests > finished requests - while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) { - // chunk the loop & wait to allow for flushing to send queue - (1..1_000).each { - writer.write(minimalTrace) - numWritten += 1 - } - Thread.sleep(100) + while (writer.traceProcessingDisruptor.disruptorRemainingCapacity + writer.batchWritingDisruptor.disruptorRemainingCapacity > 0 || numFailedPublish.get() == 0) { + writer.write(minimalTrace) + numWritten += 1 + Thread.sleep(1) // Allow traces to get serialized. } then: @@ -494,17 +490,18 @@ class DDAgentWriterTest extends DDSpecification { def priorNumFailed = numFailedPublish.get() // with both disruptor & queue full, should reject everything - def expectedRejects = 100_000 + def expectedRejects = 100 (1..expectedRejects).each { writer.write(minimalTrace) numWritten += 1 } then: - // If the in-flight requests timeouts and frees up a slot in the sending queue, then - // many of traces will be accepted and batched into a new failing request. + // If the in-flight request times out (we don't currently retry), + // then a new batch will begin processing and many of traces will + // be accepted and batched into a new failing request. // In that case, the reject number will be low. - numFailedPublish.get() - priorNumFailed > expectedRejects * 0.40 + numFailedPublish.get() - priorNumFailed >= expectedRejects * 0.80 numPublished.get() + numFailedPublish.get() == numWritten cleanup: @@ -512,6 +509,10 @@ class DDAgentWriterTest extends DDSpecification { writer.close() agent.close() + + where: + bufferSize = 16 + minimalTrace = createMinimalTrace() } def "multi threaded"() {