Merge pull request #1150 from DataDog/tyler/fix-disruptor-shutdown

Allow flush to return if executor already shut down
This commit is contained in:
Tyler Benson 2019-12-19 16:12:02 -08:00 committed by GitHub
commit ad22bd9cfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 81 additions and 47 deletions

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
@ -317,7 +318,7 @@ public class DDAgentWriter implements Writer {
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
@ -333,6 +334,12 @@ public class DDAgentWriter implements Writer {
return;
// scheduleFlush called in finally block.
}
if (scheduledWriterExecutor.isShutdown()) {
monitor.onFailedSend(
DDAgentWriter.this, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
@ -340,60 +347,67 @@ public class DDAgentWriter implements Writer {
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
monitor.onFlush(DDAgentWriter.this, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
monitor.onFlush(DDAgentWriter.this, early);
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
try {
final DDApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
try {
final DDApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
monitor.onSend(DDAgentWriter.this, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
monitor.onSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
DDAgentWriter.this,
representativeCount,
sizeInBytes,
DDApi.Response.failed(e));
} finally {
apiPhaser.arrive(); // Flush completed.
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
DDAgentWriter.this,
representativeCount,
sizeInBytes,
DDApi.Response.failed(e));
} finally {
apiPhaser.arrive(); // Flush completed.
}
}
});
});
} catch (final RejectedExecutionException ex) {
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
scheduleFlush();
@ -541,7 +555,7 @@ public class DDAgentWriter implements Writer {
};
}
private static final String tag(final String tagPrefix, final String tagValue) {
private static String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@ -626,6 +640,7 @@ public class DDAgentWriter implements Writer {
}
}
@Override
public String toString() {
if (hostInfo == null) {
return "StatsD";

View File

@ -198,6 +198,25 @@ class DDAgentWriterTest extends DDSpecification {
writer.traceCount.get() == 0
}
def "check shutdown if executor stopped first"() {
setup:
def writer = new DDAgentWriter(api)
writer.start()
writer.scheduledWriterExecutor.shutdown()
when:
writer.write([])
writer.flush()
then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
0 * _
writer.traceCount.get() == 1
cleanup:
writer.close()
}
def createMinimalTrace() {
def minimalContext = new DDSpanContext(
1G,