Split TraceConsumer into two different disruptors
First disruptor (TraceProcessingDisruptor) does processing, which is currently limited to serialization, but in the future can do other processing such as TraceInterceptor invocation. Second disruptor (BatchWritingDisruptor) takes serialized traces and batches them into groups and flushes them periodically based on size and time.
This commit is contained in:
parent
6766e12597
commit
451fba256a
|
@ -12,6 +12,8 @@ minimumInstructionCoverage = 0.6
|
|||
excludedClassesCoverage += [
|
||||
'datadog.trace.common.writer.ListWriter',
|
||||
'datadog.trace.common.writer.LoggingWriter',
|
||||
'datadog.trace.common.writer.DDAgentWriter.Spec',
|
||||
'datadog.trace.common.writer.DDAgentWriter.Spec.SpecBuilder',
|
||||
'datadog.trace.common.sampling.PrioritySampling',
|
||||
// This code is copied from okHttp samples and we have integration tests to verify that it works.
|
||||
'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket',
|
||||
|
|
|
@ -3,21 +3,16 @@ package datadog.trace.common.writer;
|
|||
import static datadog.trace.api.Config.DEFAULT_AGENT_HOST;
|
||||
import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
|
||||
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
import datadog.trace.common.writer.ddagent.BatchWritingDisruptor;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
|
||||
import datadog.trace.common.writer.ddagent.Monitor;
|
||||
import datadog.trace.common.writer.ddagent.TraceConsumer;
|
||||
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
|
||||
import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import lombok.Value;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
|
@ -29,86 +24,65 @@ import lombok.extern.slf4j.Slf4j;
|
|||
*/
|
||||
@Slf4j
|
||||
public class DDAgentWriter implements Writer {
|
||||
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
|
||||
private static final int SENDER_QUEUE_SIZE = 16;
|
||||
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
|
||||
@Value
|
||||
@lombok.Builder
|
||||
public static class Spec {
|
||||
@lombok.Builder.Default public String agentHost = DEFAULT_AGENT_HOST;
|
||||
@lombok.Builder.Default public int traceAgentPort = DEFAULT_TRACE_AGENT_PORT;
|
||||
@lombok.Builder.Default public String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
|
||||
@lombok.Builder.Default public int traceBufferSize = DISRUPTOR_BUFFER_SIZE;
|
||||
@lombok.Builder.Default public Monitor monitor = new Monitor.Noop();
|
||||
@lombok.Builder.Default public int flushFrequencySeconds = 1;
|
||||
}
|
||||
|
||||
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
|
||||
new DaemonThreadFactory("dd-trace-writer");
|
||||
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
|
||||
|
||||
private final DDAgentApi api;
|
||||
public final int flushFrequencySeconds;
|
||||
public final TraceSerializingDisruptor disruptor;
|
||||
public final TraceProcessingDisruptor traceProcessingDisruptor;
|
||||
public final BatchWritingDisruptor batchWritingDisruptor;
|
||||
|
||||
public final ScheduledExecutorService scheduledWriterExecutor;
|
||||
private final AtomicInteger traceCount = new AtomicInteger(0);
|
||||
public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
|
||||
|
||||
public final Monitor monitor;
|
||||
|
||||
public DDAgentWriter() {
|
||||
this(
|
||||
new DDAgentApi(
|
||||
DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
|
||||
new Monitor.Noop());
|
||||
this(Spec.builder().build());
|
||||
}
|
||||
|
||||
public DDAgentWriter(final Spec spec) {
|
||||
api = new DDAgentApi(spec.agentHost, spec.traceAgentPort, spec.unixDomainSocket);
|
||||
monitor = spec.monitor;
|
||||
|
||||
batchWritingDisruptor =
|
||||
new BatchWritingDisruptor(
|
||||
spec.traceBufferSize, spec.flushFrequencySeconds, api, monitor, this);
|
||||
traceProcessingDisruptor =
|
||||
new TraceProcessingDisruptor(
|
||||
spec.traceBufferSize, api, batchWritingDisruptor, monitor, this);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
|
||||
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
|
||||
}
|
||||
|
||||
/** Old signature (pre-Monitor) used in tests */
|
||||
private DDAgentWriter(final DDAgentApi api) {
|
||||
this(api, new Monitor.Noop());
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in the tests.
|
||||
*
|
||||
* @param api
|
||||
* @param disruptorSize Rounded up to next power of 2
|
||||
* @param flushFrequencySeconds value < 1 disables scheduled flushes
|
||||
*/
|
||||
private DDAgentWriter(
|
||||
final DDAgentApi api,
|
||||
final int disruptorSize,
|
||||
final int senderQueueSize,
|
||||
final int flushFrequencySeconds) {
|
||||
this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
|
||||
}
|
||||
|
||||
// DQH - TODO - Update the tests & remove this
|
||||
private DDAgentWriter(
|
||||
final DDAgentApi api,
|
||||
final Monitor monitor,
|
||||
final int disruptorSize,
|
||||
final int flushFrequencySeconds) {
|
||||
this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
|
||||
}
|
||||
|
||||
// DQH - TODO - Update the tests & remove this
|
||||
private DDAgentWriter(
|
||||
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
|
||||
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
|
||||
}
|
||||
|
||||
private DDAgentWriter(
|
||||
final DDAgentApi api,
|
||||
final Monitor monitor,
|
||||
final int disruptorSize,
|
||||
final int senderQueueSize,
|
||||
final int flushFrequencySeconds) {
|
||||
this.api = api;
|
||||
this.monitor = monitor;
|
||||
|
||||
disruptor =
|
||||
new TraceSerializingDisruptor(
|
||||
disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this));
|
||||
batchWritingDisruptor = new BatchWritingDisruptor(DISRUPTOR_BUFFER_SIZE, 1, api, monitor, this);
|
||||
traceProcessingDisruptor =
|
||||
new TraceProcessingDisruptor(
|
||||
DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this);
|
||||
}
|
||||
|
||||
this.flushFrequencySeconds = flushFrequencySeconds;
|
||||
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
|
||||
@Deprecated
|
||||
// DQH - TODO - Update the tests & remove this
|
||||
private DDAgentWriter(
|
||||
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
|
||||
this.api = api;
|
||||
monitor = new Monitor.Noop();
|
||||
|
||||
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
|
||||
batchWritingDisruptor =
|
||||
new BatchWritingDisruptor(disruptorSize, flushFrequencySeconds, api, monitor, this);
|
||||
traceProcessingDisruptor =
|
||||
new TraceProcessingDisruptor(disruptorSize, api, batchWritingDisruptor, monitor, this);
|
||||
}
|
||||
|
||||
public void addResponseListener(final DDAgentResponseListener listener) {
|
||||
|
@ -117,7 +91,7 @@ public class DDAgentWriter implements Writer {
|
|||
|
||||
// Exposing some statistics for consumption by monitors
|
||||
public final long getDisruptorCapacity() {
|
||||
return disruptor.getDisruptorCapacity();
|
||||
return traceProcessingDisruptor.getDisruptorCapacity();
|
||||
}
|
||||
|
||||
public final long getDisruptorUtilizedCapacity() {
|
||||
|
@ -125,20 +99,21 @@ public class DDAgentWriter implements Writer {
|
|||
}
|
||||
|
||||
public final long getDisruptorRemainingCapacity() {
|
||||
return disruptor.getDisruptorRemainingCapacity();
|
||||
return traceProcessingDisruptor.getDisruptorRemainingCapacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final List<DDSpan> trace) {
|
||||
// We can't add events after shutdown otherwise it will never complete shutting down.
|
||||
if (disruptor.running) {
|
||||
final boolean published = disruptor.tryPublish(trace);
|
||||
if (traceProcessingDisruptor.running) {
|
||||
final int representativeCount = traceCount.getAndSet(0) + 1;
|
||||
final boolean published = traceProcessingDisruptor.publish(trace, representativeCount);
|
||||
|
||||
if (published) {
|
||||
monitor.onPublish(DDAgentWriter.this, trace);
|
||||
} else {
|
||||
// We're discarding the trace, but we still want to count it.
|
||||
traceCount.incrementAndGet();
|
||||
traceCount.addAndGet(representativeCount);
|
||||
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
|
||||
|
||||
monitor.onFailedPublish(this, trace);
|
||||
|
@ -150,6 +125,10 @@ public class DDAgentWriter implements Writer {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean flush() {
|
||||
return traceProcessingDisruptor.flush(traceCount.getAndSet(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementTraceCount() {
|
||||
traceCount.incrementAndGet();
|
||||
|
@ -161,32 +140,16 @@ public class DDAgentWriter implements Writer {
|
|||
|
||||
@Override
|
||||
public void start() {
|
||||
disruptor.start();
|
||||
|
||||
batchWritingDisruptor.start();
|
||||
traceProcessingDisruptor.start();
|
||||
monitor.onStart(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
boolean flushSuccess = true;
|
||||
|
||||
// We have to shutdown scheduled executor first to make sure no flush events issued after
|
||||
// disruptor has been shutdown.
|
||||
// Otherwise those events will never be processed and flush call will wait forever.
|
||||
scheduledWriterExecutor.shutdown();
|
||||
try {
|
||||
scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS);
|
||||
} catch (final InterruptedException e) {
|
||||
log.warn("Waiting for flush executor shutdown interrupted.", e);
|
||||
|
||||
flushSuccess = false;
|
||||
}
|
||||
flushSuccess |= disruptor.flush();
|
||||
|
||||
disruptor.close();
|
||||
|
||||
monitor.onShutdown(this, flushSuccess);
|
||||
monitor.onShutdown(this, traceProcessingDisruptor.flush(traceCount.getAndSet(0)));
|
||||
traceProcessingDisruptor.close();
|
||||
batchWritingDisruptor.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -64,6 +64,7 @@ public interface Writer extends Closeable {
|
|||
}
|
||||
|
||||
private static Writer createAgentWriter(final Config config) {
|
||||
// TODO: switch to using DDAgentWriter.Spec constructor...
|
||||
return new DDAgentWriter(createApi(config), createMonitor(config));
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import com.lmax.disruptor.SleepingWaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractDisruptor<T> implements Closeable {
|
||||
|
||||
protected final Disruptor<DisruptorEvent<T>> disruptor;
|
||||
|
||||
public volatile boolean running = false;
|
||||
|
||||
protected final DisruptorEvent.FlushTranslator<T> flushTranslator =
|
||||
new DisruptorEvent.FlushTranslator<>();
|
||||
protected final DisruptorEvent.DataTranslator<T> dataTranslator =
|
||||
new DisruptorEvent.DataTranslator<>();
|
||||
|
||||
public AbstractDisruptor(final int disruptorSize, final EventHandler<DisruptorEvent<T>> handler) {
|
||||
disruptor =
|
||||
new Disruptor<>(
|
||||
new DisruptorEvent.Factory<T>(),
|
||||
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
|
||||
getThreadFactory(),
|
||||
ProducerType.MULTI,
|
||||
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
|
||||
disruptor.handleEventsWith(handler);
|
||||
}
|
||||
|
||||
protected abstract ThreadFactory getThreadFactory();
|
||||
|
||||
public void start() {
|
||||
disruptor.start();
|
||||
running = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
running = false;
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
public abstract boolean publish(final T data, int representativeCount);
|
||||
|
||||
/**
|
||||
* This method will block until the flush is complete.
|
||||
*
|
||||
* @param traceCount - number of unreported traces to include in this batch.
|
||||
*/
|
||||
public boolean flush(final int traceCount) {
|
||||
if (running) {
|
||||
return flush(traceCount, new CountDownLatch(1));
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** This method will block until the flush is complete. */
|
||||
protected boolean flush(final int traceCount, final CountDownLatch latch) {
|
||||
log.info("Flushing any remaining traces.");
|
||||
disruptor.publishEvent(flushTranslator, traceCount, latch);
|
||||
try {
|
||||
latch.await();
|
||||
return true;
|
||||
} catch (final InterruptedException e) {
|
||||
log.warn("Waiting for flush interrupted.", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Exposing some statistics for consumption by monitors
|
||||
public final long getDisruptorCapacity() {
|
||||
return disruptor.getRingBuffer().getBufferSize();
|
||||
}
|
||||
|
||||
public final long getDisruptorRemainingCapacity() {
|
||||
return disruptor.getRingBuffer().remainingCapacity();
|
||||
}
|
||||
|
||||
public final long getCurrentCount() {
|
||||
return disruptor.getCursor() - disruptor.getRingBuffer().getMinimumGatingSequence();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
import datadog.trace.common.writer.DDAgentWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
|
||||
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
|
||||
|
||||
private final ScheduledExecutorService heartbeatExecutor =
|
||||
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat"));
|
||||
|
||||
private final DisruptorEvent.HeartbeatTranslator<byte[]> heartbeatTranslator =
|
||||
new DisruptorEvent.HeartbeatTranslator();
|
||||
|
||||
public BatchWritingDisruptor(
|
||||
final int disruptorSize,
|
||||
final int flushFrequencySeconds,
|
||||
final DDAgentApi api,
|
||||
final Monitor monitor,
|
||||
final DDAgentWriter writer) {
|
||||
super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer));
|
||||
|
||||
if (0 < flushFrequencySeconds) {
|
||||
// This provides a steady stream of events to enable flushing with a low throughput.
|
||||
final Runnable heartbeat =
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Only add if the buffer is empty.
|
||||
if (running && getCurrentCount() == 0) {
|
||||
disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator);
|
||||
}
|
||||
}
|
||||
};
|
||||
heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ThreadFactory getThreadFactory() {
|
||||
return new DaemonThreadFactory("dd-trace-writer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean publish(final byte[] data, final int representativeCount) {
|
||||
disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount);
|
||||
return true;
|
||||
}
|
||||
|
||||
// Intentionally not thread safe.
|
||||
private static class BatchWritingHandler implements EventHandler<DisruptorEvent<byte[]>> {
|
||||
|
||||
private final long flushFrequencyNanos;
|
||||
private final DDAgentApi api;
|
||||
private final Monitor monitor;
|
||||
private final DDAgentWriter writer;
|
||||
private final List<byte[]> serializedTraces = new ArrayList<>();
|
||||
private int representativeCount = 0;
|
||||
private int sizeInBytes = 0;
|
||||
private long nextScheduledFlush;
|
||||
|
||||
private BatchWritingHandler(
|
||||
final int flushFrequencySeconds,
|
||||
final DDAgentApi api,
|
||||
final Monitor monitor,
|
||||
final DDAgentWriter writer) {
|
||||
flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds);
|
||||
scheduleNextFlush();
|
||||
this.api = api;
|
||||
this.monitor = monitor;
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(
|
||||
final DisruptorEvent<byte[]> event, final long sequence, final boolean endOfBatch) {
|
||||
try {
|
||||
if (event.data != null) {
|
||||
sizeInBytes += event.data.length;
|
||||
serializedTraces.add(event.data);
|
||||
}
|
||||
|
||||
// Flush events might increase this with no data.
|
||||
representativeCount += event.representativeCount;
|
||||
|
||||
if (event.flushLatch != null
|
||||
|| FLUSH_PAYLOAD_BYTES <= sizeInBytes
|
||||
|| nextScheduledFlush <= System.nanoTime()) {
|
||||
flush(event.flushLatch, FLUSH_PAYLOAD_BYTES <= sizeInBytes);
|
||||
}
|
||||
} finally {
|
||||
event.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private void flush(final CountDownLatch flushLatch, final boolean early) {
|
||||
try {
|
||||
if (serializedTraces.isEmpty()) {
|
||||
// FIXME: this will reset representativeCount without reporting
|
||||
// anything even if representativeCount > 0.
|
||||
return;
|
||||
}
|
||||
|
||||
monitor.onFlush(writer, early);
|
||||
// TODO add retry and rate limiting
|
||||
final DDAgentApi.Response response =
|
||||
api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces);
|
||||
|
||||
if (response.success()) {
|
||||
log.debug("Successfully sent {} traces to the API", serializedTraces.size());
|
||||
|
||||
monitor.onSend(writer, representativeCount, sizeInBytes, response);
|
||||
} else {
|
||||
log.debug(
|
||||
"Failed to send {} traces (representing {}) of size {} bytes to the API",
|
||||
serializedTraces.size(),
|
||||
representativeCount,
|
||||
sizeInBytes);
|
||||
|
||||
monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.debug("Failed to send traces to the API: {}", e.getMessage());
|
||||
|
||||
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
|
||||
// shouldn't occur.
|
||||
// However, just to be safe to start, create a failed Response to handle any
|
||||
// spurious Throwable-s.
|
||||
monitor.onFailedSend(
|
||||
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
|
||||
} finally {
|
||||
serializedTraces.clear();
|
||||
sizeInBytes = 0;
|
||||
representativeCount = 0;
|
||||
scheduleNextFlush();
|
||||
|
||||
if (flushLatch != null) {
|
||||
flushLatch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void scheduleNextFlush() {
|
||||
// TODO: adjust this depending on responsiveness of the agent.
|
||||
if (0 < flushFrequencyNanos) {
|
||||
nextScheduledFlush = System.nanoTime() + flushFrequencyNanos;
|
||||
} else {
|
||||
nextScheduledFlush = Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,13 +2,19 @@ package datadog.trace.common.writer.ddagent;
|
|||
|
||||
import com.lmax.disruptor.EventFactory;
|
||||
import com.lmax.disruptor.EventTranslator;
|
||||
import com.lmax.disruptor.EventTranslatorOneArg;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import java.util.List;
|
||||
import com.lmax.disruptor.EventTranslatorTwoArg;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
class DisruptorEvent<T> {
|
||||
public volatile boolean shouldFlush = false;
|
||||
public volatile T data = null;
|
||||
public volatile int representativeCount = 0;
|
||||
public volatile CountDownLatch flushLatch = null;
|
||||
|
||||
public void reset() {
|
||||
data = null;
|
||||
representativeCount = 0;
|
||||
flushLatch = null;
|
||||
}
|
||||
|
||||
static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
|
||||
@Override
|
||||
|
@ -17,25 +23,38 @@ class DisruptorEvent<T> {
|
|||
}
|
||||
}
|
||||
|
||||
static class TraceTranslator
|
||||
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
|
||||
static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
|
||||
new DisruptorEvent.TraceTranslator();
|
||||
static class DataTranslator<T> implements EventTranslatorTwoArg<DisruptorEvent<T>, T, Integer> {
|
||||
|
||||
@Override
|
||||
public void translateTo(
|
||||
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
|
||||
event.data = trace;
|
||||
final DisruptorEvent<T> event,
|
||||
final long sequence,
|
||||
final T data,
|
||||
final Integer representativeCount) {
|
||||
event.data = data;
|
||||
event.representativeCount = representativeCount;
|
||||
}
|
||||
}
|
||||
|
||||
static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> {
|
||||
static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
|
||||
new DisruptorEvent.FlushTranslator();
|
||||
static class HeartbeatTranslator<T> implements EventTranslator<DisruptorEvent<T>> {
|
||||
|
||||
@Override
|
||||
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) {
|
||||
event.shouldFlush = true;
|
||||
public void translateTo(final DisruptorEvent<T> event, final long sequence) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static class FlushTranslator<T>
|
||||
implements EventTranslatorTwoArg<DisruptorEvent<T>, Integer, CountDownLatch> {
|
||||
|
||||
@Override
|
||||
public void translateTo(
|
||||
final DisruptorEvent<T> event,
|
||||
final long sequence,
|
||||
final Integer representativeCount,
|
||||
final CountDownLatch latch) {
|
||||
event.representativeCount = representativeCount;
|
||||
event.flushLatch = latch;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.writer.DDAgentWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/** This class is intentionally not threadsafe. */
|
||||
@Slf4j
|
||||
public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
|
||||
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
|
||||
|
||||
private final AtomicInteger traceCount;
|
||||
private final Semaphore senderSemaphore;
|
||||
private final DDAgentWriter writer;
|
||||
|
||||
private List<byte[]> serializedTraces = new ArrayList<>();
|
||||
private int payloadSize = 0;
|
||||
|
||||
public TraceConsumer(
|
||||
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
|
||||
this.traceCount = traceCount;
|
||||
senderSemaphore = new Semaphore(senderQueueSize);
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(
|
||||
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
|
||||
final List<DDSpan> trace = event.data;
|
||||
event.data = null; // clear the event for reuse.
|
||||
if (trace != null) {
|
||||
traceCount.incrementAndGet();
|
||||
try {
|
||||
final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
|
||||
payloadSize += serializedTrace.length;
|
||||
serializedTraces.add(serializedTrace);
|
||||
|
||||
writer.monitor.onSerialize(writer, trace, serializedTrace);
|
||||
} catch (final JsonProcessingException e) {
|
||||
log.warn("Error serializing trace", e);
|
||||
|
||||
writer.monitor.onFailedSerialize(writer, trace, e);
|
||||
} catch (final Throwable e) {
|
||||
log.debug("Error while serializing trace", e);
|
||||
|
||||
writer.monitor.onFailedSerialize(writer, trace, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
|
||||
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
|
||||
|
||||
reportTraces(early);
|
||||
event.shouldFlush = false;
|
||||
}
|
||||
}
|
||||
|
||||
private void reportTraces(final boolean early) {
|
||||
try {
|
||||
if (serializedTraces.isEmpty()) {
|
||||
writer.monitor.onFlush(writer, early);
|
||||
|
||||
writer.apiPhaser.arrive(); // Allow flush to return
|
||||
return;
|
||||
// scheduleFlush called in finally block.
|
||||
}
|
||||
if (writer.scheduledWriterExecutor.isShutdown()) {
|
||||
writer.monitor.onFailedSend(
|
||||
writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
|
||||
writer.apiPhaser.arrive(); // Allow flush to return
|
||||
return;
|
||||
}
|
||||
final List<byte[]> toSend = serializedTraces;
|
||||
serializedTraces = new ArrayList<>(toSend.size());
|
||||
// ^ Initialize with similar size to reduce arraycopy churn.
|
||||
|
||||
final int representativeCount = traceCount.getAndSet(0);
|
||||
final int sizeInBytes = payloadSize;
|
||||
|
||||
try {
|
||||
writer.monitor.onFlush(writer, early);
|
||||
|
||||
// Run the actual IO task on a different thread to avoid blocking the consumer.
|
||||
try {
|
||||
senderSemaphore.acquire();
|
||||
} catch (final InterruptedException e) {
|
||||
writer.monitor.onFailedSend(
|
||||
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
|
||||
|
||||
// Finally, we'll schedule another flush
|
||||
// Any threads awaiting the flush will continue to wait
|
||||
return;
|
||||
}
|
||||
writer.scheduledWriterExecutor.execute(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
senderSemaphore.release();
|
||||
|
||||
try {
|
||||
final DDAgentApi.Response response =
|
||||
writer
|
||||
.getApi()
|
||||
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
|
||||
|
||||
if (response.success()) {
|
||||
log.debug("Successfully sent {} traces to the API", toSend.size());
|
||||
|
||||
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
|
||||
} else {
|
||||
log.debug(
|
||||
"Failed to send {} traces (representing {}) of size {} bytes to the API",
|
||||
toSend.size(),
|
||||
representativeCount,
|
||||
sizeInBytes);
|
||||
|
||||
writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.debug("Failed to send traces to the API: {}", e.getMessage());
|
||||
|
||||
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
|
||||
// shouldn't occur.
|
||||
// However, just to be safe to start, create a failed Response to handle any
|
||||
// spurious Throwable-s.
|
||||
writer.monitor.onFailedSend(
|
||||
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
|
||||
} finally {
|
||||
writer.apiPhaser.arrive(); // Flush completed.
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (final RejectedExecutionException ex) {
|
||||
writer.monitor.onFailedSend(
|
||||
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
|
||||
writer.apiPhaser.arrive(); // Allow flush to return
|
||||
}
|
||||
} finally {
|
||||
payloadSize = 0;
|
||||
writer.disruptor.scheduleFlush();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
import datadog.trace.common.writer.DDAgentWriter;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
|
||||
|
||||
public TraceProcessingDisruptor(
|
||||
final int disruptorSize,
|
||||
final DDAgentApi api,
|
||||
final BatchWritingDisruptor batchWritingDisruptor,
|
||||
final Monitor monitor,
|
||||
final DDAgentWriter writer) {
|
||||
// TODO: add config to enable control over serialization overhead.
|
||||
super(disruptorSize, new TraceSerializingHandler(api, batchWritingDisruptor, monitor, writer));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ThreadFactory getThreadFactory() {
|
||||
return new DaemonThreadFactory("dd-trace-processor");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean publish(final List<DDSpan> data, final int representativeCount) {
|
||||
return disruptor.getRingBuffer().tryPublishEvent(dataTranslator, data, representativeCount);
|
||||
}
|
||||
|
||||
// This class is threadsafe if we want to enable more processors.
|
||||
public static class TraceSerializingHandler
|
||||
implements EventHandler<DisruptorEvent<List<DDSpan>>> {
|
||||
private final DDAgentApi api;
|
||||
private final BatchWritingDisruptor batchWritingDisruptor;
|
||||
private final Monitor monitor;
|
||||
private final DDAgentWriter writer;
|
||||
|
||||
public TraceSerializingHandler(
|
||||
final DDAgentApi api,
|
||||
final BatchWritingDisruptor batchWritingDisruptor,
|
||||
final Monitor monitor,
|
||||
final DDAgentWriter writer) {
|
||||
this.api = api;
|
||||
this.batchWritingDisruptor = batchWritingDisruptor;
|
||||
this.monitor = monitor;
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(
|
||||
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
|
||||
try {
|
||||
if (event.data != null) {
|
||||
try {
|
||||
final byte[] serializedTrace = api.serializeTrace(event.data);
|
||||
monitor.onSerialize(writer, event.data, serializedTrace);
|
||||
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
|
||||
event.representativeCount = 0; // reset in case flush is invoked below.
|
||||
} catch (final JsonProcessingException e) {
|
||||
log.debug("Error serializing trace", e);
|
||||
monitor.onFailedSerialize(writer, event.data, e);
|
||||
} catch (final Throwable e) {
|
||||
log.debug("Error while serializing trace", e);
|
||||
monitor.onFailedSerialize(writer, event.data, e);
|
||||
}
|
||||
}
|
||||
|
||||
if (event.flushLatch != null) {
|
||||
if (batchWritingDisruptor.running) {
|
||||
// propagate the flush.
|
||||
batchWritingDisruptor.flush(event.representativeCount, event.flushLatch);
|
||||
}
|
||||
if (!batchWritingDisruptor.running) { // check again to protect against race condition.
|
||||
// got shutdown early somehow?
|
||||
event.flushLatch.countDown();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
event.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,117 +0,0 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
|
||||
import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import com.lmax.disruptor.SleepingWaitStrategy;
|
||||
import com.lmax.disruptor.dsl.Disruptor;
|
||||
import com.lmax.disruptor.dsl.ProducerType;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
import datadog.trace.common.writer.DDAgentWriter;
|
||||
import java.io.Closeable;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class TraceSerializingDisruptor implements Closeable {
|
||||
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
|
||||
new DaemonThreadFactory("dd-trace-disruptor");
|
||||
private final FlushTask flushTask = new FlushTask();
|
||||
|
||||
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
|
||||
private final DDAgentWriter writer;
|
||||
|
||||
public volatile boolean running = false;
|
||||
|
||||
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
|
||||
|
||||
public TraceSerializingDisruptor(
|
||||
final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
|
||||
disruptor =
|
||||
new Disruptor<>(
|
||||
new DisruptorEvent.Factory<List<DDSpan>>(),
|
||||
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
|
||||
DISRUPTOR_THREAD_FACTORY,
|
||||
ProducerType.MULTI,
|
||||
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
|
||||
this.writer = writer;
|
||||
disruptor.handleEventsWith(handler);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
disruptor.start();
|
||||
running = true;
|
||||
scheduleFlush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
running = false;
|
||||
disruptor.shutdown();
|
||||
}
|
||||
|
||||
public boolean tryPublish(final List<DDSpan> trace) {
|
||||
return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
|
||||
}
|
||||
|
||||
/** This method will block until the flush is complete. */
|
||||
public boolean flush() {
|
||||
if (running) {
|
||||
log.info("Flushing any remaining traces.");
|
||||
// Register with the phaser so we can block until the flush completion.
|
||||
writer.apiPhaser.register();
|
||||
disruptor.publishEvent(FLUSH_TRANSLATOR);
|
||||
try {
|
||||
// Allow thread to be interrupted.
|
||||
writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
|
||||
|
||||
return true;
|
||||
} catch (final InterruptedException e) {
|
||||
log.warn("Waiting for flush interrupted.", e);
|
||||
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void scheduleFlush() {
|
||||
if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
|
||||
final ScheduledFuture<?> previous =
|
||||
flushSchedule.getAndSet(
|
||||
writer.scheduledWriterExecutor.schedule(
|
||||
flushTask, writer.flushFrequencySeconds, SECONDS));
|
||||
|
||||
final boolean previousIncomplete = (previous != null);
|
||||
if (previousIncomplete) {
|
||||
previous.cancel(true);
|
||||
}
|
||||
|
||||
writer.monitor.onScheduleFlush(writer, previousIncomplete);
|
||||
}
|
||||
}
|
||||
|
||||
private class FlushTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
// Don't call flush() because it would block the thread also used for sending the traces.
|
||||
disruptor.publishEvent(FLUSH_TRANSLATOR);
|
||||
}
|
||||
}
|
||||
|
||||
// Exposing some statistics for consumption by monitors
|
||||
public final long getDisruptorCapacity() {
|
||||
return disruptor.getRingBuffer().getBufferSize();
|
||||
}
|
||||
|
||||
public final long getDisruptorRemainingCapacity() {
|
||||
return disruptor.getRingBuffer().remainingCapacity();
|
||||
}
|
||||
}
|
|
@ -7,12 +7,13 @@ import datadog.opentracing.DDTracer
|
|||
import datadog.opentracing.PendingTrace
|
||||
import datadog.trace.api.sampling.PrioritySampling
|
||||
import datadog.trace.common.writer.DDAgentWriter
|
||||
import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.common.writer.ddagent.Monitor
|
||||
import datadog.trace.common.writer.ddagent.TraceConsumer
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import spock.lang.Timeout
|
||||
|
||||
import java.util.concurrent.Phaser
|
||||
import java.util.concurrent.Semaphore
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
@ -24,21 +25,41 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
|
|||
@Timeout(20)
|
||||
class DDAgentWriterTest extends DDSpecification {
|
||||
|
||||
def api = Mock(DDAgentApi)
|
||||
def phaser = new Phaser()
|
||||
def api = Mock(DDAgentApi) {
|
||||
// Define the following response in the spec:
|
||||
// sendSerializedTraces(_, _, _) >> {
|
||||
// phaser.arrive()
|
||||
// return DDAgentApi.Response.success(200)
|
||||
// }
|
||||
}
|
||||
def monitor = Mock(Monitor)
|
||||
|
||||
def setup() {
|
||||
// Register for two threads.
|
||||
phaser.register()
|
||||
phaser.register()
|
||||
}
|
||||
|
||||
def "test happy path"() {
|
||||
setup:
|
||||
def writer = new DDAgentWriter(api, 2, -1)
|
||||
writer.start()
|
||||
|
||||
when:
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
0 * _
|
||||
|
||||
when:
|
||||
writer.write(trace)
|
||||
writer.write(trace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
|
||||
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
|
||||
0 * _
|
||||
|
||||
cleanup:
|
||||
|
@ -57,11 +78,11 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
(1..traceCount).each {
|
||||
writer.write(trace)
|
||||
}
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount })
|
||||
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) >> DDAgentApi.Response.success(200)
|
||||
0 * _
|
||||
|
||||
cleanup:
|
||||
|
@ -76,9 +97,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
def "test flush by size"() {
|
||||
setup:
|
||||
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
|
||||
def phaser = writer.apiPhaser
|
||||
writer.start()
|
||||
phaser.register()
|
||||
|
||||
when:
|
||||
(1..6).each {
|
||||
|
@ -90,18 +109,21 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
then:
|
||||
6 * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
2 * api.sendSerializedTraces(3, _, { it.size() == 3 })
|
||||
2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) >> {
|
||||
phaser.arrive()
|
||||
return DDAgentApi.Response.success(200)
|
||||
}
|
||||
|
||||
when:
|
||||
(1..2).each {
|
||||
writer.write(trace)
|
||||
}
|
||||
// Flush the remaining 2
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
|
||||
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
|
||||
0 * _
|
||||
|
||||
cleanup:
|
||||
|
@ -114,11 +136,8 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
def "test flush by time"() {
|
||||
setup:
|
||||
def writer = new DDAgentWriter(api)
|
||||
def phaser = writer.apiPhaser
|
||||
phaser.register()
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
writer.start()
|
||||
writer.disruptor.flush()
|
||||
|
||||
when:
|
||||
(1..5).each {
|
||||
|
@ -128,7 +147,14 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
then:
|
||||
5 * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * api.sendSerializedTraces(5, _, { it.size() == 5 })
|
||||
1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> {
|
||||
phaser.arrive()
|
||||
return DDAgentApi.Response.success(200)
|
||||
}
|
||||
5 * monitor.onPublish(_, _)
|
||||
5 * monitor.onSerialize(_, _, _)
|
||||
1 * monitor.onFlush(_, _)
|
||||
(0..1) * monitor.onSend(_, _, _, _) // This gets called after phaser.arrive(), so there's a race condition.
|
||||
0 * _
|
||||
|
||||
cleanup:
|
||||
|
@ -153,11 +179,11 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
// Busywait because we don't want to fill up the ring buffer
|
||||
}
|
||||
}
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount })
|
||||
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) >> DDAgentApi.Response.success(200)
|
||||
|
||||
cleanup:
|
||||
writer.close()
|
||||
|
@ -181,39 +207,43 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
minimalSpan = new DDSpan(0, minimalContext)
|
||||
minimalTrace = [minimalSpan]
|
||||
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
|
||||
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
|
||||
maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
|
||||
}
|
||||
|
||||
def "check that are no interactions after close"() {
|
||||
|
||||
setup:
|
||||
def writer = new DDAgentWriter(api)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
writer.start()
|
||||
|
||||
when:
|
||||
writer.close()
|
||||
writer.write([])
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
// 2 * monitor.onFlush(_, false)
|
||||
1 * monitor.onFailedPublish(_, _)
|
||||
1 * monitor.onShutdown(_, _)
|
||||
0 * _
|
||||
writer.traceCount.get() == 0
|
||||
}
|
||||
|
||||
def "check shutdown if executor stopped first"() {
|
||||
def "check shutdown if batchWritingDisruptor stopped first"() {
|
||||
setup:
|
||||
def writer = new DDAgentWriter(api)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
writer.start()
|
||||
writer.scheduledWriterExecutor.shutdown()
|
||||
writer.batchWritingDisruptor.close()
|
||||
|
||||
when:
|
||||
writer.write([])
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
|
||||
1 * monitor.onSerialize(writer, _, _)
|
||||
1 * monitor.onPublish(writer, _)
|
||||
0 * _
|
||||
writer.traceCount.get() == 1
|
||||
writer.traceCount.get() == 0
|
||||
|
||||
cleanup:
|
||||
writer.close()
|
||||
|
@ -253,9 +283,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
}
|
||||
}
|
||||
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||
def monitor = Mock(Monitor)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build())
|
||||
|
||||
when:
|
||||
writer.start()
|
||||
|
@ -265,12 +293,12 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
when:
|
||||
writer.write(minimalTrace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
1 * monitor.onPublish(writer, minimalTrace)
|
||||
1 * monitor.onSerialize(writer, minimalTrace, _)
|
||||
1 * monitor.onScheduleFlush(writer, _)
|
||||
1 * monitor.onFlush(writer, _)
|
||||
1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 })
|
||||
|
||||
when:
|
||||
|
@ -302,9 +330,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
}
|
||||
}
|
||||
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||
def monitor = Mock(Monitor)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build())
|
||||
|
||||
when:
|
||||
writer.start()
|
||||
|
@ -314,12 +340,12 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
when:
|
||||
writer.write(minimalTrace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
1 * monitor.onPublish(writer, minimalTrace)
|
||||
1 * monitor.onSerialize(writer, minimalTrace, _)
|
||||
1 * monitor.onScheduleFlush(writer, _)
|
||||
1 * monitor.onFlush(writer, _)
|
||||
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 })
|
||||
|
||||
when:
|
||||
|
@ -345,7 +371,6 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
return DDAgentApi.Response.failed(new IOException("comm error"))
|
||||
}
|
||||
}
|
||||
def monitor = Mock(Monitor)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
|
||||
when:
|
||||
|
@ -356,12 +381,12 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
when:
|
||||
writer.write(minimalTrace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
1 * monitor.onPublish(writer, minimalTrace)
|
||||
1 * monitor.onSerialize(writer, minimalTrace, _)
|
||||
1 * monitor.onScheduleFlush(writer, _)
|
||||
1 * monitor.onFlush(writer, _)
|
||||
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null })
|
||||
|
||||
when:
|
||||
|
@ -400,30 +425,30 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
}
|
||||
}
|
||||
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||
|
||||
// This test focuses just on failed publish, so not verifying every callback
|
||||
def monitor = Stub(Monitor)
|
||||
monitor.onPublish(_, _) >> {
|
||||
numPublished.incrementAndGet()
|
||||
}
|
||||
monitor.onFailedPublish(_, _) >> {
|
||||
numFailedPublish.incrementAndGet()
|
||||
}
|
||||
monitor.onFlush(_, _) >> {
|
||||
numFlushes.incrementAndGet()
|
||||
}
|
||||
monitor.onSend(_, _, _, _) >> {
|
||||
numRequests.incrementAndGet()
|
||||
}
|
||||
monitor.onFailedPublish(_, _, _, _) >> {
|
||||
numFailedRequests.incrementAndGet()
|
||||
def monitor = Stub(Monitor) {
|
||||
onPublish(_, _) >> {
|
||||
numPublished.incrementAndGet()
|
||||
}
|
||||
onFailedPublish(_, _) >> {
|
||||
numFailedPublish.incrementAndGet()
|
||||
}
|
||||
onFlush(_, _) >> {
|
||||
numFlushes.incrementAndGet()
|
||||
}
|
||||
onSend(_, _, _, _) >> {
|
||||
numRequests.incrementAndGet()
|
||||
}
|
||||
onFailedPublish(_, _, _, _) >> {
|
||||
numFailedRequests.incrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
// sender queue is sized in requests -- not traces
|
||||
def bufferSize = 32
|
||||
def senderQueueSize = 2
|
||||
def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
|
||||
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build())
|
||||
writer.start()
|
||||
|
||||
// gate responses
|
||||
|
@ -438,7 +463,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
// sanity check coordination mechanism of test
|
||||
// release to allow response to be generated
|
||||
responseSemaphore.release()
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
// reacquire semaphore to stall further responses
|
||||
responseSemaphore.acquire()
|
||||
|
@ -505,21 +530,21 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
}
|
||||
}
|
||||
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||
|
||||
// This test focuses just on failed publish, so not verifying every callback
|
||||
def monitor = Stub(Monitor)
|
||||
monitor.onPublish(_, _) >> {
|
||||
numPublished.incrementAndGet()
|
||||
}
|
||||
monitor.onFailedPublish(_, _) >> {
|
||||
numFailedPublish.incrementAndGet()
|
||||
}
|
||||
monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
|
||||
numRepSent.addAndGet(repCount)
|
||||
def monitor = Stub(Monitor) {
|
||||
onPublish(_, _) >> {
|
||||
numPublished.incrementAndGet()
|
||||
}
|
||||
onFailedPublish(_, _) >> {
|
||||
numFailedPublish.incrementAndGet()
|
||||
}
|
||||
onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
|
||||
numRepSent.addAndGet(repCount)
|
||||
}
|
||||
}
|
||||
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build())
|
||||
writer.start()
|
||||
|
||||
when:
|
||||
|
@ -538,7 +563,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
t1.join()
|
||||
t2.join()
|
||||
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
def totalTraces = 100 + 100
|
||||
|
@ -566,7 +591,6 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
}
|
||||
}
|
||||
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||
|
||||
def statsd = Stub(StatsDClient)
|
||||
statsd.incrementCounter("queue.accepted") >> { stat ->
|
||||
|
@ -580,12 +604,12 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
}
|
||||
|
||||
def monitor = new Monitor.StatsD(statsd)
|
||||
def writer = new DDAgentWriter(api, monitor)
|
||||
def writer = new DDAgentWriter(DDAgentWriter.Spec.builder().traceAgentPort(agent.address.port).monitor(monitor).build())
|
||||
writer.start()
|
||||
|
||||
when:
|
||||
writer.write(minimalTrace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
numTracesAccepted == 1
|
||||
|
@ -633,7 +657,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
|
||||
when:
|
||||
writer.write(minimalTrace)
|
||||
writer.disruptor.flush()
|
||||
writer.flush()
|
||||
|
||||
then:
|
||||
numRequests == 1
|
||||
|
|
Loading…
Reference in New Issue