Improvements due to the review
This commit is contained in:
parent
1c4c6bd8ec
commit
9145684491
|
|
@ -38,7 +38,7 @@ public class DDAgentWriter implements Writer {
|
||||||
/** Flush interval for the API in seconds */
|
/** Flush interval for the API in seconds */
|
||||||
private static final long FLUSH_TIME_SECONDS = 1;
|
private static final long FLUSH_TIME_SECONDS = 1;
|
||||||
|
|
||||||
/** Scheduled thread pool, it' acting like a cron */
|
/** Scheduled thread pool, acting like a cron */
|
||||||
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
|
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
|
||||||
|
|
||||||
/** Effective thread pool, where real logic is done */
|
/** Effective thread pool, where real logic is done */
|
||||||
|
|
@ -70,7 +70,7 @@ public class DDAgentWriter implements Writer {
|
||||||
|
|
||||||
final List<DDBaseSpan<?>> removed = traces.add(trace);
|
final List<DDBaseSpan<?>> removed = traces.add(trace);
|
||||||
if (removed != null && !queueFullReported) {
|
if (removed != null && !queueFullReported) {
|
||||||
log.warn("Queue is full, dropping one trace, queue size: {}", DEFAULT_MAX_TRACES);
|
log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES);
|
||||||
queueFullReported = true;
|
queueFullReported = true;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -107,7 +107,7 @@ public class DDAgentWriter implements Writer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Infinite tasks blocking until some spans come in the blocking queue. */
|
/** Infinite tasks blocking until some spans come in the blocking queue. */
|
||||||
private class TracesSendingTask implements Runnable {
|
class TracesSendingTask implements Runnable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
@ -122,8 +122,6 @@ public class DDAgentWriter implements Writer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void size() {}
|
|
||||||
|
|
||||||
class SendingTask implements Callable<Long> {
|
class SendingTask implements Callable<Long> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package com.datadoghq.trace.writer;
|
package com.datadoghq.trace.writer;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
|
|
@ -21,7 +20,6 @@ class WriterQueue<T> {
|
||||||
|
|
||||||
private final int capacity;
|
private final int capacity;
|
||||||
private volatile ArrayList<T> list;
|
private volatile ArrayList<T> list;
|
||||||
private volatile int elementCount = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default construct, a capacity must be provided
|
* Default construct, a capacity must be provided
|
||||||
|
|
@ -32,7 +30,7 @@ class WriterQueue<T> {
|
||||||
if (capacity < 1) {
|
if (capacity < 1) {
|
||||||
throw new IllegalArgumentException("Capacity couldn't be 0");
|
throw new IllegalArgumentException("Capacity couldn't be 0");
|
||||||
}
|
}
|
||||||
list = new ArrayList<>(capacity);
|
this.list = emptyList(capacity);
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,10 +41,8 @@ class WriterQueue<T> {
|
||||||
* @return a list contain all elements
|
* @return a list contain all elements
|
||||||
*/
|
*/
|
||||||
public synchronized List<T> getAll() {
|
public synchronized List<T> getAll() {
|
||||||
List<T> all = Collections.emptyList();
|
final List<T> all = list;
|
||||||
all = list;
|
list = emptyList(capacity);
|
||||||
list = new ArrayList<>(capacity);
|
|
||||||
elementCount = 0;
|
|
||||||
return all;
|
return all;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -60,11 +56,10 @@ class WriterQueue<T> {
|
||||||
public synchronized T add(final T element) {
|
public synchronized T add(final T element) {
|
||||||
|
|
||||||
T removed = null;
|
T removed = null;
|
||||||
if (elementCount < capacity) {
|
if (list.size() < capacity) {
|
||||||
list.add(element);
|
list.add(element);
|
||||||
++elementCount;
|
|
||||||
} else {
|
} else {
|
||||||
final int index = ThreadLocalRandom.current().nextInt(0, elementCount);
|
final int index = ThreadLocalRandom.current().nextInt(0, list.size());
|
||||||
removed = list.set(index, element);
|
removed = list.set(index, element);
|
||||||
}
|
}
|
||||||
return removed;
|
return removed;
|
||||||
|
|
@ -78,7 +73,7 @@ class WriterQueue<T> {
|
||||||
* @return the current size of the queue
|
* @return the current size of the queue
|
||||||
*/
|
*/
|
||||||
public int size() {
|
public int size() {
|
||||||
return elementCount;
|
return list.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -87,6 +82,10 @@ class WriterQueue<T> {
|
||||||
* @return true if the queue is empty
|
* @return true if the queue is empty
|
||||||
*/
|
*/
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
return elementCount == 0;
|
return list.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ArrayList<T> emptyList(final int capacity) {
|
||||||
|
return new ArrayList<>(capacity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue