diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java index 1f37878985..50cb661010 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java @@ -7,135 +7,132 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Semaphore; +import java.util.concurrent.*; /** * This writer write provided traces to the a DD agent which is most of time located on the same host. - * + *

* It handles writes asynchronuously so the calling threads are automatically released. However, if too much spans are collected * the writers can reach a state where it is forced to drop incoming spans. */ public class DDAgentWriter implements Writer { - private static final Logger logger = LoggerFactory.getLogger(DDAgentWriter.class.getName()); - - /** - * Default location of the DD agent - */ - protected static final String DEFAULT_HOSTNAME = "localhost"; - protected static final int DEFAULT_PORT = 8126; - - /** - * Maximum number of spans kept in memory - */ - protected static final int DEFAULT_MAX_SPANS = 1000; - - /** - * Maximum number of traces sent to the DD agent API at once - */ - protected static final int DEFAULT_BATCH_SIZE = 10; + private static final Logger logger = LoggerFactory.getLogger(DDAgentWriter.class.getName()); - /** - * Used to ensure that we don't keep too many spans (while the blocking queue collect traces...) - */ - private final Semaphore tokens; - - /** - * In memory collection of traces waiting for departure - */ - private final BlockingQueue> traces; - - /** - * Async worker that posts the spans to the DD agent - */ - private final Thread asyncWriterThread; + /** + * Default location of the DD agent + */ + protected static final String DEFAULT_HOSTNAME = "localhost"; + protected static final int DEFAULT_PORT = 8126; - /** - * The DD agent api - */ - private final DDApi api; + /** + * Maximum number of spans kept in memory + */ + protected static final int DEFAULT_MAX_SPANS = 1000; - public DDAgentWriter() { - this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); - } - - public DDAgentWriter(DDApi api) { - super(); - this.api = api; - - tokens = new Semaphore(DEFAULT_MAX_SPANS); - traces = new ArrayBlockingQueue>(DEFAULT_MAX_SPANS); + /** + * Maximum number of traces sent to the DD agent API at once + */ + protected static final int DEFAULT_BATCH_SIZE = 10; - asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask"); - asyncWriterThread.setDaemon(true); - asyncWriterThread.start(); - } + /** + * Used to ensure that we don't keep too many spans (while the blocking queue collect traces...) + */ + private final Semaphore tokens; - /* (non-Javadoc) - * @see com.datadoghq.trace.Writer#write(java.util.List) - */ - public void write(List trace) { - //Try to add a new span in the queue - boolean proceed = tokens.tryAcquire(trace.size()); + /** + * In memory collection of traces waiting for departure + */ + private final BlockingQueue> traces; - if(proceed){ - traces.add(trace); - }else{ - logger.warn("Cannot add a trace of {} as the async queue is full. Queue max size: {}", trace.size(), DEFAULT_MAX_SPANS); - } - } + /** + * Async worker that posts the spans to the DD agent + */ + private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /* (non-Javadoc) - * @see com.datadoghq.trace.Writer#close() - */ - public void close() { - asyncWriterThread.interrupt(); - try { - asyncWriterThread.join(); - } catch (InterruptedException e) { - logger.info("Writer properly closed and async writer interrupted."); - } - } + /** + * The DD agent api + */ + private final DDApi api; - /** - * Infinite tasks blocking until some spans come in the blocking queue. - */ - protected class SpansSendingTask implements Runnable { - - public void run() { - while (true) { - try { - List> payload = new ArrayList>(); - - //WAIT until a new span comes - List l = DDAgentWriter.this.traces.take(); - payload.add(l); - - //Drain all spans up to a certain batch suze - traces.drainTo(payload, DEFAULT_BATCH_SIZE); + public DDAgentWriter() { + this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); + } - //SEND the payload to the agent - logger.debug("Async writer about to write {} traces.", payload.size()); - api.sendTraces(payload); + public DDAgentWriter(DDApi api) { + super(); + this.api = api; - //Compute the number of spans sent - int spansCount = 0; - for(List trace:payload){ - spansCount+=trace.size(); - } - logger.debug("Async writer just sent {} spans through {} traces", spansCount, payload.size()); + tokens = new Semaphore(DEFAULT_MAX_SPANS); + traces = new ArrayBlockingQueue>(DEFAULT_MAX_SPANS); - //Release the tokens - tokens.release(spansCount); - } catch (InterruptedException e) { - logger.info("Async writer interrupted."); + executor.submit(new SpansSendingTask()); - //The thread was interrupted, we break the LOOP - break; - } - } - } - } + } + + /* (non-Javadoc) + * @see com.datadoghq.trace.Writer#write(java.util.List) + */ + public void write(List trace) { + //Try to add a new span in the queue + boolean proceed = tokens.tryAcquire(trace.size()); + + if (proceed) { + traces.add(trace); + } else { + logger.warn("Cannot add a trace of {} as the async queue is full. Queue max size: {}", trace.size(), DEFAULT_MAX_SPANS); + } + } + + /* (non-Javadoc) + * @see com.datadoghq.trace.Writer#close() + */ + public void close() { + executor.shutdownNow(); + try { + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.info("Writer properly closed and async writer interrupted."); + } + } + + /** + * Infinite tasks blocking until some spans come in the blocking queue. + */ + protected class SpansSendingTask implements Runnable { + + public void run() { + while (true) { + try { + List> payload = new ArrayList>(); + + //WAIT until a new span comes + List l = DDAgentWriter.this.traces.take(); + payload.add(l); + + //Drain all spans up to a certain batch suze + traces.drainTo(payload, DEFAULT_BATCH_SIZE); + + //SEND the payload to the agent + logger.debug("Async writer about to write {} traces.", payload.size()); + api.sendTraces(payload); + + //Compute the number of spans sent + int spansCount = 0; + for (List trace : payload) { + spansCount += trace.size(); + } + logger.debug("Async writer just sent {} spans through {} traces", spansCount, payload.size()); + + //Release the tokens + tokens.release(spansCount); + } catch (InterruptedException e) { + logger.info("Async writer interrupted."); + + //The thread was interrupted, we break the LOOP + break; + } + } + } + } }