Remove priority sampling logic from DDTracer
This commit is contained in:
parent
3a76347a21
commit
c1f9f4fc5d
|
@ -97,7 +97,8 @@ public class DDSpan implements Span {
|
||||||
*
|
*
|
||||||
* @return true if root, false otherwise
|
* @return true if root, false otherwise
|
||||||
*/
|
*/
|
||||||
protected final boolean isRootSpan() {
|
@JsonIgnore
|
||||||
|
public final boolean isRootSpan() {
|
||||||
|
|
||||||
if (context().getTrace().isEmpty()) {
|
if (context().getTrace().isEmpty()) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -14,6 +14,7 @@ import datadog.trace.common.sampling.RateByServiceSampler;
|
||||||
import datadog.trace.common.sampling.Sampler;
|
import datadog.trace.common.sampling.Sampler;
|
||||||
import datadog.trace.common.writer.DDAgentWriter;
|
import datadog.trace.common.writer.DDAgentWriter;
|
||||||
import datadog.trace.common.writer.DDApi;
|
import datadog.trace.common.writer.DDApi;
|
||||||
|
import datadog.trace.common.writer.DDApi.ResponseListener;
|
||||||
import datadog.trace.common.writer.Writer;
|
import datadog.trace.common.writer.Writer;
|
||||||
import io.opentracing.Scope;
|
import io.opentracing.Scope;
|
||||||
import io.opentracing.ScopeManager;
|
import io.opentracing.ScopeManager;
|
||||||
|
@ -43,8 +44,6 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
final Writer writer;
|
final Writer writer;
|
||||||
/** Sampler defines the sampling policy in order to reduce the number of traces for instance */
|
/** Sampler defines the sampling policy in order to reduce the number of traces for instance */
|
||||||
final Sampler sampler;
|
final Sampler sampler;
|
||||||
/** Sampler which rates based on the service name */
|
|
||||||
final RateByServiceSampler serviceSampler;
|
|
||||||
|
|
||||||
/** Span context decorators */
|
/** Span context decorators */
|
||||||
private final Map<String, List<AbstractDecorator>> spanContextDecorators = new HashMap<>();
|
private final Map<String, List<AbstractDecorator>> spanContextDecorators = new HashMap<>();
|
||||||
|
@ -65,8 +64,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
this(
|
this(
|
||||||
config.getProperty(DDTraceConfig.SERVICE_NAME),
|
config.getProperty(DDTraceConfig.SERVICE_NAME),
|
||||||
Writer.Builder.forConfig(config),
|
Writer.Builder.forConfig(config),
|
||||||
Sampler.Builder.forConfig(config),
|
Sampler.Builder.forConfig(config));
|
||||||
RateByServiceSampler.Builder.forConfig(config));
|
|
||||||
log.debug("Using config: {}", config);
|
log.debug("Using config: {}", config);
|
||||||
|
|
||||||
// Create decorators from resource files
|
// Create decorators from resource files
|
||||||
|
@ -78,25 +76,16 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
}
|
}
|
||||||
|
|
||||||
public DDTracer(final String serviceName, final Writer writer, final Sampler sampler) {
|
public DDTracer(final String serviceName, final Writer writer, final Sampler sampler) {
|
||||||
this(serviceName, writer, sampler, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public DDTracer(
|
|
||||||
final String serviceName,
|
|
||||||
final Writer writer,
|
|
||||||
final Sampler sampler,
|
|
||||||
RateByServiceSampler serviceSampler) {
|
|
||||||
this.serviceName = serviceName;
|
this.serviceName = serviceName;
|
||||||
this.writer = writer;
|
this.writer = writer;
|
||||||
this.writer.start();
|
this.writer.start();
|
||||||
this.sampler = sampler;
|
this.sampler = sampler;
|
||||||
this.serviceSampler = serviceSampler;
|
|
||||||
registry = new CodecRegistry();
|
registry = new CodecRegistry();
|
||||||
registry.register(Format.Builtin.HTTP_HEADERS, new HTTPCodec());
|
registry.register(Format.Builtin.HTTP_HEADERS, new HTTPCodec());
|
||||||
registry.register(Format.Builtin.TEXT_MAP, new HTTPCodec());
|
registry.register(Format.Builtin.TEXT_MAP, new HTTPCodec());
|
||||||
if (this.writer instanceof DDAgentWriter && serviceSampler != null) {
|
if (this.writer instanceof DDAgentWriter && sampler instanceof DDApi.ResponseListener) {
|
||||||
final DDApi api = ((DDAgentWriter) this.writer).getApi();
|
final DDApi api = ((DDAgentWriter) this.writer).getApi();
|
||||||
api.addResponseListener(this.serviceSampler);
|
api.addResponseListener((DDApi.ResponseListener) this.sampler);
|
||||||
}
|
}
|
||||||
log.info("New instance: {}", this);
|
log.info("New instance: {}", this);
|
||||||
}
|
}
|
||||||
|
@ -178,9 +167,7 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
if (trace.isEmpty()) {
|
if (trace.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// If priority sampling is enabled, send all traces to the agent (even traces marked to drop).
|
if (this.sampler.sample(trace.peek())) {
|
||||||
// Otherwise, use the sampler to drop traces.
|
|
||||||
if (prioritySamplingEnabled() || this.sampler.sample(trace.peek())) {
|
|
||||||
this.writer.write(new ArrayList<>(trace));
|
this.writer.write(new ArrayList<>(trace));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -229,10 +216,6 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean prioritySamplingEnabled() {
|
|
||||||
return null != serviceSampler;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class CodecRegistry {
|
private static class CodecRegistry {
|
||||||
|
|
||||||
private final Map<Format<?>, Codec<?>> codecs = new HashMap<>();
|
private final Map<Format<?>, Codec<?>> codecs = new HashMap<>();
|
||||||
|
@ -276,25 +259,8 @@ public class DDTracer extends ThreadLocalScopeManager implements io.opentracing.
|
||||||
|
|
||||||
private DDSpan startSpan() {
|
private DDSpan startSpan() {
|
||||||
final DDSpan span = new DDSpan(this.timestamp, buildSpanContext());
|
final DDSpan span = new DDSpan(this.timestamp, buildSpanContext());
|
||||||
if (DDTracer.this.prioritySamplingEnabled()) {
|
if (DDTracer.this.sampler instanceof RateByServiceSampler) {
|
||||||
if (span.isRootSpan()) {
|
((RateByServiceSampler) DDTracer.this.sampler).initializeSamplingPriority(span);
|
||||||
// Run the priority sampler on the new span
|
|
||||||
if (DDTracer.this.sampler.sample(span) && DDTracer.this.serviceSampler.sample(span)) {
|
|
||||||
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
|
|
||||||
} else {
|
|
||||||
span.setSamplingPriority(PrioritySampling.SAMPLER_DROP);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (span.getSamplingPriority() == null) {
|
|
||||||
// Edge case: If the parent context did not set the priority, run the priority sampler.
|
|
||||||
// Happens when extracted http context did not send the priority header.
|
|
||||||
if (DDTracer.this.sampler.sample(span) && DDTracer.this.serviceSampler.sample(span)) {
|
|
||||||
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
|
|
||||||
} else {
|
|
||||||
span.setSamplingPriority(PrioritySampling.SAMPLER_DROP);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return span;
|
return span;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package datadog.trace.common;
|
package datadog.trace.common;
|
||||||
|
|
||||||
import datadog.opentracing.DDTracer;
|
import datadog.opentracing.DDTracer;
|
||||||
import datadog.trace.common.sampling.Sampler;
|
|
||||||
import datadog.trace.common.writer.DDAgentWriter;
|
import datadog.trace.common.writer.DDAgentWriter;
|
||||||
import datadog.trace.common.writer.Writer;
|
import datadog.trace.common.writer.Writer;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
|
@ -3,12 +3,10 @@ package datadog.trace.common.sampling;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
import datadog.trace.common.DDTraceConfig;
|
|
||||||
import datadog.trace.common.writer.DDApi.ResponseListener;
|
import datadog.trace.common.writer.DDApi.ResponseListener;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -27,13 +25,37 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized boolean sample(DDSpan span) {
|
public synchronized boolean sample(DDSpan span) {
|
||||||
|
// Priority sampling sends all traces to the core agent, including traces marked dropped.
|
||||||
|
// This allows the core agent to collect stats on all traces.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** If span is a root span, set the span context samplingPriority to keep or drop */
|
||||||
|
public void initializeSamplingPriority(DDSpan span) {
|
||||||
|
if (span.isRootSpan()) {
|
||||||
|
// Run the priority sampler on the new span
|
||||||
|
setSamplingPriorityOnSpanContext(span);
|
||||||
|
} else if (span.getSamplingPriority() == null) {
|
||||||
|
// Edge case: If the parent context did not set the priority, run the priority sampler.
|
||||||
|
// Happens when extracted http context did not send the priority header.
|
||||||
|
setSamplingPriorityOnSpanContext(span);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void setSamplingPriorityOnSpanContext(DDSpan span) {
|
||||||
final String serviceName = span.getServiceName();
|
final String serviceName = span.getServiceName();
|
||||||
final String env = getSpanEnv(span);
|
final String env = getSpanEnv(span);
|
||||||
final String key = "service:" + serviceName + ",env:" + env;
|
final String key = "service:" + serviceName + ",env:" + env;
|
||||||
|
boolean agentSample;
|
||||||
if (serviceRates.containsKey(key)) {
|
if (serviceRates.containsKey(key)) {
|
||||||
return serviceRates.get(key).sample(span);
|
agentSample = serviceRates.get(key).sample(span);
|
||||||
} else {
|
} else {
|
||||||
return baseSampler.sample(span);
|
agentSample = baseSampler.sample(span);
|
||||||
|
}
|
||||||
|
if (agentSample) {
|
||||||
|
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
|
||||||
|
} else {
|
||||||
|
span.setSamplingPriority(PrioritySampling.SAMPLER_DROP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,22 +88,6 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class Builder {
|
|
||||||
public static RateByServiceSampler forConfig(final Properties config) {
|
|
||||||
RateByServiceSampler sampler = null;
|
|
||||||
if (config != null) {
|
|
||||||
final boolean enabled =
|
|
||||||
Boolean.parseBoolean(config.getProperty(DDTraceConfig.PRIORITY_SAMPLING));
|
|
||||||
if (enabled) {
|
|
||||||
sampler = new RateByServiceSampler();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return sampler;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Builder() {}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This sampler sample the traces at a predefined rate.
|
* This sampler sample the traces at a predefined rate.
|
||||||
*
|
*
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package datadog.trace.common.sampling;
|
package datadog.trace.common.sampling;
|
||||||
|
|
||||||
import datadog.opentracing.DDSpan;
|
import datadog.opentracing.DDSpan;
|
||||||
|
import datadog.trace.common.DDTraceConfig;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
/** Main interface to sample a collection of traces. */
|
/** Main interface to sample a collection of traces. */
|
||||||
|
@ -17,7 +18,19 @@ public interface Sampler {
|
||||||
|
|
||||||
final class Builder {
|
final class Builder {
|
||||||
public static Sampler forConfig(final Properties config) {
|
public static Sampler forConfig(final Properties config) {
|
||||||
return new AllSampler();
|
final Sampler sampler;
|
||||||
|
if (config != null) {
|
||||||
|
final boolean prioritySamplingEnabled =
|
||||||
|
Boolean.parseBoolean(config.getProperty(DDTraceConfig.PRIORITY_SAMPLING));
|
||||||
|
if (prioritySamplingEnabled) {
|
||||||
|
sampler = new RateByServiceSampler();
|
||||||
|
} else {
|
||||||
|
sampler = new AllSampler();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sampler = new AllSampler();
|
||||||
|
}
|
||||||
|
return sampler;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Builder() {}
|
private Builder() {}
|
||||||
|
|
|
@ -18,16 +18,36 @@ class RateByServiceSamplerTest extends Specification {
|
||||||
when:
|
when:
|
||||||
String response = '{"rate_by_service": {"service:,env:":1.0, "service:spock,env:test":0.000001}}'
|
String response = '{"rate_by_service": {"service:,env:":1.0, "service:spock,env:test":0.000001}}'
|
||||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||||
|
DDSpan span1 = makeTrace("foo", "bar")
|
||||||
|
serviceSampler.initializeSamplingPriority(span1)
|
||||||
then:
|
then:
|
||||||
serviceSampler.sample(makeTrace("foo", "bar"))
|
span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
|
||||||
|
serviceSampler.sample(span1)
|
||||||
// !serviceSampler.sample(makeTrace("spock", "test"))
|
// !serviceSampler.sample(makeTrace("spock", "test"))
|
||||||
|
|
||||||
when:
|
when:
|
||||||
response = '{"rate_by_service": {"service:,env:":0.000001, "service:spock,env:test":1.0}}'
|
response = '{"rate_by_service": {"service:,env:":0.000001, "service:spock,env:test":1.0}}'
|
||||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||||
|
DDSpan span2 = makeTrace("spock", "test")
|
||||||
|
serviceSampler.initializeSamplingPriority(span2)
|
||||||
then:
|
then:
|
||||||
// !serviceSampler.sample(makeTrace("foo", "bar"))
|
// !serviceSampler.sample(makeTrace("foo", "bar"))
|
||||||
serviceSampler.sample(makeTrace("spock", "test"))
|
span2.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
|
||||||
|
serviceSampler.sample(span2)
|
||||||
|
}
|
||||||
|
|
||||||
|
def "sampling priority set on context"() {
|
||||||
|
setup:
|
||||||
|
RateByServiceSampler serviceSampler = new RateByServiceSampler()
|
||||||
|
ObjectMapper serializer = new ObjectMapper()
|
||||||
|
String response = '{"rate_by_service": {"service:,env:":1.0}}'
|
||||||
|
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||||
|
|
||||||
|
DDSpan span = makeTrace("foo", "bar")
|
||||||
|
serviceSampler.initializeSamplingPriority(span)
|
||||||
|
expect:
|
||||||
|
// sets correctly on root span
|
||||||
|
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
|
||||||
}
|
}
|
||||||
|
|
||||||
private DDSpan makeTrace(String serviceName, String envName) {
|
private DDSpan makeTrace(String serviceName, String envName) {
|
||||||
|
|
Loading…
Reference in New Issue