Implementing random delete

This commit is contained in:
Guillaume Polaert 2017-08-08 16:13:07 +02:00
parent 114ef60e6e
commit 17fa5f63b1
1 changed files with 23 additions and 14 deletions

View File

@ -4,16 +4,17 @@ import com.datadoghq.trace.DDBaseSpan;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -47,17 +48,16 @@ public class DDAgentWriter implements Writer {
*/ */
private final Semaphore tokens; private final Semaphore tokens;
/** In memory collection of traces waiting for departure */ /** Used to protect the traces during the drop */
private final BlockingQueue<List<DDBaseSpan<?>>> traces; private final Lock lock;
/** Scheduled thread pool, it' acting like a cron */ /** Scheduled thread pool, it' acting like a cron */
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1); private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
/** Effective thread pool, where real logic is done */ /** Effective thread pool, where real logic is done */
private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final ExecutorService executor = Executors.newSingleThreadExecutor();
/** The DD agent api */ /** The DD agent api */
private final DDApi api; private final DDApi api;
/** In memory collection of traces waiting for departure */
private ArrayList<List<DDBaseSpan<?>>> traces;
public DDAgentWriter() { public DDAgentWriter() {
this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT)); this(new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT));
@ -67,8 +67,9 @@ public class DDAgentWriter implements Writer {
super(); super();
this.api = api; this.api = api;
lock = new ReentrantLock();
tokens = new Semaphore(DEFAULT_MAX_TRACES); tokens = new Semaphore(DEFAULT_MAX_TRACES);
traces = new ArrayBlockingQueue<>(DEFAULT_MAX_TRACES); traces = new ArrayList<>(DEFAULT_MAX_TRACES);
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -76,17 +77,21 @@ public class DDAgentWriter implements Writer {
*/ */
@Override @Override
public void write(final List<DDBaseSpan<?>> trace) { public void write(final List<DDBaseSpan<?>> trace) {
lock.lock();
//Try to add a new span in the queue //Try to add a new span in the queue
final boolean proceed = tokens.tryAcquire(1); final boolean proceed = tokens.tryAcquire(1);
if (proceed) { if (proceed) {
traces.add(trace); traces.add(trace);
} else { } else {
log.warn(
"Cannot add a trace of {} as the async queue is full. Queue max size: {}", final int index = ThreadLocalRandom.current().nextInt(0, DEFAULT_MAX_TRACES);
trace.size(), traces.remove(index);
DEFAULT_MAX_TRACES); traces.add(trace);
log.warn("Queue is full, dropping an element, queue size: {}", DEFAULT_MAX_TRACES);
} }
lock.unlock();
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -141,11 +146,15 @@ public class DDAgentWriter implements Writer {
return 0L; return 0L;
} }
final List<List<DDBaseSpan<?>>> payload = new ArrayList<>(); lock.lock();
final int nbTraces = traces.drainTo(payload); final List<List<DDBaseSpan<?>>> payload = traces;
traces = new ArrayList<>(DEFAULT_MAX_TRACES);
lock.unlock();
int nbSpans = 0; int nbSpans = 0;
int nbTraces = 0;
for (final List<?> trace : payload) { for (final List<?> trace : payload) {
nbTraces++;
nbSpans += trace.size(); nbSpans += trace.size();
} }