Merge pull request #1156 from DataDog/tyler/ddagentwriter-split

Split DDAgentWriter into smaller classes
This commit is contained in:
Tyler Benson 2020-01-03 11:19:28 -08:00 committed by GitHub
commit 8500dbc2ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 670 additions and 571 deletions

View File

@ -14,8 +14,8 @@ import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.sampling.PrioritySampler;
import datadog.trace.common.sampling.Sampler;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.DDApi;
import datadog.trace.common.writer.Writer;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import datadog.trace.context.ScopeListener;
import io.opentracing.References;
import io.opentracing.Scope;
@ -245,11 +245,8 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
injector = HttpCodec.createInjector(Config.get());
extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders);
if (this.writer instanceof DDAgentWriter) {
final DDApi api = ((DDAgentWriter) this.writer).getApi();
if (sampler instanceof DDApi.ResponseListener) {
api.addResponseListener((DDApi.ResponseListener) this.sampler);
}
if (this.writer instanceof DDAgentWriter && sampler instanceof DDAgentResponseListener) {
((DDAgentWriter) this.writer).addResponseListener((DDAgentResponseListener) this.sampler);
}
log.info("New instance: {}", this);

View File

@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.writer.DDApi.ResponseListener;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j;
* <p>The configuration of (serviceName,env)->rate is configured by the core agent.
*/
@Slf4j
public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener {
public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentResponseListener {
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
/** Key for setting the default/baseline rate */

View File

@ -5,31 +5,19 @@ import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.util.DaemonThreadFactory;
import java.util.ArrayList;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.TraceConsumer;
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
@ -43,57 +31,35 @@ import lombok.extern.slf4j.Slf4j;
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR =
new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>() {
@Override
public void translateTo(
final Event<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
event.data = trace;
}
};
private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR =
new EventTranslator<Event<List<DDSpan>>>() {
@Override
public void translateTo(final Event<List<DDSpan>> event, final long sequence) {
event.shouldFlush = true;
}
};
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
private final Runnable flushTask = new FlushTask();
private final DDApi api;
private final int flushFrequencySeconds;
private final Disruptor<Event<List<DDSpan>>> disruptor;
private final DDAgentApi api;
public final int flushFrequencySeconds;
public final TraceSerializingDisruptor disruptor;
private final Semaphore senderSemaphore;
private final ScheduledExecutorService scheduledWriterExecutor;
public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
private final Phaser apiPhaser;
private volatile boolean running = false;
public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
private final Monitor monitor;
public final Monitor monitor;
public DDAgentWriter() {
this(
new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
new NoopMonitor());
new DDAgentApi(
DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
new Monitor.Noop());
}
public DDAgentWriter(final DDApi api, final Monitor monitor) {
public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
private DDAgentWriter(final DDApi api) {
this(api, new NoopMonitor());
private DDAgentWriter(final DDAgentApi api) {
this(api, new Monitor.Noop());
}
/**
@ -104,16 +70,16 @@ public class DDAgentWriter implements Writer {
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(
final DDApi api,
final DDAgentApi api,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds);
this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDApi api,
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
@ -121,12 +87,13 @@ public class DDAgentWriter implements Writer {
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
private DDAgentWriter(
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
final DDApi api,
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int senderQueueSize,
@ -135,25 +102,22 @@ public class DDAgentWriter implements Writer {
this.monitor = monitor;
disruptor =
new Disruptor<>(
new DisruptorEventFactory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(new TraceConsumer());
new TraceSerializingDisruptor(
disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this));
this.flushFrequencySeconds = flushFrequencySeconds;
senderSemaphore = new Semaphore(senderQueueSize);
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
}
public void addResponseListener(final DDAgentResponseListener listener) {
api.addResponseListener(listener);
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
return disruptor.getDisruptorCapacity();
}
public final long getDisruptorUtilizedCapacity() {
@ -161,14 +125,14 @@ public class DDAgentWriter implements Writer {
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
return disruptor.getDisruptorRemainingCapacity();
}
@Override
public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
if (running) {
final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
if (disruptor.running) {
final boolean published = disruptor.tryPublish(trace);
if (published) {
monitor.onPublish(DDAgentWriter.this, trace);
@ -191,22 +155,19 @@ public class DDAgentWriter implements Writer {
traceCount.incrementAndGet();
}
public DDApi getApi() {
public DDAgentApi getApi() {
return api;
}
@Override
public void start() {
disruptor.start();
running = true;
scheduleFlush();
monitor.onStart(this);
}
@Override
public void close() {
running = false;
boolean flushSuccess = true;
@ -221,34 +182,13 @@ public class DDAgentWriter implements Writer {
flushSuccess = false;
}
flushSuccess |= flush();
disruptor.shutdown();
flushSuccess |= disruptor.flush();
disruptor.close();
monitor.onShutdown(this, flushSuccess);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
@Override
public String toString() {
// DQH - I don't particularly like the instanceof check,
@ -257,396 +197,11 @@ public class DDAgentWriter implements Writer {
// if something is *probably* the NoopMonitor.
String str = "DDAgentWriter { api=" + api;
if (!(monitor instanceof NoopMonitor)) {
if (!(monitor instanceof Monitor.Noop)) {
str += ", monitor=" + monitor;
}
str += " }";
return str;
}
private void scheduleFlush() {
if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
monitor.onScheduleFlush(this, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
/** This class is intentionally not threadsafe. */
private class TraceConsumer implements EventHandler<Event<List<DDSpan>>> {
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
@Override
public void onEvent(
final Event<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = api.serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
monitor.onFlush(DDAgentWriter.this, early);
apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (scheduledWriterExecutor.isShutdown()) {
monitor.onFailedSend(
DDAgentWriter.this, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
monitor.onFlush(DDAgentWriter.this, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
monitor.onSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
DDAgentWriter.this,
representativeCount,
sizeInBytes,
DDApi.Response.failed(e));
} finally {
apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
scheduleFlush();
}
}
}
private static class Event<T> {
private volatile boolean shouldFlush = false;
private volatile T data = null;
}
private static class DisruptorEventFactory<T> implements EventFactory<Event<T>> {
@Override
public Event<T> newInstance() {
return new Event<>();
}
}
/**
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
* lifecycle events...
*
* <ul>
* <li>start
* <li>shutdown
* <li>publishing to disruptor
* <li>serializing
* <li>sending to agent
* </ul>
*/
public interface Monitor {
void onStart(final DDAgentWriter agentWriter);
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFlush(final DDAgentWriter agentWriter, final boolean early);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
}
public static final class NoopMonitor implements Monitor {
@Override
public void onStart(final DDAgentWriter agentWriter) {}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public String toString() {
return "NoOp";
}
}
public static final class StatsDMonitor implements Monitor {
public static final String PREFIX = "datadog.tracer";
public static final String LANG_TAG = "lang";
public static final String LANG_VERSION_TAG = "lang_version";
public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
public static final String TRACER_VERSION_TAG = "tracer_version";
private final String hostInfo;
private final StatsDClient statsd;
// DQH - Made a conscious choice to not take a Config object here.
// Letting the creating of the Monitor take the Config,
// so it can decide which Monitor variant to create.
public StatsDMonitor(final String host, final int port) {
hostInfo = host + ":" + port;
statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
}
// Currently, intended for testing
private StatsDMonitor(final StatsDClient statsd) {
hostInfo = null;
this.statsd = statsd;
}
protected static final String[] getDefaultTags() {
return new String[] {
tag(LANG_TAG, "java"),
tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
};
}
private static String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@Override
public void onStart(final DDAgentWriter agentWriter) {
statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.accepted");
statsd.count("queue.accepted_lengths", trace.size());
}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.dropped");
}
@Override
public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
// not recorded
}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
// map precisely
statsd.count("queue.accepted_size", serializedTrace.length);
}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {
// TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
// api.errors???
}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
private void onSendAttempt(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
statsd.incrementCounter("api.requests");
statsd.recordGaugeValue("queue.length", representativeCount);
// TODO: missing queue.spans (# of spans being sent)
statsd.recordGaugeValue("queue.size", sizeInBytes);
if (response.exception() != null) {
// covers communication errors -- both not receiving a response or
// receiving malformed response (even when otherwise successful)
statsd.incrementCounter("api.errors");
}
if (response.status() != null) {
statsd.incrementCounter("api.responses", "status: " + response.status());
}
}
@Override
public String toString() {
if (hostInfo == null) {
return "StatsD";
} else {
return "StatsD { host=" + hostInfo + " }";
}
}
}
}

View File

@ -2,6 +2,8 @@ package datadog.trace.common.writer;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.Monitor;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
@ -65,14 +67,14 @@ public interface Writer extends Closeable {
return new DDAgentWriter(createApi(config), createMonitor(config));
}
private static final DDApi createApi(final Config config) {
return new DDApi(
private static DDAgentApi createApi(final Config config) {
return new DDAgentApi(
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
}
private static final DDAgentWriter.Monitor createMonitor(final Config config) {
private static Monitor createMonitor(final Config config) {
if (!config.isHealthMetricsEnabled()) {
return new DDAgentWriter.NoopMonitor();
return new Monitor.Noop();
} else {
String host = config.getHealthMetricsStatsdHost();
if (host == null) {
@ -87,7 +89,7 @@ public interface Writer extends Closeable {
port = config.getJmxFetchStatsdPort();
}
return new DDAgentWriter.StatsDMonitor(host, port);
return new Monitor.StatsD(host, port);
}
}

View File

@ -1,4 +1,4 @@
package datadog.trace.common.writer;
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -21,7 +21,6 @@ import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
@ -29,7 +28,7 @@ import org.msgpack.jackson.dataformat.MessagePackFactory;
/** The API pointing to a DD agent */
@Slf4j
public class DDApi {
public class DDAgentApi {
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
@ -44,7 +43,7 @@ public class DDApi {
private static final String TRACES_ENDPOINT_V4 = "v0.4/traces";
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
private final List<ResponseListener> responseListeners = new ArrayList<>();
private final List<DDAgentResponseListener> responseListeners = new ArrayList<>();
private volatile long nextAllowedLogTime = 0;
@ -54,7 +53,7 @@ public class DDApi {
private final OkHttpClient httpClient;
private final HttpUrl tracesUrl;
public DDApi(final String host, final int port, final String unixDomainSocketPath) {
public DDAgentApi(final String host, final int port, final String unixDomainSocketPath) {
this(
host,
port,
@ -62,7 +61,7 @@ public class DDApi {
unixDomainSocketPath);
}
DDApi(
DDAgentApi(
final String host,
final int port,
final boolean v4EndpointsAvailable,
@ -77,7 +76,7 @@ public class DDApi {
}
}
public void addResponseListener(final ResponseListener listener) {
public void addResponseListener(final DDAgentResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
@ -90,7 +89,7 @@ public class DDApi {
* @return a Response object -- encapsulating success of communication, sending, and result
* parsing
*/
public Response sendTraces(final List<List<DDSpan>> traces) {
Response sendTraces(final List<List<DDSpan>> traces) {
final List<byte[]> serializedTraces = new ArrayList<>(traces.size());
int sizeInBytes = 0;
for (final List<DDSpan> trace : traces) {
@ -187,7 +186,7 @@ public class DDApi {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final String endpoint = tracesUrl.toString();
for (final ResponseListener listener : responseListeners) {
for (final DDAgentResponseListener listener : responseListeners) {
listener.onResponse(endpoint, parsedResponse);
}
return Response.success(response.code(), parsedResponse);
@ -348,26 +347,21 @@ public class DDApi {
}
public final boolean success() {
return this.success;
return success;
}
// TODO: DQH - In Java 8, switch to OptionalInteger
public final Integer status() {
return this.status;
return status;
}
public final JsonNode json() {
return this.json;
return json;
}
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
public final Throwable exception() {
return this.exception;
return exception;
}
}
public interface ResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}
}

View File

@ -0,0 +1,8 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.databind.JsonNode;
public interface DDAgentResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}

View File

@ -0,0 +1,41 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import datadog.opentracing.DDSpan;
import java.util.List;
class DisruptorEvent<T> {
public volatile boolean shouldFlush = false;
public volatile T data = null;
static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
@Override
public DisruptorEvent<T> newInstance() {
return new DisruptorEvent<>();
}
}
static class TraceTranslator
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
new DisruptorEvent.TraceTranslator();
@Override
public void translateTo(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
event.data = trace;
}
}
static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> {
static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
new DisruptorEvent.FlushTranslator();
@Override
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) {
event.shouldFlush = true;
}
}
}

View File

@ -0,0 +1,231 @@
package datadog.trace.common.writer.ddagent;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.List;
/**
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
* lifecycle events...
*
* <ul>
* <li>start
* <li>shutdown
* <li>publishing to disruptor
* <li>serializing
* <li>sending to agent
* </ul>
*/
public interface Monitor {
void onStart(final DDAgentWriter agentWriter);
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFlush(final DDAgentWriter agentWriter, final boolean early);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response);
void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response);
final class StatsD implements Monitor {
public static final String PREFIX = "datadog.tracer";
public static final String LANG_TAG = "lang";
public static final String LANG_VERSION_TAG = "lang_version";
public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
public static final String TRACER_VERSION_TAG = "tracer_version";
private final String hostInfo;
private final StatsDClient statsd;
// DQH - Made a conscious choice to not take a Config object here.
// Letting the creating of the Monitor take the Config,
// so it can decide which Monitor variant to create.
public StatsD(final String host, final int port) {
hostInfo = host + ":" + port;
statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
}
// Currently, intended for testing
private StatsD(final StatsDClient statsd) {
hostInfo = null;
this.statsd = statsd;
}
protected static final String[] getDefaultTags() {
return new String[] {
tag(LANG_TAG, "java"),
tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
};
}
private static String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@Override
public void onStart(final DDAgentWriter agentWriter) {
statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.accepted");
statsd.count("queue.accepted_lengths", trace.size());
}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.dropped");
}
@Override
public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
// not recorded
}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
// map precisely
statsd.count("queue.accepted_size", serializedTrace.length);
}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {
// TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
// api.errors???
}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
private void onSendAttempt(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response) {
statsd.incrementCounter("api.requests");
statsd.recordGaugeValue("queue.length", representativeCount);
// TODO: missing queue.spans (# of spans being sent)
statsd.recordGaugeValue("queue.size", sizeInBytes);
if (response.exception() != null) {
// covers communication errors -- both not receiving a response or
// receiving malformed response (even when otherwise successful)
statsd.incrementCounter("api.errors");
}
if (response.status() != null) {
statsd.incrementCounter("api.responses", "status: " + response.status());
}
}
@Override
public String toString() {
if (hostInfo == null) {
return "StatsD";
} else {
return "StatsD { host=" + hostInfo + " }";
}
}
}
final class Noop implements Monitor {
@Override
public void onStart(final DDAgentWriter agentWriter) {}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response) {}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDAgentApi.Response response) {}
@Override
public String toString() {
return "NoOp";
}
}
}

View File

@ -0,0 +1,150 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/** This class is intentionally not threadsafe. */
@Slf4j
public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private final AtomicInteger traceCount;
private final Semaphore senderSemaphore;
private final DDAgentWriter writer;
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
public TraceConsumer(
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
this.traceCount = traceCount;
senderSemaphore = new Semaphore(senderQueueSize);
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
writer.monitor.onSerialize(writer, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
writer.monitor.onFlush(writer, early);
writer.apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (writer.scheduledWriterExecutor.isShutdown()) {
writer.monitor.onFailedSend(
writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
writer.apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
writer.monitor.onFlush(writer, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
writer.scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDAgentApi.Response response =
writer
.getApi()
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
} finally {
writer.apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
writer.apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
writer.disruptor.scheduleFlush();
}
}
}

View File

@ -0,0 +1,117 @@
package datadog.trace.common.writer.ddagent;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TraceSerializingDisruptor implements Closeable {
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private final FlushTask flushTask = new FlushTask();
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
private final DDAgentWriter writer;
public volatile boolean running = false;
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
public TraceSerializingDisruptor(
final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
this.writer = writer;
disruptor.handleEventsWith(handler);
}
public void start() {
disruptor.start();
running = true;
scheduleFlush();
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
public boolean tryPublish(final List<DDSpan> trace) {
return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
writer.apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
public void scheduleFlush() {
if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
writer.scheduledWriterExecutor.schedule(
flushTask, writer.flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
writer.monitor.onScheduleFlush(writer, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
}

View File

@ -14,6 +14,7 @@ import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import io.opentracing.propagation.TextMapInject
import org.junit.Rule
@ -49,7 +50,7 @@ class DDTracerTest extends DDSpecification {
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" ||
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces"
tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor
tracer.writer.monitor instanceof Monitor.Noop
tracer.spanContextDecorators.size() == 15
@ -65,7 +66,7 @@ class DDTracerTest extends DDSpecification {
def tracer = new DDTracer(new Config())
then:
tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor
tracer.writer.monitor instanceof Monitor.StatsD
tracer.writer.monitor.hostInfo == "localhost:8125"
}

View File

@ -3,8 +3,8 @@ package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.DDApi.ResponseListener
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import java.util.concurrent.atomic.AtomicLong
@ -12,8 +12,8 @@ import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
class DDApiTest extends DDSpecification {
static mapper = DDApi.OBJECT_MAPPER
class DDAgentApiTest extends DDSpecification {
static mapper = DDAgentApi.OBJECT_MAPPER
def "sending an empty list of traces returns no errors"() {
setup:
@ -30,7 +30,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
@ -51,7 +51,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces"
@ -73,7 +73,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
def client = new DDAgentApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
@ -125,7 +125,7 @@ class DDApiTest extends DDSpecification {
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
ResponseListener responseListener = { String endpoint, JsonNode responseJson ->
DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
agentResponse.set(responseJson.toString())
}
def agent = httpServer {
@ -136,7 +136,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
def client = new DDAgentApi("localhost", agent.address.port, null)
client.addResponseListener(responseListener)
when:
@ -162,7 +162,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", v3Agent.address.port, null)
def client = new DDAgentApi("localhost", v3Agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces"
@ -189,7 +189,7 @@ class DDApiTest extends DDSpecification {
}
}
def port = badPort ? 999 : agent.address.port
def client = new DDApi("localhost", port, null)
def client = new DDAgentApi("localhost", port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces"
@ -216,7 +216,7 @@ class DDApiTest extends DDSpecification {
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
def client = new DDAgentApi("localhost", agent.address.port, null)
when:
def success = client.sendTraces(traces).success()

View File

@ -7,7 +7,9 @@ import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.common.writer.ddagent.TraceConsumer
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
@ -22,7 +24,7 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
class DDAgentWriterTest extends DDSpecification {
def api = Mock(DDApi)
def api = Mock(DDAgentApi)
def "test happy path"() {
setup:
@ -32,7 +34,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(trace)
writer.write(trace)
writer.flush()
writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -55,7 +57,7 @@ class DDAgentWriterTest extends DDSpecification {
(1..traceCount).each {
writer.write(trace)
}
writer.flush()
writer.disruptor.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -95,7 +97,7 @@ class DDAgentWriterTest extends DDSpecification {
writer.write(trace)
}
// Flush the remaining 2
writer.flush()
writer.disruptor.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -116,7 +118,7 @@ class DDAgentWriterTest extends DDSpecification {
def phaser = writer.apiPhaser
phaser.register()
writer.start()
writer.flush()
writer.disruptor.flush()
when:
(1..5).each {
@ -151,7 +153,7 @@ class DDAgentWriterTest extends DDSpecification {
// Busywait because we don't want to fill up the ring buffer
}
}
writer.flush()
writer.disruptor.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -178,8 +180,8 @@ class DDAgentWriterTest extends DDSpecification {
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {
@ -191,7 +193,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.close()
writer.write([])
writer.flush()
writer.disruptor.flush()
then:
0 * _
@ -206,7 +208,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write([])
writer.flush()
writer.disruptor.flush()
then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
@ -251,8 +253,8 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def api = new DDAgentApi("localhost", agent.address.port, null)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -263,7 +265,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -300,8 +302,8 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def api = new DDAgentApi("localhost", agent.address.port, null)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -312,7 +314,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -334,16 +336,16 @@ class DDAgentWriterTest extends DDSpecification {
setup:
def minimalTrace = createMinimalTrace()
def api = new DDApi("localhost", 8192, null) {
DDApi.Response sendSerializedTraces(
def api = new DDAgentApi("localhost", 8192, null) {
DDAgentApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List<byte[]> traces) {
// simulating a communication failure to a server
return DDApi.Response.failed(new IOException("comm error"))
return DDAgentApi.Response.failed(new IOException("comm error"))
}
}
def monitor = Mock(DDAgentWriter.Monitor)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -354,7 +356,7 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
@ -398,10 +400,10 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@ -436,7 +438,7 @@ class DDAgentWriterTest extends DDSpecification {
// sanity check coordination mechanism of test
// release to allow response to be generated
responseSemaphore.release()
writer.flush()
writer.disruptor.flush()
// reacquire semaphore to stall further responses
responseSemaphore.acquire()
@ -503,10 +505,10 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@ -536,7 +538,7 @@ class DDAgentWriterTest extends DDSpecification {
t1.join()
t2.join()
writer.flush()
writer.disruptor.flush()
then:
def totalTraces = 100 + 100
@ -564,7 +566,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def api = new DDAgentApi("localhost", agent.address.port, null)
def statsd = Stub(StatsDClient)
statsd.incrementCounter("queue.accepted") >> { stat ->
@ -577,13 +579,13 @@ class DDAgentWriterTest extends DDSpecification {
numResponses += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
numTracesAccepted == 1
@ -604,13 +606,13 @@ class DDAgentWriterTest extends DDSpecification {
def minimalTrace = createMinimalTrace()
// DQH -- need to set-up a dummy agent for the final send callback to work
def api = new DDApi("localhost", 8192, null) {
DDApi.Response sendSerializedTraces(
def api = new DDAgentApi("localhost", 8192, null) {
DDAgentApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List<byte[]> traces) {
// simulating a communication failure to a server
return DDApi.Response.failed(new IOException("comm error"))
return DDAgentApi.Response.failed(new IOException("comm error"))
}
}
@ -625,13 +627,13 @@ class DDAgentWriterTest extends DDSpecification {
numErrors += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
writer.flush()
writer.disruptor.flush()
then:
numRequests == 1

View File

@ -4,8 +4,9 @@ import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
@ -20,7 +21,7 @@ class DDApiIntegrationTest {
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
static class DDApiIntegrationV4Test extends DDSpecification {
static class DDAgentApiIntegrationV4Test extends DDSpecification {
static final WRITER = new ListWriter()
static final TRACER = new DDTracer(WRITER)
static final CONTEXT = new DDSpanContext(
@ -64,7 +65,7 @@ class DDApiIntegrationTest {
def endpoint = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<String>(null)
DDApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
endpoint.set(receivedEndpoint)
agentResponse.set(responseJson.toString())
}
@ -109,10 +110,10 @@ class DDApiIntegrationTest {
}
def setup() {
api = new DDApi(agentContainerHost, agentContainerPort, v4(), null)
api = new DDAgentApi(agentContainerHost, agentContainerPort, v4(), null)
api.addResponseListener(responseListener)
unixDomainSocketApi = new DDApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
unixDomainSocketApi.addResponseListener(responseListener)
}
@ -159,7 +160,7 @@ class DDApiIntegrationTest {
}
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
static class DDApiIntegrationV3Test extends DDApiIntegrationV4Test {
static class DDAgentApiIntegrationV3Test extends DDAgentApiIntegrationV4Test {
boolean v4() {
return false
}