Merge pull request #1102 from DataDog/landerson/sampling-revamp

Deterministic and Rule-based Sampling
This commit is contained in:
Laplie Anderson 2019-11-19 12:11:40 -05:00 committed by GitHub
commit e548e11da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1302 additions and 207 deletions

View File

@ -63,6 +63,10 @@ public class Config {
public static final String TRACE_EXECUTORS = "trace.executors";
public static final String TRACE_METHODS = "trace.methods";
public static final String TRACE_CLASSES_EXCLUDE = "trace.classes.exclude";
public static final String TRACE_SAMPLING_SERVICE_RULES = "trace.sampling.service.rules";
public static final String TRACE_SAMPLING_OPERATION_RULES = "trace.sampling.operation.rules";
public static final String TRACE_SAMPLING_DEFAULT_RATE = "trace.sampling.default.rate";
public static final String TRACE_SAMPLING_RATE_LIMIT = "trace.sampling.rate.limit";
public static final String TRACE_REPORT_HOSTNAME = "trace.report-hostname";
public static final String HEADER_TAGS = "trace.header.tags";
public static final String HTTP_SERVER_ERROR_STATUSES = "http.server.error.statuses";
@ -145,6 +149,7 @@ public class Config {
private static final String DEFAULT_TRACE_METHODS = null;
public static final boolean DEFAULT_TRACE_ANALYTICS_ENABLED = false;
public static final float DEFAULT_ANALYTICS_SAMPLE_RATE = 1.0f;
public static final double DEFAULT_TRACE_SAMPLING_RATE_LIMIT = 100;
public enum PropagationStyle {
DATADOG,
@ -215,6 +220,11 @@ public class Config {
@Getter private final boolean traceAnalyticsEnabled;
@Getter private final Map<String, String> traceSamplingServiceRules;
@Getter private final Map<String, String> traceSamplingOperationRules;
@Getter private final Double traceSamplingDefaultRate;
@Getter private final Double traceSamplingRateLimit;
// Values from an optionally provided properties file
private static Properties propertiesFromConfigFile;
@ -336,6 +346,14 @@ public class Config {
traceAnalyticsEnabled =
getBooleanSettingFromEnvironment(TRACE_ANALYTICS_ENABLED, DEFAULT_TRACE_ANALYTICS_ENABLED);
traceSamplingServiceRules = getMapSettingFromEnvironment(TRACE_SAMPLING_SERVICE_RULES, null);
traceSamplingOperationRules =
getMapSettingFromEnvironment(TRACE_SAMPLING_OPERATION_RULES, null);
traceSamplingDefaultRate = getDoubleSettingFromEnvironment(TRACE_SAMPLING_DEFAULT_RATE, null);
traceSamplingRateLimit =
getDoubleSettingFromEnvironment(
TRACE_SAMPLING_RATE_LIMIT, DEFAULT_TRACE_SAMPLING_RATE_LIMIT);
log.debug("New instance: {}", this);
}
@ -460,6 +478,19 @@ public class Config {
traceAnalyticsEnabled =
getPropertyBooleanValue(properties, TRACE_ANALYTICS_ENABLED, parent.traceAnalyticsEnabled);
traceSamplingServiceRules =
getPropertyMapValue(
properties, TRACE_SAMPLING_SERVICE_RULES, parent.traceSamplingServiceRules);
traceSamplingOperationRules =
getPropertyMapValue(
properties, TRACE_SAMPLING_OPERATION_RULES, parent.traceSamplingOperationRules);
traceSamplingDefaultRate =
getPropertyDoubleValue(
properties, TRACE_SAMPLING_DEFAULT_RATE, parent.traceSamplingDefaultRate);
traceSamplingRateLimit =
getPropertyDoubleValue(
properties, TRACE_SAMPLING_RATE_LIMIT, parent.traceSamplingRateLimit);
log.debug("New instance: {}", this);
}
@ -697,6 +728,22 @@ public class Config {
}
}
/**
* Calls {@link #getSettingFromEnvironment(String, String)} and converts the result to a Double.
*
* @deprecated This method should only be used internally. Use the explicit getter instead.
*/
public static Double getDoubleSettingFromEnvironment(
final String name, final Double defaultValue) {
final String value = getSettingFromEnvironment(name, null);
try {
return value == null ? defaultValue : Double.valueOf(value);
} catch (final NumberFormatException e) {
log.warn("Invalid configuration for " + name, e);
return defaultValue;
}
}
/**
* Calls {@link #getSettingFromEnvironment(String, String)} and converts the result to a Integer.
*/
@ -795,6 +842,12 @@ public class Config {
return value == null || value.trim().isEmpty() ? defaultValue : Integer.valueOf(value);
}
private static Double getPropertyDoubleValue(
final Properties properties, final String name, final Double defaultValue) {
final String value = properties.getProperty(name);
return value == null || value.trim().isEmpty() ? defaultValue : Double.valueOf(value);
}
private static <T extends Enum<T>> Set<T> getPropertySetValue(
final Properties properties, final String name, final Class<T> clazz) {
final String value = properties.getProperty(name);

View File

@ -13,6 +13,9 @@ import static datadog.trace.api.Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE
import static datadog.trace.api.Config.DEFAULT_JMX_FETCH_STATSD_PORT
import static datadog.trace.api.Config.GLOBAL_TAGS
import static datadog.trace.api.Config.HEADER_TAGS
import static datadog.trace.api.Config.HEALTH_METRICS_ENABLED
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_HOST
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_PORT
import static datadog.trace.api.Config.HTTP_CLIENT_ERROR_STATUSES
import static datadog.trace.api.Config.HTTP_CLIENT_HOST_SPLIT_BY_DOMAIN
import static datadog.trace.api.Config.HTTP_SERVER_ERROR_STATUSES
@ -39,10 +42,11 @@ import static datadog.trace.api.Config.TRACE_AGENT_PORT
import static datadog.trace.api.Config.TRACE_ENABLED
import static datadog.trace.api.Config.TRACE_REPORT_HOSTNAME
import static datadog.trace.api.Config.TRACE_RESOLVER_ENABLED
import static datadog.trace.api.Config.TRACE_SAMPLING_DEFAULT_RATE
import static datadog.trace.api.Config.TRACE_SAMPLING_OPERATION_RULES
import static datadog.trace.api.Config.TRACE_SAMPLING_RATE_LIMIT
import static datadog.trace.api.Config.TRACE_SAMPLING_SERVICE_RULES
import static datadog.trace.api.Config.WRITER_TYPE
import static datadog.trace.api.Config.HEALTH_METRICS_ENABLED
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_HOST
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_PORT
class ConfigTest extends DDSpecification {
@Rule
@ -145,6 +149,10 @@ class ConfigTest extends DDSpecification {
prop.setProperty(HEALTH_METRICS_ENABLED, "true")
prop.setProperty(HEALTH_METRICS_STATSD_HOST, "metrics statsd host")
prop.setProperty(HEALTH_METRICS_STATSD_PORT, "654")
prop.setProperty(TRACE_SAMPLING_SERVICE_RULES, "a:1")
prop.setProperty(TRACE_SAMPLING_OPERATION_RULES, "b:1")
prop.setProperty(TRACE_SAMPLING_DEFAULT_RATE, ".5")
prop.setProperty(TRACE_SAMPLING_RATE_LIMIT, "200")
when:
Config config = Config.get(prop)
@ -181,6 +189,10 @@ class ConfigTest extends DDSpecification {
config.healthMetricsEnabled == true
config.healthMetricsStatsdHost == "metrics statsd host"
config.healthMetricsStatsdPort == 654
config.traceSamplingServiceRules == [a: "1"]
config.traceSamplingOperationRules == [b: "1"]
config.traceSamplingDefaultRate == 0.5
config.traceSamplingRateLimit == 200
}
def "specify overrides via system properties"() {
@ -218,6 +230,10 @@ class ConfigTest extends DDSpecification {
System.setProperty(PREFIX + HEALTH_METRICS_ENABLED, "true")
System.setProperty(PREFIX + HEALTH_METRICS_STATSD_HOST, "metrics statsd host")
System.setProperty(PREFIX + HEALTH_METRICS_STATSD_PORT, "654")
System.setProperty(PREFIX + TRACE_SAMPLING_SERVICE_RULES, "a:1")
System.setProperty(PREFIX + TRACE_SAMPLING_OPERATION_RULES, "b:1")
System.setProperty(PREFIX + TRACE_SAMPLING_DEFAULT_RATE, ".5")
System.setProperty(PREFIX + TRACE_SAMPLING_RATE_LIMIT, "200")
when:
Config config = new Config()
@ -254,6 +270,10 @@ class ConfigTest extends DDSpecification {
config.healthMetricsEnabled == true
config.healthMetricsStatsdHost == "metrics statsd host"
config.healthMetricsStatsdPort == 654
config.traceSamplingServiceRules == [a: "1"]
config.traceSamplingOperationRules == [b: "1"]
config.traceSamplingDefaultRate == 0.5
config.traceSamplingRateLimit == 200
}
def "specify overrides via env vars"() {
@ -610,6 +630,35 @@ class ConfigTest extends DDSpecification {
defaultValue = 10.0
}
def "test getDoubleSettingFromEnvironment(#name)"() {
setup:
environmentVariables.set("DD_ENV_ZERO_TEST", "0.0")
environmentVariables.set("DD_ENV_FLOAT_TEST", "1.0")
environmentVariables.set("DD_FLOAT_TEST", "0.2")
System.setProperty("dd.prop.zero.test", "0")
System.setProperty("dd.prop.float.test", "0.3")
System.setProperty("dd.float.test", "0.4")
System.setProperty("dd.garbage.test", "garbage")
System.setProperty("dd.negative.test", "-1")
expect:
Config.getDoubleSettingFromEnvironment(name, defaultValue) == (double) expected
where:
name | expected
"env.zero.test" | 0.0
"prop.zero.test" | 0
"env.float.test" | 1.0
"prop.float.test" | 0.3
"float.test" | 0.4
"negative.test" | -1.0
"garbage.test" | 10.0
"default.test" | 10.0
defaultValue = 10.0
}
def "verify mapping configs on tracer"() {
setup:
System.setProperty(PREFIX + SERVICE_MAPPING, mapString)
@ -811,4 +860,34 @@ class ConfigTest extends DDSpecification {
cleanup:
System.clearProperty(PREFIX + CONFIGURATION_FILE)
}
def "get analytics sample rate"() {
setup:
environmentVariables.set("DD_FOO_ANALYTICS_SAMPLE_RATE", "0.5")
environmentVariables.set("DD_BAR_ANALYTICS_SAMPLE_RATE", "0.9")
System.setProperty("dd.baz.analytics.sample-rate", "0.7")
System.setProperty("dd.buzz.analytics.sample-rate", "0.3")
when:
String[] array = services.toArray(new String[0])
def value = Config.get().getInstrumentationAnalyticsSampleRate(array)
then:
value == expected
where:
services | expected
["foo"] | 0.5f
["baz"] | 0.7f
["doesnotexist"] | 1.0f
["doesnotexist", "foo"] | 0.5f
["doesnotexist", "baz"] | 0.7f
["foo", "bar"] | 0.5f
["bar", "foo"] | 0.9f
["baz", "buzz"] | 0.7f
["buzz", "baz"] | 0.3f
["foo", "baz"] | 0.5f
["baz", "foo"] | 0.7f
}
}

View File

@ -191,18 +191,20 @@ public class DDSpanContext implements io.opentracing.SpanContext {
this.spanType = spanType;
}
public void setSamplingPriority(final int newPriority) {
/** @return if sampling priority was set by this method invocation */
public boolean setSamplingPriority(final int newPriority) {
if (newPriority == PrioritySampling.UNSET) {
log.debug("{}: Refusing to set samplingPriority to UNSET", this);
return false;
}
if (trace != null) {
final DDSpan rootSpan = trace.getRootSpan();
if (null != rootSpan && rootSpan.context() != this) {
rootSpan.context().setSamplingPriority(newPriority);
return;
return rootSpan.context().setSamplingPriority(newPriority);
}
}
if (newPriority == PrioritySampling.UNSET) {
log.debug("{}: Refusing to set samplingPriority to UNSET", this);
return;
}
// sync with lockSamplingPriority
synchronized (this) {
if (samplingPriorityLocked) {
@ -210,21 +212,22 @@ public class DDSpanContext implements io.opentracing.SpanContext {
"samplingPriority locked at {}. Refusing to set to {}",
getMetrics().get(PRIORITY_SAMPLING_KEY),
newPriority);
return false;
} else {
setMetric(PRIORITY_SAMPLING_KEY, newPriority);
log.debug("Set sampling priority to {}", getMetrics().get(PRIORITY_SAMPLING_KEY));
return true;
}
}
}
/** @return the sampling priority of this span's trace, or null if no priority has been set */
public int getSamplingPriority() {
if (trace != null) {
final DDSpan rootSpan = trace.getRootSpan();
if (null != rootSpan && rootSpan.context() != this) {
return rootSpan.context().getSamplingPriority();
}
final DDSpan rootSpan = trace.getRootSpan();
if (null != rootSpan && rootSpan.context() != this) {
return rootSpan.context().getSamplingPriority();
}
final Number val = getMetrics().get(PRIORITY_SAMPLING_KEY);
return null == val ? PrioritySampling.UNSET : val.intValue();
}
@ -239,12 +242,11 @@ public class DDSpanContext implements io.opentracing.SpanContext {
* @return true if the sampling priority was locked.
*/
public boolean lockSamplingPriority() {
if (trace != null) {
final DDSpan rootSpan = trace.getRootSpan();
if (null != rootSpan && rootSpan.context() != this) {
return rootSpan.context().lockSamplingPriority();
}
final DDSpan rootSpan = trace.getRootSpan();
if (null != rootSpan && rootSpan.context() != this) {
return rootSpan.context().lockSamplingPriority();
}
// sync with setSamplingPriority
synchronized (this) {
if (getMetrics().get(PRIORITY_SAMPLING_KEY) == null) {
@ -367,13 +369,12 @@ public class DDSpanContext implements io.opentracing.SpanContext {
.append("/")
.append(getResourceName())
.append(" metrics=")
.append(new TreeMap(getMetrics()));
.append(new TreeMap<>(getMetrics()));
if (errorFlag) {
s.append(" *errored*");
}
if (tags != null) {
s.append(" tags=").append(new TreeMap(tags));
}
s.append(" tags=").append(new TreeMap<>(tags));
return s.toString();
}
}

View File

@ -11,7 +11,7 @@ import datadog.trace.api.Config;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.interceptor.TraceInterceptor;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.sampling.RateByServiceSampler;
import datadog.trace.common.sampling.PrioritySampler;
import datadog.trace.common.sampling.Sampler;
import datadog.trace.common.writer.DDAgentWriter;
import datadog.trace.common.writer.DDApi;
@ -347,7 +347,12 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
@Override
public <T> void inject(final SpanContext spanContext, final Format<T> format, final T carrier) {
if (carrier instanceof TextMapInject) {
injector.inject((DDSpanContext) spanContext, (TextMapInject) carrier);
final DDSpanContext ddSpanContext = (DDSpanContext) spanContext;
final DDSpan rootSpan = ddSpanContext.getTrace().getRootSpan();
setSamplingPriorityIfNecessary(rootSpan);
injector.inject(ddSpanContext, (TextMapInject) carrier);
} else {
log.debug("Unsupported format for propagation - {}", format.getClass().getName());
}
@ -389,10 +394,28 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
}
}
incrementTraceCount();
// TODO: current trace implementation doesn't guarantee that first span is the root span
// We may want to reconsider way this check is done.
if (!writtenTrace.isEmpty() && sampler.sample(writtenTrace.get(0))) {
writer.write(writtenTrace);
if (!writtenTrace.isEmpty()) {
final DDSpan rootSpan = (DDSpan) writtenTrace.get(0).getLocalRootSpan();
setSamplingPriorityIfNecessary(rootSpan);
final DDSpan spanToSample = rootSpan == null ? writtenTrace.get(0) : rootSpan;
if (sampler.sample(spanToSample)) {
writer.write(writtenTrace);
}
}
}
void setSamplingPriorityIfNecessary(final DDSpan rootSpan) {
// There's a race where multiple threads can see PrioritySampling.UNSET here
// This check skips potential complex sampling priority logic when we know its redundant
// Locks inside DDSpanContext ensure the correct behavior in the race case
if (sampler instanceof PrioritySampler
&& rootSpan != null
&& rootSpan.context().getSamplingPriority() == PrioritySampling.UNSET) {
((PrioritySampler) sampler).setSamplingPriority(rootSpan);
}
}
@ -487,11 +510,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
}
private DDSpan startSpan() {
final DDSpan span = new DDSpan(timestampMicro, buildSpanContext());
if (sampler instanceof RateByServiceSampler) {
((RateByServiceSampler) sampler).initializeSamplingPriority(span);
}
return span;
return new DDSpan(timestampMicro, buildSpanContext());
}
@Override

View File

@ -6,6 +6,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Pattern;
@Deprecated
public abstract class AbstractSampler implements Sampler {
/** Sample tags */
@ -35,6 +36,7 @@ public abstract class AbstractSampler implements Sampler {
* @param tag
* @param skipPattern
*/
@Deprecated
public void addSkipTagPattern(final String tag, final Pattern skipPattern) {
skipTagsPatterns.put(tag, skipPattern);
}

View File

@ -0,0 +1,46 @@
package datadog.trace.common.sampling;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTracer;
import java.math.BigDecimal;
import java.math.BigInteger;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KnuthSampler implements RateSampler {
private static final BigInteger KNUTH_FACTOR = new BigInteger("1111111111111111111");
private static final BigDecimal TRACE_ID_MAX_AS_BIG_DECIMAL =
new BigDecimal(DDTracer.TRACE_ID_MAX);
private static final BigInteger MODULUS = new BigInteger("2").pow(64);
private final BigInteger cutoff;
private final double rate;
public KnuthSampler(final double rate) {
this.rate = rate;
cutoff = new BigDecimal(rate).multiply(TRACE_ID_MAX_AS_BIG_DECIMAL).toBigInteger();
log.debug("Initializing the RateSampler, sampleRate: {} %", rate * 100);
}
@Override
public boolean sample(final DDSpan span) {
final boolean sampled;
if (rate == 1) {
sampled = true;
} else if (rate == 0) {
sampled = false;
} else {
sampled = span.getTraceId().multiply(KNUTH_FACTOR).mod(MODULUS).compareTo(cutoff) < 0;
}
log.debug("{} - Span is sampled: {}", span, sampled);
return sampled;
}
@Override
public double getSampleRate() {
return rate;
}
}

View File

@ -0,0 +1,7 @@
package datadog.trace.common.sampling;
import datadog.opentracing.DDSpan;
public interface PrioritySampler {
void setSamplingPriority(DDSpan span);
}

View File

@ -11,7 +11,6 @@ import datadog.trace.common.writer.DDApi.ResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
/**
@ -20,14 +19,16 @@ import lombok.extern.slf4j.Slf4j;
* <p>The configuration of (serviceName,env)->rate is configured by the core agent.
*/
@Slf4j
public class RateByServiceSampler implements Sampler, ResponseListener {
public class RateByServiceSampler implements Sampler, PrioritySampler, ResponseListener {
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";
/** Key for setting the default/baseline rate */
private static final String DEFAULT_KEY = "service:,env:";
private static final double DEFAULT_RATE = 1.0;
private volatile Map<String, RateSampler> serviceRates =
unmodifiableMap(singletonMap(DEFAULT_KEY, new RateSampler(DEFAULT_RATE)));
unmodifiableMap(singletonMap(DEFAULT_KEY, createRateSampler(DEFAULT_RATE)));
@Override
public boolean sample(final DDSpan span) {
@ -37,18 +38,8 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
}
/** If span is a root span, set the span context samplingPriority to keep or drop */
public void initializeSamplingPriority(final DDSpan span) {
if (span.isRootSpan()) {
// Run the priority sampler on the new span
setSamplingPriorityOnSpanContext(span);
} else if (span.getSamplingPriority() == null) {
// Edge case: If the parent context did not set the priority, run the priority sampler.
// Happens when extracted http context did not send the priority header.
setSamplingPriorityOnSpanContext(span);
}
}
private void setSamplingPriorityOnSpanContext(final DDSpan span) {
@Override
public void setSamplingPriority(final DDSpan span) {
final String serviceName = span.getServiceName();
final String env = getSpanEnv(span);
final String key = "service:" + serviceName + ",env:" + env;
@ -59,10 +50,18 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
sampler = rates.get(DEFAULT_KEY);
}
final boolean priorityWasSet;
if (sampler.sample(span)) {
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
} else {
span.setSamplingPriority(PrioritySampling.SAMPLER_DROP);
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_DROP);
}
// Only set metrics if we actually set the sampling priority
// We don't know until the call is completed because the lock is internal to DDSpanContext
if (priorityWasSet) {
span.context().setMetric(SAMPLING_AGENT_RATE, sampler.getSampleRate());
}
}
@ -82,7 +81,7 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
final JsonNode value = newServiceRates.get(key);
try {
if (value instanceof NumericNode) {
updatedServiceRates.put(key, new RateSampler(value.doubleValue()));
updatedServiceRates.put(key, createRateSampler(value.doubleValue()));
} else {
log.debug("Unable to parse new service rate {} -> {}", key, value);
}
@ -91,55 +90,23 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
}
}
if (!updatedServiceRates.containsKey(DEFAULT_KEY)) {
updatedServiceRates.put(DEFAULT_KEY, new RateSampler(DEFAULT_RATE));
updatedServiceRates.put(DEFAULT_KEY, createRateSampler(DEFAULT_RATE));
}
serviceRates = unmodifiableMap(updatedServiceRates);
}
}
/**
* This sampler sample the traces at a predefined rate.
*
* <p>Keep (100 * `sample_rate`)% of the traces. It samples randomly, its main purpose is to
* reduce the integration footprint.
*/
private static class RateSampler extends AbstractSampler {
/** The sample rate used */
private final double sampleRate;
/**
* Build an instance of the sampler. The Sample rate is fixed for each instance.
*
* @param sampleRate a number [0,1] representing the rate ratio.
*/
private RateSampler(double sampleRate) {
if (sampleRate < 0) {
sampleRate = 1;
log.error("SampleRate is negative or null, disabling the sampler");
} else if (sampleRate > 1) {
sampleRate = 1;
}
this.sampleRate = sampleRate;
log.debug("Initializing the RateSampler, sampleRate: {} %", this.sampleRate * 100);
private RateSampler createRateSampler(final double sampleRate) {
final double sanitizedRate;
if (sampleRate < 0) {
log.error("SampleRate is negative or null, disabling the sampler");
sanitizedRate = 1;
} else if (sampleRate > 1) {
sanitizedRate = 1;
} else {
sanitizedRate = sampleRate;
}
@Override
public boolean doSample(final DDSpan span) {
final boolean sample = ThreadLocalRandom.current().nextFloat() <= sampleRate;
log.debug("{} - Span is sampled: {}", span, sample);
return sample;
}
public double getSampleRate() {
return sampleRate;
}
@Override
public String toString() {
return "RateSampler { sampleRate=" + sampleRate + " }";
}
return new KnuthSampler(sanitizedRate);
}
}

View File

@ -0,0 +1,5 @@
package datadog.trace.common.sampling;
public interface RateSampler extends Sampler {
double getSampleRate();
}

View File

@ -0,0 +1,122 @@
package datadog.trace.common.sampling;
import com.google.common.util.concurrent.RateLimiter;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.sampling.SamplingRule.AlwaysMatchesSamplingRule;
import datadog.trace.common.sampling.SamplingRule.OperationSamplingRule;
import datadog.trace.common.sampling.SamplingRule.ServiceSamplingRule;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RuleBasedSampler implements Sampler, PrioritySampler {
private final List<SamplingRule> samplingRules;
private final PrioritySampler fallbackSampler;
private final RateLimiter rateLimiter;
private final double rateLimit;
public static final String SAMPLING_RULE_RATE = "_dd.rule_psr";
public static final String SAMPLING_LIMIT_RATE = "_dd.limit_psr";
public RuleBasedSampler(
final List<SamplingRule> samplingRules,
final double rateLimit,
final PrioritySampler fallbackSampler) {
this.samplingRules = samplingRules;
this.fallbackSampler = fallbackSampler;
rateLimiter = RateLimiter.create(rateLimit);
this.rateLimit = rateLimit;
}
public static RuleBasedSampler build(
final Map<String, String> serviceRules,
final Map<String, String> operationRules,
final Double defaultRate,
final double rateLimit) {
final List<SamplingRule> samplingRules = new ArrayList<>();
if (serviceRules != null) {
for (final Entry<String, String> entry : serviceRules.entrySet()) {
try {
final double rateForEntry = Double.parseDouble(entry.getValue());
final SamplingRule samplingRule =
new ServiceSamplingRule(entry.getKey(), new KnuthSampler(rateForEntry));
samplingRules.add(samplingRule);
} catch (final NumberFormatException e) {
log.error("Unable to parse rate for service: {}", entry, e);
}
}
}
if (operationRules != null) {
for (final Entry<String, String> entry : operationRules.entrySet()) {
try {
final double rateForEntry = Double.parseDouble(entry.getValue());
final SamplingRule samplingRule =
new OperationSamplingRule(entry.getKey(), new KnuthSampler(rateForEntry));
samplingRules.add(samplingRule);
} catch (final NumberFormatException e) {
log.error("Unable to parse rate for operation: {}", entry, e);
}
}
}
if (defaultRate != null) {
final SamplingRule samplingRule =
new AlwaysMatchesSamplingRule(new KnuthSampler(defaultRate));
samplingRules.add(samplingRule);
}
return new RuleBasedSampler(samplingRules, rateLimit, new RateByServiceSampler());
}
@Override
public boolean sample(final DDSpan span) {
return true;
}
@Override
public void setSamplingPriority(final DDSpan span) {
SamplingRule matchedRule = null;
for (final SamplingRule samplingRule : samplingRules) {
if (samplingRule.matches(span)) {
matchedRule = samplingRule;
break;
}
}
if (matchedRule == null) {
fallbackSampler.setSamplingPriority(span);
} else {
final boolean priorityWasSet;
boolean usedRateLimiter = false;
if (matchedRule.sample(span)) {
usedRateLimiter = true;
if (rateLimiter.tryAcquire()) {
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
} else {
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_DROP);
}
} else {
priorityWasSet = span.context().setSamplingPriority(PrioritySampling.SAMPLER_DROP);
}
// Only set metrics if we actually set the sampling priority
// We don't know until the call is completed because the lock is internal to DDSpanContext
if (priorityWasSet) {
span.context().setMetric(SAMPLING_RULE_RATE, matchedRule.getSampler().getSampleRate());
if (usedRateLimiter) {
span.context().setMetric(SAMPLING_LIMIT_RATE, rateLimit);
}
}
}
}
}

View File

@ -2,7 +2,9 @@ package datadog.trace.common.sampling;
import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** Main interface to sample a collection of traces. */
public interface Sampler {
@ -15,11 +17,30 @@ public interface Sampler {
*/
boolean sample(DDSpan span);
@Slf4j
final class Builder {
public static Sampler forConfig(final Config config) {
final Sampler sampler;
Sampler sampler;
if (config != null) {
if (config.isPrioritySamplingEnabled()) {
final Map<String, String> serviceRules = config.getTraceSamplingServiceRules();
final Map<String, String> operationRules = config.getTraceSamplingOperationRules();
if ((serviceRules != null && !serviceRules.isEmpty())
|| (operationRules != null && !operationRules.isEmpty())
|| config.getTraceSamplingDefaultRate() != null) {
try {
sampler =
RuleBasedSampler.build(
serviceRules,
operationRules,
config.getTraceSamplingDefaultRate(),
config.getTraceSamplingRateLimit());
} catch (final IllegalArgumentException e) {
log.error("Invalid sampler configuration. Using AllSampler", e);
sampler = new AllSampler();
}
} else if (config.isPrioritySamplingEnabled()) {
sampler = new RateByServiceSampler();
} else {
sampler = new AllSampler();

View File

@ -0,0 +1,73 @@
package datadog.trace.common.sampling;
import datadog.opentracing.DDSpan;
import java.util.regex.Pattern;
public abstract class SamplingRule {
private final RateSampler sampler;
public SamplingRule(final RateSampler sampler) {
this.sampler = sampler;
}
public abstract boolean matches(DDSpan span);
public boolean sample(final DDSpan span) {
return sampler.sample(span);
}
public RateSampler getSampler() {
return sampler;
}
public static class AlwaysMatchesSamplingRule extends SamplingRule {
public AlwaysMatchesSamplingRule(final RateSampler sampler) {
super(sampler);
}
@Override
public boolean matches(final DDSpan span) {
return true;
}
}
public abstract static class PatternMatchSamplingRule extends SamplingRule {
private final Pattern pattern;
public PatternMatchSamplingRule(final String regex, final RateSampler sampler) {
super(sampler);
this.pattern = Pattern.compile(regex);
}
@Override
public boolean matches(final DDSpan span) {
final String relevantString = getRelevantString(span);
return relevantString != null && pattern.matcher(relevantString).matches();
}
protected abstract String getRelevantString(DDSpan span);
}
public static class ServiceSamplingRule extends PatternMatchSamplingRule {
public ServiceSamplingRule(final String regex, final RateSampler sampler) {
super(regex, sampler);
}
@Override
protected String getRelevantString(final DDSpan span) {
return span.getServiceName();
}
}
public static class OperationSamplingRule extends PatternMatchSamplingRule {
public OperationSamplingRule(final String regex, final RateSampler sampler) {
super(regex, sampler);
}
@Override
protected String getRelevantString(final DDSpan span) {
return span.getOperationName();
}
}
}

View File

@ -4,7 +4,6 @@ import datadog.opentracing.propagation.ExtractedContext
import datadog.opentracing.propagation.TagContext
import datadog.trace.api.Config
import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import io.opentracing.Scope
@ -442,7 +441,7 @@ class DDSpanBuilderTest extends DDSpecification {
expect:
span.traceId != 0G
span.parentId == 0G
span.samplingPriority == PrioritySampling.SAMPLER_KEEP // Since we're using the RateByServiceSampler
span.samplingPriority == null
span.context().origin == tagContext.origin
span.context().baggageItems == [:]
span.context().@tags == tagContext.tags + [(Config.RUNTIME_ID_TAG) : config.getRuntimeId(),

View File

@ -29,9 +29,9 @@ class DDSpanSerializationTest extends DDSpecification {
expected.put("duration", 33000)
expected.put("resource", "operation")
final Map<String, Number> metrics = new HashMap<>()
if (samplingPriority != PrioritySampling.UNSET) {
metrics.put("_sampling_priority_v1", Integer.valueOf(samplingPriority))
metrics.put("_sample_rate", Double.valueOf(1.0))
metrics.put("_sampling_priority_v1", 1)
if (samplingPriority == PrioritySampling.UNSET) { // RateByServiceSampler sets priority
metrics.put("_dd.agent_psr", 1.0d)
}
expected.put("metrics", metrics)
expected.put("start", 100000)
@ -62,9 +62,7 @@ class DDSpanSerializationTest extends DDSpecification {
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
DDSpan span = new DDSpan(100L, context)
if (samplingPriority != PrioritySampling.UNSET) {
span.context().setMetric("_sample_rate", Double.valueOf(1.0))
}
span.finish(133L)
ObjectMapper serializer = new ObjectMapper()

View File

@ -1,15 +1,13 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.propagation.ExtractedContext
import datadog.opentracing.propagation.TagContext
import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import io.opentracing.SpanContext
import spock.lang.Shared
import java.util.concurrent.TimeUnit
@ -20,15 +18,7 @@ class DDSpanTest extends DDSpecification {
def writer = new ListWriter()
def sampler = new RateByServiceSampler()
def tracer = new DDTracer(DEFAULT_SERVICE_NAME, writer, sampler, [:])
@Shared
def defaultSamplingPriority = PrioritySampling.SAMPLER_KEEP
def setup() {
sampler.onResponse("test", new ObjectMapper()
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
}
def "getters and setters"() {
setup:
final DDSpanContext context =
@ -257,70 +247,4 @@ class DDSpanTest extends DDSpecification {
null | true
new ExtractedContext(123G, 456G, 1, "789", [:], [:]) | false
}
def "sampling priority set on init"() {
setup:
def span = tracer.buildSpan("test").start()
expect:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
when:
span.setTag(DDTags.SERVICE_NAME, "spock")
then:
// FIXME: priority currently only applies if service name set before span started.
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
// span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
when:
span = tracer.buildSpan("test").withTag(DDTags.SERVICE_NAME, "spock").start()
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
}
def "setting forced tracing via tag"() {
setup:
def span = tracer.buildSpan("root").start()
if (tagName) {
span.setTag(tagName, tagValue)
}
expect:
span.getSamplingPriority() == expectedPriority
cleanup:
span.finish()
where:
tagName | tagValue | expectedPriority
'manual.drop' | true | PrioritySampling.USER_DROP
'manual.keep' | true | PrioritySampling.USER_KEEP
}
def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
setup:
def span = tracer.buildSpan("root").start()
if (tagName) {
span.setTag(tagName, tagValue)
}
expect:
span.getSamplingPriority() == defaultSamplingPriority
cleanup:
span.finish()
where:
tagName | tagValue
// When no tag is set default to
null | null
// Setting to not known value
'manual.drop' | false
'manual.keep' | false
'manual.drop' | 1
'manual.keep' | 1
}
}

View File

@ -1,14 +1,21 @@
package datadog.trace
import datadog.opentracing.DDSpan
import datadog.opentracing.DDTracer
import datadog.opentracing.propagation.DatadogHttpCodec
import datadog.opentracing.propagation.HttpCodec
import datadog.trace.api.Config
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.sampling.AllSampler
import datadog.trace.common.sampling.PrioritySampler
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.sampling.Sampler
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.util.test.DDSpecification
import io.opentracing.propagation.TextMapInject
import org.junit.Rule
import org.junit.contrib.java.lang.system.EnvironmentVariables
import org.junit.contrib.java.lang.system.RestoreSystemProperties
@ -21,6 +28,7 @@ import static datadog.trace.api.Config.PRIORITY_SAMPLING
import static datadog.trace.api.Config.SERVICE_MAPPING
import static datadog.trace.api.Config.SPAN_TAGS
import static datadog.trace.api.Config.WRITER_TYPE
import static io.opentracing.propagation.Format.Builtin.TEXT_MAP_INJECT
class DDTracerTest extends DDSpecification {
@ -29,16 +37,6 @@ class DDTracerTest extends DDSpecification {
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
def setupSpec() {
// assert that a trace agent isn't running locally as that messes up the test.
try {
(new Socket("localhost", 8126)).close()
throw new IllegalStateException("An agent is already running locally on port 8126. Please stop it if you want to run tests locally.")
} catch (final ConnectException ioe) {
// trace agent is not running locally.
}
}
def "verify defaults on tracer"() {
when:
def tracer = new DDTracer()
@ -46,7 +44,11 @@ class DDTracerTest extends DDSpecification {
then:
tracer.serviceName == "unnamed-java-app"
tracer.sampler instanceof RateByServiceSampler
tracer.writer.toString() == "DDAgentWriter { api=DDApi { tracesUrl=http://localhost:8126/v0.3/traces } }"
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == "localhost"
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" ||
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces"
tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor
tracer.spanContextDecorators.size() == 15
@ -63,8 +65,8 @@ class DDTracerTest extends DDSpecification {
def tracer = new DDTracer(new Config())
then:
tracer.writer.toString() == "DDAgentWriter { api=DDApi { tracesUrl=http://localhost:8126/v0.3/traces }, monitor=StatsD { host=localhost:8125 } }"
tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor
tracer.writer.monitor.hostInfo == "localhost:8125"
}
@ -111,22 +113,44 @@ class DDTracerTest extends DDSpecification {
"a:b,c:d,e:" | [a: "b", c: "d"]
}
def "verify single override on #source for #key"() {
def "verify overriding host"() {
when:
System.setProperty(PREFIX + key, value)
def tracer = new DDTracer(new Config())
then:
tracer."$source".toString() == expected
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == value
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
where:
key | value
"agent.host" | "somethingelse"
}
source | key | value | expected
"writer" | "default" | "default" | "DDAgentWriter { api=DDApi { tracesUrl=http://localhost:8126/v0.3/traces } }"
"writer" | "writer.type" | "LoggingWriter" | "LoggingWriter { }"
"writer" | "agent.host" | "somethingelse" | "DDAgentWriter { api=DDApi { tracesUrl=http://somethingelse:8126/v0.3/traces } }"
"writer" | "agent.port" | "777" | "DDAgentWriter { api=DDApi { tracesUrl=http://localhost:777/v0.3/traces } }"
"writer" | "trace.agent.port" | "9999" | "DDAgentWriter { api=DDApi { tracesUrl=http://localhost:9999/v0.3/traces } }"
def "verify overriding port"() {
when:
System.setProperty(PREFIX + key, value)
def tracer = new DDTracer(new Config())
then:
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == "localhost"
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == Integer.valueOf(value)
where:
key | value
"agent.port" | "777"
"trace.agent.port" | "9999"
}
def "Writer is instance of LoggingWriter when property set"() {
when:
System.setProperty(PREFIX + "writer.type", "LoggingWriter")
def tracer = new DDTracer(new Config())
then:
tracer.writer instanceof LoggingWriter
}
def "verify sampler/writer constructor"() {
@ -174,4 +198,131 @@ class DDTracerTest extends DDSpecification {
child.finish()
root.finish()
}
def "priority sampling when span finishes"() {
given:
Properties properties = new Properties()
properties.setProperty("writer.type", "LoggingWriter")
def tracer = new DDTracer(new Config(properties, Config.get()))
when:
def span = tracer.buildSpan("operation").start()
span.finish()
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
}
def "priority sampling set when child span complete"() {
given:
Properties properties = new Properties()
properties.setProperty("writer.type", "LoggingWriter")
def tracer = new DDTracer(new Config(properties, Config.get()))
when:
def root = tracer.buildSpan("operation").start()
def child = tracer.buildSpan('my_child').asChildOf(root).start()
root.finish()
then:
root.getSamplingPriority() == null
when:
child.finish()
then:
root.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
child.getSamplingPriority() == root.getSamplingPriority()
}
def "span priority set when injecting"() {
given:
Properties properties = new Properties()
properties.setProperty("writer.type", "LoggingWriter")
def tracer = new DDTracer(new Config(properties, Config.get()))
def injector = Mock(TextMapInject)
when:
def root = tracer.buildSpan("operation").start()
def child = tracer.buildSpan('my_child').asChildOf(root).start()
tracer.inject(child.context(), TEXT_MAP_INJECT, injector)
then:
root.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
child.getSamplingPriority() == root.getSamplingPriority()
1 * injector.put(DatadogHttpCodec.SAMPLING_PRIORITY_KEY, String.valueOf(PrioritySampling.SAMPLER_KEEP))
cleanup:
child.finish()
root.finish()
}
def "span priority only set after first injection"() {
given:
def sampler = new ControllableSampler()
def tracer = new DDTracer("serviceName", new LoggingWriter(), sampler)
def injector = Mock(TextMapInject)
when:
def root = tracer.buildSpan("operation").start()
def child = tracer.buildSpan('my_child').asChildOf(root).start()
tracer.inject(child.context(), TEXT_MAP_INJECT, injector)
then:
root.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
child.getSamplingPriority() == root.getSamplingPriority()
1 * injector.put(DatadogHttpCodec.SAMPLING_PRIORITY_KEY, String.valueOf(PrioritySampling.SAMPLER_KEEP))
when:
sampler.nextSamplingPriority = PrioritySampling.SAMPLER_DROP
def child2 = tracer.buildSpan('my_child2').asChildOf(root).start()
tracer.inject(child2.context(), TEXT_MAP_INJECT, injector)
then:
root.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
child.getSamplingPriority() == root.getSamplingPriority()
child2.getSamplingPriority() == root.getSamplingPriority()
1 * injector.put(DatadogHttpCodec.SAMPLING_PRIORITY_KEY, String.valueOf(PrioritySampling.SAMPLER_KEEP))
cleanup:
child.finish()
child2.finish()
root.finish()
}
def "injection doesn't override set priority"() {
given:
def sampler = new ControllableSampler()
def tracer = new DDTracer("serviceName", new LoggingWriter(), sampler)
def injector = Mock(TextMapInject)
when:
def root = tracer.buildSpan("operation").start()
def child = tracer.buildSpan('my_child').asChildOf(root).start()
child.setSamplingPriority(PrioritySampling.USER_DROP)
tracer.inject(child.context(), TEXT_MAP_INJECT, injector)
then:
root.getSamplingPriority() == PrioritySampling.USER_DROP
child.getSamplingPriority() == root.getSamplingPriority()
1 * injector.put(DatadogHttpCodec.SAMPLING_PRIORITY_KEY, String.valueOf(PrioritySampling.USER_DROP))
cleanup:
child.finish()
root.finish()
}
}
class ControllableSampler implements Sampler, PrioritySampler {
protected int nextSamplingPriority = PrioritySampling.SAMPLER_KEEP
@Override
void setSamplingPriority(DDSpan span) {
span.setSamplingPriority(nextSamplingPriority)
}
@Override
boolean sample(DDSpan span) {
return true
}
}

View File

@ -0,0 +1,363 @@
package datadog.trace.api.sampling
import datadog.opentracing.DDSpan
import datadog.trace.common.sampling.KnuthSampler
import datadog.trace.util.test.DDSpecification
class KnuthSamplerTest extends DDSpecification {
def "test known values: #traceId"() {
given:
KnuthSampler sampler = new KnuthSampler(0.5)
DDSpan span = Mock(DDSpan) {
getTraceId() >> traceId
}
when:
def sampled = sampler.sample(span)
then:
sampled == expected
where:
expected | traceId
false | 10428415896243638596G
false | 11199607447739267382G
false | 11273630029763932141G
false | 11407674492757219439G
false | 11792151447964398879G
false | 12432680895096110463G
false | 13126262220165910460G
false | 13174268766980400525G
false | 15505210698284655633G
false | 15649472107743074779G
false | 17204678798284737396G
false | 17344948852394588913G
false | 17496662575514578077G
false | 18252401681137062077G
false | 18317291550776694829G
false | 1874068156324778273G
false | 1905388747193831650G
false | 2202916659517317514G
false | 2227583514184312746G
false | 2338498362660772719G
false | 2781055864473387780G
false | 3328451335138149956G
false | 3337066551442961397G
false | 3409814636252858217G
false | 3510942875414458836G
false | 3784560248718450071G
false | 4751997750760398084G
false | 4831389563158288344G
false | 4990765271833742716G
false | 5089134323978233018G
false | 5199948958991797301G
false | 5577006791947779410G
false | 5600924393587988459G
false | 5793183108815074904G
false | 6263450610539110790G
false | 6382800227808658932G
false | 6651414131918424343G
false | 6842348953158377901G
false | 6941261091797652072G
false | 7273596521315663110G
false | 7504504064263669287G
false | 788787457839692041G
false | 7955079406183515637G
false | 8549944162621642512G
false | 8603989663476771718G
false | 8807817071862113702G
false | 9010467728050264449G
true | 10667007354186551956G
true | 10683692646452562431G
true | 10821471013040158923G
true | 10950412492527322440G
true | 11239168150708129139G
true | 1169089424364679180G
true | 11818186001859264308G
true | 11833901312327420776G
true | 11926759511765359899G
true | 11926873763676642186G
true | 11963748953446345529G
true | 11998794077335055257G
true | 12096659438561119542G
true | 12156940908066221323G
true | 12947799971452915849G
true | 13260572831089785859G
true | 13771804148684671731G
true | 14117161486975057715G
true | 14242321332569825828G
true | 14486903973548550719G
true | 14967026985784794439G
true | 15213854965919594827G
true | 15352856648520921629G
true | 15399114114227588261G
true | 15595235597337683065G
true | 16194613440650274502G
true | 1687184559264975024G
true | 17490665426807838719G
true | 18218388313430417611G
true | 2601737961087659062G
true | 261049867304784443G
true | 2740103009342231109G
true | 2970700287221458280G
true | 3916589616287113937G
true | 4324745483838182873G
true | 4937104021912138218G
true | 5486140987150761883G
true | 5944830206637008055G
true | 6296367092202729479G
true | 6334824724549167320G
true | 6556961545928831643G
true | 6735196588112087610G
true | 7388428680384065704G
true | 8249030965139585917G
true | 837825985403119657G
true | 8505906760983331750G
true | 8674665223082153551G
true | 894385949183117216G
true | 898860202204764712G
true | 9768663798983814715G
true | 9828766684487745566G
true | 9908585559158765387G
true | 9956202364908137547G
}
def "test sampling none: #traceId"() {
given:
KnuthSampler sampler = new KnuthSampler(0)
DDSpan span = Mock(DDSpan) {
getTraceId() >> traceId
}
when:
def sampled = sampler.sample(span)
then:
sampled == expected
// These values are repeated from the "known values test"
// It is an arbitrary subset of all possible traceIds
where:
expected | traceId
false | 10428415896243638596G
false | 11199607447739267382G
false | 11273630029763932141G
false | 11407674492757219439G
false | 11792151447964398879G
false | 12432680895096110463G
false | 13126262220165910460G
false | 13174268766980400525G
false | 15505210698284655633G
false | 15649472107743074779G
false | 17204678798284737396G
false | 17344948852394588913G
false | 17496662575514578077G
false | 18252401681137062077G
false | 18317291550776694829G
false | 1874068156324778273G
false | 1905388747193831650G
false | 2202916659517317514G
false | 2227583514184312746G
false | 2338498362660772719G
false | 2781055864473387780G
false | 3328451335138149956G
false | 3337066551442961397G
false | 3409814636252858217G
false | 3510942875414458836G
false | 3784560248718450071G
false | 4751997750760398084G
false | 4831389563158288344G
false | 4990765271833742716G
false | 5089134323978233018G
false | 5199948958991797301G
false | 5577006791947779410G
false | 5600924393587988459G
false | 5793183108815074904G
false | 6263450610539110790G
false | 6382800227808658932G
false | 6651414131918424343G
false | 6842348953158377901G
false | 6941261091797652072G
false | 7273596521315663110G
false | 7504504064263669287G
false | 788787457839692041G
false | 7955079406183515637G
false | 8549944162621642512G
false | 8603989663476771718G
false | 8807817071862113702G
false | 9010467728050264449G
false | 10667007354186551956G
false | 10683692646452562431G
false | 10821471013040158923G
false | 10950412492527322440G
false | 11239168150708129139G
false | 1169089424364679180G
false | 11818186001859264308G
false | 11833901312327420776G
false | 11926759511765359899G
false | 11926873763676642186G
false | 11963748953446345529G
false | 11998794077335055257G
false | 12096659438561119542G
false | 12156940908066221323G
false | 12947799971452915849G
false | 13260572831089785859G
false | 13771804148684671731G
false | 14117161486975057715G
false | 14242321332569825828G
false | 14486903973548550719G
false | 14967026985784794439G
false | 15213854965919594827G
false | 15352856648520921629G
false | 15399114114227588261G
false | 15595235597337683065G
false | 16194613440650274502G
false | 1687184559264975024G
false | 17490665426807838719G
false | 18218388313430417611G
false | 2601737961087659062G
false | 261049867304784443G
false | 2740103009342231109G
false | 2970700287221458280G
false | 3916589616287113937G
false | 4324745483838182873G
false | 4937104021912138218G
false | 5486140987150761883G
false | 5944830206637008055G
false | 6296367092202729479G
false | 6334824724549167320G
false | 6556961545928831643G
false | 6735196588112087610G
false | 7388428680384065704G
false | 8249030965139585917G
false | 837825985403119657G
false | 8505906760983331750G
false | 8674665223082153551G
false | 894385949183117216G
false | 898860202204764712G
false | 9768663798983814715G
false | 9828766684487745566G
false | 9908585559158765387G
false | 9956202364908137547G
}
def "test sampling all: #traceId"() {
given:
KnuthSampler sampler = new KnuthSampler(1)
DDSpan span = Mock(DDSpan) {
getTraceId() >> traceId
}
when:
def sampled = sampler.sample(span)
then:
sampled == expected
// These values are repeated from the "known values test"
// It is an arbitrary subset of all possible traceIds
where:
expected | traceId
true | 10428415896243638596G
true | 11199607447739267382G
true | 11273630029763932141G
true | 11407674492757219439G
true | 11792151447964398879G
true | 12432680895096110463G
true | 13126262220165910460G
true | 13174268766980400525G
true | 15505210698284655633G
true | 15649472107743074779G
true | 17204678798284737396G
true | 17344948852394588913G
true | 17496662575514578077G
true | 18252401681137062077G
true | 18317291550776694829G
true | 1874068156324778273G
true | 1905388747193831650G
true | 2202916659517317514G
true | 2227583514184312746G
true | 2338498362660772719G
true | 2781055864473387780G
true | 3328451335138149956G
true | 3337066551442961397G
true | 3409814636252858217G
true | 3510942875414458836G
true | 3784560248718450071G
true | 4751997750760398084G
true | 4831389563158288344G
true | 4990765271833742716G
true | 5089134323978233018G
true | 5199948958991797301G
true | 5577006791947779410G
true | 5600924393587988459G
true | 5793183108815074904G
true | 6263450610539110790G
true | 6382800227808658932G
true | 6651414131918424343G
true | 6842348953158377901G
true | 6941261091797652072G
true | 7273596521315663110G
true | 7504504064263669287G
true | 788787457839692041G
true | 7955079406183515637G
true | 8549944162621642512G
true | 8603989663476771718G
true | 8807817071862113702G
true | 9010467728050264449G
true | 10667007354186551956G
true | 10683692646452562431G
true | 10821471013040158923G
true | 10950412492527322440G
true | 11239168150708129139G
true | 1169089424364679180G
true | 11818186001859264308G
true | 11833901312327420776G
true | 11926759511765359899G
true | 11926873763676642186G
true | 11963748953446345529G
true | 11998794077335055257G
true | 12096659438561119542G
true | 12156940908066221323G
true | 12947799971452915849G
true | 13260572831089785859G
true | 13771804148684671731G
true | 14117161486975057715G
true | 14242321332569825828G
true | 14486903973548550719G
true | 14967026985784794439G
true | 15213854965919594827G
true | 15352856648520921629G
true | 15399114114227588261G
true | 15595235597337683065G
true | 16194613440650274502G
true | 1687184559264975024G
true | 17490665426807838719G
true | 18218388313430417611G
true | 2601737961087659062G
true | 261049867304784443G
true | 2740103009342231109G
true | 2970700287221458280G
true | 3916589616287113937G
true | 4324745483838182873G
true | 4937104021912138218G
true | 5486140987150761883G
true | 5944830206637008055G
true | 6296367092202729479G
true | 6334824724549167320G
true | 6556961545928831643G
true | 6735196588112087610G
true | 7388428680384065704G
true | 8249030965139585917G
true | 837825985403119657G
true | 8505906760983331750G
true | 8674665223082153551G
true | 894385949183117216G
true | 898860202204764712G
true | 9768663798983814715G
true | 9828766684487745566G
true | 9908585559158765387G
true | 9956202364908137547G
}
}

View File

@ -2,8 +2,11 @@ package datadog.trace.api.sampling
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.DDSpan
import datadog.opentracing.DDTracer
import datadog.opentracing.SpanFactory
import datadog.trace.api.DDTags
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.util.test.DDSpecification
import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY
@ -38,7 +41,7 @@ class RateByServiceSamplerTest extends DDSpecification {
String response = '{"rate_by_service": {"service:spock,env:test":0.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span1 = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.initializeSamplingPriority(span1)
serviceSampler.setSamplingPriority(span1)
then:
span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span1)
@ -47,7 +50,7 @@ class RateByServiceSamplerTest extends DDSpecification {
response = '{"rate_by_service": {"service:spock,env:test":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span2 = SpanFactory.newSpanOf("spock", "test")
serviceSampler.initializeSamplingPriority(span2)
serviceSampler.setSamplingPriority(span2)
then:
span2.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span2)
@ -61,11 +64,84 @@ class RateByServiceSamplerTest extends DDSpecification {
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.initializeSamplingPriority(span)
serviceSampler.setSamplingPriority(span)
expect:
// sets correctly on root span
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
// RateByServiceSamler must not set the sample rate
span.getMetrics().get("_sample_rate") == null
}
def "sampling priority set when service later"() {
def sampler = new RateByServiceSampler()
def tracer = new DDTracer("serviceName", new LoggingWriter(), sampler)
sampler.onResponse("test", new ObjectMapper()
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
when:
def span = tracer.buildSpan("test").start()
then:
span.getSamplingPriority() == null
when:
span.setTag(DDTags.SERVICE_NAME, "spock")
then:
span.finish()
span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
when:
span = tracer.buildSpan("test").withTag(DDTags.SERVICE_NAME, "spock").start()
span.finish()
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
}
def "setting forced tracing via tag"() {
when:
def sampler = new RateByServiceSampler()
def tracer = new DDTracer("serviceName", new LoggingWriter(), sampler)
def span = tracer.buildSpan("root").start()
if (tagName) {
span.setTag(tagName, tagValue)
}
span.finish()
then:
span.getSamplingPriority() == expectedPriority
where:
tagName | tagValue | expectedPriority
'manual.drop' | true | PrioritySampling.USER_DROP
'manual.keep' | true | PrioritySampling.USER_KEEP
}
def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
setup:
def sampler = new RateByServiceSampler()
def tracer = new DDTracer("serviceName", new LoggingWriter(), sampler)
def span = tracer.buildSpan("root").start()
if (tagName) {
span.setTag(tagName, tagValue)
}
expect:
span.getSamplingPriority() == null
cleanup:
span.finish()
where:
tagName | tagValue
// When no tag is set default to
null | null
// Setting to not known value
'manual.drop' | false
'manual.keep' | false
'manual.drop' | 1
'manual.keep' | 1
}
}

View File

@ -0,0 +1,189 @@
package datadog.trace.api.sampling
import datadog.opentracing.DDSpan
import datadog.opentracing.SpanFactory
import datadog.trace.common.sampling.PrioritySampler
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.sampling.RuleBasedSampler
import datadog.trace.common.sampling.Sampler
import datadog.trace.util.test.DDSpecification
import static datadog.trace.api.Config.TRACE_SAMPLING_DEFAULT_RATE
import static datadog.trace.api.Config.TRACE_SAMPLING_OPERATION_RULES
import static datadog.trace.api.Config.TRACE_SAMPLING_RATE_LIMIT
import static datadog.trace.api.Config.TRACE_SAMPLING_SERVICE_RULES
import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_DROP
import static datadog.trace.api.sampling.PrioritySampling.SAMPLER_KEEP
class RuleBasedSamplingTest extends DDSpecification {
def "Rule Based Sampler is not created when properties not set"() {
when:
Sampler sampler = Sampler.Builder.forConfig(new Properties())
then:
!(sampler instanceof RuleBasedSampler)
}
def "Rule Based Sampler is not created when just rate limit set"() {
when:
Properties properties = new Properties()
properties.setProperty(TRACE_SAMPLING_RATE_LIMIT, "50")
Sampler sampler = Sampler.Builder.forConfig(properties)
then:
!(sampler instanceof RuleBasedSampler)
}
def "sampling config combinations"() {
given:
Properties properties = new Properties()
if (serviceRules != null) {
properties.setProperty(TRACE_SAMPLING_SERVICE_RULES, serviceRules)
}
if (operationRules != null) {
properties.setProperty(TRACE_SAMPLING_OPERATION_RULES, operationRules)
}
if (defaultRate != null) {
properties.setProperty(TRACE_SAMPLING_DEFAULT_RATE, defaultRate)
}
if (rateLimit != null) {
properties.setProperty(TRACE_SAMPLING_RATE_LIMIT, rateLimit)
}
when:
Sampler sampler = Sampler.Builder.forConfig(properties)
then:
sampler instanceof PrioritySampler
when:
DDSpan span = SpanFactory.newSpanOf("service", "bar")
span.setOperationName("operation")
((PrioritySampler) sampler).setSamplingPriority(span)
then:
span.getMetrics().get(RuleBasedSampler.SAMPLING_RULE_RATE) == expectedRuleRate
span.getMetrics().get(RuleBasedSampler.SAMPLING_LIMIT_RATE) == expectedRateLimit
span.getMetrics().get(RateByServiceSampler.SAMPLING_AGENT_RATE) == expectedAgentRate
span.getSamplingPriority() == expectedPriority
where:
serviceRules | operationRules | defaultRate | rateLimit | expectedRuleRate | expectedRateLimit | expectedAgentRate | expectedPriority
// Matching neither passes through to rate based sampler
"xx:1" | null | null | "50" | null | null | 1.0 | SAMPLER_KEEP
null | "xx:1" | null | "50" | null | null | 1.0 | SAMPLER_KEEP
// Matching neither with default rate
null | null | "1" | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | null | "0" | "50" | 0 | null | null | SAMPLER_DROP
"xx:1" | null | "1" | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | "xx:1" | "1" | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"xx:1" | null | "0" | "50" | 0 | null | null | SAMPLER_DROP
null | "xx:1" | "0" | "50" | 0 | null | null | SAMPLER_DROP
// Matching service: keep
"service:1" | null | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"s.*:1" | null | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
".*e:1" | null | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"[a-z]+:1" | null | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
// Matching service: drop
"service:0" | null | null | "50" | 0 | null | null | SAMPLER_DROP
"s.*:0" | null | null | "50" | 0 | null | null | SAMPLER_DROP
".*e:0" | null | null | "50" | 0 | null | null | SAMPLER_DROP
"[a-z]+:0" | null | null | "50" | 0 | null | null | SAMPLER_DROP
// Matching service overrides default rate
"service:1" | null | "0" | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"service:0" | null | "1" | "50" | 0 | null | null | SAMPLER_DROP
// multiple services
"xxx:0,service:1" | null | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"xxx:1,service:0" | null | null | "50" | 0 | null | null | SAMPLER_DROP
// Matching operation : keep
null | "operation:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | "o.*:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | ".*n:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | "[a-z]+:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
// Matching operation: drop
null | "operation:0" | null | "50" | 0 | null | null | SAMPLER_DROP
null | "o.*:0" | null | "50" | 0 | null | null | SAMPLER_DROP
null | ".*n:0" | null | "50" | 0 | null | null | SAMPLER_DROP
null | "[a-z]+:0" | null | "50" | 0 | null | null | SAMPLER_DROP
// Matching operation overrides default rate
null | "operation:1" | "0" | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | "operation:0" | "1" | "50" | 0 | null | null | SAMPLER_DROP
// multiple operation combinations
null | "xxx:0,operation:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
null | "xxx:1,operation:0" | null | "50" | 0 | null | null | SAMPLER_DROP
// Service and operation name combinations
"service:1" | "operation:0" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"service:1" | "xxx:0" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"service:0" | "operation:1" | null | "50" | 0 | null | null | SAMPLER_DROP
"service:0" | "xxx:1" | null | "50" | 0 | null | null | SAMPLER_DROP
"xxx:0" | "operation:1" | null | "50" | 1.0 | 50 | null | SAMPLER_KEEP
"xxx:1" | "operation:0" | null | "50" | 0 | null | null | SAMPLER_DROP
// There are no tests for ordering within service or operation rules because the rule order in that case is unspecified
}
def "Rate limit is set for rate limited spans"() {
when:
Properties properties = new Properties()
properties.setProperty(TRACE_SAMPLING_SERVICE_RULES, "service:1")
properties.setProperty(TRACE_SAMPLING_RATE_LIMIT, "1")
Sampler sampler = Sampler.Builder.forConfig(properties)
DDSpan span1 = SpanFactory.newSpanOf("service", "bar")
DDSpan span2 = SpanFactory.newSpanOf("service", "bar")
((PrioritySampler) sampler).setSamplingPriority(span1)
// Span 2 should be rate limited if there isn't a >1 sec delay between these 2 lines
((PrioritySampler) sampler).setSamplingPriority(span2)
then:
span1.getMetrics().get(RuleBasedSampler.SAMPLING_RULE_RATE) == 1.0
span1.getMetrics().get(RuleBasedSampler.SAMPLING_LIMIT_RATE) == 1.0
span1.getMetrics().get(RateByServiceSampler.SAMPLING_AGENT_RATE) == null
span1.getSamplingPriority() == SAMPLER_KEEP
span2.getMetrics().get(RuleBasedSampler.SAMPLING_RULE_RATE) == 1.0
span2.getMetrics().get(RuleBasedSampler.SAMPLING_LIMIT_RATE) == 1.0
span2.getMetrics().get(RateByServiceSampler.SAMPLING_AGENT_RATE) == null
span2.getSamplingPriority() == SAMPLER_DROP
}
def "Rate limit is set for rate limited spans (matched on different rules)"() {
when:
Properties properties = new Properties()
properties.setProperty(TRACE_SAMPLING_SERVICE_RULES, "service:1,foo:1")
properties.setProperty(TRACE_SAMPLING_RATE_LIMIT, "1")
Sampler sampler = Sampler.Builder.forConfig(properties)
DDSpan span1 = SpanFactory.newSpanOf("service", "bar")
DDSpan span2 = SpanFactory.newSpanOf("foo", "bar")
((PrioritySampler) sampler).setSamplingPriority(span1)
// Span 2 should be rate limited if there isn't a >1 sec delay between these 2 lines
((PrioritySampler) sampler).setSamplingPriority(span2)
then:
span1.getMetrics().get(RuleBasedSampler.SAMPLING_RULE_RATE) == 1.0
span1.getMetrics().get(RuleBasedSampler.SAMPLING_LIMIT_RATE) == 1.0
span1.getMetrics().get(RateByServiceSampler.SAMPLING_AGENT_RATE) == null
span1.getSamplingPriority() == SAMPLER_KEEP
span2.getMetrics().get(RuleBasedSampler.SAMPLING_RULE_RATE) == 1.0
span2.getMetrics().get(RuleBasedSampler.SAMPLING_LIMIT_RATE) == 1.0
span2.getMetrics().get(RateByServiceSampler.SAMPLING_AGENT_RATE) == null
span2.getSamplingPriority() == SAMPLER_DROP
}
}