Merge pull request #895 from aantono/haystack
Added support for Haystack trace propagation
This commit is contained in:
commit
b20a32b44d
|
@ -137,7 +137,8 @@ public class Config {
|
|||
|
||||
public enum PropagationStyle {
|
||||
DATADOG,
|
||||
B3
|
||||
B3,
|
||||
HAYSTACK
|
||||
}
|
||||
|
||||
/** A tag intended for internal use only, hence not added to the public api DDTags class. */
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
package datadog.opentracing.propagation;
|
||||
|
||||
import static datadog.opentracing.propagation.HttpCodec.ZERO;
|
||||
import static datadog.opentracing.propagation.HttpCodec.validateUInt64BitsID;
|
||||
|
||||
import datadog.opentracing.DDSpanContext;
|
||||
import datadog.trace.api.sampling.PrioritySampling;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.propagation.TextMap;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* A codec designed for HTTP transport via headers using Haystack headers.
|
||||
*
|
||||
* @author Alex Antonov
|
||||
*/
|
||||
@Slf4j
|
||||
public class HaystackHttpCodec {
|
||||
|
||||
private static final String OT_BAGGAGE_PREFIX = "Baggage-";
|
||||
private static final String TRACE_ID_KEY = "Trace-ID";
|
||||
private static final String SPAN_ID_KEY = "Span-ID";
|
||||
private static final String PARENT_ID_KEY = "Parent_ID";
|
||||
|
||||
private HaystackHttpCodec() {
|
||||
// This class should not be created. This also makes code coverage checks happy.
|
||||
}
|
||||
|
||||
public static class Injector implements HttpCodec.Injector {
|
||||
|
||||
@Override
|
||||
public void inject(final DDSpanContext context, final TextMap carrier) {
|
||||
carrier.put(TRACE_ID_KEY, context.getTraceId());
|
||||
carrier.put(SPAN_ID_KEY, context.getSpanId());
|
||||
carrier.put(PARENT_ID_KEY, context.getParentId());
|
||||
|
||||
for (final Map.Entry<String, String> entry : context.baggageItems()) {
|
||||
carrier.put(OT_BAGGAGE_PREFIX + entry.getKey(), HttpCodec.encode(entry.getValue()));
|
||||
}
|
||||
log.debug("{} - Haystack parent context injected", context.getTraceId());
|
||||
}
|
||||
}
|
||||
|
||||
public static class Extractor implements HttpCodec.Extractor {
|
||||
private final Map<String, String> taggedHeaders;
|
||||
|
||||
/** Creates Header Extractor using Haystack propagation. */
|
||||
public Extractor(final Map<String, String> taggedHeaders) {
|
||||
this.taggedHeaders = new HashMap<>();
|
||||
for (final Map.Entry<String, String> mapping : taggedHeaders.entrySet()) {
|
||||
this.taggedHeaders.put(mapping.getKey().trim().toLowerCase(), mapping.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SpanContext extract(final TextMap carrier) {
|
||||
try {
|
||||
Map<String, String> baggage = Collections.emptyMap();
|
||||
Map<String, String> tags = Collections.emptyMap();
|
||||
String traceId = ZERO;
|
||||
String spanId = ZERO;
|
||||
int samplingPriority = PrioritySampling.SAMPLER_KEEP;
|
||||
String origin = null; // Always null
|
||||
|
||||
for (final Map.Entry<String, String> entry : carrier) {
|
||||
final String key = entry.getKey().toLowerCase();
|
||||
final String value = entry.getValue();
|
||||
|
||||
if (value == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (TRACE_ID_KEY.equalsIgnoreCase(key)) {
|
||||
traceId = validateUInt64BitsID(value, 10);
|
||||
} else if (SPAN_ID_KEY.equalsIgnoreCase(key)) {
|
||||
spanId = validateUInt64BitsID(value, 10);
|
||||
} else if (key.startsWith(OT_BAGGAGE_PREFIX.toLowerCase())) {
|
||||
if (baggage.isEmpty()) {
|
||||
baggage = new HashMap<>();
|
||||
}
|
||||
baggage.put(key.replace(OT_BAGGAGE_PREFIX.toLowerCase(), ""), HttpCodec.decode(value));
|
||||
}
|
||||
|
||||
if (taggedHeaders.containsKey(key)) {
|
||||
if (tags.isEmpty()) {
|
||||
tags = new HashMap<>();
|
||||
}
|
||||
tags.put(taggedHeaders.get(key), HttpCodec.decode(value));
|
||||
}
|
||||
}
|
||||
|
||||
if (!ZERO.equals(traceId)) {
|
||||
final ExtractedContext context =
|
||||
new ExtractedContext(traceId, spanId, samplingPriority, origin, baggage, tags);
|
||||
context.lockSamplingPriority();
|
||||
|
||||
log.debug("{} - Parent context extracted", context.getTraceId());
|
||||
return context;
|
||||
} else if (origin != null || !tags.isEmpty()) {
|
||||
log.debug("Tags context extracted");
|
||||
return new TagContext(origin, tags);
|
||||
}
|
||||
} catch (final RuntimeException e) {
|
||||
log.debug("Exception when extracting context", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -41,6 +41,10 @@ public class HttpCodec {
|
|||
injectors.add(new B3HttpCodec.Injector());
|
||||
continue;
|
||||
}
|
||||
if (style == Config.PropagationStyle.HAYSTACK) {
|
||||
injectors.add(new HaystackHttpCodec.Injector());
|
||||
continue;
|
||||
}
|
||||
log.debug("No implementation found to inject propagation style: {}", style);
|
||||
}
|
||||
return new CompoundInjector(injectors);
|
||||
|
@ -58,6 +62,10 @@ public class HttpCodec {
|
|||
extractors.add(new B3HttpCodec.Extractor(taggedHeaders));
|
||||
continue;
|
||||
}
|
||||
if (style == Config.PropagationStyle.HAYSTACK) {
|
||||
extractors.add(new HaystackHttpCodec.Extractor(taggedHeaders));
|
||||
continue;
|
||||
}
|
||||
log.debug("No implementation found to extract propagation style: {}", style);
|
||||
}
|
||||
return new CompoundExtractor(extractors);
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
package datadog.opentracing.propagation
|
||||
|
||||
import datadog.trace.api.sampling.PrioritySampling
|
||||
import io.opentracing.SpanContext
|
||||
import io.opentracing.propagation.TextMapExtractAdapter
|
||||
import spock.lang.Specification
|
||||
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.OT_BAGGAGE_PREFIX
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.SPAN_ID_KEY
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.TRACE_ID_KEY
|
||||
import static datadog.opentracing.propagation.HttpCodec.UINT64_MAX
|
||||
|
||||
class HaystackHttpExtractorTest extends Specification {
|
||||
|
||||
HttpCodec.Extractor extractor = new HaystackHttpCodec.Extractor(["SOME_HEADER": "some-tag"])
|
||||
|
||||
def "extract http headers"() {
|
||||
setup:
|
||||
def headers = [
|
||||
(TRACE_ID_KEY.toUpperCase()) : traceId,
|
||||
(SPAN_ID_KEY.toUpperCase()) : spanId,
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
|
||||
SOME_HEADER : "my-interesting-info",
|
||||
]
|
||||
|
||||
when:
|
||||
final ExtractedContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
context.traceId == traceId
|
||||
context.spanId == spanId
|
||||
context.baggage == ["k1": "v1", "k2": "v2"]
|
||||
context.tags == ["some-tag": "my-interesting-info"]
|
||||
context.samplingPriority == samplingPriority
|
||||
context.origin == origin
|
||||
|
||||
where:
|
||||
traceId | spanId | samplingPriority | origin
|
||||
"1" | "2" | PrioritySampling.SAMPLER_KEEP | null
|
||||
"2" | "3" | PrioritySampling.SAMPLER_KEEP | null
|
||||
UINT64_MAX.toString() | UINT64_MAX.minus(1).toString() | PrioritySampling.SAMPLER_KEEP | null
|
||||
UINT64_MAX.minus(1).toString() | UINT64_MAX.toString() | PrioritySampling.SAMPLER_KEEP | null
|
||||
}
|
||||
|
||||
def "extract header tags with no propagation"() {
|
||||
when:
|
||||
TagContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
!(context instanceof ExtractedContext)
|
||||
context.getTags() == ["some-tag": "my-interesting-info"]
|
||||
|
||||
|
||||
where:
|
||||
headers | _
|
||||
[SOME_HEADER: "my-interesting-info"] | _
|
||||
}
|
||||
|
||||
def "extract empty headers returns null"() {
|
||||
expect:
|
||||
extractor.extract(new TextMapExtractAdapter(["ignored-header": "ignored-value"])) == null
|
||||
}
|
||||
|
||||
def "extract http headers with invalid non-numeric ID"() {
|
||||
setup:
|
||||
def headers = [
|
||||
(TRACE_ID_KEY.toUpperCase()) : "traceId",
|
||||
(SPAN_ID_KEY.toUpperCase()) : "spanId",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
|
||||
SOME_HEADER : "my-interesting-info",
|
||||
]
|
||||
|
||||
when:
|
||||
SpanContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
context == null
|
||||
}
|
||||
|
||||
def "extract http headers with out of range trace ID"() {
|
||||
setup:
|
||||
String outOfRangeTraceId = UINT64_MAX.add(BigInteger.ONE).toString()
|
||||
def headers = [
|
||||
(TRACE_ID_KEY.toUpperCase()) : outOfRangeTraceId,
|
||||
(SPAN_ID_KEY.toUpperCase()) : "0",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
|
||||
SOME_HEADER : "my-interesting-info",
|
||||
]
|
||||
|
||||
when:
|
||||
SpanContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
context == null
|
||||
}
|
||||
|
||||
def "extract http headers with out of range span ID"() {
|
||||
setup:
|
||||
def headers = [
|
||||
(TRACE_ID_KEY.toUpperCase()) : "0",
|
||||
(SPAN_ID_KEY.toUpperCase()) : "-1",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
|
||||
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
|
||||
SOME_HEADER : "my-interesting-info",
|
||||
]
|
||||
|
||||
when:
|
||||
SpanContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
context == null
|
||||
}
|
||||
|
||||
def "more ID range validation"() {
|
||||
setup:
|
||||
def headers = [
|
||||
(TRACE_ID_KEY.toUpperCase()): traceId,
|
||||
(SPAN_ID_KEY.toUpperCase()) : spanId,
|
||||
]
|
||||
|
||||
when:
|
||||
final ExtractedContext context = extractor.extract(new TextMapExtractAdapter(headers))
|
||||
|
||||
then:
|
||||
if (expectedTraceId) {
|
||||
assert context.traceId == expectedTraceId
|
||||
assert context.spanId == expectedSpanId
|
||||
} else {
|
||||
assert context == null
|
||||
}
|
||||
|
||||
where:
|
||||
gtTraceId | gSpanId | expectedTraceId | expectedSpanId
|
||||
"-1" | "1" | null | "0"
|
||||
"1" | "-1" | null | "0"
|
||||
"0" | "1" | null | "0"
|
||||
"1" | "0" | "1" | "0"
|
||||
"$UINT64_MAX" | "1" | "$UINT64_MAX" | "1"
|
||||
"${UINT64_MAX.plus(1)}" | "1" | null | "1"
|
||||
"1" | "$UINT64_MAX" | "1" | "$UINT64_MAX"
|
||||
"1" | "${UINT64_MAX.plus(1)}" | null | "0"
|
||||
|
||||
traceId = gtTraceId.toString()
|
||||
spanId = gSpanId.toString()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package datadog.opentracing.propagation
|
||||
|
||||
import datadog.opentracing.DDSpanContext
|
||||
import datadog.opentracing.DDTracer
|
||||
import datadog.opentracing.PendingTrace
|
||||
import datadog.trace.api.sampling.PrioritySampling
|
||||
import datadog.trace.common.writer.ListWriter
|
||||
import io.opentracing.propagation.TextMapInjectAdapter
|
||||
import spock.lang.Specification
|
||||
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.OT_BAGGAGE_PREFIX
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.SPAN_ID_KEY
|
||||
import static datadog.opentracing.propagation.HaystackHttpCodec.TRACE_ID_KEY
|
||||
import static datadog.opentracing.propagation.HttpCodec.UINT64_MAX
|
||||
|
||||
class HaystackHttpInjectorTest extends Specification {
|
||||
|
||||
HttpCodec.Injector injector = new HaystackHttpCodec.Injector()
|
||||
|
||||
def "inject http headers"() {
|
||||
setup:
|
||||
def writer = new ListWriter()
|
||||
def tracer = new DDTracer(writer)
|
||||
final DDSpanContext mockedContext =
|
||||
new DDSpanContext(
|
||||
traceId,
|
||||
spanId,
|
||||
"0",
|
||||
"fakeService",
|
||||
"fakeOperation",
|
||||
"fakeResource",
|
||||
samplingPriority,
|
||||
origin,
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("k1", "v1")
|
||||
put("k2", "v2")
|
||||
}
|
||||
},
|
||||
false,
|
||||
"fakeType",
|
||||
null,
|
||||
new PendingTrace(tracer, "1", [:]),
|
||||
tracer)
|
||||
|
||||
final Map<String, String> carrier = Mock()
|
||||
|
||||
when:
|
||||
injector.inject(mockedContext, new TextMapInjectAdapter(carrier))
|
||||
|
||||
then:
|
||||
1 * carrier.put(TRACE_ID_KEY, traceId)
|
||||
1 * carrier.put(SPAN_ID_KEY, spanId)
|
||||
1 * carrier.put(OT_BAGGAGE_PREFIX + "k1", "v1")
|
||||
1 * carrier.put(OT_BAGGAGE_PREFIX + "k2", "v2")
|
||||
|
||||
|
||||
|
||||
where:
|
||||
traceId | spanId | samplingPriority | origin
|
||||
"1" | "2" | PrioritySampling.SAMPLER_KEEP | null
|
||||
"1" | "2" | PrioritySampling.SAMPLER_KEEP | null
|
||||
UINT64_MAX.toString() | UINT64_MAX.minus(1).toString() | PrioritySampling.SAMPLER_KEEP | null
|
||||
UINT64_MAX.minus(1).toString() | UINT64_MAX.toString() | PrioritySampling.SAMPLER_KEEP | null
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue