Remove Jackson from dd-trace-ot (#1185)

Remove Jackson from dd-trace-ot
This commit is contained in:
Tyler Benson 2020-02-03 15:10:16 -05:00 committed by GitHub
commit 0eac80baa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 465 additions and 161 deletions

View File

@ -37,9 +37,10 @@ dependencies {
compile group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.1.1'
compile deps.jackson
compile deps.slf4j
compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.20'
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7
compile group: 'com.squareup.moshi', name: 'moshi', version: '1.9.2'
compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.23'
compile group: 'com.lmax', name: 'disruptor', version: '3.4.2'
@ -50,6 +51,7 @@ dependencies {
testCompile project(":dd-java-agent:testing")
testCompile project(':utils:gc-utils')
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1'
testCompile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.20'
traceAgentTestCompile deps.testcontainers

View File

@ -3,8 +3,6 @@ package datadog.opentracing;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static io.opentracing.log.Fields.MESSAGE;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.sampling.PrioritySampling;
@ -77,7 +75,6 @@ public class DDSpan implements Span, MutableSpan {
context.getTrace().registerSpan(this);
}
@JsonIgnore
public boolean isFinished() {
return durationNano.get() != 0;
}
@ -120,20 +117,17 @@ public class DDSpan implements Span, MutableSpan {
*
* @return true if root, false otherwise
*/
@JsonIgnore
public final boolean isRootSpan() {
return BigInteger.ZERO.equals(context.getParentId());
}
@Override
@Deprecated
@JsonIgnore
public MutableSpan getRootSpan() {
return getLocalRootSpan();
}
@Override
@JsonIgnore
public MutableSpan getLocalRootSpan() {
return context().getTrace().getRootSpan();
}
@ -297,14 +291,13 @@ public class DDSpan implements Span, MutableSpan {
return this;
}
// Getters and JSON serialisation instructions
// Getters
/**
* Meta merges baggage and tags (stringified values)
*
* @return merged context baggage and tags
*/
@JsonGetter
public Map<String, String> getMeta() {
final Map<String, String> meta = new HashMap<>();
for (final Map.Entry<String, String> entry : context().getBaggageItems().entrySet()) {
@ -321,58 +314,48 @@ public class DDSpan implements Span, MutableSpan {
*
* @return metrics for this span
*/
@JsonGetter
public Map<String, Number> getMetrics() {
return context.getMetrics();
}
@Override
@JsonGetter("start")
public long getStartTime() {
return startTimeNano > 0 ? startTimeNano : TimeUnit.MICROSECONDS.toNanos(startTimeMicro);
}
@Override
@JsonGetter("duration")
public long getDurationNano() {
return durationNano.get();
}
@Override
@JsonGetter("service")
public String getServiceName() {
return context.getServiceName();
}
@JsonGetter("trace_id")
public BigInteger getTraceId() {
return context.getTraceId();
}
@JsonGetter("span_id")
public BigInteger getSpanId() {
return context.getSpanId();
}
@JsonGetter("parent_id")
public BigInteger getParentId() {
return context.getParentId();
}
@Override
@JsonGetter("resource")
public String getResourceName() {
return context.getResourceName();
}
@Override
@JsonGetter("name")
public String getOperationName() {
return context.getOperationName();
}
@Override
@JsonIgnore
public Integer getSamplingPriority() {
final int samplingPriority = context.getSamplingPriority();
if (samplingPriority == PrioritySampling.UNSET) {
@ -383,29 +366,24 @@ public class DDSpan implements Span, MutableSpan {
}
@Override
@JsonIgnore
public String getSpanType() {
return context.getSpanType();
}
@Override
@JsonIgnore
public Map<String, Object> getTags() {
return context().getTags();
}
@JsonGetter
public String getType() {
return context.getSpanType();
}
@Override
@JsonIgnore
public Boolean isError() {
return context.getErrorFlag();
}
@JsonGetter
public int getError() {
return context.getErrorFlag() ? 1 : 0;
}

View File

@ -1,6 +1,5 @@
package datadog.opentracing;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.opentracing.decorators.AbstractDecorator;
import datadog.trace.api.DDTags;
import datadog.trace.api.sampling.PrioritySampling;
@ -289,12 +288,10 @@ public class DDSpanContext implements io.opentracing.SpanContext {
return baggageItems.entrySet();
}
@JsonIgnore
public PendingTrace getTrace() {
return trace;
}
@JsonIgnore
public DDTracer getTracer() {
return tracer;
}

View File

@ -3,13 +3,10 @@ package datadog.trace.common.sampling;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@ -70,23 +67,16 @@ public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentRe
}
@Override
public void onResponse(final String endpoint, final JsonNode responseJson) {
final JsonNode newServiceRates = responseJson.get("rate_by_service");
public void onResponse(
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
final Map<String, Number> newServiceRates = responseJson.get("rate_by_service");
if (null != newServiceRates) {
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
final Map<String, RateSampler> updatedServiceRates = new HashMap<>();
final Iterator<String> itr = newServiceRates.fieldNames();
while (itr.hasNext()) {
final String key = itr.next();
final JsonNode value = newServiceRates.get(key);
try {
if (value instanceof NumericNode) {
updatedServiceRates.put(key, createRateSampler(value.doubleValue()));
} else {
log.debug("Unable to parse new service rate {} -> {}", key, value);
}
} catch (final NumberFormatException nfe) {
log.debug("Unable to parse new service rate {} -> {}", key, value);
for (final Map.Entry<String, Number> entry : newServiceRates.entrySet()) {
if (entry.getValue() != null) {
updatedServiceRates.put(
entry.getKey(), createRateSampler(entry.getValue().doubleValue()));
}
}
if (!updatedServiceRates.containsKey(DEFAULT_KEY)) {

View File

@ -0,0 +1,101 @@
package datadog.trace.common.serialization;
import datadog.opentracing.DDSpan;
import java.io.IOException;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
public abstract class FormatWriter<DEST> {
public abstract void writeKey(String key, DEST destination) throws IOException;
public abstract void writeListHeader(int size, DEST destination) throws IOException;
public abstract void writeListFooter(DEST destination) throws IOException;
public abstract void writeMapHeader(int size, DEST destination) throws IOException;
public abstract void writeMapFooter(DEST destination) throws IOException;
public abstract void writeString(String key, String value, DEST destination) throws IOException;
public abstract void writeShort(String key, short value, DEST destination) throws IOException;
public abstract void writeByte(String key, byte value, DEST destination) throws IOException;
public abstract void writeInt(String key, int value, DEST destination) throws IOException;
public abstract void writeLong(String key, long value, DEST destination) throws IOException;
public abstract void writeFloat(String key, float value, DEST destination) throws IOException;
public abstract void writeDouble(String key, double value, DEST destination) throws IOException;
public abstract void writeBigInteger(String key, BigInteger value, DEST destination)
throws IOException;
public void writeNumber(final String key, final Number value, final DEST destination)
throws IOException {
if (value instanceof Double) {
writeDouble(key, value.doubleValue(), destination);
} else if (value instanceof Long) {
writeLong(key, value.longValue(), destination);
} else if (value instanceof Integer) {
writeInt(key, value.intValue(), destination);
} else if (value instanceof Float) {
writeFloat(key, value.floatValue(), destination);
} else if (value instanceof Byte) {
writeByte(key, value.byteValue(), destination);
} else if (value instanceof Short) {
writeShort(key, value.shortValue(), destination);
}
}
public void writeNumberMap(
final String key, final Map<String, Number> value, final DEST destination)
throws IOException {
writeKey(key, destination);
writeMapHeader(value.size(), destination);
for (final Map.Entry<String, Number> entry : value.entrySet()) {
writeNumber(entry.getKey(), entry.getValue(), destination);
}
writeMapFooter(destination);
}
public void writeStringMap(
final String key, final Map<String, String> value, final DEST destination)
throws IOException {
writeKey(key, destination);
writeMapHeader(value.size(), destination);
for (final Map.Entry<String, String> entry : value.entrySet()) {
writeString(entry.getKey(), entry.getValue(), destination);
}
writeMapFooter(destination);
}
public void writeTrace(final List<DDSpan> trace, final DEST destination) throws IOException {
writeListHeader(trace.size(), destination);
for (final DDSpan span : trace) {
writeDDSpan(span, destination);
}
writeListFooter(destination);
}
public void writeDDSpan(final DDSpan span, final DEST destination) throws IOException {
// Some of the tests rely on the specific ordering here.
writeMapHeader(12, destination); // must match count below.
/* 1 */ writeString("service", span.getServiceName(), destination);
/* 2 */ writeString("name", span.getOperationName(), destination);
/* 3 */ writeString("resource", span.getResourceName(), destination);
/* 4 */ writeBigInteger("trace_id", span.getTraceId(), destination);
/* 5 */ writeBigInteger("span_id", span.getSpanId(), destination);
/* 6 */ writeBigInteger("parent_id", span.getParentId(), destination);
/* 7 */ writeLong("start", span.getStartTime(), destination);
/* 8 */ writeLong("duration", span.getDurationNano(), destination);
/* 9 */ writeString("type", span.getType(), destination);
/* 10 */ writeInt("error", span.getError(), destination);
/* 11 */ writeNumberMap("metrics", span.getMetrics(), destination);
/* 12 */ writeStringMap("meta", span.getMeta(), destination);
writeMapFooter(destination);
}
}

View File

@ -0,0 +1,130 @@
package datadog.trace.common.serialization;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.opentracing.DDSpan;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.math.BigInteger;
import java.util.List;
import java.util.Set;
public class JsonFormatWriter extends FormatWriter<JsonWriter> {
private static final Moshi MOSHI = new Moshi.Builder().add(DDSpanAdapter.FACTORY).build();
public static final JsonAdapter<List<DDSpan>> TRACE_ADAPTER =
MOSHI.adapter(Types.newParameterizedType(List.class, DDSpan.class));
public static final JsonAdapter<DDSpan> SPAN_ADAPTER = MOSHI.adapter(DDSpan.class);
public static JsonFormatWriter JSON_WRITER = new JsonFormatWriter();
@Override
public void writeKey(final String key, final JsonWriter destination) throws IOException {
destination.name(key);
}
@Override
public void writeListHeader(final int size, final JsonWriter destination) throws IOException {
destination.beginArray();
}
@Override
public void writeListFooter(final JsonWriter destination) throws IOException {
destination.endArray();
}
@Override
public void writeMapHeader(final int size, final JsonWriter destination) throws IOException {
destination.beginObject();
}
@Override
public void writeMapFooter(final JsonWriter destination) throws IOException {
destination.endObject();
}
@Override
public void writeString(final String key, final String value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeShort(final String key, final short value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeByte(final String key, final byte value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeInt(final String key, final int value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeLong(final String key, final long value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeFloat(final String key, final float value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeDouble(final String key, final double value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeBigInteger(
final String key, final BigInteger value, final JsonWriter destination) throws IOException {
destination.name(key);
destination.value(value);
}
static class DDSpanAdapter extends JsonAdapter<DDSpan> {
public static final JsonAdapter.Factory FACTORY =
new JsonAdapter.Factory() {
@Override
public JsonAdapter<?> create(
final Type type, final Set<? extends Annotation> annotations, final Moshi moshi) {
final Class<?> rawType = Types.getRawType(type);
if (rawType.isAssignableFrom(DDSpan.class)) {
return new DDSpanAdapter();
}
return null;
}
};
@Override
public DDSpan fromJson(final JsonReader reader) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void toJson(final JsonWriter writer, final DDSpan value) throws IOException {
JSON_WRITER.writeDDSpan(value, writer);
}
}
}

View File

@ -0,0 +1,87 @@
package datadog.trace.common.serialization;
import java.io.IOException;
import java.math.BigInteger;
import org.msgpack.core.MessagePacker;
public class MsgpackFormatWriter extends FormatWriter<MessagePacker> {
public static MsgpackFormatWriter MSGPACK_WRITER = new MsgpackFormatWriter();
@Override
public void writeKey(final String key, final MessagePacker destination) throws IOException {
destination.packString(key);
}
@Override
public void writeListHeader(final int size, final MessagePacker destination) throws IOException {
destination.packArrayHeader(size);
}
@Override
public void writeListFooter(final MessagePacker destination) throws IOException {}
@Override
public void writeMapHeader(final int size, final MessagePacker destination) throws IOException {
destination.packMapHeader(size);
}
@Override
public void writeMapFooter(final MessagePacker destination) throws IOException {}
@Override
public void writeString(final String key, final String value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packString(value);
}
@Override
public void writeShort(final String key, final short value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packShort(value);
}
@Override
public void writeByte(final String key, final byte value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packByte(value);
}
@Override
public void writeInt(final String key, final int value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packInt(value);
}
@Override
public void writeLong(final String key, final long value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packLong(value);
}
@Override
public void writeFloat(final String key, final float value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packFloat(value);
}
@Override
public void writeDouble(final String key, final double value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packDouble(value);
}
@Override
public void writeBigInteger(
final String key, final BigInteger value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packBigInteger(value);
}
}

View File

@ -1,25 +1,29 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.databind.ObjectMapper;
import static datadog.trace.common.serialization.JsonFormatWriter.TRACE_ADAPTER;
import datadog.opentracing.DDSpan;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LoggingWriter implements Writer {
private final ObjectMapper serializer = new ObjectMapper();
@Override
public void write(final List<DDSpan> trace) {
if (log.isInfoEnabled()) {
try {
log.info("write(trace): {}", serializer.writeValueAsString(trace));
log.info("write(trace): {}", toString(trace));
} catch (final Exception e) {
log.error("error writing(trace): {}", trace);
}
}
}
private String toString(final List<DDSpan> trace) {
return TRACE_ADAPTER.toJson(trace);
}
@Override
public void incrementTraceCount() {
log.info("incrementTraceCount()");

View File

@ -1,9 +1,10 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.opentracing.ContainerInfo;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
@ -12,8 +13,8 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
@ -24,7 +25,7 @@ import okhttp3.RequestBody;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.msgpack.core.buffer.ArrayBufferOutput;
/** The API pointing to a DD agent */
@Slf4j
@ -47,7 +48,14 @@ public class DDAgentApi {
private volatile long nextAllowedLogTime = 0;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
private static final JsonAdapter<Map<String, Map<String, Number>>> RESPONSE_ADAPTER =
new Moshi.Builder()
.build()
.adapter(
Types.newParameterizedType(
Map.class,
String.class,
Types.newParameterizedType(Map.class, String.class, Double.class)));
private static final MediaType MSGPACK = MediaType.get("application/msgpack");
private final OkHttpClient httpClient;
@ -57,7 +65,7 @@ public class DDAgentApi {
this(
host,
port,
traceEndpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath),
endpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath, true),
unixDomainSocketPath);
}
@ -97,7 +105,7 @@ public class DDAgentApi {
final byte[] serializedTrace = serializeTrace(trace);
sizeInBytes += serializedTrace.length;
serializedTraces.add(serializedTrace);
} catch (final JsonProcessingException e) {
} catch (final IOException e) {
log.warn("Error serializing trace", e);
// TODO: DQH - Incorporate the failed serialization into the Response object???
@ -107,8 +115,13 @@ public class DDAgentApi {
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
}
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(trace);
byte[] serializeTrace(final List<DDSpan> trace) throws IOException {
// TODO: reuse byte array buffer
final ArrayBufferOutput output = new ArrayBufferOutput();
final MessagePacker packer = MessagePack.newDefaultPacker(output);
MSGPACK_WRITER.writeTrace(trace, packer);
packer.flush();
return output.toByteArray();
}
Response sendSerializedTraces(
@ -183,17 +196,16 @@ public class DDAgentApi {
final String responseString = response.body().string().trim();
try {
if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final Map<String, Map<String, Number>> parsedResponse =
RESPONSE_ADAPTER.fromJson(responseString);
final String endpoint = tracesUrl.toString();
for (final DDAgentResponseListener listener : responseListeners) {
listener.onResponse(endpoint, parsedResponse);
}
return Response.success(response.code(), parsedResponse);
}
return Response.success(response.code());
} catch (final JsonParseException e) {
} catch (final IOException e) {
log.debug("Failed to parse DD agent response: " + responseString, e);
return Response.success(response.code(), e);
@ -222,19 +234,13 @@ public class DDAgentApi {
}
}
private static boolean traceEndpointAvailable(
final HttpUrl url, final String unixDomainSocketPath) {
return endpointAvailable(url, unixDomainSocketPath, Collections.emptyList(), true);
}
private static final byte[] EMPTY_LIST = new byte[] {MessagePack.Code.FIXARRAY_PREFIX};
private static boolean endpointAvailable(
final HttpUrl url,
final String unixDomainSocketPath,
final Object data,
final boolean retry) {
final HttpUrl url, final String unixDomainSocketPath, final boolean retry) {
try {
final OkHttpClient client = buildHttpClient(unixDomainSocketPath);
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data));
final RequestBody body = RequestBody.create(MSGPACK, EMPTY_LIST);
final Request request = prepareRequest(url).put(body).build();
try (final okhttp3.Response response = client.newCall(request).execute()) {
@ -242,7 +248,7 @@ public class DDAgentApi {
}
} catch (final IOException e) {
if (retry) {
return endpointAvailable(url, unixDomainSocketPath, data, false);
return endpointAvailable(url, unixDomainSocketPath, false);
}
}
return false;
@ -307,42 +313,31 @@ public class DDAgentApi {
public static final class Response {
/** Factory method for a successful request with a trivial response body */
public static final Response success(final int status) {
return new Response(true, status, null, null);
}
/** Factory method for a successful request with a well-formed JSON response body */
public static final Response success(final int status, final JsonNode json) {
return new Response(true, status, json, null);
return new Response(true, status, null);
}
/** Factory method for a successful request will a malformed response body */
public static final Response success(final int status, final Throwable exception) {
return new Response(true, status, null, exception);
return new Response(true, status, exception);
}
/** Factory method for a request that receive an error status in response */
public static final Response failed(final int status) {
return new Response(false, status, null, null);
return new Response(false, status, null);
}
/** Factory method for a failed communication attempt */
public static final Response failed(final Throwable exception) {
return new Response(false, null, null, exception);
return new Response(false, null, exception);
}
private final boolean success;
private final Integer status;
private final JsonNode json;
private final Throwable exception;
private Response(
final boolean success,
final Integer status,
final JsonNode json,
final Throwable exception) {
private Response(final boolean success, final Integer status, final Throwable exception) {
this.success = success;
this.status = status;
this.json = json;
this.exception = exception;
}
@ -355,10 +350,6 @@ public class DDAgentApi {
return status;
}
public final JsonNode json() {
return json;
}
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
public final Throwable exception() {
return exception;

View File

@ -1,8 +1,8 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
public interface DDAgentResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
void onResponse(String endpoint, Map<String, Map<String, Number>> responseJson);
}

View File

@ -1,6 +1,5 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
@ -74,9 +73,6 @@ public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
monitor.onSerialize(writer, event.data, serializedTrace);
event.representativeCount = 0; // reset in case flush is invoked below.
} catch (final JsonProcessingException e) {
log.debug("Error serializing trace", e);
monitor.onFailedSerialize(writer, event.data, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
monitor.onFailedSerialize(writer, event.data, e);

View File

@ -1,43 +1,48 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.Maps
import com.squareup.moshi.Moshi
import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.core.buffer.ArrayBufferInput
import org.msgpack.jackson.dataformat.MessagePackFactory
import org.msgpack.core.buffer.ArrayBufferOutput
import org.msgpack.value.ValueType
import static datadog.trace.common.serialization.JsonFormatWriter.SPAN_ADAPTER
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
class DDSpanSerializationTest extends DDSpecification {
def "serialize spans with sampling #samplingPriority"() throws Exception {
setup:
final Map<String, String> baggage = new HashMap<>()
baggage.put("a-baggage", "value")
final Map<String, Object> tags = new HashMap<>()
baggage.put("k1", "v1")
def jsonAdapter = new Moshi.Builder().build().adapter(Map)
Map<String, Object> expected = Maps.newHashMap()
expected.put("meta", baggage)
expected.put("service", "service")
expected.put("error", 0)
expected.put("type", "type")
expected.put("name", "operation")
expected.put("duration", 33000)
expected.put("resource", "operation")
final Map<String, Number> metrics = new HashMap<>()
metrics.put("_sampling_priority_v1", 1)
final Map<String, Number> metrics = ["_sampling_priority_v1": 1]
if (samplingPriority == PrioritySampling.UNSET) { // RateByServiceSampler sets priority
metrics.put("_dd.agent_psr", 1.0d)
}
expected.put("metrics", metrics)
expected.put("start", 100000)
expected.put("span_id", 2l)
expected.put("parent_id", 0l)
expected.put("trace_id", 1l)
Map<String, Object> expected = [
service : "service",
name : "operation",
resource : "operation",
trace_id : 1l,
span_id : 2l,
parent_id: 0l,
start : 100000,
duration : 33000,
type : "type",
error : 0,
metrics : metrics,
meta : [
"a-baggage" : "value",
"k1" : "v1",
(DDTags.THREAD_NAME): Thread.currentThread().getName(),
(DDTags.THREAD_ID) : String.valueOf(Thread.currentThread().getId()),
],
]
def writer = new ListWriter()
def tracer = DDTracer.builder().writer(writer).build()
@ -51,23 +56,19 @@ class DDSpanSerializationTest extends DDSpecification {
null,
samplingPriority,
null,
new HashMap<>(baggage),
["a-baggage": "value"],
false,
"type",
tags,
["k1": "v1"],
new PendingTrace(tracer, 1G, [:]),
tracer)
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
DDSpan span = new DDSpan(100L, context)
span.finish(133L)
ObjectMapper serializer = new ObjectMapper()
def actualTree = serializer.readTree(serializer.writeValueAsString(span))
def expectedTree = serializer.readTree(serializer.writeValueAsString(expected))
def actualTree = jsonAdapter.fromJson(SPAN_ADAPTER.toJson(span))
def expectedTree = jsonAdapter.fromJson(jsonAdapter.toJson(expected))
expect:
actualTree == expectedTree
@ -79,7 +80,6 @@ class DDSpanSerializationTest extends DDSpecification {
def "serialize trace/span with id #value as int"() {
setup:
def objectMapper = new ObjectMapper(new MessagePackFactory())
def writer = new ListWriter()
def tracer = DDTracer.builder().writer(writer).build()
def context = new DDSpanContext(
@ -98,7 +98,11 @@ class DDSpanSerializationTest extends DDSpecification {
new PendingTrace(tracer, 1G, [:]),
tracer)
def span = new DDSpan(0, context)
byte[] bytes = objectMapper.writeValueAsBytes(span)
def buffer = new ArrayBufferOutput()
def packer = MessagePack.newDefaultPacker(buffer)
MSGPACK_WRITER.writeDDSpan(span, packer)
packer.flush()
byte[] bytes = buffer.toByteArray()
def unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes))
int size = unpacker.unpackMapHeader()

View File

@ -1,24 +1,25 @@
package datadog.trace.api.sampling
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.DDSpan
import datadog.opentracing.DDTracer
import datadog.opentracing.SpanFactory
import datadog.trace.api.DDTags
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.util.test.DDSpecification
import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY
class RateByServiceSamplerTest extends DDSpecification {
static serializer = DDAgentApi.RESPONSE_ADAPTER
def "invalid rate -> 1"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
String response = '{"rate_by_service": {"service:,env:":' + rate + '}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
expect:
serviceSampler.serviceRates[DEFAULT_KEY].sampleRate == expectedRate
@ -35,11 +36,10 @@ class RateByServiceSamplerTest extends DDSpecification {
def "rate by service name"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
when:
String response = '{"rate_by_service": {"service:spock,env:test":0.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span1 = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.setSamplingPriority(span1)
then:
@ -48,7 +48,7 @@ class RateByServiceSamplerTest extends DDSpecification {
when:
response = '{"rate_by_service": {"service:spock,env:test":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span2 = SpanFactory.newSpanOf("spock", "test")
serviceSampler.setSamplingPriority(span2)
then:
@ -59,9 +59,8 @@ class RateByServiceSamplerTest extends DDSpecification {
def "sampling priority set on context"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
String response = '{"rate_by_service": {"service:,env:":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.setSamplingPriority(span)
@ -76,8 +75,8 @@ class RateByServiceSamplerTest extends DDSpecification {
def sampler = new RateByServiceSampler()
def tracer = DDTracer.builder().writer(new LoggingWriter()).sampler(sampler).build()
sampler.onResponse("test", new ObjectMapper()
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
sampler.onResponse("test", serializer
.fromJson('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
when:
def span = tracer.buildSpan("test").start()

View File

@ -1,11 +1,12 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import org.msgpack.jackson.dataformat.MessagePackFactory
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
@ -13,7 +14,7 @@ import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
class DDAgentApiTest extends DDSpecification {
static mapper = DDAgentApi.OBJECT_MAPPER
static mapper = new ObjectMapper(new MessagePackFactory())
def "sending an empty list of traces returns no errors"() {
setup:
@ -124,15 +125,15 @@ class DDAgentApiTest extends DDSpecification {
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
agentResponse.set(responseJson.toString())
def agentResponse = new AtomicReference<Map>(null)
DDAgentResponseListener responseListener = { String endpoint, Map responseJson ->
agentResponse.set(responseJson)
}
def agent = httpServer {
handlers {
put("v0.4/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send('{"hello":"test"}')
response.status(status).send('{"hello":{}}')
}
}
}
@ -142,7 +143,7 @@ class DDAgentApiTest extends DDSpecification {
when:
client.sendTraces([[], [], []])
then:
agentResponse.get() == '{"hello":"test"}'
agentResponse.get() == ["hello": [:]]
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"

View File

@ -11,6 +11,8 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.core.buffer.ArrayBufferOutput
import spock.lang.Retry
import spock.lang.Timeout
@ -21,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import static datadog.opentracing.SpanFactory.newSpanOf
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
@ -206,7 +209,7 @@ class DDAgentWriterTest extends DDSpecification {
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
traceSize = calculateSize(minimalTrace)
maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
@ -667,4 +670,12 @@ class DDAgentWriterTest extends DDSpecification {
cleanup:
writer.close()
}
static int calculateSize(List<DDSpan> trace) {
def buffer = new ArrayBufferOutput()
def packer = MessagePack.newDefaultPacker(buffer)
MSGPACK_WRITER.writeTrace(trace, packer)
packer.flush()
return buffer.size
}
}

View File

@ -0,0 +1,21 @@
package datadog.trace.api.writer
import datadog.opentracing.DDTracer
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.util.test.DDSpecification
import spock.lang.Subject
class LoggingWriterTest extends DDSpecification {
@Subject
def writer = new LoggingWriter()
def tracer = Mock(DDTracer)
def sampleTrace = [SpanFactory.newSpanOf(tracer), SpanFactory.newSpanOf(tracer)]
def "test toString"() {
expect:
writer.toString(sampleTrace).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0,"start":1000,"duration":0,"type":"fakeType","error":0,"metrics":{},"meta":{')
}
}

View File

@ -1,4 +1,3 @@
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
@ -63,11 +62,11 @@ class DDApiIntegrationTest {
def unixDomainSocketApi
def endpoint = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<Map<String, Map<String, Number>>>(null)
DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
DDAgentResponseListener responseListener = { String receivedEndpoint, Map<String, Map<String, Number>> responseJson ->
endpoint.set(receivedEndpoint)
agentResponse.set(responseJson.toString())
agentResponse.set(responseJson)
}
def setupSpec() {
@ -126,7 +125,7 @@ class DDApiIntegrationTest {
api.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
}
where:
@ -147,7 +146,7 @@ class DDApiIntegrationTest {
unixDomainSocketApi.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
}
where:

View File

@ -8,9 +8,6 @@ ext {
slf4j : "1.7.29",
guava : "20.0", // Last version to support Java 7
// When upgrading for security fixes, ensure corresponding change is reflected in jmxfetch.
jackson : "2.10.0", // https://nvd.nist.gov/vuln/detail/CVE-2019-16942 et al
spock : "1.3-groovy-$spockGroovyVer",
groovy : groovyVer,
logback : "1.2.3",
@ -34,10 +31,6 @@ ext {
// General
slf4j : "org.slf4j:slf4j-api:${versions.slf4j}",
guava : "com.google.guava:guava:$versions.guava",
jackson : [
dependencies.create(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson),
dependencies.create(group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.18'),
],
bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: "${versions.bytebuddy}"),
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"),
autoservice : [