Extract Monitor classes from DDAgentWriter

This commit is contained in:
Tyler Benson 2019-12-20 12:09:14 -08:00
parent 7920a25b7e
commit 6e57041a6a
5 changed files with 254 additions and 244 deletions

View File

@ -13,11 +13,9 @@ import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.ddagent.Monitor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@ -84,7 +82,7 @@ public class DDAgentWriter implements Writer {
public DDAgentWriter() {
this(
new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
new NoopMonitor());
new Monitor.Noop());
}
public DDAgentWriter(final DDApi api, final Monitor monitor) {
@ -93,7 +91,7 @@ public class DDAgentWriter implements Writer {
/** Old signature (pre-Monitor) used in tests */
private DDAgentWriter(final DDApi api) {
this(api, new NoopMonitor());
this(api, new Monitor.Noop());
}
/**
@ -108,7 +106,7 @@ public class DDAgentWriter implements Writer {
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, senderQueueSize, flushFrequencySeconds);
this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
@ -122,7 +120,7 @@ public class DDAgentWriter implements Writer {
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
@ -257,7 +255,7 @@ public class DDAgentWriter implements Writer {
// if something is *probably* the NoopMonitor.
String str = "DDAgentWriter { api=" + api;
if (!(monitor instanceof NoopMonitor)) {
if (!(monitor instanceof Monitor.Noop)) {
str += ", monitor=" + monitor;
}
str += " }";
@ -426,227 +424,4 @@ public class DDAgentWriter implements Writer {
return new Event<>();
}
}
/**
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
* lifecycle events...
*
* <ul>
* <li>start
* <li>shutdown
* <li>publishing to disruptor
* <li>serializing
* <li>sending to agent
* </ul>
*/
public interface Monitor {
void onStart(final DDAgentWriter agentWriter);
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFlush(final DDAgentWriter agentWriter, final boolean early);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
}
public static final class NoopMonitor implements Monitor {
@Override
public void onStart(final DDAgentWriter agentWriter) {}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public String toString() {
return "NoOp";
}
}
public static final class StatsDMonitor implements Monitor {
public static final String PREFIX = "datadog.tracer";
public static final String LANG_TAG = "lang";
public static final String LANG_VERSION_TAG = "lang_version";
public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
public static final String TRACER_VERSION_TAG = "tracer_version";
private final String hostInfo;
private final StatsDClient statsd;
// DQH - Made a conscious choice to not take a Config object here.
// Letting the creating of the Monitor take the Config,
// so it can decide which Monitor variant to create.
public StatsDMonitor(final String host, final int port) {
hostInfo = host + ":" + port;
statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
}
// Currently, intended for testing
private StatsDMonitor(final StatsDClient statsd) {
hostInfo = null;
this.statsd = statsd;
}
protected static final String[] getDefaultTags() {
return new String[] {
tag(LANG_TAG, "java"),
tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
};
}
private static String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@Override
public void onStart(final DDAgentWriter agentWriter) {
statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.accepted");
statsd.count("queue.accepted_lengths", trace.size());
}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.dropped");
}
@Override
public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
// not recorded
}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
// map precisely
statsd.count("queue.accepted_size", serializedTrace.length);
}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {
// TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
// api.errors???
}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
private void onSendAttempt(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
statsd.incrementCounter("api.requests");
statsd.recordGaugeValue("queue.length", representativeCount);
// TODO: missing queue.spans (# of spans being sent)
statsd.recordGaugeValue("queue.size", sizeInBytes);
if (response.exception() != null) {
// covers communication errors -- both not receiving a response or
// receiving malformed response (even when otherwise successful)
statsd.incrementCounter("api.errors");
}
if (response.status() != null) {
statsd.incrementCounter("api.responses", "status: " + response.status());
}
}
@Override
public String toString() {
if (hostInfo == null) {
return "StatsD";
} else {
return "StatsD { host=" + hostInfo + " }";
}
}
}
}

View File

@ -2,6 +2,7 @@ package datadog.trace.common.writer;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import datadog.trace.common.writer.ddagent.Monitor;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
@ -65,14 +66,14 @@ public interface Writer extends Closeable {
return new DDAgentWriter(createApi(config), createMonitor(config));
}
private static final DDApi createApi(final Config config) {
private static DDApi createApi(final Config config) {
return new DDApi(
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
}
private static final DDAgentWriter.Monitor createMonitor(final Config config) {
private static Monitor createMonitor(final Config config) {
if (!config.isHealthMetricsEnabled()) {
return new DDAgentWriter.NoopMonitor();
return new Monitor.Noop();
} else {
String host = config.getHealthMetricsStatsdHost();
if (host == null) {
@ -87,7 +88,7 @@ public interface Writer extends Closeable {
port = config.getJmxFetchStatsdPort();
}
return new DDAgentWriter.StatsDMonitor(host, port);
return new Monitor.StatsD(host, port);
}
}

View File

@ -0,0 +1,232 @@
package datadog.trace.common.writer.ddagent;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.DDApi;
import java.util.List;
/**
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
* lifecycle events...
*
* <ul>
* <li>start
* <li>shutdown
* <li>publishing to disruptor
* <li>serializing
* <li>sending to agent
* </ul>
*/
public interface Monitor {
void onStart(final DDAgentWriter agentWriter);
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFlush(final DDAgentWriter agentWriter, final boolean early);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
final class StatsD implements Monitor {
public static final String PREFIX = "datadog.tracer";
public static final String LANG_TAG = "lang";
public static final String LANG_VERSION_TAG = "lang_version";
public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
public static final String TRACER_VERSION_TAG = "tracer_version";
private final String hostInfo;
private final StatsDClient statsd;
// DQH - Made a conscious choice to not take a Config object here.
// Letting the creating of the Monitor take the Config,
// so it can decide which Monitor variant to create.
public StatsD(final String host, final int port) {
hostInfo = host + ":" + port;
statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
}
// Currently, intended for testing
private StatsD(final StatsDClient statsd) {
hostInfo = null;
this.statsd = statsd;
}
protected static final String[] getDefaultTags() {
return new String[] {
tag(LANG_TAG, "java"),
tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
};
}
private static String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@Override
public void onStart(final DDAgentWriter agentWriter) {
statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.accepted");
statsd.count("queue.accepted_lengths", trace.size());
}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.dropped");
}
@Override
public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
// not recorded
}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
// map precisely
statsd.count("queue.accepted_size", serializedTrace.length);
}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {
// TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
// api.errors???
}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
private void onSendAttempt(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
statsd.incrementCounter("api.requests");
statsd.recordGaugeValue("queue.length", representativeCount);
// TODO: missing queue.spans (# of spans being sent)
statsd.recordGaugeValue("queue.size", sizeInBytes);
if (response.exception() != null) {
// covers communication errors -- both not receiving a response or
// receiving malformed response (even when otherwise successful)
statsd.incrementCounter("api.errors");
}
if (response.status() != null) {
statsd.incrementCounter("api.responses", "status: " + response.status());
}
}
@Override
public String toString() {
if (hostInfo == null) {
return "StatsD";
} else {
return "StatsD { host=" + hostInfo + " }";
}
}
}
final class Noop implements Monitor {
@Override
public void onStart(final DDAgentWriter agentWriter) {}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFlush(final DDAgentWriter agentWriter, final boolean early) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public String toString() {
return "NoOp";
}
}
}

View File

@ -14,6 +14,7 @@ import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import io.opentracing.propagation.TextMapInject
import org.junit.Rule
@ -49,7 +50,7 @@ class DDTracerTest extends DDSpecification {
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" ||
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces"
tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor
tracer.writer.monitor instanceof Monitor.Noop
tracer.spanContextDecorators.size() == 15
@ -65,7 +66,7 @@ class DDTracerTest extends DDSpecification {
def tracer = new DDTracer(new Config())
then:
tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor
tracer.writer.monitor instanceof Monitor.StatsD
tracer.writer.monitor.hostInfo == "localhost:8125"
}

View File

@ -8,6 +8,7 @@ import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
@ -252,7 +253,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -301,7 +302,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -343,7 +344,7 @@ class DDAgentWriterTest extends DDSpecification {
return DDApi.Response.failed(new IOException("comm error"))
}
}
def monitor = Mock(DDAgentWriter.Monitor)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
@ -401,7 +402,7 @@ class DDAgentWriterTest extends DDSpecification {
def api = new DDApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@ -506,7 +507,7 @@ class DDAgentWriterTest extends DDSpecification {
def api = new DDApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
@ -577,7 +578,7 @@ class DDAgentWriterTest extends DDSpecification {
numResponses += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
@ -625,7 +626,7 @@ class DDAgentWriterTest extends DDSpecification {
numErrors += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()