Split TraceConsumer into two different disruptors (#1161)

Split TraceConsumer into two different disruptors
This commit is contained in:
Tyler Benson 2020-01-31 15:41:14 -05:00 committed by GitHub
commit 406b324a82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 612 additions and 481 deletions

View File

@ -12,6 +12,7 @@ minimumInstructionCoverage = 0.6
excludedClassesCoverage += [ excludedClassesCoverage += [
'datadog.trace.common.writer.ListWriter', 'datadog.trace.common.writer.ListWriter',
'datadog.trace.common.writer.LoggingWriter', 'datadog.trace.common.writer.LoggingWriter',
'datadog.trace.common.writer.DDAgentWriter.DDAgentWriterBuilder',
'datadog.trace.common.sampling.PrioritySampling', 'datadog.trace.common.sampling.PrioritySampling',
// This code is copied from okHttp samples and we have integration tests to verify that it works. // This code is copied from okHttp samples and we have integration tests to verify that it works.
'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket', 'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket',

View File

@ -44,6 +44,7 @@ import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -96,18 +97,18 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
private final HttpCodec.Injector injector; private final HttpCodec.Injector injector;
private final HttpCodec.Extractor extractor; private final HttpCodec.Extractor extractor;
public static class Builder { public static class DDTracerBuilder {
public Builder() { public DDTracerBuilder() {
// Apply the default values from config. // Apply the default values from config.
config(Config.get()); config(Config.get());
} }
public Builder withProperties(final Properties properties) { public DDTracerBuilder withProperties(final Properties properties) {
return config(Config.get(properties)); return config(Config.get(properties));
} }
public Builder config(final Config config) { public DDTracerBuilder config(final Config config) {
this.config = config; this.config = config;
serviceName(config.getServiceName()); serviceName(config.getServiceName());
// Explicitly skip setting writer to avoid allocating resources prematurely. // Explicitly skip setting writer to avoid allocating resources prematurely.
@ -267,7 +268,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
partialFlushMinSpans); partialFlushMinSpans);
} }
@lombok.Builder(builderClassName = "Builder") @Builder
// These field names must be stable to ensure the builder api is stable. // These field names must be stable to ensure the builder api is stable.
private DDTracer( private DDTracer(
final Config config, final Config config,

View File

@ -3,49 +3,55 @@ package datadog.trace.common.writer;
import static datadog.trace.api.Config.DEFAULT_AGENT_HOST; import static datadog.trace.api.Config.DEFAULT_AGENT_HOST;
import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory; import datadog.trace.common.writer.ddagent.BatchWritingDisruptor;
import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import datadog.trace.common.writer.ddagent.Monitor; import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.TraceConsumer; import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor;
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
import java.util.List; import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
* This writer buffers traces and sends them to the provided DDApi instance. * This writer buffers traces and sends them to the provided DDApi instance. Buffering is done with
* a distruptor to limit blocking the application threads. Internally, the trace is serialized and
* put onto a separate disruptor that does block to decouple the CPU intensive from the IO bound
* threads.
* *
* <p>Written traces are passed off to a disruptor so as to avoid blocking the application's thread. * <p>[Application] -> [trace processing buffer] -> [serialized trace batching buffer] -> [dd-agent]
* If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the *
* threshold will be counted and sampled. * <p>Note: the first buffer is non-blocking and will discard if full, the second is blocking and
* will cause back pressure on the trace processing (serializing) thread.
*
* <p>If the buffer is filled traces are discarded before serializing. Once serialized every effort
* is made to keep, to avoid wasting the serialization effort.
*/ */
@Slf4j @Slf4j
public class DDAgentWriter implements Writer { public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = private static final int DISRUPTOR_BUFFER_SIZE = 1024;
new DaemonThreadFactory("dd-trace-writer");
private final DDAgentApi api; private final DDAgentApi api;
public final int flushFrequencySeconds; private final TraceProcessingDisruptor traceProcessingDisruptor;
public final TraceSerializingDisruptor disruptor; private final BatchWritingDisruptor batchWritingDisruptor;
public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0); private final AtomicInteger traceCount = new AtomicInteger(0);
public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
public final Monitor monitor; public final Monitor monitor;
// Apply defaults to the class generated by lombok.
public static class DDAgentWriterBuilder {
String agentHost = DEFAULT_AGENT_HOST;
int traceAgentPort = DEFAULT_TRACE_AGENT_PORT;
String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
int traceBufferSize = DISRUPTOR_BUFFER_SIZE;
Monitor monitor = new Monitor.Noop();
int flushFrequencySeconds = 1;
}
@Deprecated
public DDAgentWriter() { public DDAgentWriter() {
this( this(
new DDAgentApi( new DDAgentApi(
@ -53,62 +59,38 @@ public class DDAgentWriter implements Writer {
new Monitor.Noop()); new Monitor.Noop());
} }
@Deprecated
public DDAgentWriter(final DDAgentApi api, final Monitor monitor) { public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
private DDAgentWriter(final DDAgentApi api) {
this(api, new Monitor.Noop());
}
/**
* Used in the tests.
*
* @param api
* @param disruptorSize Rounded up to next power of 2
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(
final DDAgentApi api,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this.api = api; this.api = api;
this.monitor = monitor; this.monitor = monitor;
disruptor = batchWritingDisruptor = new BatchWritingDisruptor(DISRUPTOR_BUFFER_SIZE, 1, api, monitor, this);
new TraceSerializingDisruptor( traceProcessingDisruptor =
disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this)); new TraceProcessingDisruptor(
DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this);
}
this.flushFrequencySeconds = flushFrequencySeconds; @lombok.Builder
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); // These field names must be stable to ensure the builder api is stable.
private DDAgentWriter(
final DDAgentApi agentApi,
final String agentHost,
final int traceAgentPort,
final String unixDomainSocket,
final int traceBufferSize,
final Monitor monitor,
final int flushFrequencySeconds) {
if (agentApi != null) {
api = agentApi;
} else {
api = new DDAgentApi(agentHost, traceAgentPort, unixDomainSocket);
}
this.monitor = monitor;
apiPhaser.register(); // Register on behalf of the scheduled executor thread. batchWritingDisruptor =
new BatchWritingDisruptor(traceBufferSize, flushFrequencySeconds, api, monitor, this);
traceProcessingDisruptor =
new TraceProcessingDisruptor(traceBufferSize, api, batchWritingDisruptor, monitor, this);
} }
public void addResponseListener(final DDAgentResponseListener listener) { public void addResponseListener(final DDAgentResponseListener listener) {
@ -117,7 +99,7 @@ public class DDAgentWriter implements Writer {
// Exposing some statistics for consumption by monitors // Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() { public final long getDisruptorCapacity() {
return disruptor.getDisruptorCapacity(); return traceProcessingDisruptor.getDisruptorCapacity();
} }
public final long getDisruptorUtilizedCapacity() { public final long getDisruptorUtilizedCapacity() {
@ -125,20 +107,27 @@ public class DDAgentWriter implements Writer {
} }
public final long getDisruptorRemainingCapacity() { public final long getDisruptorRemainingCapacity() {
return disruptor.getDisruptorRemainingCapacity(); return traceProcessingDisruptor.getDisruptorRemainingCapacity();
} }
@Override @Override
public void write(final List<DDSpan> trace) { public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down. // We can't add events after shutdown otherwise it will never complete shutting down.
if (disruptor.running) { if (traceProcessingDisruptor.running) {
final boolean published = disruptor.tryPublish(trace); final int representativeCount;
if (trace.isEmpty() || !(trace.get(0).isRootSpan())) {
// We don't want to reset the count if we can't correctly report the value.
representativeCount = 1;
} else {
representativeCount = traceCount.getAndSet(0) + 1;
}
final boolean published = traceProcessingDisruptor.publish(trace, representativeCount);
if (published) { if (published) {
monitor.onPublish(DDAgentWriter.this, trace); monitor.onPublish(DDAgentWriter.this, trace);
} else { } else {
// We're discarding the trace, but we still want to count it. // We're discarding the trace, but we still want to count it.
traceCount.incrementAndGet(); traceCount.addAndGet(representativeCount);
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace); log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
monitor.onFailedPublish(this, trace); monitor.onFailedPublish(this, trace);
@ -150,6 +139,10 @@ public class DDAgentWriter implements Writer {
} }
} }
public boolean flush() {
return traceProcessingDisruptor.flush(traceCount.getAndSet(0));
}
@Override @Override
public void incrementTraceCount() { public void incrementTraceCount() {
traceCount.incrementAndGet(); traceCount.incrementAndGet();
@ -161,31 +154,19 @@ public class DDAgentWriter implements Writer {
@Override @Override
public void start() { public void start() {
disruptor.start(); batchWritingDisruptor.start();
traceProcessingDisruptor.start();
monitor.onStart(this); monitor.onStart(this);
} }
@Override @Override
public void close() { public void close() {
final boolean flushSuccess = traceProcessingDisruptor.flush(traceCount.getAndSet(0));
boolean flushSuccess = true;
// We have to shutdown scheduled executor first to make sure no flush events issued after
// disruptor has been shutdown.
// Otherwise those events will never be processed and flush call will wait forever.
scheduledWriterExecutor.shutdown();
try { try {
scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS); traceProcessingDisruptor.close();
} catch (final InterruptedException e) { } finally { // in case first close fails.
log.warn("Waiting for flush executor shutdown interrupted.", e); batchWritingDisruptor.close();
flushSuccess = false;
} }
flushSuccess |= disruptor.flush();
disruptor.close();
monitor.onShutdown(this, flushSuccess); monitor.onShutdown(this, flushSuccess);
} }

View File

@ -53,7 +53,7 @@ public interface Writer extends Closeable {
} else { } else {
log.warn( log.warn(
"Writer type not configured correctly: No config provided! Defaulting to DDAgentWriter."); "Writer type not configured correctly: No config provided! Defaulting to DDAgentWriter.");
writer = new DDAgentWriter(); writer = DDAgentWriter.builder().build();
} }
return writer; return writer;
@ -64,7 +64,10 @@ public interface Writer extends Closeable {
} }
private static Writer createAgentWriter(final Config config) { private static Writer createAgentWriter(final Config config) {
return new DDAgentWriter(createApi(config), createMonitor(config)); return DDAgentWriter.builder()
.agentApi(createApi(config))
.monitor(createMonitor(config))
.build();
} }
private static DDAgentApi createApi(final Config config) { private static DDAgentApi createApi(final Config config) {

View File

@ -0,0 +1,96 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
abstract class AbstractDisruptor<T> implements Closeable {
protected final Disruptor<DisruptorEvent<T>> disruptor;
public volatile boolean running = false;
protected final DisruptorEvent.FlushTranslator<T> flushTranslator =
new DisruptorEvent.FlushTranslator<>();
protected final DisruptorEvent.DataTranslator<T> dataTranslator =
new DisruptorEvent.DataTranslator<>();
public AbstractDisruptor(final int disruptorSize, final EventHandler<DisruptorEvent<T>> handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<T>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
getThreadFactory(),
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(handler);
}
protected abstract ThreadFactory getThreadFactory();
public void start() {
disruptor.start();
running = true;
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
/**
* Allows the underlying publish to be defined as a blocking or non blocking call.
*
* @param data
* @param representativeCount
* @return
*/
public abstract boolean publish(final T data, int representativeCount);
/**
* This method will block until the flush is complete.
*
* @param traceCount - number of unreported traces to include in this batch.
*/
public boolean flush(final int traceCount) {
if (running) {
return flush(traceCount, new CountDownLatch(1));
} else {
return false;
}
}
/** This method will block until the flush is complete. */
protected boolean flush(final int traceCount, final CountDownLatch latch) {
log.info("Flushing any remaining traces.");
disruptor.publishEvent(flushTranslator, traceCount, latch);
try {
latch.await();
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
public final long getCurrentCount() {
return disruptor.getCursor() - disruptor.getRingBuffer().getMinimumGatingSequence();
}
}

View File

@ -0,0 +1,171 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes serialized traces and batches them into appropriately sized requests.
*
* <p>publishing to the buffer will block if the buffer is full.
*/
@Slf4j
public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
// TODO: move executor to tracer for sharing with other tasks.
private final ScheduledExecutorService heartbeatExecutor =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat"));
private final DisruptorEvent.HeartbeatTranslator<byte[]> heartbeatTranslator =
new DisruptorEvent.HeartbeatTranslator();
public BatchWritingDisruptor(
final int disruptorSize,
final int flushFrequencySeconds,
final DDAgentApi api,
final Monitor monitor,
final DDAgentWriter writer) {
super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer));
if (0 < flushFrequencySeconds) {
// This provides a steady stream of events to enable flushing with a low throughput.
final Runnable heartbeat =
new Runnable() {
@Override
public void run() {
// Only add if the buffer is empty.
if (running && getCurrentCount() == 0) {
disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator);
}
}
};
heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS);
}
}
@Override
protected ThreadFactory getThreadFactory() {
return new DaemonThreadFactory("dd-trace-writer");
}
@Override
public boolean publish(final byte[] data, final int representativeCount) {
// blocking call to ensure serialized traces aren't discarded and apply back pressure.
disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount);
return true;
}
// Intentionally not thread safe.
private static class BatchWritingHandler implements EventHandler<DisruptorEvent<byte[]>> {
private final long flushFrequencyNanos;
private final DDAgentApi api;
private final Monitor monitor;
private final DDAgentWriter writer;
private final List<byte[]> serializedTraces = new ArrayList<>();
private int representativeCount = 0;
private int sizeInBytes = 0;
private long nextScheduledFlush;
private BatchWritingHandler(
final int flushFrequencySeconds,
final DDAgentApi api,
final Monitor monitor,
final DDAgentWriter writer) {
flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds);
scheduleNextFlush();
this.api = api;
this.monitor = monitor;
this.writer = writer;
}
// TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning.
@Override
public void onEvent(
final DisruptorEvent<byte[]> event, final long sequence, final boolean endOfBatch) {
try {
if (event.data != null) {
sizeInBytes += event.data.length;
serializedTraces.add(event.data);
}
// Flush events might increase this with no data.
representativeCount += event.representativeCount;
if (event.flushLatch != null
|| FLUSH_PAYLOAD_BYTES <= sizeInBytes
|| nextScheduledFlush <= System.nanoTime()) {
flush(event.flushLatch, FLUSH_PAYLOAD_BYTES <= sizeInBytes);
}
} finally {
event.reset();
}
}
private void flush(final CountDownLatch flushLatch, final boolean early) {
try {
if (serializedTraces.isEmpty()) {
// FIXME: this will reset representativeCount without reporting
// anything even if representativeCount > 0.
return;
}
// TODO add retry and rate limiting
final DDAgentApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces);
monitor.onFlush(writer, early);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", serializedTraces.size());
monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
serializedTraces.size(),
representativeCount,
sizeInBytes);
monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
} finally {
serializedTraces.clear();
sizeInBytes = 0;
representativeCount = 0;
scheduleNextFlush();
if (flushLatch != null) {
flushLatch.countDown();
}
}
}
private void scheduleNextFlush() {
// TODO: adjust this depending on responsiveness of the agent.
if (0 < flushFrequencyNanos) {
nextScheduledFlush = System.nanoTime() + flushFrequencyNanos;
} else {
nextScheduledFlush = Long.MAX_VALUE;
}
}
}
}

View File

@ -2,13 +2,20 @@ package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.EventTranslatorTwoArg;
import datadog.opentracing.DDSpan; import java.util.concurrent.CountDownLatch;
import java.util.List;
class DisruptorEvent<T> { class DisruptorEvent<T> {
public volatile boolean shouldFlush = false; // Memory ordering enforced by disruptor's memory fences, so volatile not required.
public volatile T data = null; T data = null;
int representativeCount = 0;
CountDownLatch flushLatch = null;
void reset() {
data = null;
representativeCount = 0;
flushLatch = null;
}
static class Factory<T> implements EventFactory<DisruptorEvent<T>> { static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
@Override @Override
@ -17,25 +24,38 @@ class DisruptorEvent<T> {
} }
} }
static class TraceTranslator static class DataTranslator<T> implements EventTranslatorTwoArg<DisruptorEvent<T>, T, Integer> {
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
new DisruptorEvent.TraceTranslator();
@Override @Override
public void translateTo( public void translateTo(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) { final DisruptorEvent<T> event,
event.data = trace; final long sequence,
final T data,
final Integer representativeCount) {
event.data = data;
event.representativeCount = representativeCount;
} }
} }
static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> { static class HeartbeatTranslator<T> implements EventTranslator<DisruptorEvent<T>> {
static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
new DisruptorEvent.FlushTranslator();
@Override @Override
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) { public void translateTo(final DisruptorEvent<T> event, final long sequence) {
event.shouldFlush = true; return;
}
}
static class FlushTranslator<T>
implements EventTranslatorTwoArg<DisruptorEvent<T>, Integer, CountDownLatch> {
@Override
public void translateTo(
final DisruptorEvent<T> event,
final long sequence,
final Integer representativeCount,
final CountDownLatch latch) {
event.representativeCount = representativeCount;
event.flushLatch = latch;
} }
} }
} }

View File

@ -1,150 +0,0 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/** This class is intentionally not threadsafe. */
@Slf4j
public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private final AtomicInteger traceCount;
private final Semaphore senderSemaphore;
private final DDAgentWriter writer;
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
public TraceConsumer(
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
this.traceCount = traceCount;
senderSemaphore = new Semaphore(senderQueueSize);
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
writer.monitor.onSerialize(writer, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
writer.monitor.onFlush(writer, early);
writer.apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (writer.scheduledWriterExecutor.isShutdown()) {
writer.monitor.onFailedSend(
writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
writer.apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
writer.monitor.onFlush(writer, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
writer.scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDAgentApi.Response response =
writer
.getApi()
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
} finally {
writer.apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
writer.apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
writer.disruptor.scheduleFlush();
}
}
}

View File

@ -0,0 +1,101 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes completed traces and applies processing to them. Upon completion, the
* serialized trace is published to {@link BatchWritingDisruptor}.
*
* <p>publishing to the buffer will not block the calling thread, but instead will return false if
* the buffer is full. This is to avoid impacting an application thread.
*/
@Slf4j
public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
public TraceProcessingDisruptor(
final int disruptorSize,
final DDAgentApi api,
final BatchWritingDisruptor batchWritingDisruptor,
final Monitor monitor,
final DDAgentWriter writer) {
// TODO: add config to enable control over serialization overhead.
super(disruptorSize, new TraceSerializingHandler(api, batchWritingDisruptor, monitor, writer));
}
@Override
protected ThreadFactory getThreadFactory() {
return new DaemonThreadFactory("dd-trace-processor");
}
@Override
public boolean publish(final List<DDSpan> data, final int representativeCount) {
return disruptor.getRingBuffer().tryPublishEvent(dataTranslator, data, representativeCount);
}
// This class is threadsafe if we want to enable more processors.
public static class TraceSerializingHandler
implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private final DDAgentApi api;
private final BatchWritingDisruptor batchWritingDisruptor;
private final Monitor monitor;
private final DDAgentWriter writer;
public TraceSerializingHandler(
final DDAgentApi api,
final BatchWritingDisruptor batchWritingDisruptor,
final Monitor monitor,
final DDAgentWriter writer) {
this.api = api;
this.batchWritingDisruptor = batchWritingDisruptor;
this.monitor = monitor;
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
try {
if (event.data != null) {
if (1 < event.representativeCount && !event.data.isEmpty()) {
// attempt to have agent scale the metrics properly
((DDSpan) event.data.get(0).getLocalRootSpan())
.context()
.setMetric("_sample_rate", 1d / event.representativeCount);
}
try {
final byte[] serializedTrace = api.serializeTrace(event.data);
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
monitor.onSerialize(writer, event.data, serializedTrace);
event.representativeCount = 0; // reset in case flush is invoked below.
} catch (final JsonProcessingException e) {
log.debug("Error serializing trace", e);
monitor.onFailedSerialize(writer, event.data, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
monitor.onFailedSerialize(writer, event.data, e);
}
}
if (event.flushLatch != null) {
if (batchWritingDisruptor.running) {
// propagate the flush.
batchWritingDisruptor.flush(event.representativeCount, event.flushLatch);
}
if (!batchWritingDisruptor.running) { // check again to protect against race condition.
// got shutdown early somehow?
event.flushLatch.countDown();
}
}
} finally {
event.reset();
}
}
}
}

View File

@ -1,117 +0,0 @@
package datadog.trace.common.writer.ddagent;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TraceSerializingDisruptor implements Closeable {
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private final FlushTask flushTask = new FlushTask();
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
private final DDAgentWriter writer;
public volatile boolean running = false;
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
public TraceSerializingDisruptor(
final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
this.writer = writer;
disruptor.handleEventsWith(handler);
}
public void start() {
disruptor.start();
running = true;
scheduleFlush();
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
public boolean tryPublish(final List<DDSpan> trace) {
return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
writer.apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
public void scheduleFlush() {
if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
writer.scheduledWriterExecutor.schedule(
flushTask, writer.flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
writer.monitor.onScheduleFlush(writer, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
}

View File

@ -7,12 +7,14 @@ import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.common.writer.ddagent.TraceConsumer
import datadog.trace.util.test.DDSpecification import datadog.trace.util.test.DDSpecification
import spock.lang.Retry
import spock.lang.Timeout import spock.lang.Timeout
import java.util.concurrent.Phaser
import java.util.concurrent.Semaphore import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
@ -24,21 +26,41 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20) @Timeout(20)
class DDAgentWriterTest extends DDSpecification { class DDAgentWriterTest extends DDSpecification {
def api = Mock(DDAgentApi) def phaser = new Phaser()
def api = Mock(DDAgentApi) {
// Define the following response in the spec:
// sendSerializedTraces(_, _, _) >> {
// phaser.arrive()
// return DDAgentApi.Response.success(200)
// }
}
def monitor = Mock(Monitor)
def setup() {
// Register for two threads.
phaser.register()
phaser.register()
}
def "test happy path"() { def "test happy path"() {
setup: setup:
def writer = new DDAgentWriter(api, 2, -1) def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(2).flushFrequencySeconds(-1).build()
writer.start() writer.start()
when:
writer.flush()
then:
0 * _
when: when:
writer.write(trace) writer.write(trace)
writer.write(trace) writer.write(trace)
writer.disruptor.flush() writer.flush()
then: then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() } 2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
0 * _ 0 * _
cleanup: cleanup:
@ -50,18 +72,18 @@ class DDAgentWriterTest extends DDSpecification {
def "test flood of traces"() { def "test flood of traces"() {
setup: setup:
def writer = new DDAgentWriter(api, disruptorSize, -1) def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(disruptorSize).flushFrequencySeconds(-1).build()
writer.start() writer.start()
when: when:
(1..traceCount).each { (1..traceCount).each {
writer.write(trace) writer.write(trace)
} }
writer.disruptor.flush() writer.flush()
then: then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() } _ * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) >> DDAgentApi.Response.success(200)
0 * _ 0 * _
cleanup: cleanup:
@ -75,10 +97,8 @@ class DDAgentWriterTest extends DDSpecification {
def "test flush by size"() { def "test flush by size"() {
setup: setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build()
def phaser = writer.apiPhaser
writer.start() writer.start()
phaser.register()
when: when:
(1..6).each { (1..6).each {
@ -90,35 +110,35 @@ class DDAgentWriterTest extends DDSpecification {
then: then:
6 * api.serializeTrace(_) >> { trace -> callRealMethod() } 6 * api.serializeTrace(_) >> { trace -> callRealMethod() }
2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) >> {
phaser.arrive()
return DDAgentApi.Response.success(200)
}
when: when:
(1..2).each { (1..2).each {
writer.write(trace) writer.write(trace)
} }
// Flush the remaining 2 // Flush the remaining 2
writer.disruptor.flush() writer.flush()
then: then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() } 2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
0 * _ 0 * _
cleanup: cleanup:
writer.close() writer.close()
where: where:
span = [newSpanOf(0, "fixed-thread-name")] span = newSpanOf(0, "fixed-thread-name")
trace = (0..10000).collect { span } trace = (0..10000).collect { span }
} }
def "test flush by time"() { def "test flush by time"() {
setup: setup:
def writer = new DDAgentWriter(api) def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
def phaser = writer.apiPhaser
phaser.register()
writer.start() writer.start()
writer.disruptor.flush()
when: when:
(1..5).each { (1..5).each {
@ -128,20 +148,26 @@ class DDAgentWriterTest extends DDSpecification {
then: then:
5 * api.serializeTrace(_) >> { trace -> callRealMethod() } 5 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> DDAgentApi.Response.success(200)
5 * monitor.onPublish(_, _)
5 * monitor.onSerialize(_, _, _)
1 * monitor.onFlush(_, _)
1 * monitor.onSend(_, _, _, _) >> {
phaser.arrive()
}
0 * _ 0 * _
cleanup: cleanup:
writer.close() writer.close()
where: where:
span = [newSpanOf(0, "fixed-thread-name")] span = newSpanOf(0, "fixed-thread-name")
trace = (1..10).collect { span } trace = (1..10).collect { span }
} }
def "test default buffer size"() { def "test default buffer size"() {
setup: setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build()
writer.start() writer.start()
when: when:
@ -153,11 +179,11 @@ class DDAgentWriterTest extends DDSpecification {
// Busywait because we don't want to fill up the ring buffer // Busywait because we don't want to fill up the ring buffer
} }
} }
writer.disruptor.flush() writer.flush()
then: then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() } (maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) >> DDAgentApi.Response.success(200)
cleanup: cleanup:
writer.close() writer.close()
@ -181,39 +207,43 @@ class DDAgentWriterTest extends DDSpecification {
minimalSpan = new DDSpan(0, minimalContext) minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan] minimalTrace = [minimalSpan]
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
} }
def "check that are no interactions after close"() { def "check that are no interactions after close"() {
setup: setup:
def writer = new DDAgentWriter(api) def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start() writer.start()
when: when:
writer.close() writer.close()
writer.write([]) writer.write([])
writer.disruptor.flush() writer.flush()
then: then:
// 2 * monitor.onFlush(_, false)
1 * monitor.onFailedPublish(_, _)
1 * monitor.onShutdown(_, _)
0 * _ 0 * _
writer.traceCount.get() == 0 writer.traceCount.get() == 0
} }
def "check shutdown if executor stopped first"() { def "check shutdown if batchWritingDisruptor stopped first"() {
setup: setup:
def writer = new DDAgentWriter(api) def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start() writer.start()
writer.scheduledWriterExecutor.shutdown() writer.batchWritingDisruptor.close()
when: when:
writer.write([]) writer.write([])
writer.disruptor.flush() writer.flush()
then: then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() } 1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * monitor.onSerialize(writer, _, _)
1 * monitor.onPublish(writer, _)
0 * _ 0 * _
writer.traceCount.get() == 1 writer.traceCount.get() == 0
cleanup: cleanup:
writer.close() writer.close()
@ -253,9 +283,7 @@ class DDAgentWriterTest extends DDSpecification {
} }
} }
} }
def api = new DDAgentApi("localhost", agent.address.port, null) def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when: when:
writer.start() writer.start()
@ -265,12 +293,12 @@ class DDAgentWriterTest extends DDSpecification {
when: when:
writer.write(minimalTrace) writer.write(minimalTrace)
writer.disruptor.flush() writer.flush()
then: then:
1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _) 1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _) 1 * monitor.onFlush(writer, _)
1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 }) 1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 })
when: when:
@ -302,9 +330,7 @@ class DDAgentWriterTest extends DDSpecification {
} }
} }
} }
def api = new DDAgentApi("localhost", agent.address.port, null) def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when: when:
writer.start() writer.start()
@ -314,12 +340,12 @@ class DDAgentWriterTest extends DDSpecification {
when: when:
writer.write(minimalTrace) writer.write(minimalTrace)
writer.disruptor.flush() writer.flush()
then: then:
1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _) 1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _) 1 * monitor.onFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 }) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 })
when: when:
@ -345,8 +371,7 @@ class DDAgentWriterTest extends DDSpecification {
return DDAgentApi.Response.failed(new IOException("comm error")) return DDAgentApi.Response.failed(new IOException("comm error"))
} }
} }
def monitor = Mock(Monitor) def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
def writer = new DDAgentWriter(api, monitor)
when: when:
writer.start() writer.start()
@ -356,12 +381,12 @@ class DDAgentWriterTest extends DDSpecification {
when: when:
writer.write(minimalTrace) writer.write(minimalTrace)
writer.disruptor.flush() writer.flush()
then: then:
1 * monitor.onPublish(writer, minimalTrace) 1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _) 1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _) 1 * monitor.onFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null }) 1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null })
when: when:
@ -371,6 +396,8 @@ class DDAgentWriterTest extends DDSpecification {
1 * monitor.onShutdown(writer, true) 1 * monitor.onShutdown(writer, true)
} }
@Retry(delay = 10)
// if execution is too slow, the http client timeout may trigger.
def "slow response test"() { def "slow response test"() {
def numWritten = 0 def numWritten = 0
def numFlushes = new AtomicInteger(0) def numFlushes = new AtomicInteger(0)
@ -382,7 +409,6 @@ class DDAgentWriterTest extends DDSpecification {
def responseSemaphore = new Semaphore(1) def responseSemaphore = new Semaphore(1)
setup: setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work // Need to set-up a dummy agent for the final send callback to work
def agent = httpServer { def agent = httpServer {
@ -400,30 +426,27 @@ class DDAgentWriterTest extends DDSpecification {
} }
} }
} }
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback // This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(Monitor) def monitor = Stub(Monitor) {
monitor.onPublish(_, _) >> { onPublish(_, _) >> {
numPublished.incrementAndGet() numPublished.incrementAndGet()
} }
monitor.onFailedPublish(_, _) >> { onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet() numFailedPublish.incrementAndGet()
} }
monitor.onFlush(_, _) >> { onFlush(_, _) >> {
numFlushes.incrementAndGet() numFlushes.incrementAndGet()
} }
monitor.onSend(_, _, _, _) >> { onSend(_, _, _, _) >> {
numRequests.incrementAndGet() numRequests.incrementAndGet()
} }
monitor.onFailedPublish(_, _, _, _) >> { onFailedPublish(_, _, _, _) >> {
numFailedRequests.incrementAndGet() numFailedRequests.incrementAndGet()
}
} }
// sender queue is sized in requests -- not traces def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()
def bufferSize = 32
def senderQueueSize = 2
def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
writer.start() writer.start()
// gate responses // gate responses
@ -438,7 +461,7 @@ class DDAgentWriterTest extends DDSpecification {
// sanity check coordination mechanism of test // sanity check coordination mechanism of test
// release to allow response to be generated // release to allow response to be generated
responseSemaphore.release() responseSemaphore.release()
writer.disruptor.flush() writer.flush()
// reacquire semaphore to stall further responses // reacquire semaphore to stall further responses
responseSemaphore.acquire() responseSemaphore.acquire()
@ -452,13 +475,10 @@ class DDAgentWriterTest extends DDSpecification {
when: when:
// send many traces to fill the sender queue... // send many traces to fill the sender queue...
// loop until outstanding requests > finished requests // loop until outstanding requests > finished requests
while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) { while (writer.traceProcessingDisruptor.disruptorRemainingCapacity + writer.batchWritingDisruptor.disruptorRemainingCapacity > 0 || numFailedPublish.get() == 0) {
// chunk the loop & wait to allow for flushing to send queue writer.write(minimalTrace)
(1..1_000).each { numWritten += 1
writer.write(minimalTrace) Thread.sleep(1) // Allow traces to get serialized.
numWritten += 1
}
Thread.sleep(100)
} }
then: then:
@ -469,17 +489,18 @@ class DDAgentWriterTest extends DDSpecification {
def priorNumFailed = numFailedPublish.get() def priorNumFailed = numFailedPublish.get()
// with both disruptor & queue full, should reject everything // with both disruptor & queue full, should reject everything
def expectedRejects = 100_000 def expectedRejects = 100
(1..expectedRejects).each { (1..expectedRejects).each {
writer.write(minimalTrace) writer.write(minimalTrace)
numWritten += 1 numWritten += 1
} }
then: then:
// If the in-flight requests timeouts and frees up a slot in the sending queue, then // If the in-flight request times out (we don't currently retry),
// many of traces will be accepted and batched into a new failing request. // then a new batch will begin processing and many of traces will
// be accepted and batched into a new failing request.
// In that case, the reject number will be low. // In that case, the reject number will be low.
numFailedPublish.get() - priorNumFailed > expectedRejects * 0.40 numFailedPublish.get() - priorNumFailed >= expectedRejects * 0.80
numPublished.get() + numFailedPublish.get() == numWritten numPublished.get() + numFailedPublish.get() == numWritten
cleanup: cleanup:
@ -487,6 +508,10 @@ class DDAgentWriterTest extends DDSpecification {
writer.close() writer.close()
agent.close() agent.close()
where:
bufferSize = 16
minimalTrace = createMinimalTrace()
} }
def "multi threaded"() { def "multi threaded"() {
@ -505,21 +530,21 @@ class DDAgentWriterTest extends DDSpecification {
} }
} }
} }
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback // This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(Monitor) def monitor = Stub(Monitor) {
monitor.onPublish(_, _) >> { onPublish(_, _) >> {
numPublished.incrementAndGet() numPublished.incrementAndGet()
} }
monitor.onFailedPublish(_, _) >> { onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet() numFailedPublish.incrementAndGet()
} }
monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response -> onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
numRepSent.addAndGet(repCount) numRepSent.addAndGet(repCount)
}
} }
def writer = new DDAgentWriter(api, monitor) def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
writer.start() writer.start()
when: when:
@ -538,7 +563,7 @@ class DDAgentWriterTest extends DDSpecification {
t1.join() t1.join()
t2.join() t2.join()
writer.disruptor.flush() writer.flush()
then: then:
def totalTraces = 100 + 100 def totalTraces = 100 + 100
@ -566,7 +591,6 @@ class DDAgentWriterTest extends DDSpecification {
} }
} }
} }
def api = new DDAgentApi("localhost", agent.address.port, null)
def statsd = Stub(StatsDClient) def statsd = Stub(StatsDClient)
statsd.incrementCounter("queue.accepted") >> { stat -> statsd.incrementCounter("queue.accepted") >> { stat ->
@ -580,12 +604,12 @@ class DDAgentWriterTest extends DDSpecification {
} }
def monitor = new Monitor.StatsD(statsd) def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor) def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
writer.start() writer.start()
when: when:
writer.write(minimalTrace) writer.write(minimalTrace)
writer.disruptor.flush() writer.flush()
then: then:
numTracesAccepted == 1 numTracesAccepted == 1
@ -628,12 +652,12 @@ class DDAgentWriterTest extends DDSpecification {
} }
def monitor = new Monitor.StatsD(statsd) def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor) def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start() writer.start()
when: when:
writer.write(minimalTrace) writer.write(minimalTrace)
writer.disruptor.flush() writer.flush()
then: then:
numRequests == 1 numRequests == 1