New API: Add Datadog Agent writer

This commit is contained in:
Nikolay Martynov 2019-01-10 13:08:19 -05:00
parent a70b698040
commit fce1af97b8
9 changed files with 376 additions and 0 deletions

View File

@ -32,6 +32,9 @@ class AgentClient {
static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version";
static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count";
static final int CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
static final int READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
@ -91,6 +94,12 @@ class AgentClient {
connection.setDoOutput(true);
connection.setDoInput(true);
// It is important to have timeout for agent request here: we need to finish request in some
// reasonable amount
// of time to allow following requests to be run.
connection.setConnectTimeout(CONNECT_TIMEOUT);
connection.setReadTimeout(READ_TIMEOUT);
connection.setRequestMethod("PUT");
connection.setRequestProperty(CONTENT_TYPE, MSGPACK);
connection.setRequestProperty(DATADOG_META_LANG, "java");

View File

@ -0,0 +1,162 @@
package datadog.trace.tracer.writer;
import datadog.trace.tracer.Trace;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AgentWriter implements Writer {
/** Maximum number of traces kept in memory */
static final int DEFAULT_QUEUE_SIZE = 7000;
/** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1;
/** Maximum amount of time to await for scheduler to shutdown */
static final long SHUTDOWN_TIMEOUT_SECONDS = 1;
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, "dd-agent-writer");
thread.setDaemon(true);
return thread;
}
};
/** Scheduled thread pool, acting like a cron */
private final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, THREAD_FACTORY);
private final TracesSendingTask task;
private final ShutdownCallback shutdownCallback;
public AgentWriter(final AgentClient client) {
this(client, DEFAULT_QUEUE_SIZE);
}
AgentWriter(final AgentClient client, final int queueSize) {
task = new TracesSendingTask(client, queueSize);
shutdownCallback = new ShutdownCallback(executorService);
}
@Override
public void write(final Trace trace) {
if (trace.isValid()) {
if (!task.getQueue().offer(trace)) {
log.debug("Writer queue is full, dropping trace {}", trace);
}
}
}
@Override
public void incrementTraceCount() {
task.getTraceCount().incrementAndGet();
}
@Override
public SampleRateByService getSampleRateByService() {
return task.getSampleRateByService().get();
}
@Override
public void start() {
executorService.scheduleAtFixedRate(task, 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
@Override
public void close() {
// Perform actions needed to shutdown this writer
shutdownCallback.run();
}
@Override
public void finalize() {
close();
}
/** Infinite tasks blocking until some spans come in the queue. */
private static final class TracesSendingTask implements Runnable {
/** The Datadog agent client */
private final AgentClient client;
/** Queue size */
private final int queueSize;
/** In memory collection of traces waiting for departure */
@Getter private final BlockingQueue<Trace> queue;
/** Number of traces to be written */
@Getter private final AtomicInteger traceCount = new AtomicInteger(0);
/** Sample rate by service returned by Datadog agent */
@Getter
private final AtomicReference<SampleRateByService> sampleRateByService =
new AtomicReference<>(SampleRateByService.EMPTY_INSTANCE);
TracesSendingTask(final AgentClient client, final int queueSize) {
this.client = client;
this.queueSize = queueSize;
queue = new ArrayBlockingQueue<>(queueSize);
}
@Override
public void run() {
try {
final List<Trace> tracesToWrite = new ArrayList<>(queueSize);
queue.drainTo(tracesToWrite);
if (tracesToWrite.size() > 0) {
sampleRateByService.set(client.sendTraces(tracesToWrite, traceCount.getAndSet(0)));
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
}
}
}
/**
* Helper to handle shutting down of the Writer because JVM is shutting down or Writer is closed.
*/
// Visible for testing
static final class ShutdownCallback extends Thread {
private final ExecutorService executorService;
public ShutdownCallback(final ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void run() {
// We use this logic in two cases:
// * When JVM is shutting down
// * When Writer is closed manually/via GC
// In latter case we need to remove shutdown hook.
try {
Runtime.getRuntime().removeShutdownHook(this);
} catch (final IllegalStateException ex) {
// The JVM may be shutting down.
}
try {
executorService.shutdownNow();
executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
}
}
}

View File

@ -16,6 +16,11 @@ public class LoggingWriter implements Writer {
// Nothing to do here.
}
@Override
public SampleRateByService getSampleRateByService() {
return SampleRateByService.EMPTY_INSTANCE;
}
@Override
public void start() {
// TODO: do we really need this? and if we do - who is responsible for calling this?

View File

@ -12,6 +12,8 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode
class SampleRateByService {
static final SampleRateByService EMPTY_INSTANCE = new SampleRateByService(Collections.EMPTY_MAP);
private final Map<String, Double> rateByService;
@JsonCreator

View File

@ -21,6 +21,9 @@ public interface Writer {
*/
void incrementTraceCount();
/** @return Most up to date {@link SampleRateByService} instance. */
SampleRateByService getSampleRateByService();
/** Start the writer */
void start();

View File

@ -3,6 +3,7 @@ package datadog.trace.tracer
import datadog.trace.api.Config
import datadog.trace.tracer.sampling.AllSampler
import datadog.trace.tracer.writer.LoggingWriter
import datadog.trace.tracer.writer.SampleRateByService
import datadog.trace.tracer.writer.Writer
import spock.lang.Shared
import spock.lang.Specification
@ -181,6 +182,11 @@ class TracerTest extends Specification {
traceCount.incrementAndGet()
}
@Override
SampleRateByService getSampleRateByService() {
return null // Doesn't matter for now
}
@Override
void start() {
//nothing to do for now

View File

@ -122,6 +122,21 @@ class AgentClientTest extends Specification {
response == null
}
def "test timeout"() {
setup:
stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withChunkedDribbleDelay(5, AgentClient.READ_TIMEOUT * 2)))
def trace = createTrace("123")
when:
def response = client.sendTraces([trace], TRACE_COUNT)
then:
response == null
}
def "test invalid url"() {
when:

View File

@ -0,0 +1,166 @@
package datadog.trace.tracer.writer
import datadog.trace.tracer.Trace
import spock.lang.Specification
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
class AgentWriterTest extends Specification {
// Amount of time within with we expect flush to happen.
// We make this slightly longer than flush time.
private static final int FLUSH_DELAY = TimeUnit.SECONDS.toMillis(AgentWriter.FLUSH_TIME_SECONDS * 2)
def sampleRateByService = Mock(SampleRateByService)
def client = Mock(AgentClient)
def "test happy path"() {
setup:
def incrementTraceCountBy = 5
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> false
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
for (def trace : traces) {
writer.write(trace)
}
incrementTraceCountBy.times {
writer.incrementTraceCount()
}
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0], traces[2]], incrementTraceCountBy) >> sampleRateByService
and:
writer.getSampleRateByService() == sampleRateByService
then:
0 * client.sendTraces(_, _)
cleanup:
writer.close()
}
def "test small queue"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client, 1)
when:
for (def trace : traces) {
writer.write(trace)
}
writer.start()
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0)
cleanup:
writer.close()
}
def "test client exception handling"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
writer.write(traces[0])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception")}
writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE
when:
writer.write(traces[1])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[1]], 0) >> sampleRateByService
writer.getSampleRateByService() == sampleRateByService
cleanup:
writer.close()
}
def "test default sample rate by service"() {
setup:
def writer = new AgentWriter(client)
when:
def sampleRateByService = writer.getSampleRateByService()
then:
sampleRateByService == SampleRateByService.EMPTY_INSTANCE
}
def "test start/#closeMethod"() {
setup:
def writer = new AgentWriter(client)
expect:
!isWriterThreadRunning()
when:
writer.start()
then:
isWriterThreadRunning()
when:
writer."${closeMethod}"()
then:
!isWriterThreadRunning()
where:
closeMethod | _
"close" | _
"finalize" | _
}
def "test shutdown callback"() {
setup:
def executor = Mock(ExecutorService) {
awaitTermination(_, _) >> { throw new InterruptedException() }
}
def callback = new AgentWriter.ShutdownCallback(executor)
when:
callback.run()
then:
noExceptionThrown()
}
boolean isWriterThreadRunning() {
return Thread.getAllStackTraces().keySet().any{ t -> t.getName() == "dd-agent-writer" }
}
}

View File

@ -37,4 +37,12 @@ class LoggingWriterTest extends Specification {
1 * trace.toString()
}
def "test getter"() {
when:
def sampleRateByInstance = writer.getSampleRateByService()
then:
sampleRateByInstance == SampleRateByService.EMPTY_INSTANCE
}
}