From 114ef60e6e3a7253f75bde9ea129306a54ed43a0 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Tue, 8 Aug 2017 14:53:20 +0200 Subject: [PATCH 01/10] Proposal for flushing writer (WIP) --- .../datadoghq/trace/writer/DDAgentWriter.java | 104 +++++++++++------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 0cbd385c7d..3328b705ba 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -6,10 +6,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; /** @@ -29,11 +33,14 @@ public class DDAgentWriter implements Writer { public static final int DEFAULT_PORT = 8126; - /** Maximum number of spans kept in memory */ - private static final int DEFAULT_MAX_SPANS = 1000; + /** Maximum number of traces kept in memory */ + private static final int DEFAULT_MAX_TRACES = 1000; - /** Maximum number of traces sent to the DD agent API at once */ - private static final int DEFAULT_BATCH_SIZE = 10; + /** Timeout for the API in seconds */ + private static final long TIMEOUT = 2; + + /** Flush interval for the API in seconds */ + private static final long FLUSH_TIME = 5; /** * Used to ensure that we don't keep too many spans (while the blocking queue collect traces...) @@ -43,7 +50,10 @@ public class DDAgentWriter implements Writer { /** In memory collection of traces waiting for departure */ private final BlockingQueue>> traces; - /** Async worker that posts the spans to the DD agent */ + /** Scheduled thread pool, it' acting like a cron */ + private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + + /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); /** The DD agent api */ @@ -57,8 +67,8 @@ public class DDAgentWriter implements Writer { super(); this.api = api; - tokens = new Semaphore(DEFAULT_MAX_SPANS); - traces = new ArrayBlockingQueue<>(DEFAULT_MAX_SPANS); + tokens = new Semaphore(DEFAULT_MAX_TRACES); + traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES); } /* (non-Javadoc) @@ -67,7 +77,7 @@ public class DDAgentWriter implements Writer { @Override public void write(final List> trace) { //Try to add a new span in the queue - final boolean proceed = tokens.tryAcquire(trace.size()); + final boolean proceed = tokens.tryAcquire(1); if (proceed) { traces.add(trace); @@ -75,7 +85,7 @@ public class DDAgentWriter implements Writer { log.warn( "Cannot add a trace of {} as the async queue is full. Queue max size: {}", trace.size(), - DEFAULT_MAX_SPANS); + DEFAULT_MAX_TRACES); } } @@ -84,7 +94,7 @@ public class DDAgentWriter implements Writer { */ @Override public void start() { - executor.submit(new SpansSendingTask()); + scheduledExecutor.scheduleAtFixedRate(new TracesSendingTask(), 0, FLUSH_TIME, TimeUnit.SECONDS); } /* (non-Javadoc) @@ -92,7 +102,14 @@ public class DDAgentWriter implements Writer { */ @Override public void close() { + scheduledExecutor.shutdownNow(); executor.shutdownNow(); + try { + scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + log.info("Writer properly closed and async writer interrupted."); + } + try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { @@ -101,43 +118,48 @@ public class DDAgentWriter implements Writer { } /** Infinite tasks blocking until some spans come in the blocking queue. */ - protected class SpansSendingTask implements Runnable { + private class TracesSendingTask implements Runnable { @Override public void run() { - while (true) { - try { - final List>> payload = new ArrayList<>(); + final Future future = executor.submit(new SendingTask()); + try { + final long nbTraces = future.get(TIMEOUT, TimeUnit.SECONDS); + log.debug("Successfully sending {} traces to the API", nbTraces); + } catch (final TimeoutException e) { + log.debug("Timeout! Fail to send traces to the API: {}", e.getMessage()); + } catch (final Throwable e) { + log.debug("Fail to send traces to the API: {}", e.getMessage()); + } + } - //WAIT until a new span comes - final List> l = DDAgentWriter.this.traces.take(); - payload.add(l); + class SendingTask implements Callable { - //Drain all spans up to a certain batch suze - traces.drainTo(payload, DEFAULT_BATCH_SIZE); - - //SEND the payload to the agent - log.debug("Async writer about to write {} traces.", payload.size()); - api.sendTraces(payload); - - //Compute the number of spans sent - int spansCount = 0; - for (final List> trace : payload) { - spansCount += trace.size(); - } - log.debug( - "Async writer just sent {} spans through {} traces", spansCount, payload.size()); - - //Release the tokens - tokens.release(spansCount); - } catch (final InterruptedException e) { - log.info("Async writer interrupted."); - - //The thread was interrupted, we break the LOOP - break; - } catch (final Throwable e) { - log.error("Unexpected error! Some traces may have been dropped.", e); + @Override + public Long call() throws Exception { + if (traces.isEmpty()) { + return 0L; } + + final List>> payload = new ArrayList<>(); + final int nbTraces = traces.drainTo(payload); + + int nbSpans = 0; + for (final List trace : payload) { + nbSpans += trace.size(); + } + + // release the lock + tokens.release(nbTraces); + + log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); + final boolean isSent = api.sendTraces(payload); + + if (!isSent) { + log.warn("Failing to send {} traces to the API", nbTraces); + return 0L; + } + return (long) nbTraces; } } } From 17fa5f63b13879fb0589f0ced44d88e1808a22b6 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Tue, 8 Aug 2017 16:13:07 +0200 Subject: [PATCH 02/10] Implementing random delete --- .../datadoghq/trace/writer/DDAgentWriter.java | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 3328b705ba..1243f73541 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -4,16 +4,17 @@ import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; /** @@ -47,17 +48,16 @@ public class DDAgentWriter implements Writer { */ private final Semaphore tokens; - /** In memory collection of traces waiting for departure */ - private final BlockingQueue>> traces; - + /** Used to protect the traces during the drop */ + private final Lock lock; /** Scheduled thread pool, it' acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); - /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /** The DD agent api */ private final DDApi api; + /** In memory collection of traces waiting for departure */ + private ArrayList>> traces; public DDAgentWriter() { this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); @@ -67,8 +67,9 @@ public class DDAgentWriter implements Writer { super(); this.api = api; + lock = new ReentrantLock(); tokens = new Semaphore(DEFAULT_MAX_TRACES); - traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES); + traces = new ArrayList<>(DEFAULT_MAX_TRACES); } /* (non-Javadoc) @@ -76,17 +77,21 @@ public class DDAgentWriter implements Writer { */ @Override public void write(final List> trace) { + + lock.lock(); //Try to add a new span in the queue final boolean proceed = tokens.tryAcquire(1); if (proceed) { traces.add(trace); } else { - log.warn( - "Cannot add a trace of {} as the async queue is full. Queue max size: {}", - trace.size(), - DEFAULT_MAX_TRACES); + + final int index = ThreadLocalRandom.current().nextInt(0, DEFAULT_MAX_TRACES); + traces.remove(index); + traces.add(trace); + log.warn("Queue is full, dropping an element, queue size: {}", DEFAULT_MAX_TRACES); } + lock.unlock(); } /* (non-Javadoc) @@ -141,11 +146,15 @@ public class DDAgentWriter implements Writer { return 0L; } - final List>> payload = new ArrayList<>(); - final int nbTraces = traces.drainTo(payload); + lock.lock(); + final List>> payload = traces; + traces = new ArrayList<>(DEFAULT_MAX_TRACES); + lock.unlock(); int nbSpans = 0; + int nbTraces = 0; for (final List trace : payload) { + nbTraces++; nbSpans += trace.size(); } From 9884ddf68706521fa3af8754cb963cd06dabfed0 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Wed, 9 Aug 2017 13:29:15 +0200 Subject: [PATCH 03/10] Adding a queue class for the writer --- .../datadoghq/trace/writer/DDAgentWriter.java | 125 +++++++++----- .../trace/writer/WriterQueueTest.groovy | 152 ++++++++++++++++++ 2 files changed, 239 insertions(+), 38 deletions(-) create mode 100644 dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 1243f73541..7dcabb9e12 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -3,13 +3,14 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,18 +39,11 @@ public class DDAgentWriter implements Writer { private static final int DEFAULT_MAX_TRACES = 1000; /** Timeout for the API in seconds */ - private static final long TIMEOUT = 2; + private static final long API_TIMEOUT_SECONDS = 2; /** Flush interval for the API in seconds */ - private static final long FLUSH_TIME = 5; + private static final long FLUSH_TIME_SECONDS = 5; - /** - * Used to ensure that we don't keep too many spans (while the blocking queue collect traces...) - */ - private final Semaphore tokens; - - /** Used to protect the traces during the drop */ - private final Lock lock; /** Scheduled thread pool, it' acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); /** Effective thread pool, where real logic is done */ @@ -57,7 +51,7 @@ public class DDAgentWriter implements Writer { /** The DD agent api */ private final DDApi api; /** In memory collection of traces waiting for departure */ - private ArrayList>> traces; + private final WriterQueue>> traces; public DDAgentWriter() { this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); @@ -66,10 +60,7 @@ public class DDAgentWriter implements Writer { public DDAgentWriter(final DDApi api) { super(); this.api = api; - - lock = new ReentrantLock(); - tokens = new Semaphore(DEFAULT_MAX_TRACES); - traces = new ArrayList<>(DEFAULT_MAX_TRACES); + traces = new WriterQueue<>(DEFAULT_MAX_TRACES); } /* (non-Javadoc) @@ -78,20 +69,10 @@ public class DDAgentWriter implements Writer { @Override public void write(final List> trace) { - lock.lock(); - //Try to add a new span in the queue - final boolean proceed = tokens.tryAcquire(1); - - if (proceed) { - traces.add(trace); - } else { - - final int index = ThreadLocalRandom.current().nextInt(0, DEFAULT_MAX_TRACES); - traces.remove(index); - traces.add(trace); - log.warn("Queue is full, dropping an element, queue size: {}", DEFAULT_MAX_TRACES); + final List> removed = traces.add(trace); + if (removed != null) { + log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES); } - lock.unlock(); } /* (non-Javadoc) @@ -99,7 +80,8 @@ public class DDAgentWriter implements Writer { */ @Override public void start() { - scheduledExecutor.scheduleAtFixedRate(new TracesSendingTask(), 0, FLUSH_TIME, TimeUnit.SECONDS); + scheduledExecutor.scheduleAtFixedRate( + new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS); } /* (non-Javadoc) @@ -122,6 +104,77 @@ public class DDAgentWriter implements Writer { } } + static class WriterQueue { + + private final LinkedList list; + private final int capacity; + private final Lock lock = new ReentrantLock(); + private int nbElements = 0; + + public WriterQueue(final int capacity) { + if (capacity < 1) { + throw new IllegalArgumentException("Capacity couldn't be 0"); + } + list = new LinkedList<>(); + this.capacity = capacity; + } + + public int size() { + return nbElements; + } + + public int drainTo(final Collection c) { + lock.lock(); + int i = 0; + final int n = nbElements; + try { + while (i < n) { + final T element = list.getLast(); + c.add(element); // things can go wrong here + list.removeLast(); + ++i; + --nbElements; + } + } catch (final Throwable ex) { + log.warn("Unexpected error while draining the queue: {}", ex.getMessage()); + throw ex; + } finally { + // Recover the nominal state + nbElements = list.size(); + lock.unlock(); + } + return i; + } + + public T add(final T element) { + + lock.lock(); + T removed = null; + try { + if (nbElements < capacity) { + list.addFirst(element); + ++nbElements; + } else { + removed = removeAndAdd(element); + } + } finally { + lock.unlock(); + } + return removed; + } + + public boolean isEmpty() { + return nbElements == 0; + } + + private T removeAndAdd(final T element) { + final int index = ThreadLocalRandom.current().nextInt(0, nbElements); + final T removed = list.remove(index); + list.addFirst(element); + return removed; + } + } + /** Infinite tasks blocking until some spans come in the blocking queue. */ private class TracesSendingTask implements Runnable { @@ -129,7 +182,7 @@ public class DDAgentWriter implements Writer { public void run() { final Future future = executor.submit(new SendingTask()); try { - final long nbTraces = future.get(TIMEOUT, TimeUnit.SECONDS); + final long nbTraces = future.get(API_TIMEOUT_SECONDS, TimeUnit.SECONDS); log.debug("Successfully sending {} traces to the API", nbTraces); } catch (final TimeoutException e) { log.debug("Timeout! Fail to send traces to the API: {}", e.getMessage()); @@ -138,6 +191,8 @@ public class DDAgentWriter implements Writer { } } + public void size() {} + class SendingTask implements Callable { @Override @@ -146,21 +201,15 @@ public class DDAgentWriter implements Writer { return 0L; } - lock.lock(); - final List>> payload = traces; - traces = new ArrayList<>(DEFAULT_MAX_TRACES); - lock.unlock(); + final List>> payload = new ArrayList<>(); + int nbTraces = traces.drainTo(payload); int nbSpans = 0; - int nbTraces = 0; for (final List trace : payload) { nbTraces++; nbSpans += trace.size(); } - // release the lock - tokens.release(nbTraces); - log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); final boolean isSent = api.sendTraces(payload); diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy new file mode 100644 index 0000000000..f87c63eeee --- /dev/null +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -0,0 +1,152 @@ +package com.datadoghq.trace.writer + +import spock.lang.Specification + +class WriterQueueTest extends Specification { + + def "instantiate a empty queue throws an exception"() { + when: + new DDAgentWriter.WriterQueue(0) + + then: + thrown IllegalArgumentException + + when: + new DDAgentWriter.WriterQueue(-1) + + then: + thrown IllegalArgumentException + } + + def "full the queue without forcing"() { + + setup: + def Q = new DDAgentWriter.WriterQueue(capacity) + def removed = false + + when: + for (def i = 0; i < capacity; i++) { + removed = removed || Q.add(i) != null + } + + then: + !removed + + where: + capacity << [1, 10, 100] + + } + + def "force element add to a full queue"() { + + setup: + def Q = new DDAgentWriter.WriterQueue(capacity) + for (def i = 0; i < capacity; i++) { + Q.add(i) + } + + when: + def removed = Q.add(1) + + then: + removed != null + Q.size() == capacity + + where: + capacity << [1, 10, 100] + + } + + def "drain the queue into another collection"() { + + setup: + def Q = new DDAgentWriter.WriterQueue(capacity) + def L = [] + for (def i = 0; i < capacity; i++) { + Q.add(i) + } + + when: + def nb = Q.drainTo(L) + + then: + nb == L.size() + nb == capacity + Q.isEmpty() + Q.size() == 0 + + where: + capacity << [1, 10, 100] + + } + + def "Queue should be never locked"() { + + setup: + def Q = new DDAgentWriter.WriterQueue(1) + def L = Collections.emptyList() // raise an error if you add an element + Q.add(42) + + when: + Q.drainTo(L) + + then: + thrown Exception + + when: + // still able to add element + def removed = Q.add(1337) + + then: + removed == 42 + Q.size() == 1 + + + } + +// def "Multi threading test"() { +// setup: +// def Q = new DDAgentWriter.WriterQueue(10) +// def start = new CountDownLatch(5) +// def executor = Executors.newFixedThreadPool(5) +// def end = false +// def L = [] +// +// def pushTask = new Runnable() { +// @Override +// void run() { +// start.await() +// while (!end) Q.add(1) +// } +// } +// +// def popTask = new Runnable() { +// @Override +// void run() { +// start.await() +// def nbDrains = 0 +// while (!end) { +// Q.drainTo(L) +// sleep(10) +// end = ++nbDrains == 1000 +// } +// } +// } +// +// when: +// // 4 pushers +// executor.submit(pushTask) +// executor.submit(pushTask) +// executor.submit(pushTask) +// executor.submit(pushTask) +// // 1 popper (do 1000 drains) +// // executor.submit(popTask) +// +// +// then: +// // to be here +// Q.size() == 10 || L.size() == 10 || Q.size() + L.size() == 10 +// +// } + +} From a4d57d2b0ca06976d6e4e32b6c09fea9b53097dc Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Wed, 9 Aug 2017 14:36:52 +0200 Subject: [PATCH 04/10] Use arraylist --- .../datadoghq/trace/writer/DDAgentWriter.java | 49 ++++------ .../trace/writer/WriterQueueTest.groovy | 95 +++---------------- 2 files changed, 29 insertions(+), 115 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 7dcabb9e12..66e45b248a 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -3,8 +3,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -106,16 +105,16 @@ public class DDAgentWriter implements Writer { static class WriterQueue { - private final LinkedList list; private final int capacity; private final Lock lock = new ReentrantLock(); + private ArrayList list; private int nbElements = 0; public WriterQueue(final int capacity) { if (capacity < 1) { throw new IllegalArgumentException("Capacity couldn't be 0"); } - list = new LinkedList<>(); + list = new ArrayList<>(capacity); this.capacity = capacity; } @@ -123,27 +122,17 @@ public class DDAgentWriter implements Writer { return nbElements; } - public int drainTo(final Collection c) { + public List getAll() { + List all = Collections.emptyList(); lock.lock(); - int i = 0; - final int n = nbElements; try { - while (i < n) { - final T element = list.getLast(); - c.add(element); // things can go wrong here - list.removeLast(); - ++i; - --nbElements; - } - } catch (final Throwable ex) { - log.warn("Unexpected error while draining the queue: {}", ex.getMessage()); - throw ex; + all = list; + list = new ArrayList<>(capacity); + nbElements = 0; } finally { - // Recover the nominal state - nbElements = list.size(); lock.unlock(); } - return i; + return all; } public T add(final T element) { @@ -152,10 +141,10 @@ public class DDAgentWriter implements Writer { T removed = null; try { if (nbElements < capacity) { - list.addFirst(element); + list.add(element); ++nbElements; } else { - removed = removeAndAdd(element); + removed = set(element); } } finally { lock.unlock(); @@ -167,11 +156,9 @@ public class DDAgentWriter implements Writer { return nbElements == 0; } - private T removeAndAdd(final T element) { + private T set(final T element) { final int index = ThreadLocalRandom.current().nextInt(0, nbElements); - final T removed = list.remove(index); - list.addFirst(element); - return removed; + return list.set(index, element); } } @@ -201,23 +188,21 @@ public class DDAgentWriter implements Writer { return 0L; } - final List>> payload = new ArrayList<>(); - int nbTraces = traces.drainTo(payload); + final List>> payload = traces.getAll(); int nbSpans = 0; for (final List trace : payload) { - nbTraces++; nbSpans += trace.size(); } - log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); + log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); final boolean isSent = api.sendTraces(payload); if (!isSent) { - log.warn("Failing to send {} traces to the API", nbTraces); + log.warn("Failing to send {} traces to the API", payload.size()); return 0L; } - return (long) nbTraces; + return (long) payload.size(); } } } diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index f87c63eeee..e30731647f 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -21,12 +21,12 @@ class WriterQueueTest extends Specification { def "full the queue without forcing"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) + def queue = new DDAgentWriter.WriterQueue(capacity) def removed = false when: for (def i = 0; i < capacity; i++) { - removed = removed || Q.add(i) != null + removed = removed || queue.add(i) != null } then: @@ -40,17 +40,17 @@ class WriterQueueTest extends Specification { def "force element add to a full queue"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) + def queue = new DDAgentWriter.WriterQueue(capacity) for (def i = 0; i < capacity; i++) { - Q.add(i) + queue.add(i) } when: - def removed = Q.add(1) + def removed = queue.add(1) then: removed != null - Q.size() == capacity + queue.size() == capacity where: capacity << [1, 10, 100] @@ -60,93 +60,22 @@ class WriterQueueTest extends Specification { def "drain the queue into another collection"() { setup: - def Q = new DDAgentWriter.WriterQueue(capacity) - def L = [] + def queue = new DDAgentWriter.WriterQueue(capacity) for (def i = 0; i < capacity; i++) { - Q.add(i) + queue.add(i) } when: - def nb = Q.drainTo(L) + def list = queue.getAll() then: - nb == L.size() - nb == capacity - Q.isEmpty() - Q.size() == 0 + list.size() == capacity + queue.isEmpty() + queue.size() == 0 where: capacity << [1, 10, 100] } - def "Queue should be never locked"() { - - setup: - def Q = new DDAgentWriter.WriterQueue(1) - def L = Collections.emptyList() // raise an error if you add an element - Q.add(42) - - when: - Q.drainTo(L) - - then: - thrown Exception - - when: - // still able to add element - def removed = Q.add(1337) - - then: - removed == 42 - Q.size() == 1 - - - } - -// def "Multi threading test"() { -// setup: -// def Q = new DDAgentWriter.WriterQueue(10) -// def start = new CountDownLatch(5) -// def executor = Executors.newFixedThreadPool(5) -// def end = false -// def L = [] -// -// def pushTask = new Runnable() { -// @Override -// void run() { -// start.await() -// while (!end) Q.add(1) -// } -// } -// -// def popTask = new Runnable() { -// @Override -// void run() { -// start.await() -// def nbDrains = 0 -// while (!end) { -// Q.drainTo(L) -// sleep(10) -// end = ++nbDrains == 1000 -// } -// } -// } -// -// when: -// // 4 pushers -// executor.submit(pushTask) -// executor.submit(pushTask) -// executor.submit(pushTask) -// executor.submit(pushTask) -// // 1 popper (do 1000 drains) -// // executor.submit(popTask) -// -// -// then: -// // to be here -// Q.size() == 10 || L.size() == 10 || Q.size() + L.size() == 10 -// -// } - } From 43525025daddb8e78c1dc81c4dd64348306e9ac3 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Thu, 10 Aug 2017 10:38:11 +0200 Subject: [PATCH 05/10] Create a separate class + reviews --- .../datadoghq/trace/writer/DDAgentWriter.java | 90 ++++-------------- .../datadoghq/trace/writer/WriterQueue.java | 92 +++++++++++++++++++ .../trace/writer/WriterQueueTest.groovy | 10 +- 3 files changed, 115 insertions(+), 77 deletions(-) create mode 100644 dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 66e45b248a..d274c54824 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -2,19 +2,14 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; /** @@ -38,20 +33,25 @@ public class DDAgentWriter implements Writer { private static final int DEFAULT_MAX_TRACES = 1000; /** Timeout for the API in seconds */ - private static final long API_TIMEOUT_SECONDS = 2; + private static final long API_TIMEOUT_SECONDS = 1; /** Flush interval for the API in seconds */ - private static final long FLUSH_TIME_SECONDS = 5; + private static final long FLUSH_TIME_SECONDS = 1; /** Scheduled thread pool, it' acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); + /** The DD agent api */ private final DDApi api; + /** In memory collection of traces waiting for departure */ private final WriterQueue>> traces; + private boolean queueFullReported = false; + public DDAgentWriter() { this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); } @@ -69,9 +69,12 @@ public class DDAgentWriter implements Writer { public void write(final List> trace) { final List> removed = traces.add(trace); - if (removed != null) { + if (removed != null && !queueFullReported) { log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES); + queueFullReported = true; + return; } + queueFullReported = false; } /* (non-Javadoc) @@ -103,65 +106,6 @@ public class DDAgentWriter implements Writer { } } - static class WriterQueue { - - private final int capacity; - private final Lock lock = new ReentrantLock(); - private ArrayList list; - private int nbElements = 0; - - public WriterQueue(final int capacity) { - if (capacity < 1) { - throw new IllegalArgumentException("Capacity couldn't be 0"); - } - list = new ArrayList<>(capacity); - this.capacity = capacity; - } - - public int size() { - return nbElements; - } - - public List getAll() { - List all = Collections.emptyList(); - lock.lock(); - try { - all = list; - list = new ArrayList<>(capacity); - nbElements = 0; - } finally { - lock.unlock(); - } - return all; - } - - public T add(final T element) { - - lock.lock(); - T removed = null; - try { - if (nbElements < capacity) { - list.add(element); - ++nbElements; - } else { - removed = set(element); - } - } finally { - lock.unlock(); - } - return removed; - } - - public boolean isEmpty() { - return nbElements == 0; - } - - private T set(final T element) { - final int index = ThreadLocalRandom.current().nextInt(0, nbElements); - return list.set(index, element); - } - } - /** Infinite tasks blocking until some spans come in the blocking queue. */ private class TracesSendingTask implements Runnable { @@ -190,12 +134,14 @@ public class DDAgentWriter implements Writer { final List>> payload = traces.getAll(); - int nbSpans = 0; - for (final List trace : payload) { - nbSpans += trace.size(); - } + if (log.isDebugEnabled()) { + int nbSpans = 0; + for (final List trace : payload) { + nbSpans += trace.size(); + } - log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); + log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); + } final boolean isSent = api.sendTraces(payload); if (!isSent) { diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java new file mode 100644 index 0000000000..ab50076e75 --- /dev/null +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java @@ -0,0 +1,92 @@ +package com.datadoghq.trace.writer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A bounded queue implementation compatible with the Datadog agent behavior. The class is + * thread-safe and can be used with concurrency. + * + *

+ * + *

This class implements a specific behavior when it's full. Each new item added will replace an + * exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform + * efficient random remove. + * + * @param The element type to store + */ +class WriterQueue { + + private final int capacity; + private volatile ArrayList list; + private volatile int elementCount = 0; + + /** + * Default construct, a capacity must be provided + * + * @param capacity the max size of the queue + */ + WriterQueue(final int capacity) { + if (capacity < 1) { + throw new IllegalArgumentException("Capacity couldn't be 0"); + } + list = new ArrayList<>(capacity); + this.capacity = capacity; + } + + /** + * Return a list containing all elements present in the queue. After the operation, the queue is + * reset. All action performed on the returned list has no impact to the queue + * + * @return a list contain all elements + */ + public synchronized List getAll() { + List all = Collections.emptyList(); + all = list; + list = new ArrayList<>(capacity); + elementCount = 0; + return all; + } + + /** + * Add an element to the queue. If the queue is full, set the element at a random place in the + * queue and return the previous one. + * + * @param element the element to add to the queue + * @return null if the queue is not full, otherwise the removed element + */ + public synchronized T add(final T element) { + + T removed = null; + if (elementCount < capacity) { + list.add(element); + ++elementCount; + } else { + final int index = ThreadLocalRandom.current().nextInt(0, elementCount); + removed = list.set(index, element); + } + return removed; + } + + // Methods below are essentially used for testing purposes + + /** + * Return the number of elements set in the queue + * + * @return the current size of the queue + */ + public int size() { + return elementCount; + } + + /** + * Return true if the queue is empty + * + * @return true if the queue is empty + */ + public boolean isEmpty() { + return elementCount == 0; + } +} diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index e30731647f..db703b895b 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -6,13 +6,13 @@ class WriterQueueTest extends Specification { def "instantiate a empty queue throws an exception"() { when: - new DDAgentWriter.WriterQueue(0) + new WriterQueue(0) then: thrown IllegalArgumentException when: - new DDAgentWriter.WriterQueue(-1) + new WriterQueue(-1) then: thrown IllegalArgumentException @@ -21,7 +21,7 @@ class WriterQueueTest extends Specification { def "full the queue without forcing"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) def removed = false when: @@ -40,7 +40,7 @@ class WriterQueueTest extends Specification { def "force element add to a full queue"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) for (def i = 0; i < capacity; i++) { queue.add(i) } @@ -60,7 +60,7 @@ class WriterQueueTest extends Specification { def "drain the queue into another collection"() { setup: - def queue = new DDAgentWriter.WriterQueue(capacity) + def queue = new WriterQueue(capacity) for (def i = 0; i < capacity; i++) { queue.add(i) } From 1c4c6bd8ec7eb64f166acc944dbe2eca02b09891 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Thu, 10 Aug 2017 13:24:18 +0200 Subject: [PATCH 06/10] Skip test coverage, need to refactor --- dd-trace/dd-trace.gradle | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dd-trace/dd-trace.gradle b/dd-trace/dd-trace.gradle index 1f74fbde6a..e2bce02818 100644 --- a/dd-trace/dd-trace.gradle +++ b/dd-trace/dd-trace.gradle @@ -17,6 +17,9 @@ whitelistedInstructionClasses += whitelistedBranchClasses += [ 'com.datadoghq.trace.DDTags', 'com.datadoghq.trace.DDTraceInfo', 'com.datadoghq.trace.util.Clock', + //TODO: Refactor the class in order to make it more testable + 'com.datadoghq.trace.writer.DDAgentWriter', + ] dependencies { @@ -29,7 +32,7 @@ dependencies { compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.8.8' compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.25' compile group: 'com.google.auto.service', name: 'auto-service', version: '1.0-rc3' - + testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3' testCompile group: 'junit', name: 'junit', version: '4.12' From 914568449123ad72ea704772bd3c58132e7822fa Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Fri, 11 Aug 2017 14:13:30 +0200 Subject: [PATCH 07/10] Improvements due to the review --- .../datadoghq/trace/writer/DDAgentWriter.java | 8 +++---- .../datadoghq/trace/writer/WriterQueue.java | 23 +++++++++---------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index d274c54824..80f0576378 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -38,7 +38,7 @@ public class DDAgentWriter implements Writer { /** Flush interval for the API in seconds */ private static final long FLUSH_TIME_SECONDS = 1; - /** Scheduled thread pool, it' acting like a cron */ + /** Scheduled thread pool, acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); /** Effective thread pool, where real logic is done */ @@ -70,7 +70,7 @@ public class DDAgentWriter implements Writer { final List> removed = traces.add(trace); if (removed != null && !queueFullReported) { - log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES); + log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES); queueFullReported = true; return; } @@ -107,7 +107,7 @@ public class DDAgentWriter implements Writer { } /** Infinite tasks blocking until some spans come in the blocking queue. */ - private class TracesSendingTask implements Runnable { + class TracesSendingTask implements Runnable { @Override public void run() { @@ -122,8 +122,6 @@ public class DDAgentWriter implements Writer { } } - public void size() {} - class SendingTask implements Callable { @Override diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java index ab50076e75..2232361adb 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/WriterQueue.java @@ -1,7 +1,6 @@ package com.datadoghq.trace.writer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -21,7 +20,6 @@ class WriterQueue { private final int capacity; private volatile ArrayList list; - private volatile int elementCount = 0; /** * Default construct, a capacity must be provided @@ -32,7 +30,7 @@ class WriterQueue { if (capacity < 1) { throw new IllegalArgumentException("Capacity couldn't be 0"); } - list = new ArrayList<>(capacity); + this.list = emptyList(capacity); this.capacity = capacity; } @@ -43,10 +41,8 @@ class WriterQueue { * @return a list contain all elements */ public synchronized List getAll() { - List all = Collections.emptyList(); - all = list; - list = new ArrayList<>(capacity); - elementCount = 0; + final List all = list; + list = emptyList(capacity); return all; } @@ -60,11 +56,10 @@ class WriterQueue { public synchronized T add(final T element) { T removed = null; - if (elementCount < capacity) { + if (list.size() < capacity) { list.add(element); - ++elementCount; } else { - final int index = ThreadLocalRandom.current().nextInt(0, elementCount); + final int index = ThreadLocalRandom.current().nextInt(0, list.size()); removed = list.set(index, element); } return removed; @@ -78,7 +73,7 @@ class WriterQueue { * @return the current size of the queue */ public int size() { - return elementCount; + return list.size(); } /** @@ -87,6 +82,10 @@ class WriterQueue { * @return true if the queue is empty */ public boolean isEmpty() { - return elementCount == 0; + return list.isEmpty(); + } + + private ArrayList emptyList(final int capacity) { + return new ArrayList<>(capacity); } } From eba8d3835fca364befd2319a438cf93549ba4609 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 14 Aug 2017 11:31:16 +0200 Subject: [PATCH 08/10] Fixing tests and coverage --- dd-trace/dd-trace.gradle | 4 +- .../datadoghq/trace/writer/DDAgentWriter.java | 13 ++- .../trace/writer/DDAgentWriterTest.groovy | 93 +++++++++++++++++++ 3 files changed, 103 insertions(+), 7 deletions(-) create mode 100644 dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDAgentWriterTest.groovy diff --git a/dd-trace/dd-trace.gradle b/dd-trace/dd-trace.gradle index e2bce02818..690fa7223b 100644 --- a/dd-trace/dd-trace.gradle +++ b/dd-trace/dd-trace.gradle @@ -17,8 +17,6 @@ whitelistedInstructionClasses += whitelistedBranchClasses += [ 'com.datadoghq.trace.DDTags', 'com.datadoghq.trace.DDTraceInfo', 'com.datadoghq.trace.util.Clock', - //TODO: Refactor the class in order to make it more testable - 'com.datadoghq.trace.writer.DDAgentWriter', ] @@ -42,6 +40,8 @@ dependencies { testCompile group: 'org.spockframework', name: 'spock-core', version: '1.0-groovy-2.4' testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.4' testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6' + testCompile group: 'org.objenesis', name: 'objenesis', version: '2.6' + testCompile group: 'cglib', name: 'cglib-nodep', version: '3.2.5' jmh 'commons-io:commons-io:2.4' } diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 80f0576378..3b27f6576c 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -30,13 +30,13 @@ public class DDAgentWriter implements Writer { public static final int DEFAULT_PORT = 8126; /** Maximum number of traces kept in memory */ - private static final int DEFAULT_MAX_TRACES = 1000; + static final int DEFAULT_MAX_TRACES = 1000; /** Timeout for the API in seconds */ - private static final long API_TIMEOUT_SECONDS = 1; + static final long API_TIMEOUT_SECONDS = 1; /** Flush interval for the API in seconds */ - private static final long FLUSH_TIME_SECONDS = 1; + static final long FLUSH_TIME_SECONDS = 1; /** Scheduled thread pool, acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); @@ -57,9 +57,13 @@ public class DDAgentWriter implements Writer { } public DDAgentWriter(final DDApi api) { + this(api, new WriterQueue>>(DEFAULT_MAX_TRACES)); + } + + public DDAgentWriter(final DDApi api, final WriterQueue>> queue) { super(); this.api = api; - traces = new WriterQueue<>(DEFAULT_MAX_TRACES); + traces = queue; } /* (non-Javadoc) @@ -67,7 +71,6 @@ public class DDAgentWriter implements Writer { */ @Override public void write(final List> trace) { - final List> removed = traces.add(trace); if (removed != null && !queueFullReported) { log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES); diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDAgentWriterTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDAgentWriterTest.groovy new file mode 100644 index 0000000000..a4917b46b3 --- /dev/null +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDAgentWriterTest.groovy @@ -0,0 +1,93 @@ +package com.datadoghq.trace.writer + +import com.datadoghq.trace.DDBaseSpan +import spock.lang.Specification + +import static com.datadoghq.trace.SpanFactory.newSpanOf +import static org.mockito.Mockito.mock +import static org.mockito.Mockito.verifyNoMoreInteractions + +class DDAgentWriterTest extends Specification { + + + def "calls to the API are scheduled"() { + + setup: + def api = Mock(DDApi) + def writer = new DDAgentWriter(api) + + when: + writer.start() + Thread.sleep(flush_time_wait) + + then: + 0 * api.sendTraces(_ as List) + + when: + for (def i = 0; i < tick; i++) { + writer.write(trace) + Thread.sleep(flush_time_wait) + } + + then: + tick * api.sendTraces([trace]) + + where: + trace = [newSpanOf(0)] + flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000)) + tick << [1, 3] + + + } + + def "check if trace has been added by force"() { + + setup: + def traces = new WriterQueue>>(capacity) + def writer = new DDAgentWriter(Mock(DDApi), traces) + + when: + for (def i = 0; i < capacity; i++) { + writer.write([]) + } + + then: + traces.size() == capacity + + when: + writer.write(trace) + + then: + traces.size() == capacity + traces.getAll().contains(trace) + + where: + trace = [newSpanOf(0)] + capacity = 10 + + + } + + def "check that are no interactions after close"() { + + setup: + def api = mock(DDApi) + def writer = new DDAgentWriter(api) + writer.start() + + when: + writer.close() + writer.write([]) + Thread.sleep(flush_time_wait) + + then: + verifyNoMoreInteractions(api) + + where: + flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000)) + + + } + + +} From 4a2e12c3485f8cf409ed70e322430083c19e4afc Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 14 Aug 2017 14:00:01 +0200 Subject: [PATCH 09/10] Test concurrency --- .../trace/writer/WriterQueueTest.groovy | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index db703b895b..b5fd656863 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -2,6 +2,9 @@ package com.datadoghq.trace.writer import spock.lang.Specification +import java.util.concurrent.Phaser +import java.util.concurrent.atomic.AtomicInteger + class WriterQueueTest extends Specification { def "instantiate a empty queue throws an exception"() { @@ -78,4 +81,103 @@ class WriterQueueTest extends Specification { } + def "check concurrency on writes"() { + setup: + + def phaser_1 = new Phaser() + def phaser_2 = new Phaser() + def queue = new WriterQueue(capacity) + def insertionCount = new AtomicInteger(0) + + phaser_1.register() // global start + phaser_2.register() // global stop + + numberThreads.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberInsertionsPerThread.times { + queue.add(1) + insertionCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + when: + phaser_1.arriveAndAwaitAdvance() // allow threads to start + phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + + then: + queue.size() == capacity + insertionCount.get() == numberInsertionsPerThread * numberThreads + + where: + capacity = 100 + numberThreads << [1, 10, 100] + numberInsertionsPerThread = 100 + + } + + + def "check concurrency on writes and reads"() { + setup: + def phaser_1 = new Phaser() + def phaser_2 = new Phaser() + def queue = new WriterQueue(capacity) + def insertionCount = new AtomicInteger(0) + def droppedCount = new AtomicInteger(0) + def getCount = new AtomicInteger(0) + def numberElements = new AtomicInteger(0) + + phaser_1.register() // global start + phaser_2.register() // global stop + + // writes + numberThreadsWrites.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberInsertionsPerThread.times { + queue.add(1) != null ? droppedCount.getAndIncrement() : null + insertionCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + // reads + numberThreadsReads.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberGetsPerThread.times { + numberElements.getAndAdd(queue.getAll().size()) + getCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + when: + phaser_1.arriveAndAwaitAdvance() // allow threads to start + phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + + then: + insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites + getCount.get() == numberGetsPerThread * numberThreadsReads + insertionCount.get() == numberElements + queue.size() + droppedCount + + where: + capacity = 100 + numberThreadsWrites << [1, 10, 100] + numberThreadsReads << [1, 5, 10] + numberInsertionsPerThread = 100 + numberGetsPerThread = 5 + + } + } From b442302b6a34cfccdab173a8550273381636dc30 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Wed, 16 Aug 2017 10:36:45 +0200 Subject: [PATCH 10/10] Fix codenarc --- .../trace/writer/WriterQueueTest.groovy | 48 +++++++------- .../trace/writer/impl/DDAgentWriterTest.java | 65 ------------------- 2 files changed, 24 insertions(+), 89 deletions(-) delete mode 100644 dd-trace/src/test/java/com/datadoghq/trace/writer/impl/DDAgentWriterTest.java diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index b5fd656863..629e18e42a 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -84,30 +84,30 @@ class WriterQueueTest extends Specification { def "check concurrency on writes"() { setup: - def phaser_1 = new Phaser() - def phaser_2 = new Phaser() + def phaser1 = new Phaser() + def phaser2 = new Phaser() def queue = new WriterQueue(capacity) def insertionCount = new AtomicInteger(0) - phaser_1.register() // global start - phaser_2.register() // global stop + phaser1.register() // global start + phaser2.register() // global stop numberThreads.times { - phaser_1.register() + phaser1.register() Thread.start { - phaser_2.register() - phaser_1.arriveAndAwaitAdvance() + phaser2.register() + phaser1.arriveAndAwaitAdvance() numberInsertionsPerThread.times { queue.add(1) insertionCount.getAndIncrement() } - phaser_2.arriveAndAwaitAdvance() + phaser2.arriveAndAwaitAdvance() } } when: - phaser_1.arriveAndAwaitAdvance() // allow threads to start - phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + phaser1.arriveAndAwaitAdvance() // allow threads to start + phaser2.arriveAndAwaitAdvance() // wait till the job is not finished then: queue.size() == capacity @@ -123,48 +123,48 @@ class WriterQueueTest extends Specification { def "check concurrency on writes and reads"() { setup: - def phaser_1 = new Phaser() - def phaser_2 = new Phaser() + def phaser1 = new Phaser() + def phaser2 = new Phaser() def queue = new WriterQueue(capacity) def insertionCount = new AtomicInteger(0) def droppedCount = new AtomicInteger(0) def getCount = new AtomicInteger(0) def numberElements = new AtomicInteger(0) - phaser_1.register() // global start - phaser_2.register() // global stop + phaser1.register() // global start + phaser2.register() // global stop // writes numberThreadsWrites.times { - phaser_1.register() + phaser1.register() Thread.start { - phaser_2.register() - phaser_1.arriveAndAwaitAdvance() + phaser2.register() + phaser1.arriveAndAwaitAdvance() numberInsertionsPerThread.times { queue.add(1) != null ? droppedCount.getAndIncrement() : null insertionCount.getAndIncrement() } - phaser_2.arriveAndAwaitAdvance() + phaser2.arriveAndAwaitAdvance() } } // reads numberThreadsReads.times { - phaser_1.register() + phaser1.register() Thread.start { - phaser_2.register() - phaser_1.arriveAndAwaitAdvance() + phaser2.register() + phaser1.arriveAndAwaitAdvance() numberGetsPerThread.times { numberElements.getAndAdd(queue.getAll().size()) getCount.getAndIncrement() } - phaser_2.arriveAndAwaitAdvance() + phaser2.arriveAndAwaitAdvance() } } when: - phaser_1.arriveAndAwaitAdvance() // allow threads to start - phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + phaser1.arriveAndAwaitAdvance() // allow threads to start + phaser2.arriveAndAwaitAdvance() // wait till the job is not finished then: insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites diff --git a/dd-trace/src/test/java/com/datadoghq/trace/writer/impl/DDAgentWriterTest.java b/dd-trace/src/test/java/com/datadoghq/trace/writer/impl/DDAgentWriterTest.java deleted file mode 100644 index 80abb64868..0000000000 --- a/dd-trace/src/test/java/com/datadoghq/trace/writer/impl/DDAgentWriterTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.datadoghq.trace.writer.impl; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -import com.datadoghq.trace.DDBaseSpan; -import com.datadoghq.trace.DDSpan; -import com.datadoghq.trace.DDTracer; -import com.datadoghq.trace.writer.DDAgentWriter; -import com.datadoghq.trace.writer.DDApi; -import java.util.ArrayList; -import java.util.List; -import org.junit.Before; -import org.junit.Test; - -public class DDAgentWriterTest { - - DDSpan parent = null; - DDApi mockedAPI = null; - List>> traces = new ArrayList<>(); - DDAgentWriter ddAgentWriter = null; - - @Before - public void setUp() throws Exception { - //Setup - final DDTracer tracer = new DDTracer(); - - parent = tracer.buildSpan("hello-world").withServiceName("service-name").startManual(); - parent.setBaggageItem("a-baggage", "value"); - - Thread.sleep(100); - - final DDSpan child = tracer.buildSpan("hello-world").asChildOf(parent).startManual(); - Thread.sleep(100); - - child.finish(); - Thread.sleep(100); - parent.finish(); - - //Create DDWriter - traces.add(new ArrayList<>(parent.context().getTrace())); - mockedAPI = mock(DDApi.class); - when(mockedAPI.sendTraces(traces)).thenReturn(true); - ddAgentWriter = new DDAgentWriter(mockedAPI); - ddAgentWriter.start(); - } - - @Test - public void testWrite() throws Exception { - ddAgentWriter.write(new ArrayList<>(parent.context().getTrace())); - Thread.sleep(500); - verify(mockedAPI).sendTraces(traces); - } - - @Test - public void testClose() throws Exception { - ddAgentWriter.close(); - - ddAgentWriter.write(new ArrayList<>(parent.context().getTrace())); - Thread.sleep(500); - verifyNoMoreInteractions(mockedAPI); - } -}