Change the internal storage and handling of trace ID, span ID, and parent IDs' to use String instead of long primitive.

This will allow the java agent to receive unsigned 64 bit integers without overflow.

Also upgraded MsgPack to handle the serialization of such Strings as numbers in JSONs.
This commit is contained in:
Gary Huang 2018-07-19 13:44:57 -04:00
parent 37ffff3fab
commit 96f79a8aba
20 changed files with 277 additions and 97 deletions

View File

@ -16,29 +16,29 @@ public class CorrelationIdentifier {
}
}
public static long getTraceId() {
public static String getTraceId() {
return provider.get().getTraceId();
}
public static long getSpanId() {
public static String getSpanId() {
return provider.get().getSpanId();
}
public interface Provider {
long getTraceId();
String getTraceId();
long getSpanId();
String getSpanId();
Provider NO_OP =
new Provider() {
@Override
public long getTraceId() {
return 0;
public String getTraceId() {
return "0";
}
@Override
public long getSpanId() {
return 0;
public String getSpanId() {
return "0";
}
};
}

View File

@ -33,7 +33,7 @@ dependencies {
compile deps.jackson
compile deps.slf4j
compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.2'
compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16'
testCompile deps.autoservice
testCompile group: 'org.objenesis', name: 'objenesis', version: '2.6'

View File

@ -4,11 +4,16 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.util.Clock;
import io.opentracing.Span;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.ref.WeakReference;
@ -110,7 +115,7 @@ public class DDSpan implements Span, MutableSpan {
*/
@JsonIgnore
public final boolean isRootSpan() {
return context.getParentId() == 0;
return "0".equals(context.getParentId());
}
@Override
@ -317,17 +322,20 @@ public class DDSpan implements Span, MutableSpan {
}
@JsonGetter("trace_id")
public long getTraceId() {
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getTraceId() {
return context.getTraceId();
}
@JsonGetter("span_id")
public long getSpanId() {
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getSpanId() {
return context.getSpanId();
}
@JsonGetter("parent_id")
public long getParentId() {
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getParentId() {
return context.getParentId();
}
@ -390,4 +398,21 @@ public class DDSpan implements Span, MutableSpan {
.append(durationNano)
.toString();
}
protected static class UInt64IDStringSerializer extends StdSerializer<String> {
public UInt64IDStringSerializer() {
this(null);
}
public UInt64IDStringSerializer(Class<String> stringClass) {
super(stringClass);
}
@Override
public void serialize(String value, JsonGenerator gen, SerializerProvider provider)
throws IOException {
gen.writeNumber(value);
}
}
}

View File

@ -35,9 +35,9 @@ public class DDSpanContext implements io.opentracing.SpanContext {
private final Map<String, String> baggageItems;
// Not Shared with other span contexts
private final long traceId;
private final long spanId;
private final long parentId;
private final String traceId;
private final String spanId;
private final String parentId;
/** Tags are associated to the current span, they will not propagate to the children span */
private final Map<String, Object> tags = new ConcurrentHashMap<>();
@ -67,9 +67,9 @@ public class DDSpanContext implements io.opentracing.SpanContext {
private final long threadId = Thread.currentThread().getId();
public DDSpanContext(
final long traceId,
final long spanId,
final long parentId,
final String traceId,
final String spanId,
final String parentId,
final String serviceName,
final String operationName,
final String resourceName,
@ -111,15 +111,15 @@ public class DDSpanContext implements io.opentracing.SpanContext {
}
}
public long getTraceId() {
public String getTraceId() {
return this.traceId;
}
public long getParentId() {
public String getParentId() {
return this.parentId;
}
public long getSpanId() {
public String getSpanId() {
return this.spanId;
}

View File

@ -456,9 +456,9 @@ public class DDTracer implements io.opentracing.Tracer {
return this;
}
private long generateNewId() {
private String generateNewId() {
// Ensure the generated ID is in a valid range:
return ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE);
return String.valueOf(ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE));
}
/**
@ -468,9 +468,9 @@ public class DDTracer implements io.opentracing.Tracer {
* @return the context
*/
private DDSpanContext buildSpanContext() {
final long traceId;
final long spanId = generateNewId();
final long parentSpanId;
final String traceId;
final String spanId = generateNewId();
final String parentSpanId;
final Map<String, String> baggage;
final PendingTrace parentTrace;
final int samplingPriority;
@ -512,7 +512,7 @@ public class DDTracer implements io.opentracing.Tracer {
// Start a new trace
} else {
traceId = generateNewId();
parentSpanId = 0L;
parentSpanId = "0";
baggage = null;
parentTrace = new PendingTrace(DDTracer.this, traceId);
samplingPriority = PrioritySampling.UNSET;

View File

@ -22,20 +22,20 @@ public class OTTraceCorrelation implements CorrelationIdentifier.Provider {
}
@Override
public long getTraceId() {
public String getTraceId() {
final Span activeSpan = tracer.activeSpan();
if (activeSpan instanceof DDSpan) {
return ((DDSpan) activeSpan).getTraceId();
}
return 0;
return "0";
}
@Override
public long getSpanId() {
public String getSpanId() {
final Span activeSpan = tracer.activeSpan();
if (activeSpan instanceof DDSpan) {
return ((DDSpan) activeSpan).getSpanId();
}
return 0;
return "0";
}
}

View File

@ -29,7 +29,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
}
private final DDTracer tracer;
private final long traceId;
private final String traceId;
// TODO: consider moving these time fields into DDTracer to ensure that traces have precise relative time
/** Trace start time in nano seconds measured up to a millisecond accuracy */
@ -47,7 +47,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
/** Ensure a trace is never written multiple times */
private final AtomicBoolean isWritten = new AtomicBoolean(false);
PendingTrace(final DDTracer tracer, final long traceId) {
PendingTrace(final DDTracer tracer, final String traceId) {
this.tracer = tracer;
this.traceId = traceId;
@ -72,7 +72,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
}
public void registerSpan(final DDSpan span) {
if (span.context().getTraceId() != traceId) {
if (!traceId.equals(span.context().getTraceId())) {
log.debug("{} - span registered for wrong trace ({})", span, traceId);
return;
}
@ -90,7 +90,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
}
private void expireSpan(final DDSpan span) {
if (span.context().getTraceId() != traceId) {
if (!traceId.equals(span.context().getTraceId())) {
log.debug("{} - span expired for wrong trace ({})", span, traceId);
return;
}
@ -111,7 +111,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
log.debug("{} - added to trace, but not complete.", span);
return;
}
if (traceId != span.getTraceId()) {
if (!traceId.equals(span.getTraceId())) {
log.debug("{} - added to a mismatched trace.", span);
return;
}

View File

@ -5,16 +5,16 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
public class ExtractedContext implements SpanContext {
private final Long traceId;
private final Long spanId;
private final String traceId;
private final String spanId;
private final int samplingPriority;
private final Map<String, String> baggage;
private final Map<String, String> tags;
private final AtomicBoolean samplingPriorityLocked = new AtomicBoolean(false);
public ExtractedContext(
final Long traceId,
final Long spanId,
final String traceId,
final String spanId,
final int samplingPriority,
final Map<String, String> baggage,
final Map<String, String> tags) {
@ -34,11 +34,11 @@ public class ExtractedContext implements SpanContext {
samplingPriorityLocked.set(true);
}
public Long getTraceId() {
public String getTraceId() {
return traceId;
}
public Long getSpanId() {
public String getSpanId() {
return spanId;
}

View File

@ -48,16 +48,16 @@ public class HTTPCodec implements Codec<TextMap> {
Map<String, String> baggage = Collections.emptyMap();
Map<String, String> tags = Collections.emptyMap();
Long traceId = 0L;
Long spanId = 0L;
String traceId = "0";
String spanId = "0";
int samplingPriority = PrioritySampling.UNSET;
for (final Map.Entry<String, String> entry : carrier) {
final String key = entry.getKey().toLowerCase();
if (key.equalsIgnoreCase(TRACE_ID_KEY)) {
traceId = Long.parseLong(entry.getValue());
traceId = entry.getValue();
} else if (key.equalsIgnoreCase(SPAN_ID_KEY)) {
spanId = Long.parseLong(entry.getValue());
spanId = entry.getValue();
} else if (key.startsWith(OT_BAGGAGE_PREFIX)) {
if (baggage.isEmpty()) {
baggage = new HashMap<>();
@ -75,7 +75,7 @@ public class HTTPCodec implements Codec<TextMap> {
}
}
ExtractedContext context = null;
if (traceId != 0L) {
if (!traceId.equals("0")) {
context = new ExtractedContext(traceId, spanId, samplingPriority, baggage, tags);
context.lockSamplingPriority();

View File

@ -118,7 +118,7 @@ public class DDApi {
return false;
}
log.debug("Succesfully sent {} of {} traces to the DD agent.", traces.size(), totalSize);
log.debug("Successfully sent {} of {} traces to the DD agent.", traces.size(), totalSize);
try {
if (null != responseString
@ -130,7 +130,7 @@ public class DDApi {
}
}
} catch (final IOException e) {
log.debug("failed to parse DD agent response: " + responseString, e);
log.debug("Failed to parse DD agent response: " + responseString, e);
}
return true;

View File

@ -12,9 +12,9 @@ public class LoggingWriter implements Writer {
@Override
public void write(final List<DDSpan> trace) {
try {
log.info("write(trace): {}", serializer.writeValueAsString(trace));
log.info("Write(trace): {}", serializer.writeValueAsString(trace));
} catch (final Exception e) {
log.error("error writing(trace): {}", trace);
log.error("Error writing(trace) with message: {}. trace: {}", e.getMessage(), trace);
}
}

View File

@ -113,14 +113,14 @@ class DDSpanBuilderTest extends Specification {
def "should link to parent span"() {
setup:
final long spanId = 1L
final String spanId = "1"
final long expectedParentId = spanId
final DDSpanContext mockedContext = mock(DDSpanContext)
when(mockedContext.getSpanId()).thenReturn(spanId)
when(mockedContext.getServiceName()).thenReturn("foo")
when(mockedContext.getTrace()).thenReturn(new PendingTrace(tracer, 1L))
when(mockedContext.getTrace()).thenReturn(new PendingTrace(tracer, "1"))
final String expectedName = "fakeName"
@ -237,8 +237,8 @@ class DDSpanBuilderTest extends Specification {
where:
extractedContext | _
new ExtractedContext(1, 2, 0, [:], [:]) | _
new ExtractedContext(3, 4, 1, ["asdf": "qwer"], ["zxcv": "1234"]) | _
new ExtractedContext("1", "2", 0, [:], [:]) | _
new ExtractedContext("3", "4", 1, ["asdf": "qwer"], ["zxcv": "1234"]) | _
}
def "global span tags populated on each span"() {

View File

@ -39,9 +39,9 @@ class DDSpanSerializationTest extends Specification {
def tracer = new DDTracer(writer)
final DDSpanContext context =
new DDSpanContext(
1L,
2L,
0L,
"1",
"2",
"0",
"service",
"operation",
null,
@ -50,7 +50,7 @@ class DDSpanSerializationTest extends Specification {
false,
"type",
tags,
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())

View File

@ -15,9 +15,9 @@ class DDSpanTest extends Specification {
setup:
final DDSpanContext context =
new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -26,7 +26,7 @@ class DDSpanTest extends Specification {
false,
"fakeType",
null,
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
final DDSpan span = new DDSpan(1L, context)

View File

@ -24,7 +24,7 @@ class OTTraceCorrelationTest extends Specification {
scope.close()
expect:
0 == traceCorrelation.getTraceId()
"0" == traceCorrelation.getTraceId()
}
def "get trace id with trace"() {
@ -37,7 +37,7 @@ class OTTraceCorrelationTest extends Specification {
scope.close()
expect:
0 == traceCorrelation.getSpanId()
"0" == traceCorrelation.getSpanId()
}
def "get span id with trace"() {

View File

@ -13,9 +13,10 @@ class PendingTraceTest extends Specification {
def traceCount = tracer.traceCount
def traceId = System.identityHashCode(this)
String traceIdStr = String.valueOf(traceId)
@Subject
PendingTrace trace = new PendingTrace(tracer, traceId)
PendingTrace trace = new PendingTrace(tracer, traceIdStr)
DDSpan rootSpan = SpanFactory.newSpanOf(trace)
@ -130,7 +131,7 @@ class PendingTraceTest extends Specification {
def "register span to wrong trace fails"() {
setup:
def otherTrace = new PendingTrace(tracer, traceId - 10)
def otherTrace = new PendingTrace(tracer, String.valueOf(traceId - 10))
otherTrace.registerSpan(new DDSpan(0, rootSpan.context()))
expect:
@ -141,7 +142,7 @@ class PendingTraceTest extends Specification {
def "add span to wrong trace fails"() {
setup:
def otherTrace = new PendingTrace(tracer, traceId - 10)
def otherTrace = new PendingTrace(tracer, String.valueOf(traceId - 10))
rootSpan.finish()
otherTrace.addSpan(rootSpan)

View File

@ -8,9 +8,9 @@ class SpanFactory {
def writer = new ListWriter()
def tracer = new DDTracer(writer)
def context = new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -19,16 +19,16 @@ class SpanFactory {
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
return new DDSpan(timestampMicro, context)
}
static newSpanOf(DDTracer tracer) {
def context = new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -37,7 +37,7 @@ class SpanFactory {
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
return new DDSpan(1, context)
}
@ -45,8 +45,8 @@ class SpanFactory {
static newSpanOf(PendingTrace trace) {
def context = new DDSpanContext(
trace.traceId,
1L,
0L,
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -64,9 +64,9 @@ class SpanFactory {
def writer = new ListWriter()
def tracer = new DDTracer(writer)
def context = new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
serviceName,
"fakeOperation",
"fakeResource",
@ -75,7 +75,7 @@ class SpanFactory {
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
context.setTag("env", envName)
return new DDSpan(0l, context)

View File

@ -87,9 +87,9 @@ class URLAsResourceNameTest extends Specification {
when:
final DDSpanContext context =
new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -98,7 +98,7 @@ class URLAsResourceNameTest extends Specification {
false,
"fakeType",
tags,
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
then:

View File

@ -28,9 +28,9 @@ class HTTPCodecTest extends Specification {
def tracer = new DDTracer(writer)
final DDSpanContext mockedContext =
new DDSpanContext(
1L,
2L,
0L,
"1",
"2",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -44,7 +44,7 @@ class HTTPCodecTest extends Specification {
false,
"fakeType",
null,
new PendingTrace(tracer, 1L),
new PendingTrace(tracer, "1"),
tracer)
final Map<String, String> carrier = new HashMap<>()
@ -64,6 +64,96 @@ class HTTPCodecTest extends Specification {
PrioritySampling.SAMPLER_KEEP | _
}
def "inject http headers with larger than Java long IDs"() {
String largeTraceId = "9523372036854775807"
String largeSpanId = "15815582334751494918"
String largeParentId = "15815582334751494914"
setup:
def writer = new ListWriter()
def tracer = new DDTracer(writer)
final DDSpanContext mockedContext =
new DDSpanContext(
largeTraceId,
largeSpanId,
largeParentId,
"fakeService",
"fakeOperation",
"fakeResource",
samplingPriority,
new HashMap<String, String>() {
{
put("k1", "v1")
put("k2", "v2")
}
},
false,
"fakeType",
null,
new PendingTrace(tracer, largeTraceId),
tracer)
final Map<String, String> carrier = new HashMap<>()
codec.inject(mockedContext, new TextMapInjectAdapter(carrier))
expect:
carrier.get(TRACE_ID_KEY) == largeTraceId
carrier.get(SPAN_ID_KEY) == largeSpanId
carrier.get(SAMPLING_PRIORITY_KEY) == (samplingPriority == PrioritySampling.UNSET ? null : String.valueOf(samplingPriority))
carrier.get(OT_BAGGAGE_PREFIX + "k1") == "v1"
carrier.get(OT_BAGGAGE_PREFIX + "k2") == "v2"
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
def "inject http headers with uint 64 max IDs"() {
String largeTraceId = "18446744073709551615"
String largeSpanId = "18446744073709551614"
String largeParentId = "18446744073709551613"
setup:
def writer = new ListWriter()
def tracer = new DDTracer(writer)
final DDSpanContext mockedContext =
new DDSpanContext(
largeTraceId,
largeSpanId,
largeParentId,
"fakeService",
"fakeOperation",
"fakeResource",
samplingPriority,
new HashMap<String, String>() {
{
put("k1", "v1")
put("k2", "v2")
}
},
false,
"fakeType",
null,
new PendingTrace(tracer, largeTraceId),
tracer)
final Map<String, String> carrier = new HashMap<>()
codec.inject(mockedContext, new TextMapInjectAdapter(carrier))
expect:
carrier.get(TRACE_ID_KEY) == largeTraceId
carrier.get(SPAN_ID_KEY) == largeSpanId
carrier.get(SAMPLING_PRIORITY_KEY) == (samplingPriority == PrioritySampling.UNSET ? null : String.valueOf(samplingPriority))
carrier.get(OT_BAGGAGE_PREFIX + "k1") == "v1"
carrier.get(OT_BAGGAGE_PREFIX + "k2") == "v2"
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
def "extract http headers"() {
setup:
final Map<String, String> actual = [
@ -81,8 +171,72 @@ class HTTPCodecTest extends Specification {
final ExtractedContext context = codec.extract(new TextMapExtractAdapter(actual))
expect:
context.getTraceId() == 1l
context.getSpanId() == 2l
context.getTraceId() == "1"
context.getSpanId() == "2"
context.getBaggage().get("k1") == "v1"
context.getBaggage().get("k2") == "v2"
context.getTags() == ["some-tag": "my-interesting-info"]
context.getSamplingPriority() == samplingPriority
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
def "extract http headers with larger than Java long IDs"() {
setup:
String largeTraceId = "9523372036854775807"
String largeSpanId = "15815582334751494918"
final Map<String, String> actual = [
(TRACE_ID_KEY.toUpperCase()) : largeTraceId,
(SPAN_ID_KEY.toUpperCase()) : largeSpanId,
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
SOME_HEADER : "my-interesting-info",
]
if (samplingPriority != PrioritySampling.UNSET) {
actual.put(SAMPLING_PRIORITY_KEY, String.valueOf(samplingPriority))
}
final ExtractedContext context = codec.extract(new TextMapExtractAdapter(actual))
expect:
context.getTraceId() == largeTraceId
context.getSpanId() == largeSpanId
context.getBaggage().get("k1") == "v1"
context.getBaggage().get("k2") == "v2"
context.getTags() == ["some-tag": "my-interesting-info"]
context.getSamplingPriority() == samplingPriority
where:
samplingPriority | _
PrioritySampling.UNSET | _
PrioritySampling.SAMPLER_KEEP | _
}
def "extract http headers with uint 64 max IDs"() {
setup:
String largeTraceId = "18446744073709551615"
String largeSpanId = "18446744073709551614"
final Map<String, String> actual = [
(TRACE_ID_KEY.toUpperCase()) : largeTraceId,
(SPAN_ID_KEY.toUpperCase()) : largeSpanId,
(OT_BAGGAGE_PREFIX.toUpperCase() + "k1"): "v1",
(OT_BAGGAGE_PREFIX.toUpperCase() + "k2"): "v2",
SOME_HEADER : "my-interesting-info",
]
if (samplingPriority != PrioritySampling.UNSET) {
actual.put(SAMPLING_PRIORITY_KEY, String.valueOf(samplingPriority))
}
final ExtractedContext context = codec.extract(new TextMapExtractAdapter(actual))
expect:
context.getTraceId() == largeTraceId
context.getSpanId() == largeSpanId
context.getBaggage().get("k1") == "v1"
context.getBaggage().get("k2") == "v2"
context.getTags() == ["some-tag": "my-interesting-info"]

View File

@ -17,9 +17,9 @@ class DDApiIntegrationTest {
static final WRITER = new ListWriter()
static final TRACER = new DDTracer(WRITER)
static final CONTEXT = new DDSpanContext(
1L,
1L,
0L,
"1",
"1",
"0",
"fakeService",
"fakeOperation",
"fakeResource",
@ -28,7 +28,7 @@ class DDApiIntegrationTest {
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(TRACER, 1L),
new PendingTrace(TRACER, "1"),
TRACER)
def api = new DDApi(DDAgentWriter.DEFAULT_HOSTNAME, DDAgentWriter.DEFAULT_PORT, v4())