diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 66e45b248a..d274c54824 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -2,19 +2,14 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; /** @@ -38,20 +33,25 @@ public class DDAgentWriter implements Writer { private static final int DEFAULT_MAX_TRACES = 1000; /** Timeout for the API in seconds */ - private static final long API_TIMEOUT_SECONDS = 2; + private static final long API_TIMEOUT_SECONDS = 1; /** Flush interval for the API in seconds */ - private static final long FLUSH_TIME_SECONDS = 5; + private static final long FLUSH_TIME_SECONDS = 1; /** Scheduled thread pool, it' acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); + /** The DD agent api */ private final DDApi api; + /** In memory collection of traces waiting for departure */ private final WriterQueue>> traces; + private boolean queueFullReported = false; + public DDAgentWriter() { this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); } @@ -69,9 +69,12 @@ public class DDAgentWriter implements Writer { public void write(final List> trace) { final List> removed = traces.add(trace); - if (removed != null) { + if (removed != null && !queueFullReported) { log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES); + queueFullReported = true; + return; } + queueFullReported = false; } /* (non-Javadoc) @@ -103,65 +106,6 @@ public class DDAgentWriter implements Writer { } } - static class WriterQueue { - - private final int capacity; - private final Lock lock = new ReentrantLock(); - private ArrayList list; - private int nbElements = 0; - - public WriterQueue(final int capacity) { - if (capacity < 1) { - throw new IllegalArgumentException("Capacity couldn't be 0"); - } - list = new ArrayList<>(capacity); - this.capacity = capacity; - } - - public int size() { - return nbElements; - } - - public List getAll() { - List all = Collections.emptyList(); - lock.lock(); - try { - all = list; - list = new ArrayList<>(capacity); - nbElements = 0; - } finally { - lock.unlock(); - } - return all; - } - - public T add(final T element) { - - lock.lock(); - T removed = null; - try { - if (nbElements < capacity) { - list.add(element); - ++nbElements; - } else { - removed = set(element); - } - } finally { - lock.unlock(); - } - return removed; - } - - public boolean isEmpty() { - return nbElements == 0; - } - - private T set(final T element) { - final int index = ThreadLocalRandom.current().nextInt(0, nbElements); - return list.set(index, element); - } - } - /** Infinite tasks blocking until some spans come in the blocking queue. */ private class TracesSendingTask implements Runnable { @@ -190,12 +134,14 @@ public class DDAgentWriter implements Writer { final List>> payload = traces.getAll(); - int nbSpans = 0; - for (final List trace : payload) { - nbSpans += trace.size(); - } + if (log.isDebugEnabled()) { + int nbSpans = 0; + for (final List trace : payload) { + nbSpans += trace.size(); + } - log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); + log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); + } final boolean isSent = api.sendTraces(payload); if (!isSent) { diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java new file mode 100644 index 0000000000..ab50076e75 --- /dev/null +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java @@ -0,0 +1,92 @@ +package com.datadoghq.trace.writer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A bounded queue implementation compatible with the Datadog agent behavior. The class is + * thread-safe and can be used with concurrency. + * + *

+ * + *

This class implements a specific behavior when it's full. Each new item added will replace an + * exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform + * efficient random remove. + * + * @param The element type to store + */ +class WriterQueue { + + private final int capacity; + private volatile ArrayList list; + private volatile int elementCount = 0; + + /** + * Default construct, a capacity must be provided + * + * @param capacity the max size of the queue + */ + WriterQueue(final int capacity) { + if (capacity < 1) { + throw new IllegalArgumentException("Capacity couldn't be 0"); + } + list = new ArrayList<>(capacity); + this.capacity = capacity; + } + + /** + * Return a list containing all elements present in the queue. After the operation, the queue is + * reset. All action performed on the returned list has no impact to the queue + * + * @return a list contain all elements + */ + public synchronized List getAll() { + List all = Collections.emptyList(); + all = list; + list = new ArrayList<>(capacity); + elementCount = 0; + return all; + } + + /** + * Add an element to the queue. If the queue is full, set the element at a random place in the + * queue and return the previous one. + * + * @param element the element to add to the queue + * @return null if the queue is not full, otherwise the removed element + */ + public synchronized T add(final T element) { + + T removed = null; + if (elementCount < capacity) { + list.add(element); + ++elementCount; + } else { + final int index = ThreadLocalRandom.current().nextInt(0, elementCount); + removed = list.set(index, element); + } + return removed; + } + + // Methods below are essentially used for testing purposes + + /** + * Return the number of elements set in the queue + * + * @return the current size of the queue + */ + public int size() { + return elementCount; + } + + /** + * Return true if the queue is empty + * + * @return true if the queue is empty + */ + public boolean isEmpty() { + return elementCount == 0; + } +} diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index e30731647f..db703b895b 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -6,13 +6,13 @@ class WriterQueueTest extends Specification { def "instantiate a empty queue throws an exception"() { when: - new DDAgentWriter.WriterQueue(0) + new WriterQueue(0) then: thrown IllegalArgumentException when: - new DDAgentWriter.WriterQueue(-1) + new WriterQueue(-1) then: thrown IllegalArgumentException @@ -21,7 +21,7 @@ class WriterQueueTest extends Specification { def "full the queue without forcing"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) def removed = false when: @@ -40,7 +40,7 @@ class WriterQueueTest extends Specification { def "force element add to a full queue"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) for (def i = 0; i < capacity; i++) { queue.add(i) } @@ -60,7 +60,7 @@ class WriterQueueTest extends Specification { def "drain the queue into another collection"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) for (def i = 0; i < capacity; i++) { queue.add(i) }