Create a separate class + reviews

This commit is contained in:
Guillaume Polaert 2017-08-10 10:38:11 +02:00
parent a4d57d2b0c
commit 43525025da
3 changed files with 115 additions and 77 deletions

View File

@ -2,19 +2,14 @@ package com.datadoghq.trace.writer;
import com.datadoghq.trace.DDBaseSpan;
import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
/**
@ -38,20 +33,25 @@ public class DDAgentWriter implements Writer {
private static final int DEFAULT_MAX_TRACES = 1000;
/** Timeout for the API in seconds */
private static final long API_TIMEOUT_SECONDS = 2;
private static final long API_TIMEOUT_SECONDS = 1;
/** Flush interval for the API in seconds */
private static final long FLUSH_TIME_SECONDS = 5;
private static final long FLUSH_TIME_SECONDS = 1;
/** Scheduled thread pool, it' acting like a cron */
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
/** Effective thread pool, where real logic is done */
private final ExecutorService executor = Executors.newSingleThreadExecutor();
/** The DD agent api */
private final DDApi api;
/** In memory collection of traces waiting for departure */
private final WriterQueue<List<DDBaseSpan<?>>> traces;
private boolean queueFullReported = false;
public DDAgentWriter() {
this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT));
}
@ -69,9 +69,12 @@ public class DDAgentWriter implements Writer {
public void write(final List<DDBaseSpan<?>> trace) {
final List<DDBaseSpan<?>> removed = traces.add(trace);
if (removed != null) {
if (removed != null && !queueFullReported) {
log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES);
queueFullReported = true;
return;
}
queueFullReported = false;
}
/* (non-Javadoc)
@ -103,65 +106,6 @@ public class DDAgentWriter implements Writer {
}
}
static class WriterQueue<T> {
private final int capacity;
private final Lock lock = new ReentrantLock();
private ArrayList<T> list;
private int nbElements = 0;
public WriterQueue(final int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("Capacity couldn't be 0");
}
list = new ArrayList<>(capacity);
this.capacity = capacity;
}
public int size() {
return nbElements;
}
public List<T> getAll() {
List<T> all = Collections.emptyList();
lock.lock();
try {
all = list;
list = new ArrayList<>(capacity);
nbElements = 0;
} finally {
lock.unlock();
}
return all;
}
public T add(final T element) {
lock.lock();
T removed = null;
try {
if (nbElements < capacity) {
list.add(element);
++nbElements;
} else {
removed = set(element);
}
} finally {
lock.unlock();
}
return removed;
}
public boolean isEmpty() {
return nbElements == 0;
}
private T set(final T element) {
final int index = ThreadLocalRandom.current().nextInt(0, nbElements);
return list.set(index, element);
}
}
/** Infinite tasks blocking until some spans come in the blocking queue. */
private class TracesSendingTask implements Runnable {
@ -190,12 +134,14 @@ public class DDAgentWriter implements Writer {
final List<List<DDBaseSpan<?>>> payload = traces.getAll();
int nbSpans = 0;
for (final List<?> trace : payload) {
nbSpans += trace.size();
}
if (log.isDebugEnabled()) {
int nbSpans = 0;
for (final List<?> trace : payload) {
nbSpans += trace.size();
}
log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
}
final boolean isSent = api.sendTraces(payload);
if (!isSent) {

View File

@ -0,0 +1,92 @@
package com.datadoghq.trace.writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* A bounded queue implementation compatible with the Datadog agent behavior. The class is
* thread-safe and can be used with concurrency.
*
* <p>
*
* <p>This class implements a specific behavior when it's full. Each new item added will replace an
* exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform
* efficient random remove.
*
* @param <T> The element type to store
*/
class WriterQueue<T> {
private final int capacity;
private volatile ArrayList<T> list;
private volatile int elementCount = 0;
/**
* Default construct, a capacity must be provided
*
* @param capacity the max size of the queue
*/
WriterQueue(final int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("Capacity couldn't be 0");
}
list = new ArrayList<>(capacity);
this.capacity = capacity;
}
/**
* Return a list containing all elements present in the queue. After the operation, the queue is
* reset. All action performed on the returned list has no impact to the queue
*
* @return a list contain all elements
*/
public synchronized List<T> getAll() {
List<T> all = Collections.emptyList();
all = list;
list = new ArrayList<>(capacity);
elementCount = 0;
return all;
}
/**
* Add an element to the queue. If the queue is full, set the element at a random place in the
* queue and return the previous one.
*
* @param element the element to add to the queue
* @return null if the queue is not full, otherwise the removed element
*/
public synchronized T add(final T element) {
T removed = null;
if (elementCount < capacity) {
list.add(element);
++elementCount;
} else {
final int index = ThreadLocalRandom.current().nextInt(0, elementCount);
removed = list.set(index, element);
}
return removed;
}
// Methods below are essentially used for testing purposes
/**
* Return the number of elements set in the queue
*
* @return the current size of the queue
*/
public int size() {
return elementCount;
}
/**
* Return true if the queue is empty
*
* @return true if the queue is empty
*/
public boolean isEmpty() {
return elementCount == 0;
}
}

View File

@ -6,13 +6,13 @@ class WriterQueueTest extends Specification {
def "instantiate a empty queue throws an exception"() {
when:
new DDAgentWriter.WriterQueue<Integer>(0)
new WriterQueue<Integer>(0)
then:
thrown IllegalArgumentException
when:
new DDAgentWriter.WriterQueue<Integer>(-1)
new WriterQueue<Integer>(-1)
then:
thrown IllegalArgumentException
@ -21,7 +21,7 @@ class WriterQueueTest extends Specification {
def "full the queue without forcing"() {
setup:
def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
def queue = new WriterQueue<Integer>(capacity)
def removed = false
when:
@ -40,7 +40,7 @@ class WriterQueueTest extends Specification {
def "force element add to a full queue"() {
setup:
def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}
@ -60,7 +60,7 @@ class WriterQueueTest extends Specification {
def "drain the queue into another collection"() {
setup:
def queue = new DDAgentWriter.WriterQueue<Integer>(capacity)
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}