diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 7dcabb9e12..66e45b248a 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -3,8 +3,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -106,16 +105,16 @@ public class DDAgentWriter implements Writer { static class WriterQueue { - private final LinkedList list; private final int capacity; private final Lock lock = new ReentrantLock(); + private ArrayList list; private int nbElements = 0; public WriterQueue(final int capacity) { if (capacity < 1) { throw new IllegalArgumentException("Capacity couldn't be 0"); } - list = new LinkedList<>(); + list = new ArrayList<>(capacity); this.capacity = capacity; } @@ -123,27 +122,17 @@ public class DDAgentWriter implements Writer { return nbElements; } - public int drainTo(final Collection c) { + public List getAll() { + List all = Collections.emptyList(); lock.lock(); - int i = 0; - final int n = nbElements; try { - while (i < n) { - final T element = list.getLast(); - c.add(element); // things can go wrong here - list.removeLast(); - ++i; - --nbElements; - } - } catch (final Throwable ex) { - log.warn("Unexpected error while draining the queue: {}", ex.getMessage()); - throw ex; + all = list; + list = new ArrayList<>(capacity); + nbElements = 0; } finally { - // Recover the nominal state - nbElements = list.size(); lock.unlock(); } - return i; + return all; } public T add(final T element) { @@ -152,10 +141,10 @@ public class DDAgentWriter implements Writer { T removed = null; try { if (nbElements < capacity) { - list.addFirst(element); + list.add(element); ++nbElements; } else { - removed = removeAndAdd(element); + removed = set(element); } } finally { lock.unlock(); @@ -167,11 +156,9 @@ public class DDAgentWriter implements Writer { return nbElements == 0; } - private T removeAndAdd(final T element) { + private T set(final T element) { final int index = ThreadLocalRandom.current().nextInt(0, nbElements); - final T removed = list.remove(index); - list.addFirst(element); - return removed; + return list.set(index, element); } } @@ -201,23 +188,21 @@ public class DDAgentWriter implements Writer { return 0L; } - final List>> payload = new ArrayList<>(); - int nbTraces = traces.drainTo(payload); + final List>> payload = traces.getAll(); int nbSpans = 0; for (final List trace : payload) { - nbTraces++; nbSpans += trace.size(); } - log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); + log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); final boolean isSent = api.sendTraces(payload); if (!isSent) { - log.warn("Failing to send {} traces to the API", nbTraces); + log.warn("Failing to send {} traces to the API", payload.size()); return 0L; } - return (long) nbTraces; + return (long) payload.size(); } } } diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index f87c63eeee..e30731647f 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -21,12 +21,12 @@ class WriterQueueTest extends Specification { def "full the queue without forcing"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) + def queue = new DDAgentWriter.WriterQueue(capacity) def removed = false when: for (def i = 0; i < capacity; i++) { - removed = removed || Q.add(i) != null + removed = removed || queue.add(i) != null } then: @@ -40,17 +40,17 @@ class WriterQueueTest extends Specification { def "force element add to a full queue"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) + def queue = new DDAgentWriter.WriterQueue(capacity) for (def i = 0; i < capacity; i++) { - Q.add(i) + queue.add(i) } when: - def removed = Q.add(1) + def removed = queue.add(1) then: removed != null - Q.size() == capacity + queue.size() == capacity where: capacity << [1, 10, 100] @@ -60,93 +60,22 @@ class WriterQueueTest extends Specification { def "drain the queue into another collection"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) - def L = [] + def queue = new DDAgentWriter.WriterQueue(capacity) for (def i = 0; i < capacity; i++) { - Q.add(i) + queue.add(i) } when: - def nb = Q.drainTo(L) + def list = queue.getAll() then: - nb == L.size() - nb == capacity - Q.isEmpty() - Q.size() == 0 + list.size() == capacity + queue.isEmpty() + queue.size() == 0 where: capacity << [1, 10, 100] } - def "Queue should be never locked"() { - - setup: - def Q = new DDAgentWriter.WriterQueue(1) - def L = Collections.emptyList() // raise an error if you add an element - Q.add(42) - - when: - Q.drainTo(L) - - then: - thrown Exception - - when: - // still able to add element - def removed = Q.add(1337) - - then: - removed == 42 - Q.size() == 1 - - - } - -// def "Multi threading test"() { -// setup: -// def Q = new DDAgentWriter.WriterQueue(10) -// def start = new CountDownLatch(5) -// def executor = Executors.newFixedThreadPool(5) -// def end = false -// def L = [] -// -// def pushTask = new Runnable() { -// @Override -// void run() { -// start.await() -// while (!end) Q.add(1) -// } -// } -// -// def popTask = new Runnable() { -// @Override -// void run() { -// start.await() -// def nbDrains = 0 -// while (!end) { -// Q.drainTo(L) -// sleep(10) -// end = ++nbDrains == 1000 -// } -// } -// } -// -// when: -// // 4 pushers -// executor.submit(pushTask) -// executor.submit(pushTask) -// executor.submit(pushTask) -// executor.submit(pushTask) -// // 1 popper (do 1000 drains) -// // executor.submit(popTask) -// -// -// then: -// // to be here -// Q.size() == 10 || L.size() == 10 || Q.size() + L.size() == 10 -// -// } - }