Move DDApi and rename to DDAgentApi
Reduce visibility back to what it was before refactoring.
This commit is contained in:
parent
84f9d80258
commit
24e2fe6da7
|
@ -14,8 +14,8 @@ import datadog.trace.api.sampling.PrioritySampling;
|
||||||
import datadog.trace.common.sampling.PrioritySampler;
|
import datadog.trace.common.sampling.PrioritySampler;
|
||||||
import datadog.trace.common.sampling.Sampler;
|
import datadog.trace.common.sampling.Sampler;
|
||||||
import datadog.trace.common.writer.DDAgentWriter;
|
import datadog.trace.common.writer.DDAgentWriter;
|
||||||
import datadog.trace.common.writer.DDApi;
|
|
||||||
import datadog.trace.common.writer.Writer;
|
import datadog.trace.common.writer.Writer;
|
||||||
|
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||||
import datadog.trace.context.ScopeListener;
|
import datadog.trace.context.ScopeListener;
|
||||||
import io.opentracing.References;
|
import io.opentracing.References;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
|
@ -246,9 +246,9 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
|
||||||
extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders);
|
extractor = HttpCodec.createExtractor(Config.get(), taggedHeaders);
|
||||||
|
|
||||||
if (this.writer instanceof DDAgentWriter) {
|
if (this.writer instanceof DDAgentWriter) {
|
||||||
final DDApi api = ((DDAgentWriter) this.writer).getApi();
|
final DDAgentApi api = ((DDAgentWriter) this.writer).getApi();
|
||||||
if (sampler instanceof DDApi.ResponseListener) {
|
if (sampler instanceof DDAgentApi.ResponseListener) {
|
||||||
api.addResponseListener((DDApi.ResponseListener) this.sampler);
|
api.addResponseListener((DDAgentApi.ResponseListener) this.sampler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import com.fasterxml.jackson.databind.node.NumericNode;
|
import com.fasterxml.jackson.databind.node.NumericNode;
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.trace.api.sampling.PrioritySampling;
|
import datadog.trace.api.sampling.PrioritySampling;
|
||||||
import datadog.trace.common.writer.DDApi.ResponseListener;
|
import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
|
@ -7,6 +7,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.trace.common.util.DaemonThreadFactory;
|
import datadog.trace.common.util.DaemonThreadFactory;
|
||||||
|
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||||
import datadog.trace.common.writer.ddagent.Monitor;
|
import datadog.trace.common.writer.ddagent.Monitor;
|
||||||
import datadog.trace.common.writer.ddagent.TraceConsumer;
|
import datadog.trace.common.writer.ddagent.TraceConsumer;
|
||||||
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
|
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
|
||||||
|
@ -34,7 +35,7 @@ public class DDAgentWriter implements Writer {
|
||||||
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
|
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
|
||||||
new DaemonThreadFactory("dd-trace-writer");
|
new DaemonThreadFactory("dd-trace-writer");
|
||||||
|
|
||||||
private final DDApi api;
|
private final DDAgentApi api;
|
||||||
public final int flushFrequencySeconds;
|
public final int flushFrequencySeconds;
|
||||||
public final TraceSerializingDisruptor disruptor;
|
public final TraceSerializingDisruptor disruptor;
|
||||||
|
|
||||||
|
@ -46,16 +47,17 @@ public class DDAgentWriter implements Writer {
|
||||||
|
|
||||||
public DDAgentWriter() {
|
public DDAgentWriter() {
|
||||||
this(
|
this(
|
||||||
new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
|
new DDAgentApi(
|
||||||
|
DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
|
||||||
new Monitor.Noop());
|
new Monitor.Noop());
|
||||||
}
|
}
|
||||||
|
|
||||||
public DDAgentWriter(final DDApi api, final Monitor monitor) {
|
public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
|
||||||
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
|
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Old signature (pre-Monitor) used in tests */
|
/** Old signature (pre-Monitor) used in tests */
|
||||||
private DDAgentWriter(final DDApi api) {
|
private DDAgentWriter(final DDAgentApi api) {
|
||||||
this(api, new Monitor.Noop());
|
this(api, new Monitor.Noop());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,7 +69,7 @@ public class DDAgentWriter implements Writer {
|
||||||
* @param flushFrequencySeconds value < 1 disables scheduled flushes
|
* @param flushFrequencySeconds value < 1 disables scheduled flushes
|
||||||
*/
|
*/
|
||||||
private DDAgentWriter(
|
private DDAgentWriter(
|
||||||
final DDApi api,
|
final DDAgentApi api,
|
||||||
final int disruptorSize,
|
final int disruptorSize,
|
||||||
final int senderQueueSize,
|
final int senderQueueSize,
|
||||||
final int flushFrequencySeconds) {
|
final int flushFrequencySeconds) {
|
||||||
|
@ -76,7 +78,7 @@ public class DDAgentWriter implements Writer {
|
||||||
|
|
||||||
// DQH - TODO - Update the tests & remove this
|
// DQH - TODO - Update the tests & remove this
|
||||||
private DDAgentWriter(
|
private DDAgentWriter(
|
||||||
final DDApi api,
|
final DDAgentApi api,
|
||||||
final Monitor monitor,
|
final Monitor monitor,
|
||||||
final int disruptorSize,
|
final int disruptorSize,
|
||||||
final int flushFrequencySeconds) {
|
final int flushFrequencySeconds) {
|
||||||
|
@ -84,12 +86,13 @@ public class DDAgentWriter implements Writer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DQH - TODO - Update the tests & remove this
|
// DQH - TODO - Update the tests & remove this
|
||||||
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
|
private DDAgentWriter(
|
||||||
|
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
|
||||||
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
|
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DDAgentWriter(
|
private DDAgentWriter(
|
||||||
final DDApi api,
|
final DDAgentApi api,
|
||||||
final Monitor monitor,
|
final Monitor monitor,
|
||||||
final int disruptorSize,
|
final int disruptorSize,
|
||||||
final int senderQueueSize,
|
final int senderQueueSize,
|
||||||
|
@ -147,7 +150,7 @@ public class DDAgentWriter implements Writer {
|
||||||
traceCount.incrementAndGet();
|
traceCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
public DDApi getApi() {
|
public DDAgentApi getApi() {
|
||||||
return api;
|
return api;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package datadog.trace.common.writer;
|
||||||
|
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.trace.api.Config;
|
import datadog.trace.api.Config;
|
||||||
|
import datadog.trace.common.writer.ddagent.DDAgentApi;
|
||||||
import datadog.trace.common.writer.ddagent.Monitor;
|
import datadog.trace.common.writer.ddagent.Monitor;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -66,8 +67,8 @@ public interface Writer extends Closeable {
|
||||||
return new DDAgentWriter(createApi(config), createMonitor(config));
|
return new DDAgentWriter(createApi(config), createMonitor(config));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DDApi createApi(final Config config) {
|
private static DDAgentApi createApi(final Config config) {
|
||||||
return new DDApi(
|
return new DDAgentApi(
|
||||||
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
|
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package datadog.trace.common.writer;
|
package datadog.trace.common.writer.ddagent;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonParseException;
|
import com.fasterxml.jackson.core.JsonParseException;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
@ -28,7 +28,7 @@ import org.msgpack.jackson.dataformat.MessagePackFactory;
|
||||||
|
|
||||||
/** The API pointing to a DD agent */
|
/** The API pointing to a DD agent */
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class DDApi {
|
public class DDAgentApi {
|
||||||
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
|
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
|
||||||
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
|
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
|
||||||
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
|
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
|
||||||
|
@ -53,7 +53,7 @@ public class DDApi {
|
||||||
private final OkHttpClient httpClient;
|
private final OkHttpClient httpClient;
|
||||||
private final HttpUrl tracesUrl;
|
private final HttpUrl tracesUrl;
|
||||||
|
|
||||||
public DDApi(final String host, final int port, final String unixDomainSocketPath) {
|
public DDAgentApi(final String host, final int port, final String unixDomainSocketPath) {
|
||||||
this(
|
this(
|
||||||
host,
|
host,
|
||||||
port,
|
port,
|
||||||
|
@ -61,7 +61,7 @@ public class DDApi {
|
||||||
unixDomainSocketPath);
|
unixDomainSocketPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
DDApi(
|
DDAgentApi(
|
||||||
final String host,
|
final String host,
|
||||||
final int port,
|
final int port,
|
||||||
final boolean v4EndpointsAvailable,
|
final boolean v4EndpointsAvailable,
|
||||||
|
@ -89,7 +89,7 @@ public class DDApi {
|
||||||
* @return a Response object -- encapsulating success of communication, sending, and result
|
* @return a Response object -- encapsulating success of communication, sending, and result
|
||||||
* parsing
|
* parsing
|
||||||
*/
|
*/
|
||||||
public Response sendTraces(final List<List<DDSpan>> traces) {
|
Response sendTraces(final List<List<DDSpan>> traces) {
|
||||||
final List<byte[]> serializedTraces = new ArrayList<>(traces.size());
|
final List<byte[]> serializedTraces = new ArrayList<>(traces.size());
|
||||||
int sizeInBytes = 0;
|
int sizeInBytes = 0;
|
||||||
for (final List<DDSpan> trace : traces) {
|
for (final List<DDSpan> trace : traces) {
|
||||||
|
@ -107,11 +107,11 @@ public class DDApi {
|
||||||
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
|
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
|
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
|
||||||
return OBJECT_MAPPER.writeValueAsBytes(trace);
|
return OBJECT_MAPPER.writeValueAsBytes(trace);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Response sendSerializedTraces(
|
Response sendSerializedTraces(
|
||||||
final int representativeCount, final Integer sizeInBytes, final List<byte[]> traces) {
|
final int representativeCount, final Integer sizeInBytes, final List<byte[]> traces) {
|
||||||
try {
|
try {
|
||||||
final RequestBody body =
|
final RequestBody body =
|
|
@ -6,19 +6,22 @@ import com.lmax.disruptor.EventTranslatorOneArg;
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class DisruptorEvent<T> {
|
class DisruptorEvent<T> {
|
||||||
public volatile boolean shouldFlush = false;
|
public volatile boolean shouldFlush = false;
|
||||||
public volatile T data = null;
|
public volatile T data = null;
|
||||||
|
|
||||||
public static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
|
static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
|
||||||
@Override
|
@Override
|
||||||
public DisruptorEvent<T> newInstance() {
|
public DisruptorEvent<T> newInstance() {
|
||||||
return new DisruptorEvent<>();
|
return new DisruptorEvent<>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TraceTranslator
|
static class TraceTranslator
|
||||||
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
|
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
|
||||||
|
static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
|
||||||
|
new DisruptorEvent.TraceTranslator();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void translateTo(
|
public void translateTo(
|
||||||
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
|
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
|
||||||
|
@ -26,7 +29,10 @@ public class DisruptorEvent<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> {
|
static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> {
|
||||||
|
static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
|
||||||
|
new DisruptorEvent.FlushTranslator();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) {
|
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) {
|
||||||
event.shouldFlush = true;
|
event.shouldFlush = true;
|
||||||
|
|
|
@ -5,7 +5,6 @@ import com.timgroup.statsd.StatsDClient;
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.opentracing.DDTraceOTInfo;
|
import datadog.opentracing.DDTraceOTInfo;
|
||||||
import datadog.trace.common.writer.DDAgentWriter;
|
import datadog.trace.common.writer.DDAgentWriter;
|
||||||
import datadog.trace.common.writer.DDApi;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -43,13 +42,13 @@ public interface Monitor {
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response);
|
final DDAgentApi.Response response);
|
||||||
|
|
||||||
void onFailedSend(
|
void onFailedSend(
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response);
|
final DDAgentApi.Response response);
|
||||||
|
|
||||||
final class StatsD implements Monitor {
|
final class StatsD implements Monitor {
|
||||||
public static final String PREFIX = "datadog.tracer";
|
public static final String PREFIX = "datadog.tracer";
|
||||||
|
@ -138,7 +137,7 @@ public interface Monitor {
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response) {
|
final DDAgentApi.Response response) {
|
||||||
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
|
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -147,7 +146,7 @@ public interface Monitor {
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response) {
|
final DDAgentApi.Response response) {
|
||||||
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
|
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,7 +154,7 @@ public interface Monitor {
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response) {
|
final DDAgentApi.Response response) {
|
||||||
statsd.incrementCounter("api.requests");
|
statsd.incrementCounter("api.requests");
|
||||||
statsd.recordGaugeValue("queue.length", representativeCount);
|
statsd.recordGaugeValue("queue.length", representativeCount);
|
||||||
// TODO: missing queue.spans (# of spans being sent)
|
// TODO: missing queue.spans (# of spans being sent)
|
||||||
|
@ -215,14 +214,14 @@ public interface Monitor {
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response) {}
|
final DDAgentApi.Response response) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailedSend(
|
public void onFailedSend(
|
||||||
final DDAgentWriter agentWriter,
|
final DDAgentWriter agentWriter,
|
||||||
final int representativeCount,
|
final int representativeCount,
|
||||||
final int sizeInBytes,
|
final int sizeInBytes,
|
||||||
final DDApi.Response response) {}
|
final DDAgentApi.Response response) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
|
@ -4,7 +4,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.lmax.disruptor.EventHandler;
|
import com.lmax.disruptor.EventHandler;
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.trace.common.writer.DDAgentWriter;
|
import datadog.trace.common.writer.DDAgentWriter;
|
||||||
import datadog.trace.common.writer.DDApi;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
@ -74,7 +73,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
|
||||||
}
|
}
|
||||||
if (writer.scheduledWriterExecutor.isShutdown()) {
|
if (writer.scheduledWriterExecutor.isShutdown()) {
|
||||||
writer.monitor.onFailedSend(
|
writer.monitor.onFailedSend(
|
||||||
writer, traceCount.get(), payloadSize, DDApi.Response.failed(-1));
|
writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
|
||||||
writer.apiPhaser.arrive(); // Allow flush to return
|
writer.apiPhaser.arrive(); // Allow flush to return
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -93,7 +92,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
|
||||||
senderSemaphore.acquire();
|
senderSemaphore.acquire();
|
||||||
} catch (final InterruptedException e) {
|
} catch (final InterruptedException e) {
|
||||||
writer.monitor.onFailedSend(
|
writer.monitor.onFailedSend(
|
||||||
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
|
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
|
||||||
|
|
||||||
// Finally, we'll schedule another flush
|
// Finally, we'll schedule another flush
|
||||||
// Any threads awaiting the flush will continue to wait
|
// Any threads awaiting the flush will continue to wait
|
||||||
|
@ -106,7 +105,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
|
||||||
senderSemaphore.release();
|
senderSemaphore.release();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final DDApi.Response response =
|
final DDAgentApi.Response response =
|
||||||
writer
|
writer
|
||||||
.getApi()
|
.getApi()
|
||||||
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
|
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
|
||||||
|
@ -132,7 +131,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
|
||||||
// However, just to be safe to start, create a failed Response to handle any
|
// However, just to be safe to start, create a failed Response to handle any
|
||||||
// spurious Throwable-s.
|
// spurious Throwable-s.
|
||||||
writer.monitor.onFailedSend(
|
writer.monitor.onFailedSend(
|
||||||
writer, representativeCount, sizeInBytes, DDApi.Response.failed(e));
|
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
|
||||||
} finally {
|
} finally {
|
||||||
writer.apiPhaser.arrive(); // Flush completed.
|
writer.apiPhaser.arrive(); // Flush completed.
|
||||||
}
|
}
|
||||||
|
@ -140,7 +139,7 @@ public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>>
|
||||||
});
|
});
|
||||||
} catch (final RejectedExecutionException ex) {
|
} catch (final RejectedExecutionException ex) {
|
||||||
writer.monitor.onFailedSend(
|
writer.monitor.onFailedSend(
|
||||||
writer, representativeCount, sizeInBytes, DDApi.Response.failed(ex));
|
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
|
||||||
writer.apiPhaser.arrive(); // Allow flush to return
|
writer.apiPhaser.arrive(); // Allow flush to return
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package datadog.trace.common.writer.ddagent;
|
package datadog.trace.common.writer.ddagent;
|
||||||
|
|
||||||
|
import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
|
||||||
|
import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
|
||||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
import com.lmax.disruptor.SleepingWaitStrategy;
|
import com.lmax.disruptor.SleepingWaitStrategy;
|
||||||
|
@ -20,10 +22,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
public class TraceSerializingDisruptor implements Closeable {
|
public class TraceSerializingDisruptor implements Closeable {
|
||||||
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
|
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
|
||||||
new DaemonThreadFactory("dd-trace-disruptor");
|
new DaemonThreadFactory("dd-trace-disruptor");
|
||||||
private static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
|
|
||||||
new DisruptorEvent.TraceTranslator();
|
|
||||||
private static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
|
|
||||||
new DisruptorEvent.FlushTranslator();
|
|
||||||
private final FlushTask flushTask = new FlushTask();
|
private final FlushTask flushTask = new FlushTask();
|
||||||
|
|
||||||
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
|
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
|
||||||
|
|
|
@ -3,8 +3,8 @@ package datadog.trace.api.writer
|
||||||
import com.fasterxml.jackson.core.type.TypeReference
|
import com.fasterxml.jackson.core.type.TypeReference
|
||||||
import com.fasterxml.jackson.databind.JsonNode
|
import com.fasterxml.jackson.databind.JsonNode
|
||||||
import datadog.opentracing.SpanFactory
|
import datadog.opentracing.SpanFactory
|
||||||
import datadog.trace.common.writer.DDApi
|
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||||
import datadog.trace.common.writer.DDApi.ResponseListener
|
import datadog.trace.common.writer.ddagent.DDAgentApi.ResponseListener
|
||||||
import datadog.trace.util.test.DDSpecification
|
import datadog.trace.util.test.DDSpecification
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
@ -12,8 +12,8 @@ import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
|
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
|
||||||
|
|
||||||
class DDApiTest extends DDSpecification {
|
class DDAgentApiTest extends DDSpecification {
|
||||||
static mapper = DDApi.OBJECT_MAPPER
|
static mapper = DDAgentApi.OBJECT_MAPPER
|
||||||
|
|
||||||
def "sending an empty list of traces returns no errors"() {
|
def "sending an empty list of traces returns no errors"() {
|
||||||
setup:
|
setup:
|
||||||
|
@ -30,7 +30,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", agent.address.port, null)
|
def client = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
|
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
|
||||||
|
@ -51,7 +51,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", agent.address.port, null)
|
def client = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces"
|
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces"
|
||||||
|
@ -73,7 +73,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", agent.address.port, null)
|
def client = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
|
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
|
||||||
|
@ -136,7 +136,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", agent.address.port, null)
|
def client = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
client.addResponseListener(responseListener)
|
client.addResponseListener(responseListener)
|
||||||
|
|
||||||
when:
|
when:
|
||||||
|
@ -162,7 +162,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", v3Agent.address.port, null)
|
def client = new DDAgentApi("localhost", v3Agent.address.port, null)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces"
|
client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces"
|
||||||
|
@ -189,7 +189,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def port = badPort ? 999 : agent.address.port
|
def port = badPort ? 999 : agent.address.port
|
||||||
def client = new DDApi("localhost", port, null)
|
def client = new DDAgentApi("localhost", port, null)
|
||||||
|
|
||||||
expect:
|
expect:
|
||||||
client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces"
|
client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces"
|
||||||
|
@ -216,7 +216,7 @@ class DDApiTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def client = new DDApi("localhost", agent.address.port, null)
|
def client = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
when:
|
when:
|
||||||
def success = client.sendTraces(traces).success()
|
def success = client.sendTraces(traces).success()
|
|
@ -7,7 +7,7 @@ import datadog.opentracing.DDTracer
|
||||||
import datadog.opentracing.PendingTrace
|
import datadog.opentracing.PendingTrace
|
||||||
import datadog.trace.api.sampling.PrioritySampling
|
import datadog.trace.api.sampling.PrioritySampling
|
||||||
import datadog.trace.common.writer.DDAgentWriter
|
import datadog.trace.common.writer.DDAgentWriter
|
||||||
import datadog.trace.common.writer.DDApi
|
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||||
import datadog.trace.common.writer.ddagent.Monitor
|
import datadog.trace.common.writer.ddagent.Monitor
|
||||||
import datadog.trace.common.writer.ddagent.TraceConsumer
|
import datadog.trace.common.writer.ddagent.TraceConsumer
|
||||||
import datadog.trace.util.test.DDSpecification
|
import datadog.trace.util.test.DDSpecification
|
||||||
|
@ -24,7 +24,7 @@ import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
|
||||||
@Timeout(20)
|
@Timeout(20)
|
||||||
class DDAgentWriterTest extends DDSpecification {
|
class DDAgentWriterTest extends DDSpecification {
|
||||||
|
|
||||||
def api = Mock(DDApi)
|
def api = Mock(DDAgentApi)
|
||||||
|
|
||||||
def "test happy path"() {
|
def "test happy path"() {
|
||||||
setup:
|
setup:
|
||||||
|
@ -180,7 +180,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
Mock(DDTracer))
|
Mock(DDTracer))
|
||||||
minimalSpan = new DDSpan(0, minimalContext)
|
minimalSpan = new DDSpan(0, minimalContext)
|
||||||
minimalTrace = [minimalSpan]
|
minimalTrace = [minimalSpan]
|
||||||
traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
|
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
|
||||||
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
|
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,7 +253,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def api = new DDApi("localhost", agent.address.port, null)
|
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
def monitor = Mock(Monitor)
|
def monitor = Mock(Monitor)
|
||||||
def writer = new DDAgentWriter(api, monitor)
|
def writer = new DDAgentWriter(api, monitor)
|
||||||
|
|
||||||
|
@ -302,7 +302,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def api = new DDApi("localhost", agent.address.port, null)
|
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
def monitor = Mock(Monitor)
|
def monitor = Mock(Monitor)
|
||||||
def writer = new DDAgentWriter(api, monitor)
|
def writer = new DDAgentWriter(api, monitor)
|
||||||
|
|
||||||
|
@ -336,13 +336,13 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
setup:
|
setup:
|
||||||
def minimalTrace = createMinimalTrace()
|
def minimalTrace = createMinimalTrace()
|
||||||
|
|
||||||
def api = new DDApi("localhost", 8192, null) {
|
def api = new DDAgentApi("localhost", 8192, null) {
|
||||||
DDApi.Response sendSerializedTraces(
|
DDAgentApi.Response sendSerializedTraces(
|
||||||
int representativeCount,
|
int representativeCount,
|
||||||
Integer sizeInBytes,
|
Integer sizeInBytes,
|
||||||
List<byte[]> traces) {
|
List<byte[]> traces) {
|
||||||
// simulating a communication failure to a server
|
// simulating a communication failure to a server
|
||||||
return DDApi.Response.failed(new IOException("comm error"))
|
return DDAgentApi.Response.failed(new IOException("comm error"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def monitor = Mock(Monitor)
|
def monitor = Mock(Monitor)
|
||||||
|
@ -400,7 +400,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def api = new DDApi("localhost", agent.address.port, null)
|
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
// This test focuses just on failed publish, so not verifying every callback
|
// This test focuses just on failed publish, so not verifying every callback
|
||||||
def monitor = Stub(Monitor)
|
def monitor = Stub(Monitor)
|
||||||
|
@ -505,7 +505,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def api = new DDApi("localhost", agent.address.port, null)
|
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
// This test focuses just on failed publish, so not verifying every callback
|
// This test focuses just on failed publish, so not verifying every callback
|
||||||
def monitor = Stub(Monitor)
|
def monitor = Stub(Monitor)
|
||||||
|
@ -566,7 +566,7 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
def api = new DDApi("localhost", agent.address.port, null)
|
def api = new DDAgentApi("localhost", agent.address.port, null)
|
||||||
|
|
||||||
def statsd = Stub(StatsDClient)
|
def statsd = Stub(StatsDClient)
|
||||||
statsd.incrementCounter("queue.accepted") >> { stat ->
|
statsd.incrementCounter("queue.accepted") >> { stat ->
|
||||||
|
@ -606,13 +606,13 @@ class DDAgentWriterTest extends DDSpecification {
|
||||||
def minimalTrace = createMinimalTrace()
|
def minimalTrace = createMinimalTrace()
|
||||||
|
|
||||||
// DQH -- need to set-up a dummy agent for the final send callback to work
|
// DQH -- need to set-up a dummy agent for the final send callback to work
|
||||||
def api = new DDApi("localhost", 8192, null) {
|
def api = new DDAgentApi("localhost", 8192, null) {
|
||||||
DDApi.Response sendSerializedTraces(
|
DDAgentApi.Response sendSerializedTraces(
|
||||||
int representativeCount,
|
int representativeCount,
|
||||||
Integer sizeInBytes,
|
Integer sizeInBytes,
|
||||||
List<byte[]> traces) {
|
List<byte[]> traces) {
|
||||||
// simulating a communication failure to a server
|
// simulating a communication failure to a server
|
||||||
return DDApi.Response.failed(new IOException("comm error"))
|
return DDAgentApi.Response.failed(new IOException("comm error"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,8 +4,8 @@ import datadog.opentracing.DDSpanContext
|
||||||
import datadog.opentracing.DDTracer
|
import datadog.opentracing.DDTracer
|
||||||
import datadog.opentracing.PendingTrace
|
import datadog.opentracing.PendingTrace
|
||||||
import datadog.trace.api.sampling.PrioritySampling
|
import datadog.trace.api.sampling.PrioritySampling
|
||||||
import datadog.trace.common.writer.DDApi
|
|
||||||
import datadog.trace.common.writer.ListWriter
|
import datadog.trace.common.writer.ListWriter
|
||||||
|
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||||
import datadog.trace.util.test.DDSpecification
|
import datadog.trace.util.test.DDSpecification
|
||||||
import org.testcontainers.containers.GenericContainer
|
import org.testcontainers.containers.GenericContainer
|
||||||
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
|
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
|
||||||
|
@ -20,7 +20,7 @@ class DDApiIntegrationTest {
|
||||||
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
|
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
|
||||||
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
|
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
|
||||||
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
||||||
static class DDApiIntegrationV4Test extends DDSpecification {
|
static class DDAgentApiIntegrationV4Test extends DDSpecification {
|
||||||
static final WRITER = new ListWriter()
|
static final WRITER = new ListWriter()
|
||||||
static final TRACER = new DDTracer(WRITER)
|
static final TRACER = new DDTracer(WRITER)
|
||||||
static final CONTEXT = new DDSpanContext(
|
static final CONTEXT = new DDSpanContext(
|
||||||
|
@ -64,7 +64,7 @@ class DDApiIntegrationTest {
|
||||||
def endpoint = new AtomicReference<String>(null)
|
def endpoint = new AtomicReference<String>(null)
|
||||||
def agentResponse = new AtomicReference<String>(null)
|
def agentResponse = new AtomicReference<String>(null)
|
||||||
|
|
||||||
DDApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
|
DDAgentApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
|
||||||
endpoint.set(receivedEndpoint)
|
endpoint.set(receivedEndpoint)
|
||||||
agentResponse.set(responseJson.toString())
|
agentResponse.set(responseJson.toString())
|
||||||
}
|
}
|
||||||
|
@ -109,10 +109,10 @@ class DDApiIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
def setup() {
|
def setup() {
|
||||||
api = new DDApi(agentContainerHost, agentContainerPort, v4(), null)
|
api = new DDAgentApi(agentContainerHost, agentContainerPort, v4(), null)
|
||||||
api.addResponseListener(responseListener)
|
api.addResponseListener(responseListener)
|
||||||
|
|
||||||
unixDomainSocketApi = new DDApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
|
unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
|
||||||
unixDomainSocketApi.addResponseListener(responseListener)
|
unixDomainSocketApi.addResponseListener(responseListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ class DDApiIntegrationTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
||||||
static class DDApiIntegrationV3Test extends DDApiIntegrationV4Test {
|
static class DDAgentApiIntegrationV3Test extends DDAgentApiIntegrationV4Test {
|
||||||
boolean v4() {
|
boolean v4() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue