Remove Jackson from dd-trace-ot
Reduced the size of dd-java-agent jar by about 2 MB. Jackson is not removed completely though as it is still a dependency of jmxfetch. Trace serialization is primarily done directly with msgpack. Response deserialization and LoggingWriter serialization is done with mochi. Msgpack Serialization buffer still not being reused though.
This commit is contained in:
parent
406b324a82
commit
697d4972a8
|
@ -37,9 +37,10 @@ dependencies {
|
|||
|
||||
compile group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.1.1'
|
||||
|
||||
compile deps.jackson
|
||||
compile deps.slf4j
|
||||
compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.20'
|
||||
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7
|
||||
compile group: 'com.squareup.moshi', name: 'moshi', version: '1.9.2'
|
||||
compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.23'
|
||||
compile group: 'com.lmax', name: 'disruptor', version: '3.4.2'
|
||||
|
||||
|
@ -50,6 +51,7 @@ dependencies {
|
|||
testCompile project(":dd-java-agent:testing")
|
||||
testCompile project(':utils:gc-utils')
|
||||
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1'
|
||||
testCompile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.20'
|
||||
|
||||
traceAgentTestCompile deps.testcontainers
|
||||
|
||||
|
|
|
@ -3,8 +3,6 @@ package datadog.opentracing;
|
|||
import static io.opentracing.log.Fields.ERROR_OBJECT;
|
||||
import static io.opentracing.log.Fields.MESSAGE;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonGetter;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.api.interceptor.MutableSpan;
|
||||
import datadog.trace.api.sampling.PrioritySampling;
|
||||
|
@ -77,7 +75,6 @@ public class DDSpan implements Span, MutableSpan {
|
|||
context.getTrace().registerSpan(this);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public boolean isFinished() {
|
||||
return durationNano.get() != 0;
|
||||
}
|
||||
|
@ -120,20 +117,17 @@ public class DDSpan implements Span, MutableSpan {
|
|||
*
|
||||
* @return true if root, false otherwise
|
||||
*/
|
||||
@JsonIgnore
|
||||
public final boolean isRootSpan() {
|
||||
return BigInteger.ZERO.equals(context.getParentId());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
@JsonIgnore
|
||||
public MutableSpan getRootSpan() {
|
||||
return getLocalRootSpan();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public MutableSpan getLocalRootSpan() {
|
||||
return context().getTrace().getRootSpan();
|
||||
}
|
||||
|
@ -297,14 +291,13 @@ public class DDSpan implements Span, MutableSpan {
|
|||
return this;
|
||||
}
|
||||
|
||||
// Getters and JSON serialisation instructions
|
||||
// Getters
|
||||
|
||||
/**
|
||||
* Meta merges baggage and tags (stringified values)
|
||||
*
|
||||
* @return merged context baggage and tags
|
||||
*/
|
||||
@JsonGetter
|
||||
public Map<String, String> getMeta() {
|
||||
final Map<String, String> meta = new HashMap<>();
|
||||
for (final Map.Entry<String, String> entry : context().getBaggageItems().entrySet()) {
|
||||
|
@ -321,58 +314,48 @@ public class DDSpan implements Span, MutableSpan {
|
|||
*
|
||||
* @return metrics for this span
|
||||
*/
|
||||
@JsonGetter
|
||||
public Map<String, Number> getMetrics() {
|
||||
return context.getMetrics();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonGetter("start")
|
||||
public long getStartTime() {
|
||||
return startTimeNano > 0 ? startTimeNano : TimeUnit.MICROSECONDS.toNanos(startTimeMicro);
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonGetter("duration")
|
||||
public long getDurationNano() {
|
||||
return durationNano.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonGetter("service")
|
||||
public String getServiceName() {
|
||||
return context.getServiceName();
|
||||
}
|
||||
|
||||
@JsonGetter("trace_id")
|
||||
public BigInteger getTraceId() {
|
||||
return context.getTraceId();
|
||||
}
|
||||
|
||||
@JsonGetter("span_id")
|
||||
public BigInteger getSpanId() {
|
||||
return context.getSpanId();
|
||||
}
|
||||
|
||||
@JsonGetter("parent_id")
|
||||
public BigInteger getParentId() {
|
||||
return context.getParentId();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonGetter("resource")
|
||||
public String getResourceName() {
|
||||
return context.getResourceName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonGetter("name")
|
||||
public String getOperationName() {
|
||||
return context.getOperationName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public Integer getSamplingPriority() {
|
||||
final int samplingPriority = context.getSamplingPriority();
|
||||
if (samplingPriority == PrioritySampling.UNSET) {
|
||||
|
@ -383,29 +366,24 @@ public class DDSpan implements Span, MutableSpan {
|
|||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public String getSpanType() {
|
||||
return context.getSpanType();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public Map<String, Object> getTags() {
|
||||
return context().getTags();
|
||||
}
|
||||
|
||||
@JsonGetter
|
||||
public String getType() {
|
||||
return context.getSpanType();
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonIgnore
|
||||
public Boolean isError() {
|
||||
return context.getErrorFlag();
|
||||
}
|
||||
|
||||
@JsonGetter
|
||||
public int getError() {
|
||||
return context.getErrorFlag() ? 1 : 0;
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package datadog.opentracing;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import datadog.opentracing.decorators.AbstractDecorator;
|
||||
import datadog.trace.api.DDTags;
|
||||
import datadog.trace.api.sampling.PrioritySampling;
|
||||
|
@ -289,12 +288,10 @@ public class DDSpanContext implements io.opentracing.SpanContext {
|
|||
return baggageItems.entrySet();
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public PendingTrace getTrace() {
|
||||
return trace;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public DDTracer getTracer() {
|
||||
return tracer;
|
||||
}
|
||||
|
|
|
@ -3,13 +3,10 @@ package datadog.trace.common.sampling;
|
|||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.NumericNode;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.api.sampling.PrioritySampling;
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -70,23 +67,16 @@ public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentRe
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final String endpoint, final JsonNode responseJson) {
|
||||
final JsonNode newServiceRates = responseJson.get("rate_by_service");
|
||||
public void onResponse(
|
||||
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
|
||||
final Map<String, Number> newServiceRates = responseJson.get("rate_by_service");
|
||||
if (null != newServiceRates) {
|
||||
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
|
||||
final Map<String, RateSampler> updatedServiceRates = new HashMap<>();
|
||||
final Iterator<String> itr = newServiceRates.fieldNames();
|
||||
while (itr.hasNext()) {
|
||||
final String key = itr.next();
|
||||
final JsonNode value = newServiceRates.get(key);
|
||||
try {
|
||||
if (value instanceof NumericNode) {
|
||||
updatedServiceRates.put(key, createRateSampler(value.doubleValue()));
|
||||
} else {
|
||||
log.debug("Unable to parse new service rate {} -> {}", key, value);
|
||||
}
|
||||
} catch (final NumberFormatException nfe) {
|
||||
log.debug("Unable to parse new service rate {} -> {}", key, value);
|
||||
for (final Map.Entry<String, Number> entry : newServiceRates.entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
updatedServiceRates.put(
|
||||
entry.getKey(), createRateSampler(entry.getValue().doubleValue()));
|
||||
}
|
||||
}
|
||||
if (!updatedServiceRates.containsKey(DEFAULT_KEY)) {
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
package datadog.trace.common.serialization;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class FormatWriter<DEST> {
|
||||
public abstract void writeKey(String key, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeListHeader(int size, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeListFooter(DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeMapHeader(int size, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeMapFooter(DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeString(String key, String value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeShort(String key, short value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeByte(String key, byte value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeInt(String key, int value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeLong(String key, long value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeFloat(String key, float value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeDouble(String key, double value, DEST destination) throws IOException;
|
||||
|
||||
public abstract void writeBigInteger(String key, BigInteger value, DEST destination)
|
||||
throws IOException;
|
||||
|
||||
public void writeNumber(final String key, final Number value, final DEST destination)
|
||||
throws IOException {
|
||||
if (value instanceof Double) {
|
||||
writeDouble(key, value.doubleValue(), destination);
|
||||
} else if (value instanceof Long) {
|
||||
writeLong(key, value.longValue(), destination);
|
||||
} else if (value instanceof Integer) {
|
||||
writeInt(key, value.intValue(), destination);
|
||||
} else if (value instanceof Float) {
|
||||
writeFloat(key, value.floatValue(), destination);
|
||||
} else if (value instanceof Byte) {
|
||||
writeByte(key, value.byteValue(), destination);
|
||||
} else if (value instanceof Short) {
|
||||
writeShort(key, value.shortValue(), destination);
|
||||
}
|
||||
}
|
||||
|
||||
public void writeNumberMap(
|
||||
final String key, final Map<String, Number> value, final DEST destination)
|
||||
throws IOException {
|
||||
writeKey(key, destination);
|
||||
writeMapHeader(value.size(), destination);
|
||||
for (final Map.Entry<String, Number> entry : value.entrySet()) {
|
||||
writeNumber(entry.getKey(), entry.getValue(), destination);
|
||||
}
|
||||
writeMapFooter(destination);
|
||||
}
|
||||
|
||||
public void writeStringMap(
|
||||
final String key, final Map<String, String> value, final DEST destination)
|
||||
throws IOException {
|
||||
writeKey(key, destination);
|
||||
writeMapHeader(value.size(), destination);
|
||||
for (final Map.Entry<String, String> entry : value.entrySet()) {
|
||||
writeString(entry.getKey(), entry.getValue(), destination);
|
||||
}
|
||||
writeMapFooter(destination);
|
||||
}
|
||||
|
||||
public void writeTrace(final List<DDSpan> trace, final DEST destination) throws IOException {
|
||||
writeListHeader(trace.size(), destination);
|
||||
for (final DDSpan span : trace) {
|
||||
writeDDSpan(span, destination);
|
||||
}
|
||||
writeListFooter(destination);
|
||||
}
|
||||
|
||||
public void writeDDSpan(final DDSpan span, final DEST destination) throws IOException {
|
||||
// Some of the tests rely on the specific ordering here.
|
||||
writeMapHeader(12, destination); // must match count below.
|
||||
/* 1 */ writeString("service", span.getServiceName(), destination);
|
||||
/* 2 */ writeString("name", span.getOperationName(), destination);
|
||||
/* 3 */ writeString("resource", span.getResourceName(), destination);
|
||||
/* 4 */ writeBigInteger("trace_id", span.getTraceId(), destination);
|
||||
/* 5 */ writeBigInteger("span_id", span.getSpanId(), destination);
|
||||
/* 6 */ writeBigInteger("parent_id", span.getParentId(), destination);
|
||||
/* 7 */ writeLong("start", span.getStartTime(), destination);
|
||||
/* 8 */ writeLong("duration", span.getDurationNano(), destination);
|
||||
/* 9 */ writeString("type", span.getType(), destination);
|
||||
/* 10 */ writeInt("error", span.getError(), destination);
|
||||
/* 11 */ writeNumberMap("metrics", span.getMetrics(), destination);
|
||||
/* 12 */ writeStringMap("meta", span.getMeta(), destination);
|
||||
writeMapFooter(destination);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,130 @@
|
|||
package datadog.trace.common.serialization;
|
||||
|
||||
import com.squareup.moshi.JsonAdapter;
|
||||
import com.squareup.moshi.JsonReader;
|
||||
import com.squareup.moshi.JsonWriter;
|
||||
import com.squareup.moshi.Moshi;
|
||||
import com.squareup.moshi.Types;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import java.io.IOException;
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.reflect.Type;
|
||||
import java.math.BigInteger;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class JsonFormatWriter extends FormatWriter<JsonWriter> {
|
||||
private static final Moshi MOSHI = new Moshi.Builder().add(DDSpanAdapter.FACTORY).build();
|
||||
|
||||
public static final JsonAdapter<List<DDSpan>> TRACE_ADAPTER =
|
||||
MOSHI.adapter(Types.newParameterizedType(List.class, DDSpan.class));
|
||||
public static final JsonAdapter<DDSpan> SPAN_ADAPTER = MOSHI.adapter(DDSpan.class);
|
||||
|
||||
public static JsonFormatWriter JSON_WRITER = new JsonFormatWriter();
|
||||
|
||||
@Override
|
||||
public void writeKey(final String key, final JsonWriter destination) throws IOException {
|
||||
destination.name(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeListHeader(final int size, final JsonWriter destination) throws IOException {
|
||||
destination.beginArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeListFooter(final JsonWriter destination) throws IOException {
|
||||
destination.endArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMapHeader(final int size, final JsonWriter destination) throws IOException {
|
||||
destination.beginObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMapFooter(final JsonWriter destination) throws IOException {
|
||||
destination.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeString(final String key, final String value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeShort(final String key, final short value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(final String key, final byte value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeInt(final String key, final int value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeLong(final String key, final long value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFloat(final String key, final float value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDouble(final String key, final double value, final JsonWriter destination)
|
||||
throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBigInteger(
|
||||
final String key, final BigInteger value, final JsonWriter destination) throws IOException {
|
||||
destination.name(key);
|
||||
destination.value(value);
|
||||
}
|
||||
|
||||
static class DDSpanAdapter extends JsonAdapter<DDSpan> {
|
||||
public static final JsonAdapter.Factory FACTORY =
|
||||
new JsonAdapter.Factory() {
|
||||
@Override
|
||||
public JsonAdapter<?> create(
|
||||
final Type type, final Set<? extends Annotation> annotations, final Moshi moshi) {
|
||||
final Class<?> rawType = Types.getRawType(type);
|
||||
if (rawType.isAssignableFrom(DDSpan.class)) {
|
||||
return new DDSpanAdapter();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public DDSpan fromJson(final JsonReader reader) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toJson(final JsonWriter writer, final DDSpan value) throws IOException {
|
||||
JSON_WRITER.writeDDSpan(value, writer);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package datadog.trace.common.serialization;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.BigInteger;
|
||||
import org.msgpack.core.MessagePacker;
|
||||
|
||||
public class MsgpackFormatWriter extends FormatWriter<MessagePacker> {
|
||||
public static MsgpackFormatWriter MSGPACK_WRITER = new MsgpackFormatWriter();
|
||||
|
||||
@Override
|
||||
public void writeKey(final String key, final MessagePacker destination) throws IOException {
|
||||
destination.packString(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeListHeader(final int size, final MessagePacker destination) throws IOException {
|
||||
destination.packArrayHeader(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeListFooter(final MessagePacker destination) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void writeMapHeader(final int size, final MessagePacker destination) throws IOException {
|
||||
destination.packMapHeader(size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeMapFooter(final MessagePacker destination) throws IOException {}
|
||||
|
||||
@Override
|
||||
public void writeString(final String key, final String value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packString(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeShort(final String key, final short value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packShort(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeByte(final String key, final byte value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packByte(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeInt(final String key, final int value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packInt(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeLong(final String key, final long value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packLong(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeFloat(final String key, final float value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packFloat(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeDouble(final String key, final double value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packDouble(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBigInteger(
|
||||
final String key, final BigInteger value, final MessagePacker destination)
|
||||
throws IOException {
|
||||
destination.packString(key);
|
||||
destination.packBigInteger(value);
|
||||
}
|
||||
}
|
|
@ -1,25 +1,29 @@
|
|||
package datadog.trace.common.writer;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import static datadog.trace.common.serialization.JsonFormatWriter.TRACE_ADAPTER;
|
||||
|
||||
import datadog.opentracing.DDSpan;
|
||||
import java.util.List;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class LoggingWriter implements Writer {
|
||||
private final ObjectMapper serializer = new ObjectMapper();
|
||||
|
||||
@Override
|
||||
public void write(final List<DDSpan> trace) {
|
||||
if (log.isInfoEnabled()) {
|
||||
try {
|
||||
log.info("write(trace): {}", serializer.writeValueAsString(trace));
|
||||
log.info("write(trace): {}", toString(trace));
|
||||
} catch (final Exception e) {
|
||||
log.error("error writing(trace): {}", trace);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String toString(final List<DDSpan> trace) {
|
||||
return TRACE_ADAPTER.toJson(trace);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementTraceCount() {
|
||||
log.info("incrementTraceCount()");
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER;
|
||||
|
||||
import com.squareup.moshi.JsonAdapter;
|
||||
import com.squareup.moshi.Moshi;
|
||||
import com.squareup.moshi.Types;
|
||||
import datadog.opentracing.ContainerInfo;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.opentracing.DDTraceOTInfo;
|
||||
|
@ -12,8 +13,8 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.HttpUrl;
|
||||
|
@ -24,7 +25,7 @@ import okhttp3.RequestBody;
|
|||
import okio.BufferedSink;
|
||||
import org.msgpack.core.MessagePack;
|
||||
import org.msgpack.core.MessagePacker;
|
||||
import org.msgpack.jackson.dataformat.MessagePackFactory;
|
||||
import org.msgpack.core.buffer.ArrayBufferOutput;
|
||||
|
||||
/** The API pointing to a DD agent */
|
||||
@Slf4j
|
||||
|
@ -47,7 +48,14 @@ public class DDAgentApi {
|
|||
|
||||
private volatile long nextAllowedLogTime = 0;
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
|
||||
private static final JsonAdapter<Map<String, Map<String, Number>>> RESPONSE_ADAPTER =
|
||||
new Moshi.Builder()
|
||||
.build()
|
||||
.adapter(
|
||||
Types.newParameterizedType(
|
||||
Map.class,
|
||||
String.class,
|
||||
Types.newParameterizedType(Map.class, String.class, Double.class)));
|
||||
private static final MediaType MSGPACK = MediaType.get("application/msgpack");
|
||||
|
||||
private final OkHttpClient httpClient;
|
||||
|
@ -57,7 +65,7 @@ public class DDAgentApi {
|
|||
this(
|
||||
host,
|
||||
port,
|
||||
traceEndpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath),
|
||||
endpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath, true),
|
||||
unixDomainSocketPath);
|
||||
}
|
||||
|
||||
|
@ -97,7 +105,7 @@ public class DDAgentApi {
|
|||
final byte[] serializedTrace = serializeTrace(trace);
|
||||
sizeInBytes += serializedTrace.length;
|
||||
serializedTraces.add(serializedTrace);
|
||||
} catch (final JsonProcessingException e) {
|
||||
} catch (final IOException e) {
|
||||
log.warn("Error serializing trace", e);
|
||||
|
||||
// TODO: DQH - Incorporate the failed serialization into the Response object???
|
||||
|
@ -107,8 +115,13 @@ public class DDAgentApi {
|
|||
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
|
||||
}
|
||||
|
||||
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
|
||||
return OBJECT_MAPPER.writeValueAsBytes(trace);
|
||||
byte[] serializeTrace(final List<DDSpan> trace) throws IOException {
|
||||
// TODO: reuse byte array buffer
|
||||
final ArrayBufferOutput output = new ArrayBufferOutput();
|
||||
final MessagePacker packer = MessagePack.newDefaultPacker(output);
|
||||
MSGPACK_WRITER.writeTrace(trace, packer);
|
||||
packer.flush();
|
||||
return output.toByteArray();
|
||||
}
|
||||
|
||||
Response sendSerializedTraces(
|
||||
|
@ -183,17 +196,16 @@ public class DDAgentApi {
|
|||
final String responseString = response.body().string().trim();
|
||||
try {
|
||||
if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) {
|
||||
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
|
||||
final Map<String, Map<String, Number>> parsedResponse =
|
||||
RESPONSE_ADAPTER.fromJson(responseString);
|
||||
final String endpoint = tracesUrl.toString();
|
||||
|
||||
for (final DDAgentResponseListener listener : responseListeners) {
|
||||
listener.onResponse(endpoint, parsedResponse);
|
||||
}
|
||||
return Response.success(response.code(), parsedResponse);
|
||||
}
|
||||
|
||||
return Response.success(response.code());
|
||||
} catch (final JsonParseException e) {
|
||||
} catch (final IOException e) {
|
||||
log.debug("Failed to parse DD agent response: " + responseString, e);
|
||||
|
||||
return Response.success(response.code(), e);
|
||||
|
@ -222,19 +234,13 @@ public class DDAgentApi {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean traceEndpointAvailable(
|
||||
final HttpUrl url, final String unixDomainSocketPath) {
|
||||
return endpointAvailable(url, unixDomainSocketPath, Collections.emptyList(), true);
|
||||
}
|
||||
private static final byte[] EMPTY_LIST = new byte[] {MessagePack.Code.FIXARRAY_PREFIX};
|
||||
|
||||
private static boolean endpointAvailable(
|
||||
final HttpUrl url,
|
||||
final String unixDomainSocketPath,
|
||||
final Object data,
|
||||
final boolean retry) {
|
||||
final HttpUrl url, final String unixDomainSocketPath, final boolean retry) {
|
||||
try {
|
||||
final OkHttpClient client = buildHttpClient(unixDomainSocketPath);
|
||||
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data));
|
||||
final RequestBody body = RequestBody.create(MSGPACK, EMPTY_LIST);
|
||||
final Request request = prepareRequest(url).put(body).build();
|
||||
|
||||
try (final okhttp3.Response response = client.newCall(request).execute()) {
|
||||
|
@ -242,7 +248,7 @@ public class DDAgentApi {
|
|||
}
|
||||
} catch (final IOException e) {
|
||||
if (retry) {
|
||||
return endpointAvailable(url, unixDomainSocketPath, data, false);
|
||||
return endpointAvailable(url, unixDomainSocketPath, false);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
@ -307,42 +313,31 @@ public class DDAgentApi {
|
|||
public static final class Response {
|
||||
/** Factory method for a successful request with a trivial response body */
|
||||
public static final Response success(final int status) {
|
||||
return new Response(true, status, null, null);
|
||||
}
|
||||
|
||||
/** Factory method for a successful request with a well-formed JSON response body */
|
||||
public static final Response success(final int status, final JsonNode json) {
|
||||
return new Response(true, status, json, null);
|
||||
return new Response(true, status, null);
|
||||
}
|
||||
|
||||
/** Factory method for a successful request will a malformed response body */
|
||||
public static final Response success(final int status, final Throwable exception) {
|
||||
return new Response(true, status, null, exception);
|
||||
return new Response(true, status, exception);
|
||||
}
|
||||
|
||||
/** Factory method for a request that receive an error status in response */
|
||||
public static final Response failed(final int status) {
|
||||
return new Response(false, status, null, null);
|
||||
return new Response(false, status, null);
|
||||
}
|
||||
|
||||
/** Factory method for a failed communication attempt */
|
||||
public static final Response failed(final Throwable exception) {
|
||||
return new Response(false, null, null, exception);
|
||||
return new Response(false, null, exception);
|
||||
}
|
||||
|
||||
private final boolean success;
|
||||
private final Integer status;
|
||||
private final JsonNode json;
|
||||
private final Throwable exception;
|
||||
|
||||
private Response(
|
||||
final boolean success,
|
||||
final Integer status,
|
||||
final JsonNode json,
|
||||
final Throwable exception) {
|
||||
private Response(final boolean success, final Integer status, final Throwable exception) {
|
||||
this.success = success;
|
||||
this.status = status;
|
||||
this.json = json;
|
||||
this.exception = exception;
|
||||
}
|
||||
|
||||
|
@ -355,10 +350,6 @@ public class DDAgentApi {
|
|||
return status;
|
||||
}
|
||||
|
||||
public final JsonNode json() {
|
||||
return json;
|
||||
}
|
||||
|
||||
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
|
||||
public final Throwable exception() {
|
||||
return exception;
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.util.Map;
|
||||
|
||||
public interface DDAgentResponseListener {
|
||||
/** Invoked after the api receives a response from the core agent. */
|
||||
void onResponse(String endpoint, JsonNode responseJson);
|
||||
void onResponse(String endpoint, Map<String, Map<String, Number>> responseJson);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package datadog.trace.common.writer.ddagent;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.lmax.disruptor.EventHandler;
|
||||
import datadog.opentracing.DDSpan;
|
||||
import datadog.trace.common.util.DaemonThreadFactory;
|
||||
|
@ -74,9 +73,6 @@ public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
|
|||
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
|
||||
monitor.onSerialize(writer, event.data, serializedTrace);
|
||||
event.representativeCount = 0; // reset in case flush is invoked below.
|
||||
} catch (final JsonProcessingException e) {
|
||||
log.debug("Error serializing trace", e);
|
||||
monitor.onFailedSerialize(writer, event.data, e);
|
||||
} catch (final Throwable e) {
|
||||
log.debug("Error while serializing trace", e);
|
||||
monitor.onFailedSerialize(writer, event.data, e);
|
||||
|
|
|
@ -1,43 +1,48 @@
|
|||
package datadog.opentracing
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.google.common.collect.Maps
|
||||
import com.squareup.moshi.Moshi
|
||||
import datadog.trace.api.DDTags
|
||||
import datadog.trace.api.sampling.PrioritySampling
|
||||
import datadog.trace.common.writer.ListWriter
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import org.msgpack.core.MessagePack
|
||||
import org.msgpack.core.buffer.ArrayBufferInput
|
||||
import org.msgpack.jackson.dataformat.MessagePackFactory
|
||||
import org.msgpack.core.buffer.ArrayBufferOutput
|
||||
import org.msgpack.value.ValueType
|
||||
|
||||
import static datadog.trace.common.serialization.JsonFormatWriter.SPAN_ADAPTER
|
||||
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
|
||||
|
||||
class DDSpanSerializationTest extends DDSpecification {
|
||||
|
||||
def "serialize spans with sampling #samplingPriority"() throws Exception {
|
||||
setup:
|
||||
final Map<String, String> baggage = new HashMap<>()
|
||||
baggage.put("a-baggage", "value")
|
||||
final Map<String, Object> tags = new HashMap<>()
|
||||
baggage.put("k1", "v1")
|
||||
def jsonAdapter = new Moshi.Builder().build().adapter(Map)
|
||||
|
||||
Map<String, Object> expected = Maps.newHashMap()
|
||||
expected.put("meta", baggage)
|
||||
expected.put("service", "service")
|
||||
expected.put("error", 0)
|
||||
expected.put("type", "type")
|
||||
expected.put("name", "operation")
|
||||
expected.put("duration", 33000)
|
||||
expected.put("resource", "operation")
|
||||
final Map<String, Number> metrics = new HashMap<>()
|
||||
metrics.put("_sampling_priority_v1", 1)
|
||||
final Map<String, Number> metrics = ["_sampling_priority_v1": 1]
|
||||
if (samplingPriority == PrioritySampling.UNSET) { // RateByServiceSampler sets priority
|
||||
metrics.put("_dd.agent_psr", 1.0d)
|
||||
}
|
||||
expected.put("metrics", metrics)
|
||||
expected.put("start", 100000)
|
||||
expected.put("span_id", 2l)
|
||||
expected.put("parent_id", 0l)
|
||||
expected.put("trace_id", 1l)
|
||||
|
||||
Map<String, Object> expected = [
|
||||
service : "service",
|
||||
name : "operation",
|
||||
resource : "operation",
|
||||
trace_id : 1l,
|
||||
span_id : 2l,
|
||||
parent_id: 0l,
|
||||
start : 100000,
|
||||
duration : 33000,
|
||||
type : "type",
|
||||
error : 0,
|
||||
metrics : metrics,
|
||||
meta : [
|
||||
"a-baggage" : "value",
|
||||
"k1" : "v1",
|
||||
(DDTags.THREAD_NAME): Thread.currentThread().getName(),
|
||||
(DDTags.THREAD_ID) : String.valueOf(Thread.currentThread().getId()),
|
||||
],
|
||||
]
|
||||
|
||||
def writer = new ListWriter()
|
||||
def tracer = DDTracer.builder().writer(writer).build()
|
||||
|
@ -51,23 +56,19 @@ class DDSpanSerializationTest extends DDSpecification {
|
|||
null,
|
||||
samplingPriority,
|
||||
null,
|
||||
new HashMap<>(baggage),
|
||||
["a-baggage": "value"],
|
||||
false,
|
||||
"type",
|
||||
tags,
|
||||
["k1": "v1"],
|
||||
new PendingTrace(tracer, 1G, [:]),
|
||||
tracer)
|
||||
|
||||
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())
|
||||
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
|
||||
|
||||
DDSpan span = new DDSpan(100L, context)
|
||||
|
||||
span.finish(133L)
|
||||
ObjectMapper serializer = new ObjectMapper()
|
||||
|
||||
def actualTree = serializer.readTree(serializer.writeValueAsString(span))
|
||||
def expectedTree = serializer.readTree(serializer.writeValueAsString(expected))
|
||||
def actualTree = jsonAdapter.fromJson(SPAN_ADAPTER.toJson(span))
|
||||
def expectedTree = jsonAdapter.fromJson(jsonAdapter.toJson(expected))
|
||||
expect:
|
||||
actualTree == expectedTree
|
||||
|
||||
|
@ -79,7 +80,6 @@ class DDSpanSerializationTest extends DDSpecification {
|
|||
|
||||
def "serialize trace/span with id #value as int"() {
|
||||
setup:
|
||||
def objectMapper = new ObjectMapper(new MessagePackFactory())
|
||||
def writer = new ListWriter()
|
||||
def tracer = DDTracer.builder().writer(writer).build()
|
||||
def context = new DDSpanContext(
|
||||
|
@ -98,7 +98,11 @@ class DDSpanSerializationTest extends DDSpecification {
|
|||
new PendingTrace(tracer, 1G, [:]),
|
||||
tracer)
|
||||
def span = new DDSpan(0, context)
|
||||
byte[] bytes = objectMapper.writeValueAsBytes(span)
|
||||
def buffer = new ArrayBufferOutput()
|
||||
def packer = MessagePack.newDefaultPacker(buffer)
|
||||
MSGPACK_WRITER.writeDDSpan(span, packer)
|
||||
packer.flush()
|
||||
byte[] bytes = buffer.toByteArray()
|
||||
def unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes))
|
||||
int size = unpacker.unpackMapHeader()
|
||||
|
||||
|
|
|
@ -1,24 +1,25 @@
|
|||
package datadog.trace.api.sampling
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
|
||||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.DDTracer
|
||||
import datadog.opentracing.SpanFactory
|
||||
import datadog.trace.api.DDTags
|
||||
import datadog.trace.common.sampling.RateByServiceSampler
|
||||
import datadog.trace.common.writer.LoggingWriter
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
|
||||
import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY
|
||||
|
||||
class RateByServiceSamplerTest extends DDSpecification {
|
||||
static serializer = DDAgentApi.RESPONSE_ADAPTER
|
||||
|
||||
def "invalid rate -> 1"() {
|
||||
setup:
|
||||
RateByServiceSampler serviceSampler = new RateByServiceSampler()
|
||||
ObjectMapper serializer = new ObjectMapper()
|
||||
String response = '{"rate_by_service": {"service:,env:":' + rate + '}}'
|
||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||
serviceSampler.onResponse("traces", serializer.fromJson(response))
|
||||
expect:
|
||||
serviceSampler.serviceRates[DEFAULT_KEY].sampleRate == expectedRate
|
||||
|
||||
|
@ -35,11 +36,10 @@ class RateByServiceSamplerTest extends DDSpecification {
|
|||
def "rate by service name"() {
|
||||
setup:
|
||||
RateByServiceSampler serviceSampler = new RateByServiceSampler()
|
||||
ObjectMapper serializer = new ObjectMapper()
|
||||
|
||||
when:
|
||||
String response = '{"rate_by_service": {"service:spock,env:test":0.0}}'
|
||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||
serviceSampler.onResponse("traces", serializer.fromJson(response))
|
||||
DDSpan span1 = SpanFactory.newSpanOf("foo", "bar")
|
||||
serviceSampler.setSamplingPriority(span1)
|
||||
then:
|
||||
|
@ -48,7 +48,7 @@ class RateByServiceSamplerTest extends DDSpecification {
|
|||
|
||||
when:
|
||||
response = '{"rate_by_service": {"service:spock,env:test":1.0}}'
|
||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||
serviceSampler.onResponse("traces", serializer.fromJson(response))
|
||||
DDSpan span2 = SpanFactory.newSpanOf("spock", "test")
|
||||
serviceSampler.setSamplingPriority(span2)
|
||||
then:
|
||||
|
@ -59,9 +59,8 @@ class RateByServiceSamplerTest extends DDSpecification {
|
|||
def "sampling priority set on context"() {
|
||||
setup:
|
||||
RateByServiceSampler serviceSampler = new RateByServiceSampler()
|
||||
ObjectMapper serializer = new ObjectMapper()
|
||||
String response = '{"rate_by_service": {"service:,env:":1.0}}'
|
||||
serviceSampler.onResponse("traces", serializer.readTree(response))
|
||||
serviceSampler.onResponse("traces", serializer.fromJson(response))
|
||||
|
||||
DDSpan span = SpanFactory.newSpanOf("foo", "bar")
|
||||
serviceSampler.setSamplingPriority(span)
|
||||
|
@ -76,8 +75,8 @@ class RateByServiceSamplerTest extends DDSpecification {
|
|||
def sampler = new RateByServiceSampler()
|
||||
def tracer = DDTracer.builder().writer(new LoggingWriter()).sampler(sampler).build()
|
||||
|
||||
sampler.onResponse("test", new ObjectMapper()
|
||||
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
|
||||
sampler.onResponse("test", serializer
|
||||
.fromJson('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
|
||||
|
||||
when:
|
||||
def span = tracer.buildSpan("test").start()
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
package datadog.trace.api.writer
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import datadog.opentracing.SpanFactory
|
||||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import org.msgpack.jackson.dataformat.MessagePackFactory
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
@ -13,7 +14,7 @@ import java.util.concurrent.atomic.AtomicReference
|
|||
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
|
||||
|
||||
class DDAgentApiTest extends DDSpecification {
|
||||
static mapper = DDAgentApi.OBJECT_MAPPER
|
||||
static mapper = new ObjectMapper(new MessagePackFactory())
|
||||
|
||||
def "sending an empty list of traces returns no errors"() {
|
||||
setup:
|
||||
|
@ -124,15 +125,15 @@ class DDAgentApiTest extends DDSpecification {
|
|||
|
||||
def "Api ResponseListeners see 200 responses"() {
|
||||
setup:
|
||||
def agentResponse = new AtomicReference<String>(null)
|
||||
DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
|
||||
agentResponse.set(responseJson.toString())
|
||||
def agentResponse = new AtomicReference<Map>(null)
|
||||
DDAgentResponseListener responseListener = { String endpoint, Map responseJson ->
|
||||
agentResponse.set(responseJson)
|
||||
}
|
||||
def agent = httpServer {
|
||||
handlers {
|
||||
put("v0.4/traces") {
|
||||
def status = request.contentLength > 0 ? 200 : 500
|
||||
response.status(status).send('{"hello":"test"}')
|
||||
response.status(status).send('{"hello":{}}')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +143,7 @@ class DDAgentApiTest extends DDSpecification {
|
|||
when:
|
||||
client.sendTraces([[], [], []])
|
||||
then:
|
||||
agentResponse.get() == '{"hello":"test"}'
|
||||
agentResponse.get() == ["hello": [:]]
|
||||
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
|
||||
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
|
||||
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"
|
||||
|
|
|
@ -11,6 +11,8 @@ import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
|
|||
import datadog.trace.common.writer.ddagent.DDAgentApi
|
||||
import datadog.trace.common.writer.ddagent.Monitor
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import org.msgpack.core.MessagePack
|
||||
import org.msgpack.core.buffer.ArrayBufferOutput
|
||||
import spock.lang.Retry
|
||||
import spock.lang.Timeout
|
||||
|
||||
|
@ -21,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
|
||||
import static datadog.opentracing.SpanFactory.newSpanOf
|
||||
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
|
||||
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
|
||||
import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
|
||||
|
||||
@Timeout(20)
|
||||
|
@ -206,7 +209,7 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
Mock(DDTracer))
|
||||
minimalSpan = new DDSpan(0, minimalContext)
|
||||
minimalTrace = [minimalSpan]
|
||||
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
|
||||
traceSize = calculateSize(minimalTrace)
|
||||
maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
|
||||
}
|
||||
|
||||
|
@ -667,4 +670,12 @@ class DDAgentWriterTest extends DDSpecification {
|
|||
cleanup:
|
||||
writer.close()
|
||||
}
|
||||
|
||||
static int calculateSize(List<DDSpan> trace) {
|
||||
def buffer = new ArrayBufferOutput()
|
||||
def packer = MessagePack.newDefaultPacker(buffer)
|
||||
MSGPACK_WRITER.writeTrace(trace, packer)
|
||||
packer.flush()
|
||||
return buffer.size
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
package datadog.trace.api.writer
|
||||
|
||||
|
||||
import datadog.opentracing.DDTracer
|
||||
import datadog.opentracing.SpanFactory
|
||||
import datadog.trace.common.writer.LoggingWriter
|
||||
import datadog.trace.util.test.DDSpecification
|
||||
import spock.lang.Subject
|
||||
|
||||
class LoggingWriterTest extends DDSpecification {
|
||||
@Subject
|
||||
def writer = new LoggingWriter()
|
||||
|
||||
def tracer = Mock(DDTracer)
|
||||
def sampleTrace = [SpanFactory.newSpanOf(tracer), SpanFactory.newSpanOf(tracer)]
|
||||
|
||||
def "test toString"() {
|
||||
expect:
|
||||
writer.toString(sampleTrace).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0,"start":1000,"duration":0,"type":"fakeType","error":0,"metrics":{},"meta":{')
|
||||
}
|
||||
}
|
|
@ -1,4 +1,3 @@
|
|||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import datadog.opentracing.DDSpan
|
||||
import datadog.opentracing.DDSpanContext
|
||||
import datadog.opentracing.DDTracer
|
||||
|
@ -63,11 +62,11 @@ class DDApiIntegrationTest {
|
|||
def unixDomainSocketApi
|
||||
|
||||
def endpoint = new AtomicReference<String>(null)
|
||||
def agentResponse = new AtomicReference<String>(null)
|
||||
def agentResponse = new AtomicReference<Map<String, Map<String, Number>>>(null)
|
||||
|
||||
DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
|
||||
DDAgentResponseListener responseListener = { String receivedEndpoint, Map<String, Map<String, Number>> responseJson ->
|
||||
endpoint.set(receivedEndpoint)
|
||||
agentResponse.set(responseJson.toString())
|
||||
agentResponse.set(responseJson)
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
|
@ -126,7 +125,7 @@ class DDApiIntegrationTest {
|
|||
api.sendTraces(traces)
|
||||
if (v4()) {
|
||||
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
|
||||
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
|
||||
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
||||
}
|
||||
|
||||
where:
|
||||
|
@ -147,7 +146,7 @@ class DDApiIntegrationTest {
|
|||
unixDomainSocketApi.sendTraces(traces)
|
||||
if (v4()) {
|
||||
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
|
||||
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
|
||||
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
||||
}
|
||||
|
||||
where:
|
||||
|
|
|
@ -8,9 +8,6 @@ ext {
|
|||
slf4j : "1.7.29",
|
||||
guava : "20.0", // Last version to support Java 7
|
||||
|
||||
// When upgrading for security fixes, ensure corresponding change is reflected in jmxfetch.
|
||||
jackson : "2.10.0", // https://nvd.nist.gov/vuln/detail/CVE-2019-16942 et al
|
||||
|
||||
spock : "1.3-groovy-$spockGroovyVer",
|
||||
groovy : groovyVer,
|
||||
logback : "1.2.3",
|
||||
|
@ -34,10 +31,6 @@ ext {
|
|||
// General
|
||||
slf4j : "org.slf4j:slf4j-api:${versions.slf4j}",
|
||||
guava : "com.google.guava:guava:$versions.guava",
|
||||
jackson : [
|
||||
dependencies.create(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson),
|
||||
dependencies.create(group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.18'),
|
||||
],
|
||||
bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: "${versions.bytebuddy}"),
|
||||
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"),
|
||||
autoservice : [
|
||||
|
|
Loading…
Reference in New Issue