Use arraylist

This commit is contained in:
Guillaume Polaert 2017-08-09 14:36:52 +02:00
parent 9884ddf687
commit a4d57d2b0c
2 changed files with 29 additions and 115 deletions

View File

@ -3,8 +3,7 @@ package com.datadoghq.trace.writer;
import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.DDBaseSpan;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -106,16 +105,16 @@ public class DDAgentWriter implements Writer {
static class WriterQueue<T> { static class WriterQueue<T> {
private final LinkedList<T> list;
private final int capacity; private final int capacity;
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
private ArrayList<T> list;
private int nbElements = 0; private int nbElements = 0;
public WriterQueue(final int capacity) { public WriterQueue(final int capacity) {
if (capacity < 1) { if (capacity < 1) {
throw new IllegalArgumentException("Capacity couldn't be 0"); throw new IllegalArgumentException("Capacity couldn't be 0");
} }
list = new LinkedList<>(); list = new ArrayList<>(capacity);
this.capacity = capacity; this.capacity = capacity;
} }
@ -123,27 +122,17 @@ public class DDAgentWriter implements Writer {
return nbElements; return nbElements;
} }
public int drainTo(final Collection<T> c) { public List<T> getAll() {
List<T> all = Collections.emptyList();
lock.lock(); lock.lock();
int i = 0;
final int n = nbElements;
try { try {
while (i < n) { all = list;
final T element = list.getLast(); list = new ArrayList<>(capacity);
c.add(element); // things can go wrong here nbElements = 0;
list.removeLast();
++i;
--nbElements;
}
} catch (final Throwable ex) {
log.warn("Unexpected error while draining the queue: {}", ex.getMessage());
throw ex;
} finally { } finally {
// Recover the nominal state
nbElements = list.size();
lock.unlock(); lock.unlock();
} }
return i; return all;
} }
public T add(final T element) { public T add(final T element) {
@ -152,10 +141,10 @@ public class DDAgentWriter implements Writer {
T removed = null; T removed = null;
try { try {
if (nbElements < capacity) { if (nbElements < capacity) {
list.addFirst(element); list.add(element);
++nbElements; ++nbElements;
} else { } else {
removed = removeAndAdd(element); removed = set(element);
} }
} finally { } finally {
lock.unlock(); lock.unlock();
@ -167,11 +156,9 @@ public class DDAgentWriter implements Writer {
return nbElements == 0; return nbElements == 0;
} }
private T removeAndAdd(final T element) { private T set(final T element) {
final int index = ThreadLocalRandom.current().nextInt(0, nbElements); final int index = ThreadLocalRandom.current().nextInt(0, nbElements);
final T removed = list.remove(index); return list.set(index, element);
list.addFirst(element);
return removed;
} }
} }
@ -201,23 +188,21 @@ public class DDAgentWriter implements Writer {
return 0L; return 0L;
} }
final List<List<DDBaseSpan<?>>> payload = new ArrayList<>(); final List<List<DDBaseSpan<?>>> payload = traces.getAll();
int nbTraces = traces.drainTo(payload);
int nbSpans = 0; int nbSpans = 0;
for (final List<?> trace : payload) { for (final List<?> trace : payload) {
nbTraces++;
nbSpans += trace.size(); nbSpans += trace.size();
} }
log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
final boolean isSent = api.sendTraces(payload); final boolean isSent = api.sendTraces(payload);
if (!isSent) { if (!isSent) {
log.warn("Failing to send {} traces to the API", nbTraces); log.warn("Failing to send {} traces to the API", payload.size());
return 0L; return 0L;
} }
return (long) nbTraces; return (long) payload.size();
} }
} }
} }

View File

@ -21,12 +21,12 @@ class WriterQueueTest extends Specification {
def "full the queue without forcing"() { def "full the queue without forcing"() {
setup: setup:
def Q = new DDAgentWriter.WriterQueue<Integer>(capacity) def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
def removed = false def removed = false
when: when:
for (def i = 0; i < capacity; i++) { for (def i = 0; i < capacity; i++) {
removed = removed || Q.add(i) != null removed = removed || queue.add(i) != null
} }
then: then:
@ -40,17 +40,17 @@ class WriterQueueTest extends Specification {
def "force element add to a full queue"() { def "force element add to a full queue"() {
setup: setup:
def Q = new DDAgentWriter.WriterQueue<Integer>(capacity) def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) { for (def i = 0; i < capacity; i++) {
Q.add(i) queue.add(i)
} }
when: when:
def removed = Q.add(1) def removed = queue.add(1)
then: then:
removed != null removed != null
Q.size() == capacity queue.size() == capacity
where: where:
capacity << [1, 10, 100] capacity << [1, 10, 100]
@ -60,93 +60,22 @@ class WriterQueueTest extends Specification {
def "drain the queue into another collection"() { def "drain the queue into another collection"() {
setup: setup:
def Q = new DDAgentWriter.WriterQueue<Integer>(capacity) def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
def L = []
for (def i = 0; i < capacity; i++) { for (def i = 0; i < capacity; i++) {
Q.add(i) queue.add(i)
} }
when: when:
def nb = Q.drainTo(L) def list = queue.getAll()
then: then:
nb == L.size() list.size() == capacity
nb == capacity queue.isEmpty()
Q.isEmpty() queue.size() == 0
Q.size() == 0
where: where:
capacity << [1, 10, 100] capacity << [1, 10, 100]
} }
def "Queue should be never locked"() {
setup:
def Q = new DDAgentWriter.WriterQueue<Integer>(1)
def L = Collections.emptyList() // raise an error if you add an element
Q.add(42)
when:
Q.drainTo(L)
then:
thrown Exception
when:
// still able to add element
def removed = Q.add(1337)
then:
removed == 42
Q.size() == 1
}
// def "Multi threading test"() {
// setup:
// def Q = new DDAgentWriter.WriterQueue<Integer>(10)
// def start = new CountDownLatch(5)
// def executor = Executors.newFixedThreadPool(5)
// def end = false
// def L = []
//
// def pushTask = new Runnable() {
// @Override
// void run() {
// start.await()
// while (!end) Q.add(1)
// }
// }
//
// def popTask = new Runnable() {
// @Override
// void run() {
// start.await()
// def nbDrains = 0
// while (!end) {
// Q.drainTo(L)
// sleep(10)
// end = ++nbDrains == 1000
// }
// }
// }
//
// when:
// // 4 pushers
// executor.submit(pushTask)
// executor.submit(pushTask)
// executor.submit(pushTask)
// executor.submit(pushTask)
// // 1 popper (do 1000 drains)
// // executor.submit(popTask)
//
//
// then:
// // to be here
// Q.size() == 10 || L.size() == 10 || Q.size() + L.size() == 10
//
// }
} }