Restricting sender queue size

To prevent unbounded memory consumption, restricting the size of the sender queue.  Also, lowering the size of the Disruptor queue.

Unfortunately, our choice of a ScheduledExecutorService makes this a bit difficult, since ScheduledExecutorService doesn't allow us to supply the queue.

A bigger change is in-order but for now, this change restricts the queue size by introducing a Semaphore around the ScheduledExecutorService.

In effort to making testing easier, I introduced Monitor.onFlush.  This is used in the new slow response test which attempts to simulate a situation where the sending queue would back up.
This commit is contained in:
dougqh 2019-12-09 17:15:32 -05:00
parent 307e56714e
commit 7883366b83
2 changed files with 120 additions and 31 deletions

View File

@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +40,8 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 8192;
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
@ -68,6 +70,8 @@ public class DDAgentWriter implements Writer {
private final DDApi api;
private final int flushFrequencySeconds;
private final Disruptor<Event<List<DDSpan>>> disruptor;
private final Semaphore senderSemaphore;
private final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
@ -83,7 +87,7 @@ public class DDAgentWriter implements Writer {
}
public DDAgentWriter(final DDApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY);
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
@ -98,14 +102,33 @@ public class DDAgentWriter implements Writer {
* @param disruptorSize Rounded up to next power of 2
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(
final DDApi api,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, flushFrequencySeconds);
this(api, new NoopMonitor(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
final DDApi api,
final Monitor monitor,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this.api = api;
this.monitor = monitor;
@ -120,6 +143,7 @@ public class DDAgentWriter implements Writer {
disruptor.handleEventsWith(new TraceConsumer());
this.flushFrequencySeconds = flushFrequencySeconds;
senderSemaphore = new Semaphore(senderQueueSize);
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
@ -291,15 +315,20 @@ public class DDAgentWriter implements Writer {
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
reportTraces();
boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces() {
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
monitor.onFlush(DDAgentWriter.this, early);
apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
@ -311,11 +340,22 @@ public class DDAgentWriter implements Writer {
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
monitor.onFlush(DDAgentWriter.this, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
@ -391,6 +431,8 @@ public class DDAgentWriter implements Writer {
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFlush(final DDAgentWriter agentWriter, final boolean early);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
@ -425,6 +467,9 @@ public class DDAgentWriter implements Writer {
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@ -521,6 +566,9 @@ public class DDAgentWriter implements Writer {
// not recorded
}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {

View File

@ -11,6 +11,7 @@ import datadog.trace.common.writer.DDApi
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@ -350,26 +351,31 @@ class DDAgentWriterTest extends DDSpecification {
}
def "slow response test"() {
def numPublished = 0
def numFailedPublish = 0
def numWritten = 0
def numFlushes = new AtomicInteger(0)
def numPublished = new AtomicInteger(0)
def numFailedPublish = new AtomicInteger(0)
def numRequests = new AtomicInteger(0)
def numFailedRequests = new AtomicInteger(0)
def responseSemaphore = new Semaphore(1)
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def first = true
def agent = httpServer {
handlers {
put("v0.4/traces") {
// DDApi sniffs for end point existence, so respond quickly the first time
// then slowly thereafter
if (!first) {
// Long enough to stall the pipeline, but not long enough to fail
Thread.sleep(2_500)
responseSemaphore.acquire()
try {
response.status(200).send()
} finally {
responseSemaphore.release()
}
response.status(200).send()
first = false
}
}
}
@ -378,49 +384,84 @@ class DDAgentWriterTest extends DDSpecification {
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
monitor.onPublish(_, _) >> {
numPublished += 1
numPublished.incrementAndGet()
}
monitor.onFailedPublish(_, _) >> {
numFailedPublish += 1
numFailedPublish.incrementAndGet()
}
monitor.onFlush(_, _) >> {
numFlushes.incrementAndGet()
}
monitor.onSend(_, _, _, _) >> {
numRequests.incrementAndGet()
}
monitor.onFailedPublish(_, _, _, _) >> {
numFailedRequests.incrementAndGet()
}
// sender queue is sized in requests -- not traces
def bufferSize = 32
def writer = new DDAgentWriter(api, monitor, bufferSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
def senderQueueSize = 2
def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
writer.start()
// gate responses
responseSemaphore.acquire()
when:
// write & flush a single trace -- the slow agent response will cause
// additional writes to back-up the sending queue
// write a single trace and flush
// with responseSemaphore held, the response is blocked but may still time out
writer.write(minimalTrace)
numWritten += 1
// sanity check coordination mechanism of test
// release to allow response to be generated
responseSemaphore.release()
writer.flush()
// reacquire semaphore to stall further responses
responseSemaphore.acquire()
then:
numPublished == 1
numFailedPublish == 0
numFailedPublish.get() == 0
numPublished.get() == numWritten
numPublished.get() + numFailedPublish.get() == numWritten
numFlushes.get() == 1
when:
// send many traces to flood the sender queue...
(1..20).each {
writer.write(minimalTrace)
// send many traces to fill the sender queue...
// loop until outstanding requests > finished requests
while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) {
// chunk the loop & wait to allow for flushing to send queue
(1..1_000).forEach {
writer.write(minimalTrace)
numWritten += 1
}
Thread.sleep(100)
}
then:
// might spill back into the Disruptor slightly, but sender queue is currently unbounded
numPublished == 1 + 20
numFailedPublish == 0
numFailedPublish.get() > 0
numPublished.get() + numFailedPublish.get() == numWritten
when:
// now, fill-up the disruptor buffer as well
(1..bufferSize * 2).each {
def priorNumFailed = numFailedPublish.get()
// with both disruptor & queue full, should reject everything
def expectedRejects = 100_000
(1..expectedRejects).each {
writer.write(minimalTrace)
numWritten += 1
}
then:
// Disruptor still doesn't reject because the sender queue is unbounded
(numPublished + numFailedPublish) == (1 + 20 + bufferSize * 2)
numFailedPublish >= 0
// timing means that not 100% will be rejected, but the vast majority should be rejected
numFailedPublish.get() - priorNumFailed > expectedRejects * 0.8
numPublished.get() + numFailedPublish.get() == numWritten
cleanup:
responseSemaphore.release()
writer.close()
agent.close()
}