Add documentation, remove volatile/public, improve test reliability.

This commit is contained in:
Tyler Benson 2020-01-07 10:07:23 -08:00
parent 451fba256a
commit 5ff855737b
6 changed files with 67 additions and 31 deletions

View File

@ -16,11 +16,18 @@ import lombok.Value;
import lombok.extern.slf4j.Slf4j;
/**
* This writer buffers traces and sends them to the provided DDApi instance.
* This writer buffers traces and sends them to the provided DDApi instance. Buffering is done with
* a distruptor to limit blocking the application threads. Internally, the trace is serialized and
* put onto a separate disruptor that does block to decouple the CPU intensive from the IO bound
* threads.
*
* <p>Written traces are passed off to a disruptor so as to avoid blocking the application's thread.
* If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the
* threshold will be counted and sampled.
* <p>[Application] -> [trace processing buffer] -> [serialized trace batching buffer] -> [dd-agent]
*
* <p>Note: the first buffer is non-blocking and will discard if full, the second is blocking and
* will cause back pressure on the trace processing (serializing) thread.
*
* <p>If the buffer is filled traces are discarded before serializing. Once serialized every effort
* is made to keep, to avoid wasting the serialization effort.
*/
@Slf4j
public class DDAgentWriter implements Writer {
@ -38,8 +45,8 @@ public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private final DDAgentApi api;
public final TraceProcessingDisruptor traceProcessingDisruptor;
public final BatchWritingDisruptor batchWritingDisruptor;
private final TraceProcessingDisruptor traceProcessingDisruptor;
private final BatchWritingDisruptor batchWritingDisruptor;
private final AtomicInteger traceCount = new AtomicInteger(0);
@ -147,10 +154,14 @@ public class DDAgentWriter implements Writer {
@Override
public void close() {
monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0)));
final boolean flushSuccess = traceProcessingDisruptor.flush(traceCount.getAndSet(0));
try {
traceProcessingDisruptor.close();
} finally { // in case first close fails.
batchWritingDisruptor.close();
}
monitor.onShutdown(this, flushSuccess);
}
@Override
public String toString() {

View File

@ -11,7 +11,7 @@ import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractDisruptor<T> implements Closeable {
abstract class AbstractDisruptor<T> implements Closeable {
protected final Disruptor<DisruptorEvent<T>> disruptor;
@ -46,6 +46,13 @@ public abstract class AbstractDisruptor<T> implements Closeable {
disruptor.shutdown();
}
/**
* Allows the underlying publish to be defined as a blocking or non blocking call.
*
* @param data
* @param representativeCount
* @return
*/
public abstract boolean publish(final T data, int representativeCount);
/**

View File

@ -12,10 +12,16 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes serialized traces and batches them into appropriately sized requests.
*
* <p>publishing to the buffer will block if the buffer is full.
*/
@Slf4j
public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
// TODO: move executor to tracer for sharing with other tasks.
private final ScheduledExecutorService heartbeatExecutor =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat"));
@ -53,6 +59,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
@Override
public boolean publish(final byte[] data, final int representativeCount) {
// blocking call to ensure serialized traces aren't discarded and apply back pressure.
disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount);
return true;
}
@ -81,6 +88,7 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
this.writer = writer;
}
// TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning.
@Override
public void onEvent(
final DisruptorEvent<byte[]> event, final long sequence, final boolean endOfBatch) {
@ -111,11 +119,12 @@ public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
return;
}
monitor.onFlush(writer, early);
// TODO add retry and rate limiting
final DDAgentApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces);
monitor.onFlush(writer, early);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", serializedTraces.size());

View File

@ -6,11 +6,12 @@ import com.lmax.disruptor.EventTranslatorTwoArg;
import java.util.concurrent.CountDownLatch;
class DisruptorEvent<T> {
public volatile T data = null;
public volatile int representativeCount = 0;
public volatile CountDownLatch flushLatch = null;
// Memory ordering enforced by disruptor's memory fences, so volatile not required.
T data = null;
int representativeCount = 0;
CountDownLatch flushLatch = null;
public void reset() {
void reset() {
data = null;
representativeCount = 0;
flushLatch = null;

View File

@ -9,6 +9,13 @@ import java.util.List;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes completed traces and applies processing to them. Upon completion, the
* serialized trace is published to {@link BatchWritingDisruptor}.
*
* <p>publishing to the buffer will not block the calling thread, but instead will return false if
* the buffer is full. This is to avoid impacting an application thread.
*/
@Slf4j
public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
@ -58,8 +65,8 @@ public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
if (event.data != null) {
try {
final byte[] serializedTrace = api.serializeTrace(event.data);
monitor.onSerialize(writer, event.data, serializedTrace);
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
monitor.onSerialize(writer, event.data, serializedTrace);
event.representativeCount = 0; // reset in case flush is invoked below.
} catch (final JsonProcessingException e) {
log.debug("Error serializing trace", e);

View File

@ -11,6 +11,7 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import spock.lang.Retry
import spock.lang.Timeout
import java.util.concurrent.Phaser
@ -396,6 +397,8 @@ class DDAgentWriterTest extends DDSpecification {
1 * monitor.onShutdown(writer, true)
}
@Retry(delay = 10)
// if execution is too slow, the http client timeout may trigger.
def "slow response test"() {
def numWritten = 0
def numFlushes = new AtomicInteger(0)
@ -407,7 +410,6 @@ class DDAgentWriterTest extends DDSpecification {
def responseSemaphore = new Semaphore(1)
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def agent = httpServer {
@ -445,9 +447,6 @@ class DDAgentWriterTest extends DDSpecification {
}
}
// sender queue is sized in requests -- not traces
def bufferSize = 32
def senderQueueSize = 2
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build())
writer.start()
@ -477,13 +476,10 @@ class DDAgentWriterTest extends DDSpecification {
when:
// send many traces to fill the sender queue...
// loop until outstanding requests > finished requests
while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) {
// chunk the loop & wait to allow for flushing to send queue
(1..1_000).each {
while (writer.traceProcessingDisruptor.disruptorRemainingCapacity + writer.batchWritingDisruptor.disruptorRemainingCapacity > 0 || numFailedPublish.get() == 0) {
writer.write(minimalTrace)
numWritten += 1
}
Thread.sleep(100)
Thread.sleep(1) // Allow traces to get serialized.
}
then:
@ -494,17 +490,18 @@ class DDAgentWriterTest extends DDSpecification {
def priorNumFailed = numFailedPublish.get()
// with both disruptor & queue full, should reject everything
def expectedRejects = 100_000
def expectedRejects = 100
(1..expectedRejects).each {
writer.write(minimalTrace)
numWritten += 1
}
then:
// If the in-flight requests timeouts and frees up a slot in the sending queue, then
// many of traces will be accepted and batched into a new failing request.
// If the in-flight request times out (we don't currently retry),
// then a new batch will begin processing and many of traces will
// be accepted and batched into a new failing request.
// In that case, the reject number will be low.
numFailedPublish.get() - priorNumFailed > expectedRejects * 0.40
numFailedPublish.get() - priorNumFailed >= expectedRejects * 0.80
numPublished.get() + numFailedPublish.get() == numWritten
cleanup:
@ -512,6 +509,10 @@ class DDAgentWriterTest extends DDSpecification {
writer.close()
agent.close()
where:
bufferSize = 16
minimalTrace = createMinimalTrace()
}
def "multi threaded"() {