Extract TraceSerializingDisruptor class from DDAgentWriter

This commit is contained in:
Tyler Benson 2019-12-20 13:36:16 -08:00
parent 97ed587547
commit 84f9d80258
4 changed files with 147 additions and 97 deletions

View File

@ -5,23 +5,17 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.TraceConsumer;
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
@ -37,26 +31,16 @@ public class DDAgentWriter implements Writer {
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final DisruptorEvent.TraceTranslator TRANSLATOR =
new DisruptorEvent.TraceTranslator();
private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
new DisruptorEvent.FlushTranslator();
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
private final Runnable flushTask = new FlushTask();
private final DDApi api;
private final int flushFrequencySeconds;
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
public final int flushFrequencySeconds;
public final TraceSerializingDisruptor disruptor;
public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
public final Phaser apiPhaser;
private volatile boolean running = false;
public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
public final Monitor monitor;
@ -114,24 +98,18 @@ public class DDAgentWriter implements Writer {
this.monitor = monitor;
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(new TraceConsumer(traceCount, senderQueueSize, this));
new TraceSerializingDisruptor(
disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this));
this.flushFrequencySeconds = flushFrequencySeconds;
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
return disruptor.getDisruptorCapacity();
}
public final long getDisruptorUtilizedCapacity() {
@ -139,14 +117,14 @@ public class DDAgentWriter implements Writer {
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
return disruptor.getDisruptorRemainingCapacity();
}
@Override
public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
if (running) {
final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
if (disruptor.running) {
final boolean published = disruptor.tryPublish(trace);
if (published) {
monitor.onPublish(DDAgentWriter.this, trace);
@ -176,15 +154,12 @@ public class DDAgentWriter implements Writer {
@Override
public void start() {
disruptor.start();
running = true;
scheduleFlush();
monitor.onStart(this);
}
@Override
public void close() {
running = false;
boolean flushSuccess = true;
@ -199,34 +174,13 @@ public class DDAgentWriter implements Writer {
flushSuccess = false;
}
flushSuccess |= flush();
disruptor.shutdown();
flushSuccess |= disruptor.flush();
disruptor.close();
monitor.onShutdown(this, flushSuccess);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
@Override
public String toString() {
// DQH - I don't particularly like the instanceof check,
@ -242,27 +196,4 @@ public class DDAgentWriter implements Writer {
return str;
}
public void scheduleFlush() {
if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
monitor.onScheduleFlush(this, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
}

View File

@ -145,7 +145,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
}
} finally {
payloadSize = 0;
writer.scheduleFlush();
writer.disruptor.scheduleFlush();
}
}
}

View File

@ -0,0 +1,119 @@
package datadog.trace.common.writer.ddagent;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TraceSerializingDisruptor implements Closeable {
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
new DisruptorEvent.TraceTranslator();
private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
new DisruptorEvent.FlushTranslator();
private final FlushTask flushTask = new FlushTask();
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
private final DDAgentWriter writer;
public volatile boolean running = false;
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
public TraceSerializingDisruptor(
final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
this.writer = writer;
disruptor.handleEventsWith(handler);
}
public void start() {
disruptor.start();
running = true;
scheduleFlush();
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
public boolean tryPublish(final List<DDSpan> trace) {
return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
writer.apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
public void scheduleFlush() {
if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
writer.scheduledWriterExecutor.schedule(
flushTask, writer.flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
writer.monitor.onScheduleFlush(writer, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
}

View File

@ -34,7 +34,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(trace)
writer.write(trace)
writer.flush()
writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -57,7 +57,7 @@ class DDAgentWriterTest extends DDSpecification {
(1..traceCount).each {
writer.write(trace)
}
writer.flush()
writer.disruptor.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -97,7 +97,7 @@ class DDAgentWriterTest extends DDSpecification {
writer.write(trace)
}
// Flush the remaining 2
writer.flush()
writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -118,7 +118,7 @@ class DDAgentWriterTest extends DDSpecification {
def phaser = writer.apiPhaser
phaser.register()
writer.start()
writer.flush()
writer.disruptor.flush()
when:
(1..5).each {
@ -153,7 +153,7 @@ class DDAgentWriterTest extends DDSpecification {
// Busywait because we don't want to fill up the ring buffer
}
}
writer.flush()
writer.disruptor.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -193,7 +193,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.close()
writer.write([])
writer.flush()
writer.disruptor.flush()
then:
0 * _
@ -208,7 +208,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write([])
writer.flush()
writer.disruptor.flush()
then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -265,7 +265,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -314,7 +314,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -356,7 +356,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -438,7 +438,7 @@ class DDAgentWriterTest extends DDSpecification {
// sanity check coordination mechanism of test
// release to allow response to be generated
responseSemaphore.release()
writer.flush()
writer.disruptor.flush()
// reacquire semaphore to stall further responses
responseSemaphore.acquire()
@ -538,7 +538,7 @@ class DDAgentWriterTest extends DDSpecification {
t1.join()
t2.join()
writer.flush()
writer.disruptor.flush()
then:
def totalTraces = 100 + 100
@ -585,7 +585,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
numTracesAccepted == 1
@ -633,7 +633,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
numRequests == 1