diff --git a/dd-trace-ot/dd-trace-ot.gradle b/dd-trace-ot/dd-trace-ot.gradle index cf92790fcb..256213b2d8 100644 --- a/dd-trace-ot/dd-trace-ot.gradle +++ b/dd-trace-ot/dd-trace-ot.gradle @@ -37,9 +37,10 @@ dependencies { compile group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.1.1' - compile deps.jackson compile deps.slf4j + compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.20' compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7 + compile group: 'com.squareup.moshi', name: 'moshi', version: '1.9.2' compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.23' compile group: 'com.lmax', name: 'disruptor', version: '3.4.2' @@ -50,6 +51,7 @@ dependencies { testCompile project(":dd-java-agent:testing") testCompile project(':utils:gc-utils') testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1' + testCompile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.20' traceAgentTestCompile deps.testcontainers diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpan.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpan.java index bf6984d6ff..2160d1cf01 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpan.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpan.java @@ -3,8 +3,6 @@ package datadog.opentracing; import static io.opentracing.log.Fields.ERROR_OBJECT; import static io.opentracing.log.Fields.MESSAGE; -import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; import datadog.trace.api.DDTags; import datadog.trace.api.interceptor.MutableSpan; import datadog.trace.api.sampling.PrioritySampling; @@ -77,7 +75,6 @@ public class DDSpan implements Span, MutableSpan { context.getTrace().registerSpan(this); } - @JsonIgnore public boolean isFinished() { return durationNano.get() != 0; } @@ -120,20 +117,17 @@ public class DDSpan implements Span, MutableSpan { * * @return true if root, false otherwise */ - @JsonIgnore public final boolean isRootSpan() { return BigInteger.ZERO.equals(context.getParentId()); } @Override @Deprecated - @JsonIgnore public MutableSpan getRootSpan() { return getLocalRootSpan(); } @Override - @JsonIgnore public MutableSpan getLocalRootSpan() { return context().getTrace().getRootSpan(); } @@ -297,14 +291,13 @@ public class DDSpan implements Span, MutableSpan { return this; } - // Getters and JSON serialisation instructions + // Getters /** * Meta merges baggage and tags (stringified values) * * @return merged context baggage and tags */ - @JsonGetter public Map getMeta() { final Map meta = new HashMap<>(); for (final Map.Entry entry : context().getBaggageItems().entrySet()) { @@ -321,58 +314,48 @@ public class DDSpan implements Span, MutableSpan { * * @return metrics for this span */ - @JsonGetter public Map getMetrics() { return context.getMetrics(); } @Override - @JsonGetter("start") public long getStartTime() { return startTimeNano > 0 ? startTimeNano : TimeUnit.MICROSECONDS.toNanos(startTimeMicro); } @Override - @JsonGetter("duration") public long getDurationNano() { return durationNano.get(); } @Override - @JsonGetter("service") public String getServiceName() { return context.getServiceName(); } - @JsonGetter("trace_id") public BigInteger getTraceId() { return context.getTraceId(); } - @JsonGetter("span_id") public BigInteger getSpanId() { return context.getSpanId(); } - @JsonGetter("parent_id") public BigInteger getParentId() { return context.getParentId(); } @Override - @JsonGetter("resource") public String getResourceName() { return context.getResourceName(); } @Override - @JsonGetter("name") public String getOperationName() { return context.getOperationName(); } @Override - @JsonIgnore public Integer getSamplingPriority() { final int samplingPriority = context.getSamplingPriority(); if (samplingPriority == PrioritySampling.UNSET) { @@ -383,29 +366,24 @@ public class DDSpan implements Span, MutableSpan { } @Override - @JsonIgnore public String getSpanType() { return context.getSpanType(); } @Override - @JsonIgnore public Map getTags() { return context().getTags(); } - @JsonGetter public String getType() { return context.getSpanType(); } @Override - @JsonIgnore public Boolean isError() { return context.getErrorFlag(); } - @JsonGetter public int getError() { return context.getErrorFlag() ? 1 : 0; } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java index 9ad2d54202..766cc82f2a 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java @@ -1,6 +1,5 @@ package datadog.opentracing; -import com.fasterxml.jackson.annotation.JsonIgnore; import datadog.opentracing.decorators.AbstractDecorator; import datadog.trace.api.DDTags; import datadog.trace.api.sampling.PrioritySampling; @@ -289,12 +288,10 @@ public class DDSpanContext implements io.opentracing.SpanContext { return baggageItems.entrySet(); } - @JsonIgnore public PendingTrace getTrace() { return trace; } - @JsonIgnore public DDTracer getTracer() { return tracer; } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java index b8191b1a8f..03417eb729 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/sampling/RateByServiceSampler.java @@ -3,13 +3,10 @@ package datadog.trace.common.sampling; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.NumericNode; import datadog.opentracing.DDSpan; import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.common.writer.ddagent.DDAgentResponseListener; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import lombok.extern.slf4j.Slf4j; @@ -70,23 +67,16 @@ public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentRe } @Override - public void onResponse(final String endpoint, final JsonNode responseJson) { - final JsonNode newServiceRates = responseJson.get("rate_by_service"); + public void onResponse( + final String endpoint, final Map> responseJson) { + final Map newServiceRates = responseJson.get("rate_by_service"); if (null != newServiceRates) { log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson); final Map updatedServiceRates = new HashMap<>(); - final Iterator itr = newServiceRates.fieldNames(); - while (itr.hasNext()) { - final String key = itr.next(); - final JsonNode value = newServiceRates.get(key); - try { - if (value instanceof NumericNode) { - updatedServiceRates.put(key, createRateSampler(value.doubleValue())); - } else { - log.debug("Unable to parse new service rate {} -> {}", key, value); - } - } catch (final NumberFormatException nfe) { - log.debug("Unable to parse new service rate {} -> {}", key, value); + for (final Map.Entry entry : newServiceRates.entrySet()) { + if (entry.getValue() != null) { + updatedServiceRates.put( + entry.getKey(), createRateSampler(entry.getValue().doubleValue())); } } if (!updatedServiceRates.containsKey(DEFAULT_KEY)) { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/serialization/FormatWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/FormatWriter.java new file mode 100644 index 0000000000..4bc1093616 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/FormatWriter.java @@ -0,0 +1,101 @@ +package datadog.trace.common.serialization; + +import datadog.opentracing.DDSpan; +import java.io.IOException; +import java.math.BigInteger; +import java.util.List; +import java.util.Map; + +public abstract class FormatWriter { + public abstract void writeKey(String key, DEST destination) throws IOException; + + public abstract void writeListHeader(int size, DEST destination) throws IOException; + + public abstract void writeListFooter(DEST destination) throws IOException; + + public abstract void writeMapHeader(int size, DEST destination) throws IOException; + + public abstract void writeMapFooter(DEST destination) throws IOException; + + public abstract void writeString(String key, String value, DEST destination) throws IOException; + + public abstract void writeShort(String key, short value, DEST destination) throws IOException; + + public abstract void writeByte(String key, byte value, DEST destination) throws IOException; + + public abstract void writeInt(String key, int value, DEST destination) throws IOException; + + public abstract void writeLong(String key, long value, DEST destination) throws IOException; + + public abstract void writeFloat(String key, float value, DEST destination) throws IOException; + + public abstract void writeDouble(String key, double value, DEST destination) throws IOException; + + public abstract void writeBigInteger(String key, BigInteger value, DEST destination) + throws IOException; + + public void writeNumber(final String key, final Number value, final DEST destination) + throws IOException { + if (value instanceof Double) { + writeDouble(key, value.doubleValue(), destination); + } else if (value instanceof Long) { + writeLong(key, value.longValue(), destination); + } else if (value instanceof Integer) { + writeInt(key, value.intValue(), destination); + } else if (value instanceof Float) { + writeFloat(key, value.floatValue(), destination); + } else if (value instanceof Byte) { + writeByte(key, value.byteValue(), destination); + } else if (value instanceof Short) { + writeShort(key, value.shortValue(), destination); + } + } + + public void writeNumberMap( + final String key, final Map value, final DEST destination) + throws IOException { + writeKey(key, destination); + writeMapHeader(value.size(), destination); + for (final Map.Entry entry : value.entrySet()) { + writeNumber(entry.getKey(), entry.getValue(), destination); + } + writeMapFooter(destination); + } + + public void writeStringMap( + final String key, final Map value, final DEST destination) + throws IOException { + writeKey(key, destination); + writeMapHeader(value.size(), destination); + for (final Map.Entry entry : value.entrySet()) { + writeString(entry.getKey(), entry.getValue(), destination); + } + writeMapFooter(destination); + } + + public void writeTrace(final List trace, final DEST destination) throws IOException { + writeListHeader(trace.size(), destination); + for (final DDSpan span : trace) { + writeDDSpan(span, destination); + } + writeListFooter(destination); + } + + public void writeDDSpan(final DDSpan span, final DEST destination) throws IOException { + // Some of the tests rely on the specific ordering here. + writeMapHeader(12, destination); // must match count below. + /* 1 */ writeString("service", span.getServiceName(), destination); + /* 2 */ writeString("name", span.getOperationName(), destination); + /* 3 */ writeString("resource", span.getResourceName(), destination); + /* 4 */ writeBigInteger("trace_id", span.getTraceId(), destination); + /* 5 */ writeBigInteger("span_id", span.getSpanId(), destination); + /* 6 */ writeBigInteger("parent_id", span.getParentId(), destination); + /* 7 */ writeLong("start", span.getStartTime(), destination); + /* 8 */ writeLong("duration", span.getDurationNano(), destination); + /* 9 */ writeString("type", span.getType(), destination); + /* 10 */ writeInt("error", span.getError(), destination); + /* 11 */ writeNumberMap("metrics", span.getMetrics(), destination); + /* 12 */ writeStringMap("meta", span.getMeta(), destination); + writeMapFooter(destination); + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/serialization/JsonFormatWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/JsonFormatWriter.java new file mode 100644 index 0000000000..429fc15742 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/JsonFormatWriter.java @@ -0,0 +1,130 @@ +package datadog.trace.common.serialization; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.JsonReader; +import com.squareup.moshi.JsonWriter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.Types; +import datadog.opentracing.DDSpan; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.math.BigInteger; +import java.util.List; +import java.util.Set; + +public class JsonFormatWriter extends FormatWriter { + private static final Moshi MOSHI = new Moshi.Builder().add(DDSpanAdapter.FACTORY).build(); + + public static final JsonAdapter> TRACE_ADAPTER = + MOSHI.adapter(Types.newParameterizedType(List.class, DDSpan.class)); + public static final JsonAdapter SPAN_ADAPTER = MOSHI.adapter(DDSpan.class); + + public static JsonFormatWriter JSON_WRITER = new JsonFormatWriter(); + + @Override + public void writeKey(final String key, final JsonWriter destination) throws IOException { + destination.name(key); + } + + @Override + public void writeListHeader(final int size, final JsonWriter destination) throws IOException { + destination.beginArray(); + } + + @Override + public void writeListFooter(final JsonWriter destination) throws IOException { + destination.endArray(); + } + + @Override + public void writeMapHeader(final int size, final JsonWriter destination) throws IOException { + destination.beginObject(); + } + + @Override + public void writeMapFooter(final JsonWriter destination) throws IOException { + destination.endObject(); + } + + @Override + public void writeString(final String key, final String value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeShort(final String key, final short value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeByte(final String key, final byte value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeInt(final String key, final int value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeLong(final String key, final long value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeFloat(final String key, final float value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeDouble(final String key, final double value, final JsonWriter destination) + throws IOException { + destination.name(key); + destination.value(value); + } + + @Override + public void writeBigInteger( + final String key, final BigInteger value, final JsonWriter destination) throws IOException { + destination.name(key); + destination.value(value); + } + + static class DDSpanAdapter extends JsonAdapter { + public static final JsonAdapter.Factory FACTORY = + new JsonAdapter.Factory() { + @Override + public JsonAdapter create( + final Type type, final Set annotations, final Moshi moshi) { + final Class rawType = Types.getRawType(type); + if (rawType.isAssignableFrom(DDSpan.class)) { + return new DDSpanAdapter(); + } + return null; + } + }; + + @Override + public DDSpan fromJson(final JsonReader reader) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void toJson(final JsonWriter writer, final DDSpan value) throws IOException { + JSON_WRITER.writeDDSpan(value, writer); + } + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/serialization/MsgpackFormatWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/MsgpackFormatWriter.java new file mode 100644 index 0000000000..ca5579da7e --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/serialization/MsgpackFormatWriter.java @@ -0,0 +1,87 @@ +package datadog.trace.common.serialization; + +import java.io.IOException; +import java.math.BigInteger; +import org.msgpack.core.MessagePacker; + +public class MsgpackFormatWriter extends FormatWriter { + public static MsgpackFormatWriter MSGPACK_WRITER = new MsgpackFormatWriter(); + + @Override + public void writeKey(final String key, final MessagePacker destination) throws IOException { + destination.packString(key); + } + + @Override + public void writeListHeader(final int size, final MessagePacker destination) throws IOException { + destination.packArrayHeader(size); + } + + @Override + public void writeListFooter(final MessagePacker destination) throws IOException {} + + @Override + public void writeMapHeader(final int size, final MessagePacker destination) throws IOException { + destination.packMapHeader(size); + } + + @Override + public void writeMapFooter(final MessagePacker destination) throws IOException {} + + @Override + public void writeString(final String key, final String value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packString(value); + } + + @Override + public void writeShort(final String key, final short value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packShort(value); + } + + @Override + public void writeByte(final String key, final byte value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packByte(value); + } + + @Override + public void writeInt(final String key, final int value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packInt(value); + } + + @Override + public void writeLong(final String key, final long value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packLong(value); + } + + @Override + public void writeFloat(final String key, final float value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packFloat(value); + } + + @Override + public void writeDouble(final String key, final double value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packDouble(value); + } + + @Override + public void writeBigInteger( + final String key, final BigInteger value, final MessagePacker destination) + throws IOException { + destination.packString(key); + destination.packBigInteger(value); + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java index cb95d07a72..c4a793f3b2 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java @@ -1,25 +1,29 @@ package datadog.trace.common.writer; -import com.fasterxml.jackson.databind.ObjectMapper; +import static datadog.trace.common.serialization.JsonFormatWriter.TRACE_ADAPTER; + import datadog.opentracing.DDSpan; import java.util.List; import lombok.extern.slf4j.Slf4j; @Slf4j public class LoggingWriter implements Writer { - private final ObjectMapper serializer = new ObjectMapper(); @Override public void write(final List trace) { if (log.isInfoEnabled()) { try { - log.info("write(trace): {}", serializer.writeValueAsString(trace)); + log.info("write(trace): {}", toString(trace)); } catch (final Exception e) { log.error("error writing(trace): {}", trace); } } } + private String toString(final List trace) { + return TRACE_ADAPTER.toJson(trace); + } + @Override public void incrementTraceCount() { log.info("incrementTraceCount()"); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java index 17a607c102..64725297aa 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java @@ -1,9 +1,10 @@ package datadog.trace.common.writer.ddagent; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; +import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; +import com.squareup.moshi.Types; import datadog.opentracing.ContainerInfo; import datadog.opentracing.DDSpan; import datadog.opentracing.DDTraceOTInfo; @@ -12,8 +13,8 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; @@ -24,7 +25,7 @@ import okhttp3.RequestBody; import okio.BufferedSink; import org.msgpack.core.MessagePack; import org.msgpack.core.MessagePacker; -import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.core.buffer.ArrayBufferOutput; /** The API pointing to a DD agent */ @Slf4j @@ -47,7 +48,14 @@ public class DDAgentApi { private volatile long nextAllowedLogTime = 0; - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory()); + private static final JsonAdapter>> RESPONSE_ADAPTER = + new Moshi.Builder() + .build() + .adapter( + Types.newParameterizedType( + Map.class, + String.class, + Types.newParameterizedType(Map.class, String.class, Double.class))); private static final MediaType MSGPACK = MediaType.get("application/msgpack"); private final OkHttpClient httpClient; @@ -57,7 +65,7 @@ public class DDAgentApi { this( host, port, - traceEndpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath), + endpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath, true), unixDomainSocketPath); } @@ -97,7 +105,7 @@ public class DDAgentApi { final byte[] serializedTrace = serializeTrace(trace); sizeInBytes += serializedTrace.length; serializedTraces.add(serializedTrace); - } catch (final JsonProcessingException e) { + } catch (final IOException e) { log.warn("Error serializing trace", e); // TODO: DQH - Incorporate the failed serialization into the Response object??? @@ -107,8 +115,13 @@ public class DDAgentApi { return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces); } - byte[] serializeTrace(final List trace) throws JsonProcessingException { - return OBJECT_MAPPER.writeValueAsBytes(trace); + byte[] serializeTrace(final List trace) throws IOException { + // TODO: reuse byte array buffer + final ArrayBufferOutput output = new ArrayBufferOutput(); + final MessagePacker packer = MessagePack.newDefaultPacker(output); + MSGPACK_WRITER.writeTrace(trace, packer); + packer.flush(); + return output.toByteArray(); } Response sendSerializedTraces( @@ -183,17 +196,16 @@ public class DDAgentApi { final String responseString = response.body().string().trim(); try { if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) { - final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString); + final Map> parsedResponse = + RESPONSE_ADAPTER.fromJson(responseString); final String endpoint = tracesUrl.toString(); for (final DDAgentResponseListener listener : responseListeners) { listener.onResponse(endpoint, parsedResponse); } - return Response.success(response.code(), parsedResponse); } - return Response.success(response.code()); - } catch (final JsonParseException e) { + } catch (final IOException e) { log.debug("Failed to parse DD agent response: " + responseString, e); return Response.success(response.code(), e); @@ -222,19 +234,13 @@ public class DDAgentApi { } } - private static boolean traceEndpointAvailable( - final HttpUrl url, final String unixDomainSocketPath) { - return endpointAvailable(url, unixDomainSocketPath, Collections.emptyList(), true); - } + private static final byte[] EMPTY_LIST = new byte[] {MessagePack.Code.FIXARRAY_PREFIX}; private static boolean endpointAvailable( - final HttpUrl url, - final String unixDomainSocketPath, - final Object data, - final boolean retry) { + final HttpUrl url, final String unixDomainSocketPath, final boolean retry) { try { final OkHttpClient client = buildHttpClient(unixDomainSocketPath); - final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data)); + final RequestBody body = RequestBody.create(MSGPACK, EMPTY_LIST); final Request request = prepareRequest(url).put(body).build(); try (final okhttp3.Response response = client.newCall(request).execute()) { @@ -242,7 +248,7 @@ public class DDAgentApi { } } catch (final IOException e) { if (retry) { - return endpointAvailable(url, unixDomainSocketPath, data, false); + return endpointAvailable(url, unixDomainSocketPath, false); } } return false; @@ -307,42 +313,31 @@ public class DDAgentApi { public static final class Response { /** Factory method for a successful request with a trivial response body */ public static final Response success(final int status) { - return new Response(true, status, null, null); - } - - /** Factory method for a successful request with a well-formed JSON response body */ - public static final Response success(final int status, final JsonNode json) { - return new Response(true, status, json, null); + return new Response(true, status, null); } /** Factory method for a successful request will a malformed response body */ public static final Response success(final int status, final Throwable exception) { - return new Response(true, status, null, exception); + return new Response(true, status, exception); } /** Factory method for a request that receive an error status in response */ public static final Response failed(final int status) { - return new Response(false, status, null, null); + return new Response(false, status, null); } /** Factory method for a failed communication attempt */ public static final Response failed(final Throwable exception) { - return new Response(false, null, null, exception); + return new Response(false, null, exception); } private final boolean success; private final Integer status; - private final JsonNode json; private final Throwable exception; - private Response( - final boolean success, - final Integer status, - final JsonNode json, - final Throwable exception) { + private Response(final boolean success, final Integer status, final Throwable exception) { this.success = success; this.status = status; - this.json = json; this.exception = exception; } @@ -355,10 +350,6 @@ public class DDAgentApi { return status; } - public final JsonNode json() { - return json; - } - // TODO: DQH - In Java 8, switch to Optional? public final Throwable exception() { return exception; diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java index a0cbc72fda..63f1ebd3df 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/DDAgentResponseListener.java @@ -1,8 +1,8 @@ package datadog.trace.common.writer.ddagent; -import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; public interface DDAgentResponseListener { /** Invoked after the api receives a response from the core agent. */ - void onResponse(String endpoint, JsonNode responseJson); + void onResponse(String endpoint, Map> responseJson); } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java index e1a62bafed..67025b67f2 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ddagent/TraceProcessingDisruptor.java @@ -1,6 +1,5 @@ package datadog.trace.common.writer.ddagent; -import com.fasterxml.jackson.core.JsonProcessingException; import com.lmax.disruptor.EventHandler; import datadog.opentracing.DDSpan; import datadog.trace.common.util.DaemonThreadFactory; @@ -74,9 +73,6 @@ public class TraceProcessingDisruptor extends AbstractDisruptor> { batchWritingDisruptor.publish(serializedTrace, event.representativeCount); monitor.onSerialize(writer, event.data, serializedTrace); event.representativeCount = 0; // reset in case flush is invoked below. - } catch (final JsonProcessingException e) { - log.debug("Error serializing trace", e); - monitor.onFailedSerialize(writer, event.data, e); } catch (final Throwable e) { log.debug("Error while serializing trace", e); monitor.onFailedSerialize(writer, event.data, e); diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy index 757f562cb0..8b5e04a336 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/DDSpanSerializationTest.groovy @@ -1,43 +1,48 @@ package datadog.opentracing -import com.fasterxml.jackson.databind.ObjectMapper -import com.google.common.collect.Maps +import com.squareup.moshi.Moshi import datadog.trace.api.DDTags import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.ListWriter import datadog.trace.util.test.DDSpecification import org.msgpack.core.MessagePack import org.msgpack.core.buffer.ArrayBufferInput -import org.msgpack.jackson.dataformat.MessagePackFactory +import org.msgpack.core.buffer.ArrayBufferOutput import org.msgpack.value.ValueType +import static datadog.trace.common.serialization.JsonFormatWriter.SPAN_ADAPTER +import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER + class DDSpanSerializationTest extends DDSpecification { def "serialize spans with sampling #samplingPriority"() throws Exception { setup: - final Map baggage = new HashMap<>() - baggage.put("a-baggage", "value") - final Map tags = new HashMap<>() - baggage.put("k1", "v1") + def jsonAdapter = new Moshi.Builder().build().adapter(Map) - Map expected = Maps.newHashMap() - expected.put("meta", baggage) - expected.put("service", "service") - expected.put("error", 0) - expected.put("type", "type") - expected.put("name", "operation") - expected.put("duration", 33000) - expected.put("resource", "operation") - final Map metrics = new HashMap<>() - metrics.put("_sampling_priority_v1", 1) + final Map metrics = ["_sampling_priority_v1": 1] if (samplingPriority == PrioritySampling.UNSET) { // RateByServiceSampler sets priority metrics.put("_dd.agent_psr", 1.0d) } - expected.put("metrics", metrics) - expected.put("start", 100000) - expected.put("span_id", 2l) - expected.put("parent_id", 0l) - expected.put("trace_id", 1l) + + Map expected = [ + service : "service", + name : "operation", + resource : "operation", + trace_id : 1l, + span_id : 2l, + parent_id: 0l, + start : 100000, + duration : 33000, + type : "type", + error : 0, + metrics : metrics, + meta : [ + "a-baggage" : "value", + "k1" : "v1", + (DDTags.THREAD_NAME): Thread.currentThread().getName(), + (DDTags.THREAD_ID) : String.valueOf(Thread.currentThread().getId()), + ], + ] def writer = new ListWriter() def tracer = DDTracer.builder().writer(writer).build() @@ -51,23 +56,19 @@ class DDSpanSerializationTest extends DDSpecification { null, samplingPriority, null, - new HashMap<>(baggage), + ["a-baggage": "value"], false, "type", - tags, + ["k1": "v1"], new PendingTrace(tracer, 1G, [:]), tracer) - baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName()) - baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId())) - DDSpan span = new DDSpan(100L, context) span.finish(133L) - ObjectMapper serializer = new ObjectMapper() - def actualTree = serializer.readTree(serializer.writeValueAsString(span)) - def expectedTree = serializer.readTree(serializer.writeValueAsString(expected)) + def actualTree = jsonAdapter.fromJson(SPAN_ADAPTER.toJson(span)) + def expectedTree = jsonAdapter.fromJson(jsonAdapter.toJson(expected)) expect: actualTree == expectedTree @@ -79,7 +80,6 @@ class DDSpanSerializationTest extends DDSpecification { def "serialize trace/span with id #value as int"() { setup: - def objectMapper = new ObjectMapper(new MessagePackFactory()) def writer = new ListWriter() def tracer = DDTracer.builder().writer(writer).build() def context = new DDSpanContext( @@ -98,7 +98,11 @@ class DDSpanSerializationTest extends DDSpecification { new PendingTrace(tracer, 1G, [:]), tracer) def span = new DDSpan(0, context) - byte[] bytes = objectMapper.writeValueAsBytes(span) + def buffer = new ArrayBufferOutput() + def packer = MessagePack.newDefaultPacker(buffer) + MSGPACK_WRITER.writeDDSpan(span, packer) + packer.flush() + byte[] bytes = buffer.toByteArray() def unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes)) int size = unpacker.unpackMapHeader() diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy index b5709f1066..5b147764f3 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/sampling/RateByServiceSamplerTest.groovy @@ -1,24 +1,25 @@ package datadog.trace.api.sampling -import com.fasterxml.jackson.databind.ObjectMapper + import datadog.opentracing.DDSpan import datadog.opentracing.DDTracer import datadog.opentracing.SpanFactory import datadog.trace.api.DDTags import datadog.trace.common.sampling.RateByServiceSampler import datadog.trace.common.writer.LoggingWriter +import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.util.test.DDSpecification import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY class RateByServiceSamplerTest extends DDSpecification { + static serializer = DDAgentApi.RESPONSE_ADAPTER def "invalid rate -> 1"() { setup: RateByServiceSampler serviceSampler = new RateByServiceSampler() - ObjectMapper serializer = new ObjectMapper() String response = '{"rate_by_service": {"service:,env:":' + rate + '}}' - serviceSampler.onResponse("traces", serializer.readTree(response)) + serviceSampler.onResponse("traces", serializer.fromJson(response)) expect: serviceSampler.serviceRates[DEFAULT_KEY].sampleRate == expectedRate @@ -35,11 +36,10 @@ class RateByServiceSamplerTest extends DDSpecification { def "rate by service name"() { setup: RateByServiceSampler serviceSampler = new RateByServiceSampler() - ObjectMapper serializer = new ObjectMapper() when: String response = '{"rate_by_service": {"service:spock,env:test":0.0}}' - serviceSampler.onResponse("traces", serializer.readTree(response)) + serviceSampler.onResponse("traces", serializer.fromJson(response)) DDSpan span1 = SpanFactory.newSpanOf("foo", "bar") serviceSampler.setSamplingPriority(span1) then: @@ -48,7 +48,7 @@ class RateByServiceSamplerTest extends DDSpecification { when: response = '{"rate_by_service": {"service:spock,env:test":1.0}}' - serviceSampler.onResponse("traces", serializer.readTree(response)) + serviceSampler.onResponse("traces", serializer.fromJson(response)) DDSpan span2 = SpanFactory.newSpanOf("spock", "test") serviceSampler.setSamplingPriority(span2) then: @@ -59,9 +59,8 @@ class RateByServiceSamplerTest extends DDSpecification { def "sampling priority set on context"() { setup: RateByServiceSampler serviceSampler = new RateByServiceSampler() - ObjectMapper serializer = new ObjectMapper() String response = '{"rate_by_service": {"service:,env:":1.0}}' - serviceSampler.onResponse("traces", serializer.readTree(response)) + serviceSampler.onResponse("traces", serializer.fromJson(response)) DDSpan span = SpanFactory.newSpanOf("foo", "bar") serviceSampler.setSamplingPriority(span) @@ -76,8 +75,8 @@ class RateByServiceSamplerTest extends DDSpecification { def sampler = new RateByServiceSampler() def tracer = DDTracer.builder().writer(new LoggingWriter()).sampler(sampler).build() - sampler.onResponse("test", new ObjectMapper() - .readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}')) + sampler.onResponse("test", serializer + .fromJson('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}')) when: def span = tracer.buildSpan("test").start() diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy index 8d5ed0dcdd..6e6a0fb5e6 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentApiTest.groovy @@ -1,11 +1,12 @@ package datadog.trace.api.writer import com.fasterxml.jackson.core.type.TypeReference -import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper import datadog.opentracing.SpanFactory import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.DDAgentResponseListener import datadog.trace.util.test.DDSpecification +import org.msgpack.jackson.dataformat.MessagePackFactory import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicReference @@ -13,7 +14,7 @@ import java.util.concurrent.atomic.AtomicReference import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer class DDAgentApiTest extends DDSpecification { - static mapper = DDAgentApi.OBJECT_MAPPER + static mapper = new ObjectMapper(new MessagePackFactory()) def "sending an empty list of traces returns no errors"() { setup: @@ -124,15 +125,15 @@ class DDAgentApiTest extends DDSpecification { def "Api ResponseListeners see 200 responses"() { setup: - def agentResponse = new AtomicReference(null) - DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson -> - agentResponse.set(responseJson.toString()) + def agentResponse = new AtomicReference(null) + DDAgentResponseListener responseListener = { String endpoint, Map responseJson -> + agentResponse.set(responseJson) } def agent = httpServer { handlers { put("v0.4/traces") { def status = request.contentLength > 0 ? 200 : 500 - response.status(status).send('{"hello":"test"}') + response.status(status).send('{"hello":{}}') } } } @@ -142,7 +143,7 @@ class DDAgentApiTest extends DDSpecification { when: client.sendTraces([[], [], []]) then: - agentResponse.get() == '{"hello":"test"}' + agentResponse.get() == ["hello": [:]] agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java" agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown") agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version" diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index 46f65e7c0e..e0a0342eae 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -11,6 +11,8 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor import datadog.trace.common.writer.ddagent.DDAgentApi import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.util.test.DDSpecification +import org.msgpack.core.MessagePack +import org.msgpack.core.buffer.ArrayBufferOutput import spock.lang.Retry import spock.lang.Timeout @@ -21,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import static datadog.opentracing.SpanFactory.newSpanOf import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer +import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE @Timeout(20) @@ -206,7 +209,7 @@ class DDAgentWriterTest extends DDSpecification { Mock(DDTracer)) minimalSpan = new DDSpan(0, minimalContext) minimalTrace = [minimalSpan] - traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length + traceSize = calculateSize(minimalTrace) maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } @@ -667,4 +670,12 @@ class DDAgentWriterTest extends DDSpecification { cleanup: writer.close() } + + static int calculateSize(List trace) { + def buffer = new ArrayBufferOutput() + def packer = MessagePack.newDefaultPacker(buffer) + MSGPACK_WRITER.writeTrace(trace, packer) + packer.flush() + return buffer.size + } } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/LoggingWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/LoggingWriterTest.groovy new file mode 100644 index 0000000000..63cacf0684 --- /dev/null +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/LoggingWriterTest.groovy @@ -0,0 +1,21 @@ +package datadog.trace.api.writer + + +import datadog.opentracing.DDTracer +import datadog.opentracing.SpanFactory +import datadog.trace.common.writer.LoggingWriter +import datadog.trace.util.test.DDSpecification +import spock.lang.Subject + +class LoggingWriterTest extends DDSpecification { + @Subject + def writer = new LoggingWriter() + + def tracer = Mock(DDTracer) + def sampleTrace = [SpanFactory.newSpanOf(tracer), SpanFactory.newSpanOf(tracer)] + + def "test toString"() { + expect: + writer.toString(sampleTrace).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0,"start":1000,"duration":0,"type":"fakeType","error":0,"metrics":{},"meta":{') + } +} diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy index ff793e9aaf..620e85044d 100644 --- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy +++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy @@ -1,4 +1,3 @@ -import com.fasterxml.jackson.databind.JsonNode import datadog.opentracing.DDSpan import datadog.opentracing.DDSpanContext import datadog.opentracing.DDTracer @@ -63,11 +62,11 @@ class DDApiIntegrationTest { def unixDomainSocketApi def endpoint = new AtomicReference(null) - def agentResponse = new AtomicReference(null) + def agentResponse = new AtomicReference>>(null) - DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson -> + DDAgentResponseListener responseListener = { String receivedEndpoint, Map> responseJson -> endpoint.set(receivedEndpoint) - agentResponse.set(responseJson.toString()) + agentResponse.set(responseJson) } def setupSpec() { @@ -126,7 +125,7 @@ class DDApiIntegrationTest { api.sendTraces(traces) if (v4()) { assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces" - assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}' + assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]] } where: @@ -147,7 +146,7 @@ class DDApiIntegrationTest { unixDomainSocketApi.sendTraces(traces) if (v4()) { assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces" - assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}' + assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]] } where: diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index bea606ba2b..774c6d259a 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -8,9 +8,6 @@ ext { slf4j : "1.7.29", guava : "20.0", // Last version to support Java 7 - // When upgrading for security fixes, ensure corresponding change is reflected in jmxfetch. - jackson : "2.10.0", // https://nvd.nist.gov/vuln/detail/CVE-2019-16942 et al - spock : "1.3-groovy-$spockGroovyVer", groovy : groovyVer, logback : "1.2.3", @@ -34,10 +31,6 @@ ext { // General slf4j : "org.slf4j:slf4j-api:${versions.slf4j}", guava : "com.google.guava:guava:$versions.guava", - jackson : [ - dependencies.create(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson), - dependencies.create(group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.18'), - ], bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: "${versions.bytebuddy}"), bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"), autoservice : [