diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 0cbd385c7d..3328b705ba 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -6,10 +6,14 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; /** @@ -29,11 +33,14 @@ public class DDAgentWriter implements Writer { public static final int DEFAULT_PORT = 8126; - /** Maximum number of spans kept in memory */ - private static final int DEFAULT_MAX_SPANS = 1000; + /** Maximum number of traces kept in memory */ + private static final int DEFAULT_MAX_TRACES = 1000; - /** Maximum number of traces sent to the DD agent API at once */ - private static final int DEFAULT_BATCH_SIZE = 10; + /** Timeout for the API in seconds */ + private static final long TIMEOUT = 2; + + /** Flush interval for the API in seconds */ + private static final long FLUSH_TIME = 5; /** * Used to ensure that we don't keep too many spans (while the blocking queue collect traces...) @@ -43,7 +50,10 @@ public class DDAgentWriter implements Writer { /** In memory collection of traces waiting for departure */ private final BlockingQueue>> traces; - /** Async worker that posts the spans to the DD agent */ + /** Scheduled thread pool, it' acting like a cron */ + private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); + + /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); /** The DD agent api */ @@ -57,8 +67,8 @@ public class DDAgentWriter implements Writer { super(); this.api = api; - tokens = new Semaphore(DEFAULT_MAX_SPANS); - traces = new ArrayBlockingQueue<>(DEFAULT_MAX_SPANS); + tokens = new Semaphore(DEFAULT_MAX_TRACES); + traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES); } /* (non-Javadoc) @@ -67,7 +77,7 @@ public class DDAgentWriter implements Writer { @Override public void write(final List> trace) { //Try to add a new span in the queue - final boolean proceed = tokens.tryAcquire(trace.size()); + final boolean proceed = tokens.tryAcquire(1); if (proceed) { traces.add(trace); @@ -75,7 +85,7 @@ public class DDAgentWriter implements Writer { log.warn( "Cannot add a trace of {} as the async queue is full. Queue max size: {}", trace.size(), - DEFAULT_MAX_SPANS); + DEFAULT_MAX_TRACES); } } @@ -84,7 +94,7 @@ public class DDAgentWriter implements Writer { */ @Override public void start() { - executor.submit(new SpansSendingTask()); + scheduledExecutor.scheduleAtFixedRate(new TracesSendingTask(), 0, FLUSH_TIME, TimeUnit.SECONDS); } /* (non-Javadoc) @@ -92,7 +102,14 @@ public class DDAgentWriter implements Writer { */ @Override public void close() { + scheduledExecutor.shutdownNow(); executor.shutdownNow(); + try { + scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + log.info("Writer properly closed and async writer interrupted."); + } + try { executor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { @@ -101,43 +118,48 @@ public class DDAgentWriter implements Writer { } /** Infinite tasks blocking until some spans come in the blocking queue. */ - protected class SpansSendingTask implements Runnable { + private class TracesSendingTask implements Runnable { @Override public void run() { - while (true) { - try { - final List>> payload = new ArrayList<>(); + final Future future = executor.submit(new SendingTask()); + try { + final long nbTraces = future.get(TIMEOUT, TimeUnit.SECONDS); + log.debug("Successfully sending {} traces to the API", nbTraces); + } catch (final TimeoutException e) { + log.debug("Timeout! Fail to send traces to the API: {}", e.getMessage()); + } catch (final Throwable e) { + log.debug("Fail to send traces to the API: {}", e.getMessage()); + } + } - //WAIT until a new span comes - final List> l = DDAgentWriter.this.traces.take(); - payload.add(l); + class SendingTask implements Callable { - //Drain all spans up to a certain batch suze - traces.drainTo(payload, DEFAULT_BATCH_SIZE); - - //SEND the payload to the agent - log.debug("Async writer about to write {} traces.", payload.size()); - api.sendTraces(payload); - - //Compute the number of spans sent - int spansCount = 0; - for (final List> trace : payload) { - spansCount += trace.size(); - } - log.debug( - "Async writer just sent {} spans through {} traces", spansCount, payload.size()); - - //Release the tokens - tokens.release(spansCount); - } catch (final InterruptedException e) { - log.info("Async writer interrupted."); - - //The thread was interrupted, we break the LOOP - break; - } catch (final Throwable e) { - log.error("Unexpected error! Some traces may have been dropped.", e); + @Override + public Long call() throws Exception { + if (traces.isEmpty()) { + return 0L; } + + final List>> payload = new ArrayList<>(); + final int nbTraces = traces.drainTo(payload); + + int nbSpans = 0; + for (final List trace : payload) { + nbSpans += trace.size(); + } + + // release the lock + tokens.release(nbTraces); + + log.debug("Sending {} traces ({} spans) to the API (async)", nbTraces, nbSpans); + final boolean isSent = api.sendTraces(payload); + + if (!isSent) { + log.warn("Failing to send {} traces to the API", nbTraces); + return 0L; + } + return (long) nbTraces; } } }