Merge pull request #986 from DataDog/tyler/sampler-locking

Remove synchronization from RateByServiceSampler
This commit is contained in:
Tyler Benson 2019-09-10 15:43:27 -07:00 committed by GitHub
commit d10b99a5db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 111 additions and 52 deletions

View File

@ -150,9 +150,14 @@ public class ClassLoaderMatcher {
@Override
public boolean matches(final ClassLoader target) {
if (target != null) {
Boolean result = cache.get(target);
if (result != null) {
return result;
}
synchronized (target) {
if (cache.containsKey(target)) {
return cache.get(target);
result = cache.get(target);
if (result != null) {
return result;
}
for (final String name : names) {
if (target.getResource(Utils.getResourceName(name)) == null) {
@ -184,9 +189,14 @@ public class ClassLoaderMatcher {
@Override
public boolean matches(final ClassLoader target) {
if (target != null) {
Boolean result = cache.get(target);
if (result != null) {
return result;
}
synchronized (target) {
if (cache.containsKey(target)) {
return cache.get(target);
result = cache.get(target);
if (result != null) {
return result;
}
try {
final Class<?> aClass = Class.forName(className, false, target);
@ -225,9 +235,14 @@ public class ClassLoaderMatcher {
@Override
public boolean matches(final ClassLoader target) {
if (target != null) {
Boolean result = cache.get(target);
if (result != null) {
return result;
}
synchronized (target) {
if (cache.containsKey(target)) {
return cache.get(target);
result = cache.get(target);
if (result != null) {
return result;
}
try {
final Class<?> aClass = Class.forName(className, false, target);

View File

@ -1,12 +1,17 @@
package datadog.trace.common.sampling;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.writer.DDApi.ResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
/**
@ -16,22 +21,23 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class RateByServiceSampler implements Sampler, ResponseListener {
/** Key for setting the baseline rate */
private static final String BASE_KEY = "service:,env:";
/** Sampler to use if service+env is not in the map */
private RateSampler baseSampler = new RateSampler(1.0);
/** Key for setting the default/baseline rate */
private static final String DEFAULT_KEY = "service:,env:";
private final Map<String, RateSampler> serviceRates = new HashMap<String, RateSampler>();
private static final double DEFAULT_RATE = 1.0;
private volatile Map<String, RateSampler> serviceRates =
unmodifiableMap(singletonMap(DEFAULT_KEY, new RateSampler(DEFAULT_RATE)));
@Override
public synchronized boolean sample(DDSpan span) {
public boolean sample(final DDSpan span) {
// Priority sampling sends all traces to the core agent, including traces marked dropped.
// This allows the core agent to collect stats on all traces.
return true;
}
/** If span is a root span, set the span context samplingPriority to keep or drop */
public void initializeSamplingPriority(DDSpan span) {
public void initializeSamplingPriority(final DDSpan span) {
if (span.isRootSpan()) {
// Run the priority sampler on the new span
setSamplingPriorityOnSpanContext(span);
@ -42,16 +48,17 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
}
}
private synchronized void setSamplingPriorityOnSpanContext(DDSpan span) {
private void setSamplingPriorityOnSpanContext(final DDSpan span) {
final String serviceName = span.getServiceName();
final String env = getSpanEnv(span);
final String key = "service:" + serviceName + ",env:" + env;
final RateSampler sampler;
if (serviceRates.containsKey(key)) {
sampler = serviceRates.get(key);
} else {
sampler = baseSampler;
final Map<String, RateSampler> rates = serviceRates;
RateSampler sampler = serviceRates.get(key);
if (sampler == null) {
sampler = rates.get(DEFAULT_KEY);
}
if (sampler.sample(span)) {
span.setSamplingPriority(PrioritySampling.SAMPLER_KEEP);
} else {
@ -59,32 +66,34 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
}
}
private static String getSpanEnv(DDSpan span) {
private static String getSpanEnv(final DDSpan span) {
return null == span.getTags().get("env") ? "" : String.valueOf(span.getTags().get("env"));
}
@Override
public void onResponse(String endpoint, JsonNode responseJson) {
JsonNode newServiceRates = responseJson.get("rate_by_service");
public void onResponse(final String endpoint, final JsonNode responseJson) {
final JsonNode newServiceRates = responseJson.get("rate_by_service");
if (null != newServiceRates) {
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
synchronized (this) {
serviceRates.clear();
Iterator<String> itr = newServiceRates.fieldNames();
while (itr.hasNext()) {
final String key = itr.next();
try {
final float val = Float.parseFloat(newServiceRates.get(key).toString());
if (BASE_KEY.equals(key)) {
baseSampler = new RateSampler(val);
} else {
serviceRates.put(key, new RateSampler(val));
}
} catch (NumberFormatException nfe) {
log.debug("Unable to parse new service rate {} -> {}", key, newServiceRates.get(key));
final Map<String, RateSampler> updatedServiceRates = new HashMap<>();
final Iterator<String> itr = newServiceRates.fieldNames();
while (itr.hasNext()) {
final String key = itr.next();
final JsonNode value = newServiceRates.get(key);
try {
if (value instanceof NumericNode) {
updatedServiceRates.put(key, new RateSampler(value.doubleValue()));
} else {
log.debug("Unable to parse new service rate {} -> {}", key, value);
}
} catch (final NumberFormatException nfe) {
log.debug("Unable to parse new service rate {} -> {}", key, value);
}
}
if (!updatedServiceRates.containsKey(DEFAULT_KEY)) {
updatedServiceRates.put(DEFAULT_KEY, new RateSampler(DEFAULT_RATE));
}
serviceRates = unmodifiableMap(updatedServiceRates);
}
}
@ -99,18 +108,14 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
/** The sample rate used */
private final double sampleRate;
public RateSampler(final String sampleRate) {
this(sampleRate == null ? 1 : Double.valueOf(sampleRate));
}
/**
* Build an instance of the sampler. The Sample rate is fixed for each instance.
*
* @param sampleRate a number [0,1] representing the rate ratio.
*/
public RateSampler(double sampleRate) {
private RateSampler(double sampleRate) {
if (sampleRate <= 0) {
if (sampleRate < 0) {
sampleRate = 1;
log.error("SampleRate is negative or null, disabling the sampler");
} else if (sampleRate > 1) {
@ -123,13 +128,13 @@ public class RateByServiceSampler implements Sampler, ResponseListener {
@Override
public boolean doSample(final DDSpan span) {
final boolean sample = Math.random() <= this.sampleRate;
final boolean sample = ThreadLocalRandom.current().nextFloat() <= sampleRate;
log.debug("{} - Span is sampled: {}", span, sample);
return sample;
}
public double getSampleRate() {
return this.sampleRate;
return sampleRate;
}
@Override

View File

@ -1,8 +1,11 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.propagation.ExtractedContext
import datadog.opentracing.propagation.TagContext
import datadog.trace.agent.test.utils.ConfigUtils
import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.writer.ListWriter
@ -20,11 +23,17 @@ class DDSpanTest extends Specification {
}
def writer = new ListWriter()
def tracer = new DDTracer(DEFAULT_SERVICE_NAME, writer, new RateByServiceSampler(), [:])
def sampler = new RateByServiceSampler()
def tracer = new DDTracer(DEFAULT_SERVICE_NAME, writer, sampler, [:])
@Shared
def defaultSamplingPriority = PrioritySampling.SAMPLER_KEEP
def setup() {
sampler.onResponse("test", new ObjectMapper()
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
}
def "getters and setters"() {
setup:
final DDSpanContext context =
@ -254,8 +263,29 @@ class DDSpanTest extends Specification {
new ExtractedContext("123", "456", 1, "789", [:], [:]) | false
}
def "setting forced tracing via tag"() {
def "sampling priority set on init"() {
setup:
def span = tracer.buildSpan("test").start()
expect:
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
when:
span.setTag(DDTags.SERVICE_NAME, "spock")
then:
// FIXME: priority currently only applies if service name set before span started.
span.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
// span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
when:
span = tracer.buildSpan("test").withTag(DDTags.SERVICE_NAME, "spock").start()
then:
span.getSamplingPriority() == PrioritySampling.SAMPLER_DROP
}
def "setting forced tracing via tag"() {
setup:
def span = tracer.buildSpan("root").start()
if (tagName) {

View File

@ -19,6 +19,7 @@ import static java.util.Collections.emptyMap
class SpanDecoratorTest extends Specification {
static {
ConfigUtils.makeConfigInstanceModifiable()
ConfigUtils.updateConfig {
System.setProperty("dd.$Config.SPLIT_BY_TAGS", "sn.tag1,sn.tag2")
}

View File

@ -1,5 +1,6 @@
package datadog.opentracing.propagation
import datadog.trace.agent.test.utils.ConfigUtils
import datadog.trace.api.Config
import io.opentracing.SpanContext
import io.opentracing.propagation.TextMapExtractAdapter
@ -11,6 +12,9 @@ import static datadog.trace.api.Config.PropagationStyle.B3
import static datadog.trace.api.Config.PropagationStyle.DATADOG
class HttpExtractorTest extends Specification {
static {
ConfigUtils.makeConfigInstanceModifiable()
}
@Shared
String outOfRangeTraceId = UINT64_MAX.add(BigInteger.ONE)

View File

@ -3,6 +3,7 @@ package datadog.opentracing.propagation
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.agent.test.utils.ConfigUtils
import datadog.trace.api.Config
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.ListWriter
@ -13,6 +14,9 @@ import static datadog.trace.api.Config.PropagationStyle.B3
import static datadog.trace.api.Config.PropagationStyle.DATADOG
class HttpInjectorTest extends Specification {
static {
ConfigUtils.makeConfigInstanceModifiable()
}
def "inject http headers"() {
setup:

View File

@ -6,6 +6,8 @@ import datadog.opentracing.SpanFactory
import datadog.trace.common.sampling.RateByServiceSampler
import spock.lang.Specification
import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY
class RateByServiceSamplerTest extends Specification {
def "invalid rate -> 1"() {
@ -15,13 +17,13 @@ class RateByServiceSamplerTest extends Specification {
String response = '{"rate_by_service": {"service:,env:":' + rate + '}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
expect:
serviceSampler.baseSampler.sampleRate == expectedRate
serviceSampler.serviceRates[DEFAULT_KEY].sampleRate == expectedRate
where:
rate | expectedRate
null | 1
1 | 1
0 | 1
0 | 0.0
-5 | 1
5 | 1
0.5 | 0.5
@ -33,22 +35,20 @@ class RateByServiceSamplerTest extends Specification {
ObjectMapper serializer = new ObjectMapper()
when:
String response = '{"rate_by_service": {"service:,env:":1.0, "service:spock,env:test":0.000001}}'
String response = '{"rate_by_service": {"service:spock,env:test":0.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span1 = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.initializeSamplingPriority(span1)
then:
span1.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span1)
// !serviceSampler.sample(SpanFactory.newSpanOf("spock", "test"))
when:
response = '{"rate_by_service": {"service:,env:":0.000001, "service:spock,env:test":1.0}}'
response = '{"rate_by_service": {"service:spock,env:test":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
DDSpan span2 = SpanFactory.newSpanOf("spock", "test")
serviceSampler.initializeSamplingPriority(span2)
then:
// !serviceSampler.sample(SpanFactory.newSpanOf("foo", "bar"))
span2.getSamplingPriority() == PrioritySampling.SAMPLER_KEEP
serviceSampler.sample(span2)
}