Proposal for flushing writer (WIP)

This commit is contained in:
Guillaume Polaert 2017-08-08 14:53:20 +02:00
parent 08f44d15b4
commit 114ef60e6e
1 changed files with 63 additions and 41 deletions

View File

@ -6,10 +6,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
/**
@ -29,11 +33,14 @@ public class DDAgentWriter implements Writer {
public static final int DEFAULT_PORT = 8126;
/** Maximum number of spans kept in memory */
private static final int DEFAULT_MAX_SPANS = 1000;
/** Maximum number of traces kept in memory */
private static final int DEFAULT_MAX_TRACES = 1000;
/** Maximum number of traces sent to the DD agent API at once */
private static final int DEFAULT_BATCH_SIZE = 10;
/** Timeout for the API in seconds */
private static final long TIMEOUT = 2;
/** Flush interval for the API in seconds */
private static final long FLUSH_TIME = 5;
/**
* Used to ensure that we don't keep too many spans (while the blocking queue collect traces...)
@ -43,7 +50,10 @@ public class DDAgentWriter implements Writer {
/** In memory collection of traces waiting for departure */
private final BlockingQueue<List<DDBaseSpan<?>>> traces;
/** Async worker that posts the spans to the DD agent */
/** Scheduled thread pool, it' acting like a cron */
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
/** Effective thread pool, where real logic is done */
private final ExecutorService executor = Executors.newSingleThreadExecutor();
/** The DD agent api */
@ -57,8 +67,8 @@ public class DDAgentWriter implements Writer {
super();
this.api = api;
tokens = new Semaphore(DEFAULT_MAX_SPANS);
traces = new ArrayBlockingQueue<>(DEFAULT_MAX_SPANS);
tokens = new Semaphore(DEFAULT_MAX_TRACES);
traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES);
}
/* (non-Javadoc)
@ -67,7 +77,7 @@ public class DDAgentWriter implements Writer {
@Override
public void write(final List<DDBaseSpan<?>> trace) {
//Try to add a new span in the queue
final boolean proceed = tokens.tryAcquire(trace.size());
final boolean proceed = tokens.tryAcquire(1);
if (proceed) {
traces.add(trace);
@ -75,7 +85,7 @@ public class DDAgentWriter implements Writer {
log.warn(
"Cannot add a trace of {} as the async queue is full. Queue max size: {}",
trace.size(),
DEFAULT_MAX_SPANS);
DEFAULT_MAX_TRACES);
}
}
@ -84,7 +94,7 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void start() {
executor.submit(new SpansSendingTask());
scheduledExecutor.scheduleAtFixedRate(new TracesSendingTask(), 0, FLUSH_TIME, TimeUnit.SECONDS);
}
/* (non-Javadoc)
@ -92,7 +102,14 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void close() {
scheduledExecutor.shutdownNow();
executor.shutdownNow();
try {
scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
@ -101,43 +118,48 @@ public class DDAgentWriter implements Writer {
}
/** Infinite tasks blocking until some spans come in the blocking queue. */
protected class SpansSendingTask implements Runnable {
private class TracesSendingTask implements Runnable {
@Override
public void run() {
while (true) {
try {
final List<List<DDBaseSpan<?>>> payload = new ArrayList<>();
final Future<Long> future = executor.submit(new SendingTask());
try {
final long nbTraces = future.get(TIMEOUT, TimeUnit.SECONDS);
log.debug("Successfully sending {} traces to the API", nbTraces);
} catch (final TimeoutException e) {
log.debug("Timeout! Fail to send traces to the API: {}", e.getMessage());
} catch (final Throwable e) {
log.debug("Fail to send traces to the API: {}", e.getMessage());
}
}
//WAIT until a new span comes
final List<DDBaseSpan<?>> l = DDAgentWriter.this.traces.take();
payload.add(l);
class SendingTask implements Callable<Long> {
//Drain all spans up to a certain batch suze
traces.drainTo(payload, DEFAULT_BATCH_SIZE);
//SEND the payload to the agent
log.debug("Async writer about to write {} traces.", payload.size());
api.sendTraces(payload);
//Compute the number of spans sent
int spansCount = 0;
for (final List<DDBaseSpan<?>> trace : payload) {
spansCount += trace.size();
}
log.debug(
"Async writer just sent {} spans through {} traces", spansCount, payload.size());
//Release the tokens
tokens.release(spansCount);
} catch (final InterruptedException e) {
log.info("Async writer interrupted.");
//The thread was interrupted, we break the LOOP
break;
} catch (final Throwable e) {
log.error("Unexpected error! Some traces may have been dropped.", e);
@Override
public Long call() throws Exception {
if (traces.isEmpty()) {
return 0L;
}
final List<List<DDBaseSpan<?>>> payload = new ArrayList<>();
final int nbTraces = traces.drainTo(payload);
int nbSpans = 0;
for (final List<?> trace : payload) {
nbSpans += trace.size();
}
// release the lock
tokens.release(nbTraces);
log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans);
final boolean isSent = api.sendTraces(payload);
if (!isSent) {
log.warn("Failing to send {} traces to the API", nbTraces);
return 0L;
}
return (long) nbTraces;
}
}
}