Merge pull request #192 from DataDog/ark/priority_sampling

Priority Sampling
This commit is contained in:
Andrew Kent 2018-01-22 11:47:51 -08:00 committed by GitHub
commit b60dbb94f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 784 additions and 422 deletions

View File

@ -49,14 +49,13 @@ The Java Agent—once passed to your application—automatically traces requests
#### Configuration #### Configuration
| Config | System Property | Environment Variable | Default | | Config | System Property | Environment Variable | Default |
| ------------- | ---------------- | -------------------- | ------------------ | | ------------------ | --------------------- | ------------------------- | ------------------ |
| service.name | dd.service.name | DD_SERVICE_NAME | `unnamed-java-app` | | service.name | dd.service.name | DD_SERVICE_NAME | `unnamed-java-app` |
| writer.type | dd.writer.type | DD_WRITER_TYPE | `DDAgentWriter` | | writer.type | dd.writer.type | DD_WRITER_TYPE | `DDAgentWriter` |
| agent.host | dd.agent.host | DD_AGENT_HOST | `localhost` | | agent.host | dd.agent.host | DD_AGENT_HOST | `localhost` |
| agent.port | dd.agent.port | DD_AGENT_PORT | `8126` | | agent.port | dd.agent.port | DD_AGENT_PORT | `8126` |
| sampler.type | dd.sampler.type | DD_SAMPLER_TYPE | `AllSampler` | | priority.sampling | dd.priority.sampling | DD_PRIORITY_SAMPLING | `false` |
| sampler.rate | dd.sampler.rate | DD_SAMPLER_RATE | `1.0` |
#### Application Servers #### Application Servers

View File

@ -14,6 +14,7 @@ whitelistedInstructionClasses += whitelistedBranchClasses += [
'datadog.trace.common.writer.ListWriter', 'datadog.trace.common.writer.ListWriter',
'datadog.trace.common.util.Clock', 'datadog.trace.common.util.Clock',
'datadog.trace.api.DDTags', 'datadog.trace.api.DDTags',
'datadog.trace.common.sampling.PrioritySampling'
] ]
dependencies { dependencies {

View File

@ -2,7 +2,10 @@ package datadog.opentracing;
import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import datadog.trace.common.sampling.PrioritySampling;
import datadog.trace.common.util.Clock; import datadog.trace.common.util.Clock;
import io.opentracing.Span; import io.opentracing.Span;
import java.io.PrintWriter; import java.io.PrintWriter;
@ -94,7 +97,8 @@ public class DDSpan implements Span {
* *
* @return true if root, false otherwise * @return true if root, false otherwise
*/ */
protected final boolean isRootSpan() { @JsonIgnore
public final boolean isRootSpan() {
if (context().getTrace().isEmpty()) { if (context().getTrace().isEmpty()) {
return false; return false;
@ -236,6 +240,16 @@ public class DDSpan implements Span {
return this; return this;
} }
/**
* Set the sampling priority of the span.
*
* <p>Has no effect if the span priority has been propagated (injected or extracted).
*/
public final DDSpan setSamplingPriority(int newPriority) {
this.context().setSamplingPriority(newPriority);
return this;
}
public final DDSpan setSpanType(final String type) { public final DDSpan setSpanType(final String type) {
this.context().setSpanType(type); this.context().setSpanType(type);
return this; return this;
@ -300,6 +314,17 @@ public class DDSpan implements Span {
return context.getOperationName(); return context.getOperationName();
} }
@JsonGetter("sampling_priority")
@JsonInclude(Include.NON_NULL)
public Integer getSamplingPriority() {
final int samplingPriority = context.getSamplingPriority();
if (samplingPriority == PrioritySampling.UNSET) {
return null;
} else {
return samplingPriority;
}
}
@JsonIgnore @JsonIgnore
public Map<String, Object> getTags() { public Map<String, Object> getTags() {
return this.context().getTags(); return this.context().getTags();

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import datadog.opentracing.decorators.AbstractDecorator; import datadog.opentracing.decorators.AbstractDecorator;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import datadog.trace.common.sampling.PrioritySampling;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -49,6 +50,10 @@ public class DDSpanContext implements io.opentracing.SpanContext {
private String spanType; private String spanType;
/** Each span have an operation name describing the current span */ /** Each span have an operation name describing the current span */
private String operationName; private String operationName;
/** The sampling priority of the trace */
private volatile int samplingPriority = PrioritySampling.UNSET;
/** When true, the samplingPriority cannot be changed. */
private volatile boolean samplingPriorityLocked = false;
// Others attributes // Others attributes
/** Tags are associated to the current span, they will not propagate to the children span */ /** Tags are associated to the current span, they will not propagate to the children span */
private Map<String, Object> tags; private Map<String, Object> tags;
@ -60,6 +65,7 @@ public class DDSpanContext implements io.opentracing.SpanContext {
final String serviceName, final String serviceName,
final String operationName, final String operationName,
final String resourceName, final String resourceName,
final int samplingPriority,
final Map<String, String> baggageItems, final Map<String, String> baggageItems,
final boolean errorFlag, final boolean errorFlag,
final String spanType, final String spanType,
@ -80,6 +86,7 @@ public class DDSpanContext implements io.opentracing.SpanContext {
this.serviceName = serviceName; this.serviceName = serviceName;
this.operationName = operationName; this.operationName = operationName;
this.resourceName = resourceName; this.resourceName = resourceName;
this.samplingPriority = samplingPriority;
this.errorFlag = errorFlag; this.errorFlag = errorFlag;
this.spanType = spanType; this.spanType = spanType;
@ -141,6 +148,46 @@ public class DDSpanContext implements io.opentracing.SpanContext {
this.spanType = spanType; this.spanType = spanType;
} }
public void setSamplingPriority(int newPriority) {
if (samplingPriorityLocked) {
log.warn(
"samplingPriority locked at {}. Refusing to set to {}", samplingPriority, newPriority);
} else {
synchronized (this) {
// sync with lockSamplingPriority
this.samplingPriority = newPriority;
}
}
}
public int getSamplingPriority() {
return samplingPriority;
}
/**
* Prevent future changes to the context's sampling priority.
*
* <p>Used when a span is extracted or injected for propagation.
*
* <p>Has no effect if the sampling priority is unset.
*
* @return true if the sampling priority was locked.
*/
public boolean lockSamplingPriority() {
if (!samplingPriorityLocked) {
synchronized (this) {
// sync with setSamplingPriority
if (samplingPriority == PrioritySampling.UNSET) {
log.debug("{} : refusing to lock unset samplingPriority", this);
} else {
this.samplingPriorityLocked = true;
log.debug("{} : locked samplingPriority to {}", this, this.samplingPriority);
}
}
}
return samplingPriorityLocked;
}
public void setBaggageItem(final String key, final String value) { public void setBaggageItem(final String key, final String value) {
if (this.baggageItems.isEmpty()) { if (this.baggageItems.isEmpty()) {
this.baggageItems = new HashMap<>(); this.baggageItems = new HashMap<>();
@ -248,6 +295,9 @@ public class DDSpanContext implements io.opentracing.SpanContext {
.append(getOperationName()) .append(getOperationName())
.append("/") .append("/")
.append(getResourceName()); .append(getResourceName());
if (getSamplingPriority() != PrioritySampling.UNSET) {
s.append(" samplingPriority=").append(getSamplingPriority());
}
if (errorFlag) { if (errorFlag) {
s.append(" *errored*"); s.append(" *errored*");
} }

View File

@ -9,7 +9,12 @@ import datadog.trace.api.DDTags;
import datadog.trace.common.DDTraceConfig; import datadog.trace.common.DDTraceConfig;
import datadog.trace.common.Service; import datadog.trace.common.Service;
import datadog.trace.common.sampling.AllSampler; import datadog.trace.common.sampling.AllSampler;
import datadog.trace.common.sampling.PrioritySampling;
import datadog.trace.common.sampling.RateByServiceSampler;
import datadog.trace.common.sampling.Sampler; import datadog.trace.common.sampling.Sampler;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.DDApi;
import datadog.trace.common.writer.DDApi.ResponseListener;
import datadog.trace.common.writer.Writer; import datadog.trace.common.writer.Writer;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.ScopeManager; import io.opentracing.ScopeManager;
@ -78,6 +83,10 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
registry = new CodecRegistry(); registry = new CodecRegistry();
registry.register(Format.Builtin.HTTP_HEADERS, new HTTPCodec()); registry.register(Format.Builtin.HTTP_HEADERS, new HTTPCodec());
registry.register(Format.Builtin.TEXT_MAP, new HTTPCodec()); registry.register(Format.Builtin.TEXT_MAP, new HTTPCodec());
if (this.writer instanceof DDAgentWriter && sampler instanceof DDApi.ResponseListener) {
final DDApi api = ((DDAgentWriter) this.writer).getApi();
api.addResponseListener((DDApi.ResponseListener) this.sampler);
}
log.info("New instance: {}", this); log.info("New instance: {}", this);
} }
@ -139,7 +148,6 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
@Override @Override
public <T> SpanContext extract(final Format<T> format, final T carrier) { public <T> SpanContext extract(final Format<T> format, final T carrier) {
final Codec<T> codec = registry.get(format); final Codec<T> codec = registry.get(format);
if (codec == null) { if (codec == null) {
log.warn("Unsupported format for propagation - {}", format.getClass().getName()); log.warn("Unsupported format for propagation - {}", format.getClass().getName());
@ -250,7 +258,11 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
} }
private DDSpan startSpan() { private DDSpan startSpan() {
return new DDSpan(this.timestamp, buildSpanContext()); final DDSpan span = new DDSpan(this.timestamp, buildSpanContext());
if (DDTracer.this.sampler instanceof RateByServiceSampler) {
((RateByServiceSampler) DDTracer.this.sampler).initializeSamplingPriority(span);
}
return span;
} }
@Override @Override
@ -373,6 +385,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
final long parentSpanId; final long parentSpanId;
final Map<String, String> baggage; final Map<String, String> baggage;
final Queue<DDSpan> parentTrace; final Queue<DDSpan> parentTrace;
final int samplingPriority;
final DDSpanContext context; final DDSpanContext context;
SpanContext parentContext = this.parent; SpanContext parentContext = this.parent;
@ -388,6 +401,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
parentSpanId = ddsc.getSpanId(); parentSpanId = ddsc.getSpanId();
baggage = ddsc.getBaggageItems(); baggage = ddsc.getBaggageItems();
parentTrace = ddsc.getTrace(); parentTrace = ddsc.getTrace();
samplingPriority = ddsc.getSamplingPriority();
if (this.serviceName == null) this.serviceName = ddsc.getServiceName(); if (this.serviceName == null) this.serviceName = ddsc.getServiceName();
if (this.spanType == null) this.spanType = ddsc.getSpanType(); if (this.spanType == null) this.spanType = ddsc.getSpanType();
@ -396,6 +410,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
parentSpanId = 0L; parentSpanId = 0L;
baggage = null; baggage = null;
parentTrace = null; parentTrace = null;
samplingPriority = PrioritySampling.UNSET;
} }
if (serviceName == null) { if (serviceName == null) {
@ -416,6 +431,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
serviceName, serviceName,
operationName, operationName,
this.resourceName, this.resourceName,
samplingPriority,
baggage, baggage,
errorFlag, errorFlag,
spanType, spanType,

View File

@ -1,6 +1,7 @@
package datadog.opentracing.propagation; package datadog.opentracing.propagation;
import datadog.opentracing.DDSpanContext; import datadog.opentracing.DDSpanContext;
import datadog.trace.common.sampling.PrioritySampling;
import io.opentracing.propagation.TextMap; import io.opentracing.propagation.TextMap;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URLDecoder; import java.net.URLDecoder;
@ -17,12 +18,15 @@ public class HTTPCodec implements Codec<TextMap> {
private static final String OT_BAGGAGE_PREFIX = "ot-baggage-"; private static final String OT_BAGGAGE_PREFIX = "ot-baggage-";
private static final String TRACE_ID_KEY = "x-datadog-trace-id"; private static final String TRACE_ID_KEY = "x-datadog-trace-id";
private static final String SPAN_ID_KEY = "x-datadog-parent-id"; private static final String SPAN_ID_KEY = "x-datadog-parent-id";
private static final String SAMPLING_PRIORITY_KEY = "x-datadog-sampling-priority";
@Override @Override
public void inject(final DDSpanContext context, final TextMap carrier) { public void inject(final DDSpanContext context, final TextMap carrier) {
carrier.put(TRACE_ID_KEY, String.valueOf(context.getTraceId())); carrier.put(TRACE_ID_KEY, String.valueOf(context.getTraceId()));
carrier.put(SPAN_ID_KEY, String.valueOf(context.getSpanId())); carrier.put(SPAN_ID_KEY, String.valueOf(context.getSpanId()));
if (context.lockSamplingPriority()) {
carrier.put(SAMPLING_PRIORITY_KEY, String.valueOf(context.getSamplingPriority()));
}
for (final Map.Entry<String, String> entry : context.baggageItems()) { for (final Map.Entry<String, String> entry : context.baggageItems()) {
carrier.put(OT_BAGGAGE_PREFIX + entry.getKey(), encode(entry.getValue())); carrier.put(OT_BAGGAGE_PREFIX + entry.getKey(), encode(entry.getValue()));
@ -35,9 +39,9 @@ public class HTTPCodec implements Codec<TextMap> {
Map<String, String> baggage = Collections.emptyMap(); Map<String, String> baggage = Collections.emptyMap();
Long traceId = 0L; Long traceId = 0L;
Long spanId = 0L; Long spanId = 0L;
int samplingPriority = PrioritySampling.UNSET;
for (final Map.Entry<String, String> entry : carrier) { for (final Map.Entry<String, String> entry : carrier) {
final String key = entry.getKey().toLowerCase(); final String key = entry.getKey().toLowerCase();
if (key.equalsIgnoreCase(TRACE_ID_KEY)) { if (key.equalsIgnoreCase(TRACE_ID_KEY)) {
traceId = Long.parseLong(entry.getValue()); traceId = Long.parseLong(entry.getValue());
@ -48,14 +52,28 @@ public class HTTPCodec implements Codec<TextMap> {
baggage = new HashMap<>(); baggage = new HashMap<>();
} }
baggage.put(key.replace(OT_BAGGAGE_PREFIX, ""), decode(entry.getValue())); baggage.put(key.replace(OT_BAGGAGE_PREFIX, ""), decode(entry.getValue()));
} else if (key.equalsIgnoreCase(SAMPLING_PRIORITY_KEY)) {
samplingPriority = Integer.parseInt(entry.getValue());
} }
} }
DDSpanContext context = null; DDSpanContext context = null;
if (traceId != 0L) { if (traceId != 0L) {
context = context =
new DDSpanContext( new DDSpanContext(
traceId, spanId, 0L, null, null, null, baggage, false, null, null, null, null); traceId,
spanId,
0L,
null,
null,
null,
samplingPriority,
baggage,
false,
null,
null,
null,
null);
context.lockSamplingPriority();
log.debug("{} - Parent context extracted", context); log.debug("{} - Parent context extracted", context);
} }

View File

@ -1,7 +1,6 @@
package datadog.trace.common; package datadog.trace.common;
import datadog.opentracing.DDTracer; import datadog.opentracing.DDTracer;
import datadog.trace.common.sampling.Sampler;
import datadog.trace.common.writer.DDAgentWriter; import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.Writer; import datadog.trace.common.writer.Writer;
import java.util.Properties; import java.util.Properties;
@ -23,15 +22,13 @@ public class DDTraceConfig extends Properties {
public static final String WRITER_TYPE = "writer.type"; public static final String WRITER_TYPE = "writer.type";
public static final String AGENT_HOST = "agent.host"; public static final String AGENT_HOST = "agent.host";
public static final String AGENT_PORT = "agent.port"; public static final String AGENT_PORT = "agent.port";
public static final String SAMPLER_TYPE = "sampler.type"; public static final String PRIORITY_SAMPLING = "priority.sampling";
public static final String SAMPLER_RATE = "sampler.rate";
private final String serviceName = getPropOrEnv(PREFIX + SERVICE_NAME); private final String serviceName = getPropOrEnv(PREFIX + SERVICE_NAME);
private final String writerType = getPropOrEnv(PREFIX + WRITER_TYPE); private final String writerType = getPropOrEnv(PREFIX + WRITER_TYPE);
private final String agentHost = getPropOrEnv(PREFIX + AGENT_HOST); private final String agentHost = getPropOrEnv(PREFIX + AGENT_HOST);
private final String agentPort = getPropOrEnv(PREFIX + AGENT_PORT); private final String agentPort = getPropOrEnv(PREFIX + AGENT_PORT);
private final String samplerType = getPropOrEnv(PREFIX + SAMPLER_TYPE); private final String prioritySampling = getPropOrEnv(PREFIX + PRIORITY_SAMPLING);
private final String samplerRate = getPropOrEnv(PREFIX + SAMPLER_RATE);
public DDTraceConfig() { public DDTraceConfig() {
super(); super();
@ -41,16 +38,13 @@ public class DDTraceConfig extends Properties {
defaults.setProperty(WRITER_TYPE, Writer.DD_AGENT_WRITER_TYPE); defaults.setProperty(WRITER_TYPE, Writer.DD_AGENT_WRITER_TYPE);
defaults.setProperty(AGENT_HOST, DDAgentWriter.DEFAULT_HOSTNAME); defaults.setProperty(AGENT_HOST, DDAgentWriter.DEFAULT_HOSTNAME);
defaults.setProperty(AGENT_PORT, String.valueOf(DDAgentWriter.DEFAULT_PORT)); defaults.setProperty(AGENT_PORT, String.valueOf(DDAgentWriter.DEFAULT_PORT));
defaults.setProperty(SAMPLER_TYPE, Sampler.ALL_SAMPLER_TYPE);
defaults.setProperty(SAMPLER_RATE, "1.0");
super.defaults = defaults; super.defaults = defaults;
setIfNotNull(SERVICE_NAME, serviceName); setIfNotNull(SERVICE_NAME, serviceName);
setIfNotNull(WRITER_TYPE, writerType); setIfNotNull(WRITER_TYPE, writerType);
setIfNotNull(AGENT_HOST, agentHost); setIfNotNull(AGENT_HOST, agentHost);
setIfNotNull(AGENT_PORT, agentPort); setIfNotNull(AGENT_PORT, agentPort);
setIfNotNull(SAMPLER_TYPE, samplerType); setIfNotNull(PRIORITY_SAMPLING, prioritySampling);
setIfNotNull(SAMPLER_RATE, samplerRate);
} }
public DDTraceConfig(final String serviceName) { public DDTraceConfig(final String serviceName) {

View File

@ -0,0 +1,20 @@
package datadog.trace.common.sampling;
public class PrioritySampling {
/**
* Implementation detail of the client. will not be sent to the agent or propagated.
*
* <p>Internal value used when the priority sampling flag has not been set on the span context.
*/
public static final int UNSET = Integer.MIN_VALUE;
/** The sampler has decided to drop the trace. */
public static final int SAMPLER_DROP = 0;
/** The sampler has decided to keep the trace. */
public static final int SAMPLER_KEEP = 1;
/** The user has decided to drop the trace. */
public static final int USER_DROP = -1;
/** The user has decided to keep the trace. */
public static final int USER_KEEP = 2;
private PrioritySampling() {}
}

View File

@ -0,0 +1,140 @@
package datadog.trace.common.sampling;
import com.fasterxml.jackson.databind.JsonNode;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDApi.ResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
/**
* A rate sampler which maintains different sample rates per service+env name.
*
* <p>The configuration of (serviceName,env)->rate is configured by the core agent.
*/
@Slf4j
public class RateByServiceSampler implements Sampler, ResponseListener {
/** Key for setting the baseline rate */
private static final String BASE_KEY = "service:,env:";
/** Sampler to use if service+env is not in the map */
private RateSampler baseSampler = new RateSampler(1.0);
private final Map<String, RateSampler> serviceRates = new HashMap<String, RateSampler>();
@Override
public synchronized boolean sample(DDSpan span) {
// Priority sampling sends all traces to the core agent, including traces marked dropped.
// This allows the core agent to collect stats on all traces.
return true;
}
/** If span is a root span, set the span context samplingPriority to keep or drop */
public void initializeSamplingPriority(DDSpan span) {
if (span.isRootSpan()) {
// Run the priority sampler on the new span
setSamplingPriorityOnSpanContext(span);
} else if (span.getSamplingPriority() == null) {
// Edge case: If the parent context did not set the priority, run the priority sampler.
// Happens when extracted http context did not send the priority header.
setSamplingPriorityOnSpanContext(span);
}
}
private synchronized void setSamplingPriorityOnSpanContext(DDSpan span) {
final String serviceName = span.getServiceName();
final String env = getSpanEnv(span);
final String key = "service:" + serviceName + ",env:" + env;
boolean agentSample;
if (serviceRates.containsKey(key)) {
agentSample = serviceRates.get(key).sample(span);
} else {
agentSample = baseSampler.sample(span);
}
if (agentSample) {
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
} else {
span.setSamplingPriority(PrioritySampling.SAMPLER_DROP);
}
}
private static String getSpanEnv(DDSpan span) {
return null == span.getTags().get("env") ? "" : String.valueOf(span.getTags().get("env"));
}
@Override
public void onResponse(String endpoint, JsonNode responseJson) {
JsonNode newServiceRates = responseJson.get("rate_by_service");
if (null != newServiceRates) {
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
synchronized (this) {
serviceRates.clear();
Iterator<String> itr = newServiceRates.fieldNames();
while (itr.hasNext()) {
final String key = itr.next();
try {
final float val = Float.parseFloat(newServiceRates.get(key).toString());
if (BASE_KEY.equals(key)) {
baseSampler = new RateSampler(val);
} else {
serviceRates.put(key, new RateSampler(val));
}
} catch (NumberFormatException nfe) {
log.debug("Unable to parse new service rate {} -> {}", key, newServiceRates.get(key));
}
}
}
}
}
/**
* This sampler sample the traces at a predefined rate.
*
* <p>Keep (100 * `sample_rate`)% of the traces. It samples randomly, its main purpose is to
* reduce the integration footprint.
*/
private static class RateSampler extends AbstractSampler {
/** The sample rate used */
private final double sampleRate;
public RateSampler(final String sampleRate) {
this(sampleRate == null ? 1 : Double.valueOf(sampleRate));
}
/**
* Build an instance of the sampler. The Sample rate is fixed for each instance.
*
* @param sampleRate a number [0,1] representing the rate ratio.
*/
public RateSampler(double sampleRate) {
if (sampleRate <= 0) {
sampleRate = 1;
log.error("SampleRate is negative or null, disabling the sampler");
} else if (sampleRate > 1) {
sampleRate = 1;
}
this.sampleRate = sampleRate;
log.debug("Initializing the RateSampler, sampleRate: {} %", this.sampleRate * 100);
}
@Override
public boolean doSample(final DDSpan span) {
final boolean sample = Math.random() <= this.sampleRate;
log.debug("{} - Span is sampled: {}", span, sample);
return sample;
}
public double getSampleRate() {
return this.sampleRate;
}
@Override
public String toString() {
return "RateSampler { sampleRate=" + sampleRate + " }";
}
}
}

View File

@ -1,57 +0,0 @@
package datadog.trace.common.sampling;
import com.google.auto.service.AutoService;
import datadog.opentracing.DDSpan;
import lombok.extern.slf4j.Slf4j;
/**
* This sampler sample the traces at a predefined rate.
*
* <p>Keep (100 * `sample_rate`)% of the traces. It samples randomly, its main purpose is to reduce
* the integration footprint.
*/
@Slf4j
@AutoService(Sampler.class)
public class RateSampler extends AbstractSampler {
/** The sample rate used */
private final double sampleRate;
public RateSampler(final String sampleRate) {
this(sampleRate == null ? 1 : Double.valueOf(sampleRate));
}
/**
* Build an instance of the sampler. The Sample rate is fixed for each instance.
*
* @param sampleRate a number [0,1] representing the rate ratio.
*/
public RateSampler(double sampleRate) {
if (sampleRate <= 0) {
sampleRate = 1;
log.error("SampleRate is negative or null, disabling the sampler");
} else if (sampleRate > 1) {
sampleRate = 1;
}
this.sampleRate = sampleRate;
log.debug("Initializing the RateSampler, sampleRate: {} %", this.sampleRate * 100);
}
@Override
public boolean doSample(final DDSpan span) {
final boolean sample = Math.random() <= this.sampleRate;
log.debug("{} - Span is sampled: {}", span, sample);
return sample;
}
public double getSampleRate() {
return this.sampleRate;
}
@Override
public String toString() {
return "RateSampler { sampleRate=" + sampleRate + " }";
}
}

View File

@ -3,12 +3,10 @@ package datadog.trace.common.sampling;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
import datadog.trace.common.DDTraceConfig; import datadog.trace.common.DDTraceConfig;
import java.util.Properties; import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** Main interface to sample a collection of traces. */ /** Main interface to sample a collection of traces. */
public interface Sampler { public interface Sampler {
static final String ALL_SAMPLER_TYPE = AllSampler.class.getSimpleName(); static final String ALL_SAMPLER_TYPE = AllSampler.class.getSimpleName();
static final String RATE_SAMPLER_TYPE = RateSampler.class.getSimpleName();
/** /**
* Sample a collection of traces based on the parent span * Sample a collection of traces based on the parent span
@ -18,26 +16,18 @@ public interface Sampler {
*/ */
boolean sample(DDSpan span); boolean sample(DDSpan span);
@Slf4j
final class Builder { final class Builder {
public static Sampler forConfig(final Properties config) { public static Sampler forConfig(final Properties config) {
final Sampler sampler; final Sampler sampler;
if (config != null) { if (config != null) {
final String configuredType = config.getProperty(DDTraceConfig.SAMPLER_TYPE); final boolean prioritySamplingEnabled =
if (RATE_SAMPLER_TYPE.equals(configuredType)) { Boolean.parseBoolean(config.getProperty(DDTraceConfig.PRIORITY_SAMPLING));
sampler = new RateSampler(config.getProperty(DDTraceConfig.SAMPLER_RATE)); if (prioritySamplingEnabled) {
} else if (ALL_SAMPLER_TYPE.equals(configuredType)) { sampler = new RateByServiceSampler();
sampler = new AllSampler();
} else { } else {
log.warn(
"Sampler type not configured correctly: Type {} not recognized. Defaulting to AllSampler.",
configuredType);
sampler = new AllSampler(); sampler = new AllSampler();
} }
} else { } else {
log.warn(
"Sampler type not configured correctly: No config provided! Defaulting to AllSampler.");
sampler = new AllSampler(); sampler = new AllSampler();
} }
return sampler; return sampler;

View File

@ -147,6 +147,10 @@ public class DDAgentWriter implements Writer {
return "DDAgentWriter { api=" + api + " }"; return "DDAgentWriter { api=" + api + " }";
} }
public DDApi getApi() {
return api;
}
/** Infinite tasks blocking until some spans come in the blocking queue. */ /** Infinite tasks blocking until some spans come in the blocking queue. */
class TracesSendingTask implements Runnable { class TracesSendingTask implements Runnable {

View File

@ -1,14 +1,19 @@
package datadog.trace.common.writer; package datadog.trace.common.writer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo; import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.Service; import datadog.trace.common.Service;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -19,12 +24,15 @@ import org.msgpack.jackson.dataformat.MessagePackFactory;
@Slf4j @Slf4j
public class DDApi { public class DDApi {
private static final String TRACES_ENDPOINT = "/v0.3/traces"; private static final String TRACES_ENDPOINT_V3 = "/v0.3/traces";
private static final String SERVICES_ENDPOINT = "/v0.3/services"; private static final String SERVICES_ENDPOINT_V3 = "/v0.3/services";
private static final String TRACES_ENDPOINT_V4 = "/v0.4/traces";
private static final String SERVICES_ENDPOINT_V4 = "/v0.4/services";
private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toSeconds(5); private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toSeconds(5);
private final String tracesEndpoint; private final String tracesEndpoint;
private final String servicesEndpoint; private final String servicesEndpoint;
private final List<ResponseListener> responseListeners = new ArrayList<ResponseListener>();
private final RateLimiter loggingRateLimiter = private final RateLimiter loggingRateLimiter =
RateLimiter.create(1.0 / SECONDS_BETWEEN_ERROR_LOG); RateLimiter.create(1.0 / SECONDS_BETWEEN_ERROR_LOG);
@ -32,8 +40,21 @@ public class DDApi {
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
public DDApi(final String host, final int port) { public DDApi(final String host, final int port) {
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT; if (endpointAvailable("http://" + host + ":" + port + TRACES_ENDPOINT_V4)
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT; && endpointAvailable("http://" + host + ":" + port + SERVICES_ENDPOINT_V4)) {
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V4;
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V4;
} else {
log.debug("API v0.4 endpoints not available. Downgrading to v0.3");
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V3;
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V3;
}
}
public void addResponseListener(ResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
} }
/** /**
@ -74,6 +95,21 @@ public class DDApi {
out.flush(); out.flush();
out.close(); out.close();
String responseString = null;
{
final BufferedReader responseReader =
new BufferedReader(new InputStreamReader(httpCon.getInputStream()));
final StringBuilder sb = new StringBuilder();
String line = null;
while ((line = responseReader.readLine()) != null) {
sb.append(line);
}
responseReader.close();
responseString = sb.toString();
}
final int responseCode = httpCon.getResponseCode(); final int responseCode = httpCon.getResponseCode();
if (responseCode != 200) { if (responseCode != 200) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -96,6 +132,19 @@ public class DDApi {
} }
log.debug("Succesfully sent {} {} to the DD agent.", size, type); log.debug("Succesfully sent {} {} to the DD agent.", size, type);
try {
if (null != responseString
&& !"".equals(responseString.trim())
&& !"OK".equalsIgnoreCase(responseString.trim())) {
JsonNode response = objectMapper.readTree(responseString);
for (ResponseListener listener : responseListeners) {
listener.onResponse(endpoint, response);
}
}
} catch (IOException e) {
log.debug("failed to parse DD agent response: " + responseString, e);
}
return true; return true;
} catch (final IOException e) { } catch (final IOException e) {
@ -114,11 +163,24 @@ public class DDApi {
} }
} }
private boolean endpointAvailable(final String endpoint) {
try {
final HttpURLConnection httpCon = getHttpURLConnection(endpoint);
OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream());
out.flush();
out.close();
return httpCon.getResponseCode() == 200;
} catch (IOException e) {
}
return false;
}
private HttpURLConnection getHttpURLConnection(final String endpoint) throws IOException { private HttpURLConnection getHttpURLConnection(final String endpoint) throws IOException {
final HttpURLConnection httpCon; final HttpURLConnection httpCon;
final URL url = new URL(endpoint); final URL url = new URL(endpoint);
httpCon = (HttpURLConnection) url.openConnection(); httpCon = (HttpURLConnection) url.openConnection();
httpCon.setDoOutput(true); httpCon.setDoOutput(true);
httpCon.setDoInput(true);
httpCon.setRequestMethod("PUT"); httpCon.setRequestMethod("PUT");
httpCon.setRequestProperty("Content-Type", "application/msgpack"); httpCon.setRequestProperty("Content-Type", "application/msgpack");
httpCon.setRequestProperty("Datadog-Meta-Lang", "java"); httpCon.setRequestProperty("Datadog-Meta-Lang", "java");
@ -132,4 +194,9 @@ public class DDApi {
public String toString() { public String toString() {
return "DDApi { tracesEndpoint=" + tracesEndpoint + " }"; return "DDApi { tracesEndpoint=" + tracesEndpoint + " }";
} }
public static interface ResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}
} }

View File

@ -0,0 +1,67 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.Maps
import datadog.trace.api.DDTags
import datadog.trace.common.sampling.PrioritySampling
import spock.lang.Specification
import spock.lang.Unroll
class DDSpanSerializationTest extends Specification {
@Unroll
def "serialize spans"() throws Exception {
setup:
final Map<String, String> baggage = new HashMap<>()
baggage.put("a-baggage", "value")
final Map<String, Object> tags = new HashMap<>()
baggage.put("k1", "v1")
Map<String, Object> expected = Maps.newHashMap()
expected.put("meta", baggage)
expected.put("service", "service")
expected.put("error", 0)
expected.put("type", "type")
expected.put("name", "operation")
expected.put("duration", 33000)
expected.put("resource", "operation")
if (samplingPriority != PrioritySampling.UNSET) {
expected.put("sampling_priority", samplingPriority)
}
expected.put("start", 100000)
expected.put("span_id", 2l)
expected.put("parent_id", 0l)
expected.put("trace_id", 1l)
final DDSpanContext context =
new DDSpanContext(
1L,
2L,
0L,
"service",
"operation",
null,
samplingPriority,
new HashMap<>(baggage),
false,
"type",
tags,
null,
null)
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
DDSpan span = new DDSpan(100L, context)
span.finish(133L)
ObjectMapper serializer = new ObjectMapper()
expect:
serializer.readTree(serializer.writeValueAsString(span)) == serializer.readTree(serializer.writeValueAsString(expected))
where:
samplingPriority | _
PrioritySampling.SAMPLER_KEEP | _
PrioritySampling.UNSET | _
}
}

View File

@ -0,0 +1,88 @@
package datadog.opentracing
import datadog.trace.common.sampling.PrioritySampling
import spock.lang.Specification
class DDSpanTest extends Specification {
def "getters and setters"() {
setup:
final DDSpanContext context =
new DDSpanContext(
1L,
1L,
0L,
"fakeService",
"fakeOperation",
"fakeResource",
PrioritySampling.UNSET,
Collections.<String, String>emptyMap(),
false,
"fakeType",
null,
null,
null)
final DDSpan span = new DDSpan(1L, context)
when:
span.setServiceName("service")
then:
span.getServiceName() == "service"
when:
span.setOperationName("operation")
then:
span.getOperationName() == "operation"
when:
span.setResourceName("resource")
then:
span.getResourceName() == "resource"
when:
span.setSpanType("type")
then:
span.getType() == "type"
when:
span.setSamplingPriority(PrioritySampling.UNSET)
then:
span.getSamplingPriority() == null
when:
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP)
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
when:
context.lockSamplingPriority()
span.setSamplingPriority(PrioritySampling.USER_KEEP)
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
}
def "resource name equals operation name if null"() {
setup:
final String opName = "operationName"
DDSpan span
when:
span = new DDTracer().buildSpan(opName).startManual()
then:
span.getResourceName() == opName
span.getServiceName() == DDTracer.UNASSIGNED_DEFAULT_SERVICE_NAME
when:
final String resourceName = "fake"
final String serviceName = "myService"
span = new DDTracer()
.buildSpan(opName)
.withResourceName(resourceName)
.withServiceName(serviceName)
.startManual()
then:
span.getResourceName() == resourceName
span.getServiceName() == serviceName
}
}

View File

@ -0,0 +1,95 @@
package datadog.opentracing.propagation
import datadog.opentracing.DDSpanContext
import datadog.trace.common.sampling.PrioritySampling
import io.opentracing.propagation.TextMapExtractAdapter
import io.opentracing.propagation.TextMapInjectAdapter
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
class HTTPCodecTest extends Specification {
@Shared
private static final String OT_BAGGAGE_PREFIX = "ot-baggage-"
@Shared
private static final String TRACE_ID_KEY = "x-datadog-trace-id"
@Shared
private static final String SPAN_ID_KEY = "x-datadog-parent-id"
@Shared
private static final String SAMPLING_PRIORITY_KEY = "x-datadog-sampling-priority"
@Unroll
def "inject http headers"() {
setup:
final DDSpanContext mockedContext =
new DDSpanContext(
1L,
2L,
0L,
"fakeService",
"fakeOperation",
"fakeResource",
samplingPriority,
new HashMap<String, String>() {
{
put("k1", "v1")
put("k2", "v2")
}
},
false,
"fakeType",
null,
null,
null)
final Map<String, String> carrier = new HashMap<>()
final HTTPCodec codec = new HTTPCodec()
codec.inject(mockedContext, new TextMapInjectAdapter(carrier))
expect:
carrier.get(TRACE_ID_KEY) == "1"
carrier.get(SPAN_ID_KEY) == "2"
carrier.get(SAMPLING_PRIORITY_KEY) == (samplingPriority == PrioritySampling.UNSET ? null : String.valueOf(samplingPriority))
carrier.get(OT_BAGGAGE_PREFIX + "k1") == "v1"
carrier.get(OT_BAGGAGE_PREFIX + "k2") == "v2"
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
@Unroll
def "extract http headers"() {
setup:
final Map<String, String> actual =
new HashMap<String, String>() {
{
put(TRACE_ID_KEY.toUpperCase(), "1")
put(SPAN_ID_KEY.toUpperCase(), "2")
put(OT_BAGGAGE_PREFIX.toUpperCase() + "k1", "v1")
put(OT_BAGGAGE_PREFIX.toUpperCase() + "k2", "v2")
}
}
if (samplingPriority != PrioritySampling.UNSET) {
actual.put(SAMPLING_PRIORITY_KEY, String.valueOf(samplingPriority))
}
final HTTPCodec codec = new HTTPCodec()
final DDSpanContext context = codec.extract(new TextMapExtractAdapter(actual))
expect:
context.getTraceId() == 1l
context.getSpanId() == 2l
context.getBaggageItem("k1") == "v1"
context.getBaggageItem("k2") == "v2"
context.getSamplingPriority() == samplingPriority
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
}

View File

@ -3,7 +3,6 @@ package datadog.trace
import datadog.opentracing.DDTracer import datadog.opentracing.DDTracer
import datadog.trace.common.DDTraceConfig import datadog.trace.common.DDTraceConfig
import datadog.trace.common.sampling.AllSampler import datadog.trace.common.sampling.AllSampler
import datadog.trace.common.sampling.RateSampler
import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter import datadog.trace.common.writer.LoggingWriter
@ -52,8 +51,6 @@ class DDTraceConfigTest extends Specification {
System.clearProperty(PREFIX + WRITER_TYPE) System.clearProperty(PREFIX + WRITER_TYPE)
System.clearProperty(PREFIX + AGENT_HOST) System.clearProperty(PREFIX + AGENT_HOST)
System.clearProperty(PREFIX + AGENT_PORT) System.clearProperty(PREFIX + AGENT_PORT)
System.clearProperty(PREFIX + SAMPLER_TYPE)
System.clearProperty(PREFIX + SAMPLER_RATE)
} }
def "verify env override"() { def "verify env override"() {
@ -73,8 +70,6 @@ class DDTraceConfigTest extends Specification {
config.getProperty(WRITER_TYPE) == "DDAgentWriter" config.getProperty(WRITER_TYPE) == "DDAgentWriter"
config.getProperty(AGENT_HOST) == "localhost" config.getProperty(AGENT_HOST) == "localhost"
config.getProperty(AGENT_PORT) == "8126" config.getProperty(AGENT_PORT) == "8126"
config.getProperty(SAMPLER_TYPE) == "AllSampler"
config.getProperty(SAMPLER_RATE) == "1.0"
when: when:
config = new DDTraceConfig("A different service name") config = new DDTraceConfig("A different service name")
@ -84,56 +79,45 @@ class DDTraceConfigTest extends Specification {
config.getProperty(WRITER_TYPE) == "DDAgentWriter" config.getProperty(WRITER_TYPE) == "DDAgentWriter"
config.getProperty(AGENT_HOST) == "localhost" config.getProperty(AGENT_HOST) == "localhost"
config.getProperty(AGENT_PORT) == "8126" config.getProperty(AGENT_PORT) == "8126"
config.getProperty(SAMPLER_TYPE) == "AllSampler"
config.getProperty(SAMPLER_RATE) == "1.0"
} }
def "specify overrides via system properties"() { def "specify overrides via system properties"() {
when: when:
System.setProperty(PREFIX + SERVICE_NAME, "something else") System.setProperty(PREFIX + SERVICE_NAME, "something else")
System.setProperty(PREFIX + WRITER_TYPE, LoggingWriter.simpleName) System.setProperty(PREFIX + WRITER_TYPE, LoggingWriter.simpleName)
System.setProperty(PREFIX + SAMPLER_TYPE, RateSampler.simpleName)
System.setProperty(PREFIX + SAMPLER_RATE, ".5")
def tracer = new DDTracer() def tracer = new DDTracer()
then: then:
tracer.serviceName == "something else" tracer.serviceName == "something else"
tracer.writer instanceof LoggingWriter tracer.writer instanceof LoggingWriter
tracer.sampler.toString() == "RateSampler { sampleRate=0.5 }"
} }
def "specify overrides via env vars"() { def "specify overrides via env vars"() {
when: when:
overrideEnvMap.put(propToEnvName(PREFIX + SERVICE_NAME), "still something else") overrideEnvMap.put(propToEnvName(PREFIX + SERVICE_NAME), "still something else")
overrideEnvMap.put(propToEnvName(PREFIX + WRITER_TYPE), LoggingWriter.simpleName) overrideEnvMap.put(propToEnvName(PREFIX + WRITER_TYPE), LoggingWriter.simpleName)
overrideEnvMap.put(propToEnvName(PREFIX + SAMPLER_TYPE), AllSampler.simpleName)
def tracer = new DDTracer() def tracer = new DDTracer()
then: then:
tracer.serviceName == "still something else" tracer.serviceName == "still something else"
tracer.writer instanceof LoggingWriter tracer.writer instanceof LoggingWriter
tracer.sampler instanceof AllSampler
} }
def "sys props override env vars"() { def "sys props override env vars"() {
when: when:
overrideEnvMap.put(propToEnvName(PREFIX + SERVICE_NAME), "still something else") overrideEnvMap.put(propToEnvName(PREFIX + SERVICE_NAME), "still something else")
overrideEnvMap.put(propToEnvName(PREFIX + WRITER_TYPE), ListWriter.simpleName) overrideEnvMap.put(propToEnvName(PREFIX + WRITER_TYPE), ListWriter.simpleName)
overrideEnvMap.put(propToEnvName(PREFIX + SAMPLER_TYPE), AllSampler.simpleName)
System.setProperty(PREFIX + SERVICE_NAME, "what we actually want") System.setProperty(PREFIX + SERVICE_NAME, "what we actually want")
System.setProperty(PREFIX + WRITER_TYPE, DDAgentWriter.simpleName) System.setProperty(PREFIX + WRITER_TYPE, DDAgentWriter.simpleName)
System.setProperty(PREFIX + AGENT_HOST, "somewhere") System.setProperty(PREFIX + AGENT_HOST, "somewhere")
System.setProperty(PREFIX + AGENT_PORT, "9999") System.setProperty(PREFIX + AGENT_PORT, "9999")
System.setProperty(PREFIX + SAMPLER_TYPE, RateSampler.simpleName)
System.setProperty(PREFIX + SAMPLER_RATE, ".9")
def tracer = new DDTracer() def tracer = new DDTracer()
then: then:
tracer.serviceName == "what we actually want" tracer.serviceName == "what we actually want"
tracer.writer.toString() == "DDAgentWriter { api=DDApi { tracesEndpoint=http://somewhere:9999/v0.3/traces } }" tracer.writer.toString() == "DDAgentWriter { api=DDApi { tracesEndpoint=http://somewhere:9999/v0.3/traces } }"
tracer.sampler.toString() == "RateSampler { sampleRate=0.9 }"
} }
def "verify defaults on tracer"() { def "verify defaults on tracer"() {
@ -164,8 +148,5 @@ class DDTraceConfigTest extends Specification {
"writer" | "writer.type" | "LoggingWriter" | "LoggingWriter { }" "writer" | "writer.type" | "LoggingWriter" | "LoggingWriter { }"
"writer" | "agent.host" | "somethingelse" | "DDAgentWriter { api=DDApi { tracesEndpoint=http://somethingelse:8126/v0.3/traces } }" "writer" | "agent.host" | "somethingelse" | "DDAgentWriter { api=DDApi { tracesEndpoint=http://somethingelse:8126/v0.3/traces } }"
"writer" | "agent.port" | "9999" | "DDAgentWriter { api=DDApi { tracesEndpoint=http://localhost:9999/v0.3/traces } }" "writer" | "agent.port" | "9999" | "DDAgentWriter { api=DDApi { tracesEndpoint=http://localhost:9999/v0.3/traces } }"
"sampler" | "default" | "default" | "AllSampler { sample=true }"
"sampler" | "sampler.type" | "RateSampler" | "RateSampler { sampleRate=1.0 }"
"sampler" | "sampler.rate" | "100" | "AllSampler { sample=true }"
} }
} }

View File

@ -3,6 +3,7 @@ package datadog.trace
import datadog.opentracing.DDSpan import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer import datadog.opentracing.DDTracer
import datadog.trace.common.sampling.PrioritySampling
class SpanFactory { class SpanFactory {
static newSpanOf(long timestampMicro) { static newSpanOf(long timestampMicro) {
@ -13,6 +14,7 @@ class SpanFactory {
"fakeService", "fakeService",
"fakeOperation", "fakeOperation",
"fakeResource", "fakeResource",
PrioritySampling.UNSET,
Collections.emptyMap(), Collections.emptyMap(),
false, false,
"fakeType", "fakeType",
@ -30,6 +32,7 @@ class SpanFactory {
"fakeService", "fakeService",
"fakeOperation", "fakeOperation",
"fakeResource", "fakeResource",
PrioritySampling.UNSET,
Collections.emptyMap(), Collections.emptyMap(),
false, false,
"fakeType", "fakeType",

View File

@ -0,0 +1,71 @@
package datadog.trace.api.sampling
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.trace.common.sampling.PrioritySampling
import datadog.trace.common.sampling.RateByServiceSampler
import spock.lang.Specification
class RateByServiceSamplerTest extends Specification {
def "rate by service name"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
when:
String response = '{"rate_by_service": {"service:,env:":1.0, "service:spock,env:test":0.000001}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span1 = makeTrace("foo", "bar")
serviceSampler.initializeSamplingPriority(span1)
then:
span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span1)
// !serviceSampler.sample(makeTrace("spock", "test"))
when:
response = '{"rate_by_service": {"service:,env:":0.000001, "service:spock,env:test":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span2 = makeTrace("spock", "test")
serviceSampler.initializeSamplingPriority(span2)
then:
// !serviceSampler.sample(makeTrace("foo", "bar"))
span2.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span2)
}
def "sampling priority set on context"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
String response = '{"rate_by_service": {"service:,env:":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span = makeTrace("foo", "bar")
serviceSampler.initializeSamplingPriority(span)
expect:
// sets correctly on root span
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
}
private DDSpan makeTrace(String serviceName, String envName) {
def context = new DDSpanContext(
1L,
1L,
0L,
serviceName,
"fakeOperation",
"fakeResource",
PrioritySampling.UNSET,
Collections.emptyMap(),
false,
"fakeType",
Collections.emptyMap(),
null,
new DDTracer())
context.setTag("env", envName)
return new DDSpan(0l, context)
}
}

View File

@ -1,10 +1,12 @@
package datadog.trace.api.writer package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.SpanFactory import datadog.trace.SpanFactory
import datadog.trace.common.Service import datadog.trace.common.Service
import datadog.trace.common.writer.DDApi import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.DDApi.ResponseListener
import org.msgpack.jackson.dataformat.MessagePackFactory import org.msgpack.jackson.dataformat.MessagePackFactory
import ratpack.http.Headers import ratpack.http.Headers
import ratpack.http.MediaType import ratpack.http.MediaType
@ -21,7 +23,10 @@ class DDApiTest extends Specification {
setup: setup:
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/traces") { put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(200).send() response.status(200).send()
} }
} }
@ -39,9 +44,12 @@ class DDApiTest extends Specification {
setup: setup:
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/traces") { put("v0.4/traces") {
response.status(404).send() response.status(404).send()
} }
put("v0.4/services") {
response.status(200).send()
}
} }
} }
def client = new DDApi("localhost", agent.address.port) def client = new DDApi("localhost", agent.address.port)
@ -60,7 +68,7 @@ class DDApiTest extends Specification {
def requestBody = new AtomicReference<byte[]>() def requestBody = new AtomicReference<byte[]>()
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/traces") { put("v0.4/traces") {
requestContentType.set(request.contentType) requestContentType.set(request.contentType)
requestHeaders.set(request.headers) requestHeaders.set(request.headers)
request.body.then { request.body.then {
@ -68,6 +76,9 @@ class DDApiTest extends Specification {
response.send() response.send()
} }
} }
put("v0.4/services") {
response.status(200).send()
}
} }
} }
def client = new DDApi("localhost", agent.address.port) def client = new DDApi("localhost", agent.address.port)
@ -120,7 +131,10 @@ class DDApiTest extends Specification {
setup: setup:
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/services") { put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(200).send() response.status(200).send()
} }
} }
@ -138,7 +152,10 @@ class DDApiTest extends Specification {
setup: setup:
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/services") { put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(404).send() response.status(404).send()
} }
} }
@ -159,7 +176,10 @@ class DDApiTest extends Specification {
def requestBody = new AtomicReference<byte[]>() def requestBody = new AtomicReference<byte[]>()
def agent = ratpack { def agent = ratpack {
handlers { handlers {
put("v0.3/services") { put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
requestContentType.set(request.contentType) requestContentType.set(request.contentType)
requestHeaders.set(request.headers) requestHeaders.set(request.headers)
request.body.then { request.body.then {
@ -192,6 +212,74 @@ class DDApiTest extends Specification {
] ]
} }
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
ResponseListener responseListener = new ResponseListener() {
@Override
void onResponse(String endpoint, JsonNode responseJson) {
agentResponse.set(responseJson.toString())
}
}
boolean servicesAvailable = true
def agent = ratpack {
handlers {
put("v0.4/traces") {
response.status(200).send('{"hello":"test"}')
}
put("v0.4/services") {
if (servicesAvailable) {
response.status(200).send('{"service-response":"from-test"}')
} else {
response.status(404).send('{"service-response":"from-test"}')
}
}
}
}
def client = new DDApi("localhost", agent.address.port)
client.addResponseListener(responseListener)
def services = ["my-service-name": new Service("my-service-name", "my-app-name", Service.AppType.CUSTOM)]
when:
client.sendTraces([])
then:
agentResponse.get() == '{"hello":"test"}'
when:
servicesAvailable = false
agentResponse.set('not-set')
client.sendServices(services)
then:
// response not seen because of non-200 status
agentResponse.get() == 'not-set'
cleanup:
agent.close()
}
def "Api Downgrades to v3"() {
setup:
def v3Agent = ratpack {
handlers {
put("v0.3/traces") {
response.status(200).send()
}
put("v0.3/services") {
response.status(200).send()
}
}
}
def client = new DDApi("localhost", v3Agent.address.port)
expect:
client.sendTraces([])
client.sendServices()
cleanup:
v3Agent.close()
}
static List<TreeMap<String, Object>> convertList(byte[] bytes) { static List<TreeMap<String, Object>> convertList(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<List<TreeMap<String, Object>>>() {}) return mapper.readValue(bytes, new TypeReference<List<TreeMap<String, Object>>>() {})
} }

View File

@ -1,67 +0,0 @@
package datadog.opentracing;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import datadog.trace.api.DDTags;
import java.util.HashMap;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
public class DDSpanSerializationTest {
ObjectMapper serializer;
DDSpan span;
Map<String, Object> expected = Maps.newHashMap();
@Before
public void setUp() throws Exception {
final Map<String, String> baggage = new HashMap<>();
baggage.put("a-baggage", "value");
final Map<String, Object> tags = new HashMap<>();
baggage.put("k1", "v1");
expected.put("meta", baggage);
expected.put("service", "service");
expected.put("error", 0);
expected.put("type", "type");
expected.put("name", "operation");
expected.put("duration", 33000);
expected.put("resource", "operation");
expected.put("start", 100000);
expected.put("span_id", 2l);
expected.put("parent_id", 0l);
expected.put("trace_id", 1l);
final DDSpanContext context =
new DDSpanContext(
1L,
2L,
0L,
"service",
"operation",
null,
new HashMap<>(baggage),
false,
"type",
tags,
null,
null);
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName());
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()));
span = new DDSpan(100L, context);
span.finish(133L);
serializer = new ObjectMapper();
}
@Test
public void test() throws Exception {
assertThat(serializer.readTree(serializer.writeValueAsString(span)))
.isEqualTo(serializer.readTree(serializer.writeValueAsString(expected)));
}
}

View File

@ -1,70 +0,0 @@
package datadog.opentracing;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import org.junit.Test;
public class DDSpanTest {
@Test
public void testGetterSetter() {
final DDSpanContext context =
new DDSpanContext(
1L,
1L,
0L,
"fakeService",
"fakeOperation",
"fakeResource",
Collections.<String, String>emptyMap(),
false,
"fakeType",
null,
null,
null);
String expected;
final DDSpan span = new DDSpan(1L, context);
expected = "service";
span.setServiceName(expected);
assertThat(span.getServiceName()).isEqualTo(expected);
expected = "operation";
span.setOperationName(expected);
assertThat(span.getOperationName()).isEqualTo(expected);
expected = "resource";
span.setResourceName(expected);
assertThat(span.getResourceName()).isEqualTo(expected);
expected = "type";
span.setSpanType(expected);
assertThat(span.getType()).isEqualTo(expected);
}
@Test
public void shouldResourceNameEqualsOperationNameIfNull() {
final String expectedName = "operationName";
DDSpan span = new DDTracer().buildSpan(expectedName).startManual();
// ResourceName = expectedName
assertThat(span.getResourceName()).isEqualTo(expectedName);
assertThat(span.getServiceName()).isEqualTo(DDTracer.UNASSIGNED_DEFAULT_SERVICE_NAME);
// ResourceName = expectedResourceName
final String expectedResourceName = "fake";
span =
new DDTracer()
.buildSpan(expectedName)
.withResourceName(expectedResourceName)
.withServiceName("foo")
.startManual();
assertThat(span.getResourceName()).isEqualTo(expectedResourceName);
assertThat(span.getServiceName()).isEqualTo("foo");
}
}

View File

@ -1,41 +0,0 @@
package datadog.opentracing;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import datadog.trace.common.sampling.RateSampler;
import datadog.trace.common.writer.Writer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
import org.junit.Test;
public class DDTracerTest {
@Test
public void write() throws Exception {
final Writer writer = mock(Writer.class);
final RateSampler sampler = mock(RateSampler.class);
final DDSpan span = mock(DDSpan.class);
// Rate 0.5
when(sampler.sample(any(DDSpan.class))).thenReturn(true).thenReturn(false);
final Queue<DDSpan> spans = new LinkedList<>();
spans.add(span);
spans.add(span);
spans.add(span);
final DDTracer tracer = new DDTracer(DDTracer.UNASSIGNED_DEFAULT_SERVICE_NAME, writer, sampler);
tracer.write(spans);
tracer.write(spans);
verify(sampler, times(2)).sample(span);
verify(writer, times(1)).write(new ArrayList<>(spans));
}
}

View File

@ -1,74 +0,0 @@
package datadog.opentracing.propagation;
import static org.assertj.core.api.Java6Assertions.assertThat;
import datadog.opentracing.DDSpanContext;
import io.opentracing.propagation.TextMapExtractAdapter;
import io.opentracing.propagation.TextMapInjectAdapter;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
/** Created by gpolaert on 6/23/17. */
public class HTTPCodecTest {
private static final String OT_BAGGAGE_PREFIX = "ot-baggage-";
private static final String TRACE_ID_KEY = "x-datadog-trace-id";
private static final String SPAN_ID_KEY = "x-datadog-parent-id";
@Test
public void shoudAddHttpHeaders() {
final DDSpanContext mockedContext =
new DDSpanContext(
1L,
2L,
0L,
"fakeService",
"fakeOperation",
"fakeResource",
new HashMap<String, String>() {
{
put("k1", "v1");
put("k2", "v2");
}
},
false,
"fakeType",
null,
null,
null);
final Map<String, String> carrier = new HashMap<>();
final HTTPCodec codec = new HTTPCodec();
codec.inject(mockedContext, new TextMapInjectAdapter(carrier));
assertThat(carrier.get(TRACE_ID_KEY)).isEqualTo("1");
assertThat(carrier.get(SPAN_ID_KEY)).isEqualTo("2");
assertThat(carrier.get(OT_BAGGAGE_PREFIX + "k1")).isEqualTo("v1");
assertThat(carrier.get(OT_BAGGAGE_PREFIX + "k2")).isEqualTo("v2");
}
@Test
public void shoudReadHttpHeaders() {
final Map<String, String> actual =
new HashMap<String, String>() {
{
put(TRACE_ID_KEY.toUpperCase(), "1");
put(SPAN_ID_KEY.toUpperCase(), "2");
put(OT_BAGGAGE_PREFIX.toUpperCase() + "k1", "v1");
put(OT_BAGGAGE_PREFIX.toUpperCase() + "k2", "v2");
}
};
final HTTPCodec codec = new HTTPCodec();
final DDSpanContext context = codec.extract(new TextMapExtractAdapter(actual));
assertThat(context.getTraceId()).isEqualTo(1l);
assertThat(context.getSpanId()).isEqualTo(2l);
assertThat(context.getBaggageItem("k1")).isEqualTo("v1");
assertThat(context.getBaggageItem("k2")).isEqualTo("v2");
}
}

View File

@ -1,46 +0,0 @@
package datadog.trace.api.sampling;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import datadog.opentracing.DDSpan;
import datadog.trace.common.sampling.RateSampler;
import datadog.trace.common.sampling.Sampler;
import org.junit.Test;
public class RateSamplerTest {
@Test
public void testRateSampler() {
final DDSpan mockSpan = mock(DDSpan.class);
final double sampleRate = 0.35;
final int iterations = 1000;
final Sampler sampler = new RateSampler(sampleRate);
int kept = 0;
for (int i = 0; i < iterations; i++) {
if (sampler.sample(mockSpan)) {
kept++;
}
}
// FIXME test has to be more predictable
// assertThat(((double) kept / iterations)).isBetween(sampleRate - 0.02, sampleRate + 0.02);
}
@Test
public void testRateBoundaries() {
RateSampler sampler = new RateSampler(1000);
assertThat(sampler.getSampleRate()).isEqualTo(1);
sampler = new RateSampler(-1000);
assertThat(sampler.getSampleRate()).isEqualTo(1);
sampler = new RateSampler(0.337);
assertThat(sampler.getSampleRate()).isEqualTo(0.337);
}
}