Merge pull request #710 from DataDog/tyler/size-limited-flushes

Change DDAgentWriter to use Disruptor
This commit is contained in:
Tyler Benson 2019-03-20 11:03:56 -07:00 committed by GitHub
commit e5e2c5b9dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 599 additions and 493 deletions

View File

@ -39,6 +39,8 @@ public final class Constants {
"org.msgpack",
"com.fasterxml.jackson",
"org.yaml.snakeyaml",
// disruptor
"com.lmax.disruptor",
// okHttp
"okhttp3",
"okio",

View File

@ -37,6 +37,7 @@ dependencies {
compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7
compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.22'
compile group: 'com.lmax', name: 'disruptor', version: '3.4.2'
// We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this
testAnnotationProcessor deps.autoservice

View File

@ -40,7 +40,6 @@ import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -89,8 +88,6 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
private final DatadogHttpCodec.Injector injector;
private final DatadogHttpCodec.Extractor extractor;
private final AtomicInteger traceCount;
/** By default, report to local agent and collect all traces. */
public DDTracer() {
this(Config.get());
@ -240,12 +237,9 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
if (this.writer instanceof DDAgentWriter) {
final DDApi api = ((DDAgentWriter) this.writer).getApi();
traceCount = api.getTraceCounter();
if (sampler instanceof DDApi.ResponseListener) {
api.addResponseListener((DDApi.ResponseListener) this.sampler);
}
} else {
traceCount = new AtomicInteger(0);
}
registerClassLoader(ClassLoader.getSystemClassLoader());
@ -385,7 +379,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
/** Increment the reported trace count, but do not write a trace. */
void incrementTraceCount() {
traceCount.incrementAndGet();
writer.incrementTraceCount();
}
@Override

View File

@ -0,0 +1,24 @@
package datadog.trace.common.util;
import java.util.concurrent.ThreadFactory;
/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */
public final class DaemonThreadFactory implements ThreadFactory {
private final String threadName;
/**
* Constructs a new {@code DaemonThreadFactory}.
*
* @param threadName used to prefix all thread names.
*/
public DaemonThreadFactory(final String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}

View File

@ -3,103 +3,154 @@ package datadog.trace.common.writer;
import static datadog.trace.api.Config.DEFAULT_AGENT_HOST;
import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
* This writer write provided traces to the a DD agent which is most of time located on the same
* host.
* This writer buffers traces and sends them to the provided DDApi instance.
*
* <p>
*
* <p>It handles writes asynchronuously so the calling threads are automatically released. However,
* if too much spans are collected the writers can reach a state where it is forced to drop incoming
* spans.
* <p>Written traces are passed off to a disruptor so as to avoid blocking the application's thread.
* If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the
* threshold will be counted and sampled.
*/
@Slf4j
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 8192;
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
/** Maximum number of traces kept in memory */
static final int DEFAULT_MAX_TRACES = 7000;
/** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1;
private final ThreadFactory agentWriterThreadFactory =
new ThreadFactory() {
private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR =
new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, "dd-agent-writer");
thread.setDaemon(true);
return thread;
public void translateTo(
final Event<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
event.data = trace;
}
};
private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR =
new EventTranslator<Event<List<DDSpan>>>() {
@Override
public void translateTo(final Event<List<DDSpan>> event, final long sequence) {
event.shouldFlush = true;
}
};
/** Scheduled thread pool, acting like a cron */
private final ScheduledExecutorService scheduledExecutor =
Executors.newScheduledThreadPool(1, agentWriterThreadFactory);
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
/** The DD agent api */
private final DDApi api;
/** In memory collection of traces waiting for departure */
private final WriterQueue<List<DDSpan>> traces;
private boolean queueFullReported = false;
private final int flushFrequencySeconds;
private final Disruptor<Event<List<DDSpan>>> disruptor;
private final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
private final Phaser apiPhaser;
private volatile boolean running = false;
public DDAgentWriter() {
this(new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET));
}
public DDAgentWriter(final DDApi api) {
this(api, new WriterQueue<List<DDSpan>>(DEFAULT_MAX_TRACES));
this(api, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY);
}
public DDAgentWriter(final DDApi api, final WriterQueue<List<DDSpan>> queue) {
super();
this.api = api;
traces = queue;
}
/* (non-Javadoc)
* @see datadog.trace.Writer#write(java.util.List)
/**
* Used in the tests.
*
* @param api
* @param disruptorSize Rounded up to next power of 2
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
this.api = api;
this.flushFrequencySeconds = flushFrequencySeconds;
disruptor =
new Disruptor<>(
new DisruptorEventFactory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(new TraceConsumer());
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
}
@Override
public void write(final List<DDSpan> trace) {
final List<DDSpan> removed = traces.add(trace);
if (removed != null && !queueFullReported) {
log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES);
queueFullReported = true;
return;
// We can't add events after shutdown otherwise it will never complete shutting down.
if (running) {
final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
if (!published) {
// We're discarding the trace, but we still want to count it.
traceCount.incrementAndGet();
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
}
} else {
log.debug("Trace written after shutdown. Ignoring trace: {}", trace);
}
queueFullReported = false;
}
/* (non-Javadoc)
* @see Writer#start()
*/
@Override
public void incrementTraceCount() {
traceCount.incrementAndGet();
}
public DDApi getApi() {
return api;
}
@Override
public void start() {
scheduledExecutor.scheduleAtFixedRate(
new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
disruptor.start();
running = true;
scheduleFlush();
}
/* (non-Javadoc)
* @see datadog.trace.Writer#close()
*/
@Override
public void close() {
scheduledExecutor.shutdownNow();
running = false;
flush();
disruptor.shutdown();
scheduledWriterExecutor.shutdown();
}
/** This method will block until the flush is complete. */
public void flush() {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
// Allow thread to be interrupted.
apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
log.warn("Waiting for flush interrupted.", e);
}
}
@ -108,37 +159,109 @@ public class DDAgentWriter implements Writer {
return "DDAgentWriter { api=" + api + " }";
}
public DDApi getApi() {
return api;
}
class TracesSendingTask implements Runnable {
@Override
public void run() {
try {
if (traces.isEmpty()) {
return;
}
final List<List<DDSpan>> payload = traces.getAll();
if (log.isDebugEnabled()) {
int nbSpans = 0;
for (final List<?> trace : payload) {
nbSpans += trace.size();
}
log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
}
final boolean isSent = api.sendTraces(payload);
if (isSent) {
log.debug("Successfully sent {} traces to the API", payload.size());
} else {
log.debug("Failed to send {} traces to the API", payload.size());
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
private void scheduleFlush() {
if (flushFrequencySeconds > 0) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
if (previous != null) {
previous.cancel(true);
}
}
}
private final Runnable flushTask = new FlushTask();
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
/** This class is intentionally not threadsafe. */
private class TraceConsumer implements EventHandler<Event<List<DDSpan>>> {
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
@Override
public void onEvent(
final Event<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = api.serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
reportTraces();
event.shouldFlush = false;
}
}
private void reportTraces() {
try {
if (serializedTraces.isEmpty()) {
apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
// Run the actual IO task on a different thread to avoid blocking the consumer.
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
final boolean sent =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (sent) {
log.debug("Successfully sent {} traces to the API", toSend.size());
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
} finally {
apiPhaser.arrive(); // Flush completed.
}
}
});
} finally {
payloadSize = 0;
scheduleFlush();
}
}
}
private static class Event<T> {
private volatile boolean shouldFlush = false;
private volatile T data = null;
}
private static class DisruptorEventFactory<T> implements EventFactory<Event<T>> {
@Override
public Event<T> newInstance() {
return new Event<>();
}
}
}

View File

@ -1,6 +1,7 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.opentracing.DDSpan;
@ -8,11 +9,11 @@ import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.writer.unixdomainsockets.UnixDomainSocketFactory;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
@ -20,6 +21,9 @@ import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
/** The API pointing to a DD agent */
@ -38,7 +42,6 @@ public class DDApi {
private final List<ResponseListener> responseListeners = new ArrayList<>();
private final AtomicInteger traceCount = new AtomicInteger(0);
private volatile long nextAllowedLogTime = 0;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
@ -76,10 +79,6 @@ public class DDApi {
}
}
public AtomicInteger getTraceCounter() {
return traceCount;
}
/**
* Send traces to the DD agent
*
@ -87,12 +86,63 @@ public class DDApi {
* @return the staus code returned
*/
public boolean sendTraces(final List<List<DDSpan>> traces) {
final int totalSize = traceCount.getAndSet(0);
final List<byte[]> serializedTraces = new ArrayList<>(traces.size());
int sizeInBytes = 0;
for (final List<DDSpan> trace : traces) {
try {
final byte[] serializedTrace = serializeTrace(trace);
sizeInBytes += serializedTrace.length;
serializedTraces.add(serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
}
}
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
}
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(trace);
}
boolean sendSerializedTraces(
final int representativeCount, final Integer sizeInBytes, final List<byte[]> traces) {
try {
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(traces));
final RequestBody body =
new RequestBody() {
@Override
public MediaType contentType() {
return MSGPACK;
}
@Override
public long contentLength() {
final int traceCount = traces.size();
// Need to allocate additional to handle MessagePacker.packArrayHeader
if (traceCount < (1 << 4)) {
return sizeInBytes + 1; // byte
} else if (traceCount < (1 << 16)) {
return sizeInBytes + 2; // short
} else {
return sizeInBytes + 4; // int
}
}
@Override
public void writeTo(final BufferedSink sink) throws IOException {
final OutputStream out = sink.outputStream();
final MessagePacker packer = MessagePack.newDefaultPacker(out);
packer.packArrayHeader(traces.size());
for (final byte[] trace : traces) {
packer.writePayload(trace);
}
packer.close();
out.close();
}
};
final Request request =
prepareRequest(tracesUrl)
.addHeader(X_DATADOG_TRACE_COUNT, String.valueOf(totalSize))
.addHeader(X_DATADOG_TRACE_COUNT, String.valueOf(representativeCount))
.put(body)
.build();
@ -100,17 +150,18 @@ public class DDApi {
if (response.code() != 200) {
if (log.isDebugEnabled()) {
log.debug(
"Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ",
"Error while sending {} of {} traces to the DD agent. Status: {}, Response: {}, Body: {}",
traces.size(),
totalSize,
representativeCount,
response.code(),
response.message());
response.message(),
response.body().string());
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG;
log.warn(
"Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)",
traces.size(),
totalSize,
representativeCount,
response.code(),
response.message(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));
@ -118,14 +169,18 @@ public class DDApi {
return false;
}
log.debug("Successfully sent {} of {} traces to the DD agent.", traces.size(), totalSize);
log.debug(
"Successfully sent {} of {} traces to the DD agent.",
traces.size(),
representativeCount);
final String responseString = response.body().string().trim();
try {
if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final String endpoint = tracesUrl.toString();
for (final ResponseListener listener : responseListeners) {
listener.onResponse(tracesUrl.toString(), parsedResponse);
listener.onResponse(endpoint, parsedResponse);
}
}
} catch (final JsonParseException e) {
@ -139,7 +194,7 @@ public class DDApi {
"Error while sending "
+ traces.size()
+ " of "
+ totalSize
+ representativeCount
+ " traces to the DD agent.",
e);
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
@ -147,7 +202,7 @@ public class DDApi {
log.warn(
"Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)",
traces.size(),
totalSize,
representativeCount,
e.getClass().getName(),
e.getMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));

View File

@ -44,6 +44,9 @@ public class ListWriter extends CopyOnWriteArrayList<List<DDSpan>> implements Wr
}
}
@Override
public void incrementTraceCount() {}
@Override
public void start() {
close();
@ -64,6 +67,6 @@ public class ListWriter extends CopyOnWriteArrayList<List<DDSpan>> implements Wr
@Override
public String toString() {
return "ListWriter { size=" + this.size() + " }";
return "ListWriter { size=" + size() + " }";
}
}

View File

@ -20,6 +20,11 @@ public class LoggingWriter implements Writer {
}
}
@Override
public void incrementTraceCount() {
log.info("incrementTraceCount()");
}
@Override
public void close() {
log.info("close()");

View File

@ -2,12 +2,13 @@ package datadog.trace.common.writer;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** A writer is responsible to send collected spans to some place */
public interface Writer {
public interface Writer extends Closeable {
/**
* Write a trace represented by the entire list of all the finished spans
@ -23,8 +24,12 @@ public interface Writer {
* Indicates to the writer that no future writing will come and it should terminates all
* connections and tasks
*/
@Override
void close();
/** Count that a trace was captured for stats, but without reporting it. */
void incrementTraceCount();
@Slf4j
final class Builder {

View File

@ -1,95 +0,0 @@
package datadog.trace.common.writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* A bounded queue implementation compatible with the Datadog agent behavior. The class is
* thread-safe and can be used with concurrency.
*
* <p>
*
* <p>
*
* <p>
*
* <p>This class implements a specific behavior when it's full. Each new item added will replace an
* exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform
* efficient random remove.
*
* @param <T> The element type to store
*/
class WriterQueue<T> {
private final int capacity;
private volatile ArrayList<T> list;
/**
* Default construct, a capacity must be provided
*
* @param capacity the max size of the queue
*/
WriterQueue(final int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("Capacity couldn't be 0");
}
this.list = emptyList(capacity);
this.capacity = capacity;
}
/**
* Return a list containing all elements present in the queue. After the operation, the queue is
* reset. All action performed on the returned list has no impact to the queue
*
* @return a list contain all elements
*/
public synchronized List<T> getAll() {
final List<T> all = list;
list = emptyList(capacity);
return all;
}
/**
* Add an element to the queue. If the queue is full, set the element at a random place in the
* queue and return the previous one.
*
* @param element the element to add to the queue
* @return null if the queue is not full, otherwise the removed element
*/
public synchronized T add(final T element) {
T removed = null;
if (list.size() < capacity) {
list.add(element);
} else {
final int index = ThreadLocalRandom.current().nextInt(0, list.size());
removed = list.set(index, element);
}
return removed;
}
// Methods below are essentially used for testing purposes
/**
* Return the number of elements set in the queue
*
* @return the current size of the queue
*/
public int size() {
return list.size();
}
/**
* Return true if the queue is empty
*
* @return true if the queue is empty
*/
public boolean isEmpty() {
return list.isEmpty();
}
private ArrayList<T> emptyList(final int capacity) {
return new ArrayList<>(capacity);
}
}

View File

@ -1,6 +1,5 @@
package datadog.opentracing
import datadog.trace.api.Config
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.gc.GCUtils
@ -10,13 +9,19 @@ import spock.lang.Timeout
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import static datadog.trace.api.Config.PARTIAL_FLUSH_MIN_SPANS
class PendingTraceTest extends Specification {
def writer = new ListWriter()
def traceCount = new AtomicInteger()
def writer = new ListWriter() {
@Override
void incrementTraceCount() {
PendingTraceTest.this.traceCount.incrementAndGet()
}
}
def tracer = new DDTracer(writer)
def traceCount = tracer.traceCount
def traceId = System.identityHashCode(this)
String traceIdStr = String.valueOf(traceId)
@ -207,7 +212,7 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 2
trace.asList() == [rootSpan]
writer == []
tracer.traceCount.get() == 0
traceCount.get() == 0
when:
child1.finish()
@ -217,7 +222,7 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 1
trace.asList() == [rootSpan]
writer == [[child1]]
tracer.traceCount.get() == 1
traceCount.get() == 1
when:
child2.finish()
@ -227,7 +232,7 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 0
trace.asList() == [child2, rootSpan]
writer == [[child1], [child2, rootSpan]]
tracer.traceCount.get() == 2
traceCount.get() == 2
}
def "partial flush with root span closed last"() {
@ -253,7 +258,7 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 2
trace.asList() == [child1]
writer == []
tracer.traceCount.get() == 0
traceCount.get() == 0
when:
child2.finish()
@ -263,7 +268,7 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 1
trace.asList() == []
writer == [[child2, child1]]
tracer.traceCount.get() == 1
traceCount.get() == 1
when:
rootSpan.finish()
@ -273,6 +278,6 @@ class PendingTraceTest extends Specification {
trace.weakReferences.size() == 0
trace.asList() == [rootSpan]
writer == [[child2, child1], [rootSpan]]
tracer.traceCount.get() == 2
traceCount.get() == 2
}
}

View File

@ -4,9 +4,11 @@ import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.ListWriter
class SpanFactory {
static newSpanOf(long timestampMicro) {
static newSpanOf(long timestampMicro, String threadName = Thread.currentThread().name) {
def writer = new ListWriter()
def tracer = new DDTracer(writer)
def currentThreadName = Thread.currentThread().getName()
Thread.currentThread().setName(threadName)
def context = new DDSpanContext(
"1",
"1",
@ -22,6 +24,7 @@ class SpanFactory {
Collections.emptyMap(),
new PendingTrace(tracer, "1", [:]),
tracer)
Thread.currentThread().setName(currentThreadName)
return new DDSpan(timestampMicro, context)
}

View File

@ -14,17 +14,32 @@ import spock.lang.Subject
import spock.lang.Timeout
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import static java.util.concurrent.TimeUnit.SECONDS
class ScopeManagerTest extends Specification {
def writer = new ListWriter()
def tracer = new DDTracer(writer)
def latch
def writer
def tracer
@Subject
def scopeManager = tracer.scopeManager()
def scopeManager
def setup() {
latch = new CountDownLatch(1)
final currentLatch = latch
writer = new ListWriter() {
void incrementTraceCount() {
currentLatch.countDown()
}
}
tracer = new DDTracer(writer)
scopeManager = tracer.scopeManager()
}
def cleanup() {
scopeManager.tlsScope.remove()
@ -255,13 +270,12 @@ class ScopeManagerTest extends Specification {
writer == [[scope.span()]]
}
@Timeout(value = 60, unit = TimeUnit.SECONDS)
@Timeout(value = 60, unit = SECONDS)
def "hard reference on continuation prevents trace from reporting"() {
setup:
def builder = tracer.buildSpan("test")
def scope = (ContinuableScope) builder.startActive(false)
def span = scope.span()
def traceCount = ((DDSpan) span).context().tracer.traceCount
scope.setAsyncPropagation(true)
def continuation = scope.capture()
scope.close()
@ -277,9 +291,7 @@ class ScopeManagerTest extends Specification {
def continuationRef = new WeakReference<>(continuation)
continuation = null // Continuation references also hold up traces.
GCUtils.awaitGC(continuationRef)
while (traceCount.get() == 0) {
// wait until trace count increments or timeout expires
}
latch.await(60, SECONDS)
}
if (autoClose) {
if (continuation != null) {
@ -289,7 +301,6 @@ class ScopeManagerTest extends Specification {
then:
forceGC ? true : writer == [[span]]
traceCount.get() == 1
where:
autoClose | forceGC

View File

@ -132,7 +132,7 @@ class DDTracerTest extends Specification {
expect:
tracer.writer instanceof DDAgentWriter
tracer.traceCount.is(((DDAgentWriter) tracer.writer).getApi().traceCount)
tracer.writer.traceCount.is(((DDAgentWriter) tracer.writer).traceCount)
where:
key | value

View File

@ -1,90 +1,196 @@
package datadog.trace.api.writer
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.WriterQueue
import spock.lang.Specification
import spock.lang.Timeout
import java.util.concurrent.TimeUnit
import static datadog.opentracing.SpanFactory.newSpanOf
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.verifyNoMoreInteractions
import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
class DDAgentWriterTest extends Specification {
def api = Mock(DDApi)
def "calls to the API are scheduled"() {
def "test happy path"() {
setup:
def api = Mock(DDApi)
def writer = new DDAgentWriter(api)
when:
def writer = new DDAgentWriter(api, 2, -1)
writer.start()
Thread.sleep(flush_time_wait)
then:
0 * api.sendTraces(_ as List)
when:
for (def i = 0; i < tick; i++) {
writer.write(trace)
Thread.sleep(flush_time_wait)
}
then:
tick * api.sendTraces([trace])
where:
trace = [newSpanOf(0)]
flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000))
tick << [1, 3]
}
def "check if trace has been added by force"() {
setup:
def traces = new WriterQueue<List<DDSpan>>(capacity)
def writer = new DDAgentWriter(Mock(DDApi), traces)
when:
for (def i = 0; i < capacity; i++) {
writer.write([])
}
then:
traces.size() == capacity
when:
writer.write(trace)
writer.write(trace)
writer.flush()
then:
traces.size() == capacity
traces.getAll().contains(trace)
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
0 * _
cleanup:
writer.close()
where:
trace = [newSpanOf(0)]
capacity = 10
trace = [newSpanOf(0, "fixed-thread-name")]
}
def "test flood of traces"() {
setup:
def writer = new DDAgentWriter(api, disruptorSize, -1)
writer.start()
when:
(1..traceCount).each {
writer.write(trace)
}
writer.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount })
0 * _
cleanup:
writer.close()
where:
trace = [newSpanOf(0, "fixed-thread-name")]
disruptorSize = 2
traceCount = 100 // Shouldn't trigger payload, but bigger than the disruptor size.
}
def "test flush by size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
def phaser = writer.apiPhaser
writer.start()
phaser.register()
when:
(1..6).each {
writer.write(trace)
}
// Wait for 2 flushes of 3 by size
phaser.awaitAdvanceInterruptibly(phaser.arrive())
phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister())
then:
6 * api.serializeTrace(_) >> { trace -> callRealMethod() }
2 * api.sendSerializedTraces(3, _, { it.size() == 3 })
when:
(1..2).each {
writer.write(trace)
}
// Flush the remaining 2
writer.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
trace = (0..10000).collect { span }
}
def "test flush by time"() {
setup:
def writer = new DDAgentWriter(api)
def phaser = writer.apiPhaser
phaser.register()
writer.start()
writer.flush()
when:
(1..5).each {
writer.write(trace)
}
phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister())
then:
5 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(5, _, { it.size() == 5 })
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
trace = (1..10).collect { span }
}
def "test default buffer size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
writer.start()
when:
(0..maxedPayloadTraceCount).each {
writer.write(minimalTrace)
def start = System.nanoTime()
// (consumer processes a trace in about 20 microseconds
while (System.nanoTime() - start < TimeUnit.MICROSECONDS.toNanos(100)) {
// Busywait because we don't want to fill up the ring buffer
}
}
writer.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount })
cleanup:
writer.close()
where:
minimalContext = new DDSpanContext(
"1",
"1",
"0",
"",
"",
"",
PrioritySampling.UNSET,
"",
Collections.emptyMap(),
false,
"",
Collections.emptyMap(),
Mock(PendingTrace),
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {
setup:
def api = mock(DDApi)
def writer = new DDAgentWriter(api)
writer.start()
when:
writer.close()
writer.write([])
Thread.sleep(flush_time_wait)
writer.flush()
then:
verifyNoMoreInteractions(api)
where:
flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000))
0 * _
writer.traceCount.get() == 0
}
}

View File

@ -2,11 +2,9 @@ package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.DDApi.ResponseListener
import org.msgpack.jackson.dataformat.MessagePackFactory
import spock.lang.Specification
import java.util.concurrent.atomic.AtomicReference
@ -14,15 +12,20 @@ import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
class DDApiTest extends Specification {
static mapper = new ObjectMapper(new MessagePackFactory())
static mapper = DDApi.OBJECT_MAPPER
def "sending an empty list of traces returns no errors"() {
setup:
def agent = httpServer {
handlers {
put("v0.4/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send()
if (request.contentType != "application/msgpack") {
response.status(400).send("wrong type: $request.contentType")
} else if (request.contentLength <= 0) {
response.status(400).send("no content")
} else {
response.status(200).send()
}
}
}
}
@ -68,7 +71,6 @@ class DDApiTest extends Specification {
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
client.getTraceCounter().addAndGet(traces.size()) >= 0
client.sendTraces(traces)
agent.lastRequest.contentType == "application/msgpack"
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
@ -82,9 +84,9 @@ class DDApiTest extends Specification {
// Populate thread info dynamically as it is different when run via gradle vs idea.
where:
traces | expectedRequestBody
[] | []
[SpanFactory.newSpanOf(1L).setTag("service.name", "my-service")] | [new TreeMap<>([
traces | expectedRequestBody
[] | []
[[SpanFactory.newSpanOf(1L).setTag("service.name", "my-service")]] | [[new TreeMap<>([
"duration" : 0,
"error" : 0,
"meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"],
@ -97,8 +99,8 @@ class DDApiTest extends Specification {
"start" : 1000,
"trace_id" : 1,
"type" : "fakeType"
])]
[SpanFactory.newSpanOf(100L).setTag("resource.name", "my-resource")] | [new TreeMap<>([
])]]
[[SpanFactory.newSpanOf(100L).setTag("resource.name", "my-resource")]] | [[new TreeMap<>([
"duration" : 0,
"error" : 0,
"meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"],
@ -111,7 +113,7 @@ class DDApiTest extends Specification {
"start" : 100000,
"trace_id" : 1,
"type" : "fakeType"
])]
])]]
}
def "Api ResponseListeners see 200 responses"() {
@ -130,18 +132,15 @@ class DDApiTest extends Specification {
}
def client = new DDApi("localhost", agent.address.port, null)
client.addResponseListener(responseListener)
def traceCounter = client.getTraceCounter()
traceCounter.set(3)
when:
client.sendTraces([])
client.sendTraces([[], [], []])
then:
agentResponse.get() == '{"hello":"test"}'
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"
agent.lastRequest.headers.get("X-Datadog-Trace-Count") == "3" // false data shows the value provided via traceCounter.
traceCounter.get() == 0
cleanup:
agent.close()
@ -200,11 +199,7 @@ class DDApiTest extends Specification {
"v0.3" | 30000 | false
}
static List<TreeMap<String, Object>> convertList(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<List<TreeMap<String, Object>>>() {})
}
static TreeMap<String, Object> convertMap(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<TreeMap<String, Object>>() {})
static List<List<TreeMap<String, Object>>> convertList(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<List<List<TreeMap<String, Object>>>>() {})
}
}

View File

@ -0,0 +1,62 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import org.msgpack.core.MessagePack
import org.msgpack.jackson.dataformat.MessagePackFactory
import spock.lang.Shared
import spock.lang.Specification
import static java.util.Collections.singletonMap
class SerializationTest extends Specification {
@Shared
def jsonMapper = new ObjectMapper()
@Shared
def mpMapper = new ObjectMapper(new MessagePackFactory())
def "test json mapper serialization"() {
setup:
def map = ["key1": "val1"]
def serializedMap = mapper.writeValueAsBytes(map)
def serializedList = "[${new String(serializedMap)}]".getBytes()
when:
def result = mapper.readValue(serializedList, new TypeReference<List<Map<String, String>>>() {})
then:
result == [map]
new String(serializedList) == '[{"key1":"val1"}]'
where:
mapper = jsonMapper
}
def "test msgpack mapper serialization"() {
setup:
def serializedMaps = input.collect {
mapper.writeValueAsBytes(it)
}
def packer = MessagePack.newDefaultBufferPacker()
packer.packArrayHeader(serializedMaps.size())
serializedMaps.each {
packer.writePayload(it)
}
def serializedList = packer.toByteArray()
when:
def result = mapper.readValue(serializedList, new TypeReference<List<Map<String, String>>>() {})
then:
result == input
where:
mapper = mpMapper
// GStrings get odd results in the serializer.
input = (1..1).collect { singletonMap("key$it".toString(), "val$it".toString()) }
}
}

View File

@ -1,181 +0,0 @@
package datadog.trace.api.writer
import datadog.trace.common.writer.WriterQueue
import spock.lang.Specification
import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicInteger
class WriterQueueTest extends Specification {
def "instantiate a empty queue throws an exception"() {
when:
new WriterQueue<Integer>(0)
then:
thrown IllegalArgumentException
when:
new WriterQueue<Integer>(-1)
then:
thrown IllegalArgumentException
}
def "full the queue without forcing"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
def removed = false
when:
for (def i = 0; i < capacity; i++) {
removed = removed || queue.add(i) != null
}
then:
!removed
where:
capacity << [1, 10, 100]
}
def "force element add to a full queue"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}
when:
def removed = queue.add(1)
then:
removed != null
queue.size() == capacity
where:
capacity << [1, 10, 100]
}
def "drain the queue into another collection"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}
when:
def list = queue.getAll()
then:
list.size() == capacity
queue.isEmpty()
queue.size() == 0
where:
capacity << [1, 10, 100]
}
def "check concurrency on writes"() {
setup:
def phaser1 = new Phaser()
def phaser2 = new Phaser()
def queue = new WriterQueue<Integer>(capacity)
def insertionCount = new AtomicInteger(0)
phaser1.register() // global start
phaser2.register() // global stop
numberThreads.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberInsertionsPerThread.times {
queue.add(1)
insertionCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
when:
phaser1.arriveAndAwaitAdvance() // allow threads to start
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
then:
queue.size() == capacity
insertionCount.get() == numberInsertionsPerThread * numberThreads
where:
capacity = 100
numberThreads << [1, 10, 100]
numberInsertionsPerThread = 100
}
def "check concurrency on writes and reads"() {
setup:
def phaser1 = new Phaser()
def phaser2 = new Phaser()
def queue = new WriterQueue<Integer>(capacity)
def insertionCount = new AtomicInteger(0)
def droppedCount = new AtomicInteger(0)
def getCount = new AtomicInteger(0)
def numberElements = new AtomicInteger(0)
phaser1.register() // global start
phaser2.register() // global stop
// writes
numberThreadsWrites.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberInsertionsPerThread.times {
queue.add(1) != null ? droppedCount.getAndIncrement() : null
insertionCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
// reads
numberThreadsReads.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberGetsPerThread.times {
numberElements.getAndAdd(queue.getAll().size())
getCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
when:
phaser1.arriveAndAwaitAdvance() // allow threads to start
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
then:
insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites
getCount.get() == numberGetsPerThread * numberThreadsReads
insertionCount.get() == numberElements + queue.size() + droppedCount
where:
capacity = 100
numberThreadsWrites << [1, 10, 100]
numberThreadsReads << [1, 5, 10]
numberInsertionsPerThread = 100
numberGetsPerThread = 5
}
}

View File

@ -151,18 +151,6 @@ class DDApiIntegrationTest {
[[new DDSpan(1, CONTEXT)]] | 3
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
}
def "Sending bad trace fails (test #test)"() {
expect:
api.sendTraces(traces) == false
where:
traces | test
[""] | 1
["", 123] | 2
[[:]] | 3
[new Object()] | 4
}
}
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })