Merge pull request #102 from DataDog/gpolaert/async-writer

Proposal for flushing writer
This commit is contained in:
Guillaume Polaert 2017-08-17 09:01:19 +02:00 committed by GitHub
commit f1b31a661a
6 changed files with 443 additions and 126 deletions

View File

@ -16,6 +16,7 @@ whitelistedInstructionClasses += whitelistedBranchClasses += [
'com.datadoghq.trace.DDTags',
'com.datadoghq.trace.DDTraceInfo',
'com.datadoghq.trace.util.Clock',
]
dependencies {
@ -38,6 +39,8 @@ dependencies {
testCompile group: 'org.spockframework', name: 'spock-core', version: '1.0-groovy-2.4'
testCompile group: 'org.codehaus.groovy', name: 'groovy-all', version: '2.4.4'
testCompile group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
testCompile group: 'org.objenesis', name: 'objenesis', version: '2.6'
testCompile group: 'cglib', name: 'cglib-nodep', version: '3.2.5'
jmh 'commons-io:commons-io:2.4'
}

View File

@ -2,14 +2,14 @@ package com.datadoghq.trace.writer;
import com.datadoghq.trace.DDBaseSpan;
import com.google.auto.service.AutoService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
/**
@ -29,36 +29,41 @@ public class DDAgentWriter implements Writer {
public static final int DEFAULT_PORT = 8126;
/** Maximum number of spans kept in memory */
private static final int DEFAULT_MAX_SPANS = 1000;
/** Maximum number of traces kept in memory */
static final int DEFAULT_MAX_TRACES = 1000;
/** Maximum number of traces sent to the DD agent API at once */
private static final int DEFAULT_BATCH_SIZE = 10;
/** Timeout for the API in seconds */
static final long API_TIMEOUT_SECONDS = 1;
/**
* Used to ensure that we don't keep too many spans (while the blocking queue collect traces...)
*/
private final Semaphore tokens;
/** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1;
/** In memory collection of traces waiting for departure */
private final BlockingQueue<List<DDBaseSpan<?>>> traces;
/** Scheduled thread pool, acting like a cron */
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
/** Async worker that posts the spans to the DD agent */
/** Effective thread pool, where real logic is done */
private final ExecutorService executor = Executors.newSingleThreadExecutor();
/** The DD agent api */
private final DDApi api;
/** In memory collection of traces waiting for departure */
private final WriterQueue<List<DDBaseSpan<?>>> traces;
private boolean queueFullReported = false;
public DDAgentWriter() {
this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT));
}
public DDAgentWriter(final DDApi api) {
this(api, new WriterQueue<List<DDBaseSpan<?>>>(DEFAULT_MAX_TRACES));
}
public DDAgentWriter(final DDApi api, final WriterQueue<List<DDBaseSpan<?>>> queue) {
super();
this.api = api;
tokens = new Semaphore(DEFAULT_MAX_SPANS);
traces = new ArrayBlockingQueue<>(DEFAULT_MAX_SPANS);
traces = queue;
}
/* (non-Javadoc)
@ -66,17 +71,13 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void write(final List<DDBaseSpan<?>> trace) {
//Try to add a new span in the queue
final boolean proceed = tokens.tryAcquire(trace.size());
if (proceed) {
traces.add(trace);
} else {
log.warn(
"Cannot add a trace of {} as the async queue is full. Queue max size: {}",
trace.size(),
DEFAULT_MAX_SPANS);
final List<DDBaseSpan<?>> removed = traces.add(trace);
if (removed != null && !queueFullReported) {
log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES);
queueFullReported = true;
return;
}
queueFullReported = false;
}
/* (non-Javadoc)
@ -84,7 +85,8 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void start() {
executor.submit(new SpansSendingTask());
scheduledExecutor.scheduleAtFixedRate(
new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
}
/* (non-Javadoc)
@ -92,7 +94,14 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void close() {
scheduledExecutor.shutdownNow();
executor.shutdownNow();
try {
scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
@ -101,43 +110,46 @@ public class DDAgentWriter implements Writer {
}
/** Infinite tasks blocking until some spans come in the blocking queue. */
protected class SpansSendingTask implements Runnable {
class TracesSendingTask implements Runnable {
@Override
public void run() {
while (true) {
final Future<Long> future = executor.submit(new SendingTask());
try {
final List<List<DDBaseSpan<?>>> payload = new ArrayList<>();
//WAIT until a new span comes
final List<DDBaseSpan<?>> l = DDAgentWriter.this.traces.take();
payload.add(l);
//Drain all spans up to a certain batch suze
traces.drainTo(payload, DEFAULT_BATCH_SIZE);
//SEND the payload to the agent
log.debug("Async writer about to write {} traces.", payload.size());
api.sendTraces(payload);
//Compute the number of spans sent
int spansCount = 0;
for (final List<DDBaseSpan<?>> trace : payload) {
spansCount += trace.size();
}
log.debug(
"Async writer just sent {} spans through {} traces", spansCount, payload.size());
//Release the tokens
tokens.release(spansCount);
} catch (final InterruptedException e) {
log.info("Async writer interrupted.");
//The thread was interrupted, we break the LOOP
break;
final long nbTraces = future.get(API_TIMEOUT_SECONDS, TimeUnit.SECONDS);
log.debug("Successfully sending {} traces to the API", nbTraces);
} catch (final TimeoutException e) {
log.debug("Timeout! Fail to send traces to the API: {}", e.getMessage());
} catch (final Throwable e) {
log.error("Unexpected error! Some traces may have been dropped.", e);
}
log.debug("Fail to send traces to the API: {}", e.getMessage());
}
}
class SendingTask implements Callable<Long> {
@Override
public Long call() throws Exception {
if (traces.isEmpty()) {
return 0L;
}
final List<List<DDBaseSpan<?>>> payload = traces.getAll();
if (log.isDebugEnabled()) {
int nbSpans = 0;
for (final List<?> trace : payload) {
nbSpans += trace.size();
}
log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
}
final boolean isSent = api.sendTraces(payload);
if (!isSent) {
log.warn("Failing to send {} traces to the API", payload.size());
return 0L;
}
return (long) payload.size();
}
}
}

View File

@ -0,0 +1,91 @@
package com.datadoghq.trace.writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* A bounded queue implementation compatible with the Datadog agent behavior. The class is
* thread-safe and can be used with concurrency.
*
* <p>
*
* <p>This class implements a specific behavior when it's full. Each new item added will replace an
* exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform
* efficient random remove.
*
* @param <T> The element type to store
*/
class WriterQueue<T> {
private final int capacity;
private volatile ArrayList<T> list;
/**
* Default construct, a capacity must be provided
*
* @param capacity the max size of the queue
*/
WriterQueue(final int capacity) {
if (capacity < 1) {
throw new IllegalArgumentException("Capacity couldn't be 0");
}
this.list = emptyList(capacity);
this.capacity = capacity;
}
/**
* Return a list containing all elements present in the queue. After the operation, the queue is
* reset. All action performed on the returned list has no impact to the queue
*
* @return a list contain all elements
*/
public synchronized List<T> getAll() {
final List<T> all = list;
list = emptyList(capacity);
return all;
}
/**
* Add an element to the queue. If the queue is full, set the element at a random place in the
* queue and return the previous one.
*
* @param element the element to add to the queue
* @return null if the queue is not full, otherwise the removed element
*/
public synchronized T add(final T element) {
T removed = null;
if (list.size() < capacity) {
list.add(element);
} else {
final int index = ThreadLocalRandom.current().nextInt(0, list.size());
removed = list.set(index, element);
}
return removed;
}
// Methods below are essentially used for testing purposes
/**
* Return the number of elements set in the queue
*
* @return the current size of the queue
*/
public int size() {
return list.size();
}
/**
* Return true if the queue is empty
*
* @return true if the queue is empty
*/
public boolean isEmpty() {
return list.isEmpty();
}
private ArrayList<T> emptyList(final int capacity) {
return new ArrayList<>(capacity);
}
}

View File

@ -0,0 +1,93 @@
package com.datadoghq.trace.writer
import com.datadoghq.trace.DDBaseSpan
import spock.lang.Specification
import static com.datadoghq.trace.SpanFactory.newSpanOf
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.verifyNoMoreInteractions
class DDAgentWriterTest extends Specification {
def "calls to the API are scheduled"() {
setup:
def api = Mock(DDApi)
def writer = new DDAgentWriter(api)
when:
writer.start()
Thread.sleep(flush_time_wait)
then:
0 * api.sendTraces(_ as List)
when:
for (def i = 0; i < tick; i++) {
writer.write(trace)
Thread.sleep(flush_time_wait)
}
then:
tick * api.sendTraces([trace])
where:
trace = [newSpanOf(0)]
flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000))
tick << [1, 3]
}
def "check if trace has been added by force"() {
setup:
def traces = new WriterQueue<List<DDBaseSpan<?>>>(capacity)
def writer = new DDAgentWriter(Mock(DDApi), traces)
when:
for (def i = 0; i < capacity; i++) {
writer.write([])
}
then:
traces.size() == capacity
when:
writer.write(trace)
then:
traces.size() == capacity
traces.getAll().contains(trace)
where:
trace = [newSpanOf(0)]
capacity = 10
}
def "check that are no interactions after close"() {
setup:
def api = mock(DDApi)
def writer = new DDAgentWriter(api)
writer.start()
when:
writer.close()
writer.write([])
Thread.sleep(flush_time_wait)
then:
verifyNoMoreInteractions(api)
where:
flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000))
}
}

View File

@ -0,0 +1,183 @@
package com.datadoghq.trace.writer
import spock.lang.Specification
import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicInteger
class WriterQueueTest extends Specification {
def "instantiate a empty queue throws an exception"() {
when:
new WriterQueue<Integer>(0)
then:
thrown IllegalArgumentException
when:
new WriterQueue<Integer>(-1)
then:
thrown IllegalArgumentException
}
def "full the queue without forcing"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
def removed = false
when:
for (def i = 0; i < capacity; i++) {
removed = removed || queue.add(i) != null
}
then:
!removed
where:
capacity << [1, 10, 100]
}
def "force element add to a full queue"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}
when:
def removed = queue.add(1)
then:
removed != null
queue.size() == capacity
where:
capacity << [1, 10, 100]
}
def "drain the queue into another collection"() {
setup:
def queue = new WriterQueue<Integer>(capacity)
for (def i = 0; i < capacity; i++) {
queue.add(i)
}
when:
def list = queue.getAll()
then:
list.size() == capacity
queue.isEmpty()
queue.size() == 0
where:
capacity << [1, 10, 100]
}
def "check concurrency on writes"() {
setup:
def phaser1 = new Phaser()
def phaser2 = new Phaser()
def queue = new WriterQueue<Integer>(capacity)
def insertionCount = new AtomicInteger(0)
phaser1.register() // global start
phaser2.register() // global stop
numberThreads.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberInsertionsPerThread.times {
queue.add(1)
insertionCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
when:
phaser1.arriveAndAwaitAdvance() // allow threads to start
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
then:
queue.size() == capacity
insertionCount.get() == numberInsertionsPerThread * numberThreads
where:
capacity = 100
numberThreads << [1, 10, 100]
numberInsertionsPerThread = 100
}
def "check concurrency on writes and reads"() {
setup:
def phaser1 = new Phaser()
def phaser2 = new Phaser()
def queue = new WriterQueue<Integer>(capacity)
def insertionCount = new AtomicInteger(0)
def droppedCount = new AtomicInteger(0)
def getCount = new AtomicInteger(0)
def numberElements = new AtomicInteger(0)
phaser1.register() // global start
phaser2.register() // global stop
// writes
numberThreadsWrites.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberInsertionsPerThread.times {
queue.add(1) != null ? droppedCount.getAndIncrement() : null
insertionCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
// reads
numberThreadsReads.times {
phaser1.register()
Thread.start {
phaser2.register()
phaser1.arriveAndAwaitAdvance()
numberGetsPerThread.times {
numberElements.getAndAdd(queue.getAll().size())
getCount.getAndIncrement()
}
phaser2.arriveAndAwaitAdvance()
}
}
when:
phaser1.arriveAndAwaitAdvance() // allow threads to start
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
then:
insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites
getCount.get() == numberGetsPerThread * numberThreadsReads
insertionCount.get() == numberElements + queue.size() + droppedCount
where:
capacity = 100
numberThreadsWrites << [1, 10, 100]
numberThreadsReads << [1, 5, 10]
numberInsertionsPerThread = 100
numberGetsPerThread = 5
}
}

View File

@ -1,65 +0,0 @@
package com.datadoghq.trace.writer.impl;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.datadoghq.trace.DDBaseSpan;
import com.datadoghq.trace.DDSpan;
import com.datadoghq.trace.DDTracer;
import com.datadoghq.trace.writer.DDAgentWriter;
import com.datadoghq.trace.writer.DDApi;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
public class DDAgentWriterTest {
DDSpan parent = null;
DDApi mockedAPI = null;
List<List<DDBaseSpan<?>>> traces = new ArrayList<>();
DDAgentWriter ddAgentWriter = null;
@Before
public void setUp() throws Exception {
//Setup
final DDTracer tracer = new DDTracer();
parent = tracer.buildSpan("hello-world").withServiceName("service-name").startManual();
parent.setBaggageItem("a-baggage", "value");
Thread.sleep(100);
final DDSpan child = tracer.buildSpan("hello-world").asChildOf(parent).startManual();
Thread.sleep(100);
child.finish();
Thread.sleep(100);
parent.finish();
//Create DDWriter
traces.add(new ArrayList<>(parent.context().getTrace()));
mockedAPI = mock(DDApi.class);
when(mockedAPI.sendTraces(traces)).thenReturn(true);
ddAgentWriter = new DDAgentWriter(mockedAPI);
ddAgentWriter.start();
}
@Test
public void testWrite() throws Exception {
ddAgentWriter.write(new ArrayList<>(parent.context().getTrace()));
Thread.sleep(500);
verify(mockedAPI).sendTraces(traces);
}
@Test
public void testClose() throws Exception {
ddAgentWriter.close();
ddAgentWriter.write(new ArrayList<>(parent.context().getTrace()));
Thread.sleep(500);
verifyNoMoreInteractions(mockedAPI);
}
}