diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 3328b705ba..1243f73541 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -4,16 +4,17 @@ import com.datadoghq.trace.DDBaseSpan; import com.google.auto.service.AutoService; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import lombok.extern.slf4j.Slf4j; /** @@ -47,17 +48,16 @@ public class DDAgentWriter implements Writer { */ private final Semaphore tokens; - /** In memory collection of traces waiting for departure */ - private final BlockingQueue>> traces; - + /** Used to protect the traces during the drop */ + private final Lock lock; /** Scheduled thread pool, it' acting like a cron */ private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); - /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /** The DD agent api */ private final DDApi api; + /** In memory collection of traces waiting for departure */ + private ArrayList>> traces; public DDAgentWriter() { this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); @@ -67,8 +67,9 @@ public class DDAgentWriter implements Writer { super(); this.api = api; + lock = new ReentrantLock(); tokens = new Semaphore(DEFAULT_MAX_TRACES); - traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES); + traces = new ArrayList<>(DEFAULT_MAX_TRACES); } /* (non-Javadoc) @@ -76,17 +77,21 @@ public class DDAgentWriter implements Writer { */ @Override public void write(final List> trace) { + + lock.lock(); //Try to add a new span in the queue final boolean proceed = tokens.tryAcquire(1); if (proceed) { traces.add(trace); } else { - log.warn( - "Cannot add a trace of {} as the async queue is full. Queue max size: {}", - trace.size(), - DEFAULT_MAX_TRACES); + + final int index = ThreadLocalRandom.current().nextInt(0, DEFAULT_MAX_TRACES); + traces.remove(index); + traces.add(trace); + log.warn("Queue is full, dropping an element, queue size: {}", DEFAULT_MAX_TRACES); } + lock.unlock(); } /* (non-Javadoc) @@ -141,11 +146,15 @@ public class DDAgentWriter implements Writer { return 0L; } - final List>> payload = new ArrayList<>(); - final int nbTraces = traces.drainTo(payload); + lock.lock(); + final List>> payload = traces; + traces = new ArrayList<>(DEFAULT_MAX_TRACES); + lock.unlock(); int nbSpans = 0; + int nbTraces = 0; for (final List trace : payload) { + nbTraces++; nbSpans += trace.size(); }