Extract TraceConsumer from DDAgentWriter

Unfortunately this required making some things public that were previously private or package visible.  I expect this to be temporary.
This commit is contained in:
Tyler Benson 2019-12-20 13:01:14 -08:00
parent 8fdd30d3ed
commit 97ed587547
4 changed files with 164 additions and 152 deletions

View File

@ -5,8 +5,6 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
@ -14,14 +12,12 @@ import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.ddagent.DisruptorEvent;
import datadog.trace.common.writer.ddagent.Monitor;
import java.util.ArrayList;
import datadog.trace.common.writer.ddagent.TraceConsumer;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +35,6 @@ import lombok.extern.slf4j.Slf4j;
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final DisruptorEvent.TraceTranslator TRANSLATOR =
@ -57,13 +52,13 @@ public class DDAgentWriter implements Writer {
private final int flushFrequencySeconds;
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
private final ScheduledExecutorService scheduledWriterExecutor;
public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
private final Phaser apiPhaser;
public final Phaser apiPhaser;
private volatile boolean running = false;
private final Monitor monitor;
public final Monitor monitor;
public DDAgentWriter() {
this(
@ -248,7 +243,7 @@ public class DDAgentWriter implements Writer {
return str;
}
private void scheduleFlush() {
public void scheduleFlush() {
if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
@ -270,138 +265,4 @@ public class DDAgentWriter implements Writer {
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
/** This class is intentionally not threadsafe. */
private static class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private final AtomicInteger traceCount;
private final Semaphore senderSemaphore;
private final DDAgentWriter writer;
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
private TraceConsumer(
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
this.traceCount = traceCount;
senderSemaphore = new Semaphore(senderQueueSize);
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = writer.api.serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
writer.monitor.onSerialize(writer, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
writer.monitor.onFlush(writer, early);
writer.apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (writer.scheduledWriterExecutor.isShutdown()) {
writer.monitor.onFailedSend(
writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
writer.apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
writer.monitor.onFlush(writer, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
writer.scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDApi.Response response =
writer.api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
} finally {
writer.apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
writer.apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
writer.scheduleFlush();
}
}
}
}

View File

@ -21,7 +21,6 @@ import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
@ -108,11 +107,11 @@ public class DDApi {
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
}
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
public byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(trace);
}
Response sendSerializedTraces(
public Response sendSerializedTraces(
final int representativeCount, final Integer sizeInBytes, final List<byte[]> traces) {
try {
final RequestBody body =
@ -348,21 +347,21 @@ public class DDApi {
}
public final boolean success() {
return this.success;
return success;
}
// TODO: DQH - In Java 8, switch to OptionalInteger
public final Integer status() {
return this.status;
return status;
}
public final JsonNode json() {
return this.json;
return json;
}
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
public final Throwable exception() {
return this.exception;
return exception;
}
}

View File

@ -0,0 +1,151 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.DDApi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/** This class is intentionally not threadsafe. */
@Slf4j
public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private final AtomicInteger traceCount;
private final Semaphore senderSemaphore;
private final DDAgentWriter writer;
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
public TraceConsumer(
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
this.traceCount = traceCount;
senderSemaphore = new Semaphore(senderQueueSize);
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
writer.monitor.onSerialize(writer, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
writer.monitor.onFlush(writer, early);
writer.apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (writer.scheduledWriterExecutor.isShutdown()) {
writer.monitor.onFailedSend(
writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
writer.apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
writer.monitor.onFlush(writer, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
writer.scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDApi.Response response =
writer
.getApi()
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
} finally {
writer.apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
writer.apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
writer.scheduleFlush();
}
}
}

View File

@ -9,6 +9,7 @@ import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.common.writer.ddagent.TraceConsumer
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
@ -180,7 +181,7 @@ class DDAgentWriterTest extends DDSpecification {
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {