Merge pull request #989 from DataDog/landerson/remove-dd-trace

Remove dd-trace and dd-trace-ext
This commit is contained in:
Laplie Anderson 2019-09-12 11:55:28 -04:00 committed by GitHub
commit 3d6d45ba77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 2 additions and 4594 deletions

View File

@ -1,4 +0,0 @@
# Convenience Utils for Datadog Tracer Internal API
- Trace Scopes
- Global Tracer
- Async controls

View File

@ -1,6 +0,0 @@
description = 'dd-trace-ext'
apply from: "${rootDir}/gradle/java.gradle"
dependencies {
compile project(':dd-trace')
}

View File

@ -1,75 +0,0 @@
package datadog.trace.tracer.ext;
import datadog.trace.tracer.Continuation;
import datadog.trace.tracer.Span;
import datadog.trace.tracer.Tracer;
// Keeping in PR for potential discussions. Will eventually remove.
// TODO: remove
class Examples {
private Examples() {}
public static void test() {
final Throwable someThrowable = null;
// registration
TracerContext.registerGlobalContext(new TracerContext(Tracer.builder().build()), false);
// scope
final TracerContext ctx = TracerContext.getGlobalContext();
// without try-with-resources
{
final Span rootSpan = ctx.getTracer().buildTrace(null);
final Scope scope = ctx.pushScope(rootSpan);
rootSpan.attachThrowable(someThrowable);
scope.close();
rootSpan.finish();
}
/*
// with try-with-resources finishOnClose=true
{
Span rootSpan = ctx.getTracer().buildTrace(null);
try (Scope scope = ctx.pushScope(rootSpan)) {
try {
// the body
} catch (Throwable t) {
rootSpan.setError(true);
rootSpan.attachThrowable(t);
throw t;
}
}
}
*/
// with try-with-resources finishOnClose=false
{
final Span rootSpan = ctx.getTracer().buildTrace(null);
try (final Scope scope = ctx.pushScope(rootSpan)) {
// the body
} catch (final Throwable t) {
rootSpan.attachThrowable(t);
throw t;
} finally {
rootSpan.finish();
}
}
// continuations
{
final Span rootSpan = ctx.getTracer().buildTrace(null);
final Continuation cont = rootSpan.getTrace().createContinuation(rootSpan);
{ // on another thread
final Span parent = cont.getSpan();
try {
// body
} finally {
cont.close();
}
}
}
// create a span as a child of the currently active span
final Span childSpan =
ctx.peekScope().span().getTrace().createSpan(ctx.peekScope().span().getContext());
}
}

View File

@ -1,27 +0,0 @@
package datadog.trace.tracer.ext;
import datadog.trace.tracer.Continuation;
import datadog.trace.tracer.Span;
/**
* A scope holds a single span or trace continuation and may optionally finish its span or
* continuation.
*
* <p>To create a scope, see {@link TracerContext#pushScope(Span)} and {@link
* TracerContext#pushScope(Continuation)}.
*
* <p>All created scopes must be closed with {@link Scope#close()}
*/
public interface Scope extends AutoCloseable {
/** Get the span held by this scope. */
Span span();
/**
* Close this scope. This method must be invoked on all created scopes.
*
* <p>Attempting to close a scope which is not on the top of its TracerContext's scope-stack is an
* error. See {@link TracerContext#peekScope()}.
*/
@Override
void close();
}

View File

@ -1,86 +0,0 @@
package datadog.trace.tracer.ext;
import datadog.trace.tracer.Continuation;
import datadog.trace.tracer.Span;
import datadog.trace.tracer.Tracer;
/**
* Provides a place to store a shared global tracer with convenient span-building helper methods.
*
* <p>Also maintains a stack of active scopes (or scope-stack). The span of the scope on the top of
* the scope stack is used as the parent for newly created spans.
*
* <p>This class is thread safe.
*/
public final class TracerContext {
// global TracerContext
/** Get the global TracerContext */
public static TracerContext getGlobalContext() {
return null;
}
/**
* Register the global TracerContext.
*
* @param context The context to register.
* @param replaceExisting If true, the existing global TracerContext will be replaced
* @return The old global TracerContext, or null if no previous context ws registered
*/
public static TracerContext registerGlobalContext(
final TracerContext context, final boolean replaceExisting) {
return null;
}
/** @return True if a global TracerContext has been registered */
public static boolean isRegistered() {
return false;
}
private final Tracer tracer;
public TracerContext(final Tracer tracer) {
this.tracer = tracer;
}
/** @return The tracer associated with this TracerContext */
public Tracer getTracer() {
return tracer;
}
// TODO: convenience APIs like buildSpan, etc.
/**
* Push a new scope to the top of this scope-stack. The scope's span will be the given span.
*
* @param span
* @return
*/
public Scope pushScope(final Span span) {
return null;
}
/**
* Push a new scope to the top of this scope-stack. The scope's span will be the continuation's
* span.
*
* @param continuation
* @return
*/
public Scope pushScope(final Continuation continuation) {
return null;
}
/**
* Pop the given scope off the top of the scope stack.
*
* <p>If the given scope is not the topmost scope on the stack an error will be thrown.
*
* @param scope the topmost scope in the scope stack.
*/
public void popScope(final Scope scope) {}
/** @return The scope on the top of this scope-stack or null if there is no active scope. */
public Scope peekScope() {
return null;
}
}

View File

@ -1,4 +0,0 @@
# Datadog Tracer Internal API
Contains the core elements needed to create and report APM traces to Datadog.
It's recommended to use `:dd-trace-ext` in addition to this api.

View File

@ -1,27 +0,0 @@
description = 'dd-trace'
apply from: "${rootDir}/gradle/java.gradle"
minimumBranchCoverage = 0.9
minimumInstructionCoverage = 0.9
excludedClassesCoverage += [
'datadog.trace.tracer.Tracer.TracerBuilder',
'datadog.trace.decorator.*', // TODO: remove when ready to write tests.
]
dependencies {
annotationProcessor deps.autoservice
implementation deps.autoservice
compile project(':dd-trace-api')
compile deps.slf4j
compile deps.jackson
compile project(':utils:gc-utils')
// Spock uses that for mocking
testCompile deps.bytebuddy
testCompile group: 'org.objenesis', name: 'objenesis', version: '2.6' // Last version to support Java7
testCompile group: 'nl.jqno.equalsverifier', name: 'equalsverifier', version: '2.5.2' // Last version to support Java7
testCompile group: 'com.github.tomakehurst', name: 'wiremock', 'version': '2.20.0'
}

View File

@ -1,28 +0,0 @@
package datadog.trace.decorator;
import datadog.trace.tracer.Span;
public abstract class BaseDecorator {
protected abstract String component();
public Span afterStart(final Span span) {
assert span != null;
span.setMeta("component", component());
return span;
}
public Span beforeFinish(final Span span) {
assert span != null;
return span;
}
public Span onError(final Span span, final Throwable throwable) {
assert span != null;
if (throwable != null) {
span.setErrored(true);
span.attachThrowable(throwable);
}
return span;
}
}

View File

@ -1,21 +0,0 @@
package datadog.trace.decorator;
import datadog.trace.tracer.Span;
public abstract class ClientDecorator extends BaseDecorator {
protected abstract String service();
protected abstract String spanType();
@Override
public Span afterStart(final Span span) {
assert span != null;
if (service() != null) {
span.setService(service());
}
span.setMeta("span.kind", "client");
span.setMeta("span.type", spanType());
return super.afterStart(span);
}
}

View File

@ -1,45 +0,0 @@
package datadog.trace.decorator;
import datadog.trace.tracer.Span;
public abstract class HttpClientDecorator<REQUEST, RESPONSE> extends ClientDecorator {
protected abstract String method(REQUEST request);
protected abstract String url(REQUEST request);
protected abstract String hostname(RESPONSE request);
protected abstract int port(RESPONSE request);
protected abstract int status(RESPONSE response);
@Override
protected String spanType() {
return "http";
}
public Span onRequest(final Span span, final REQUEST request) {
assert span != null;
if (request != null) {
span.setMeta("http.method", method(request));
span.setMeta("http.url", url(request));
}
return span;
}
public Span onResponse(final Span span, final RESPONSE response) {
assert span != null;
if (response != null) {
span.setMeta("peer.hostname", hostname(response));
span.setMeta("peer.port", port(response));
final int status = status(response);
span.setMeta("http.status", status);
if (400 <= status && status < 500) {
span.setErrored(true);
}
}
return span;
}
}

View File

@ -1,55 +0,0 @@
package datadog.trace.decorator;
import datadog.trace.tracer.Span;
public abstract class HttpServerDecorator<REQUEST, RESPONSE> extends ServerDecorator {
protected abstract String method(REQUEST request);
protected abstract String url(REQUEST request);
protected abstract String hostname(REQUEST request);
protected abstract int port(REQUEST request);
protected abstract int status(RESPONSE response);
@Override
protected String spanType() {
return "web";
}
public Span onRequest(final Span span, final REQUEST request) {
assert span != null;
if (request != null) {
span.setMeta("http.method", method(request));
span.setMeta("http.url", url(request));
span.setMeta("peer.hostname", hostname(request));
span.setMeta("peer.port", port(request));
// TODO set resource name from URL.
}
return span;
}
public Span onResponse(final Span span, final RESPONSE response) {
assert span != null;
if (response != null) {
span.setMeta("http.status", status(response));
if (status(response) >= 500) {
span.setErrored(true);
}
}
return span;
}
@Override
public Span onError(final Span span, final Throwable throwable) {
assert span != null;
final Object status = span.getMeta("http.status");
if (status == null || status.equals(200)) {
// Ensure status set correctly
span.setMeta("http.status", 500);
}
return super.onError(span, throwable);
}
}

View File

@ -1,16 +0,0 @@
package datadog.trace.decorator;
import datadog.trace.tracer.Span;
public abstract class ServerDecorator extends BaseDecorator {
protected abstract String spanType();
@Override
public Span afterStart(final Span span) {
assert span != null;
span.setMeta("span.kind", "server");
span.setMeta("span.type", spanType());
return super.afterStart(span);
}
}

View File

@ -1,108 +0,0 @@
package datadog.trace.tracer;
import java.util.concurrent.TimeUnit;
import lombok.EqualsAndHashCode;
/**
* This is a wrapper to System clock that provides an easier way to get nanosecond precision.
*
* <p>The JDK provides two clocks:
* <li>one in nanoseconds, for precision, but it can only use to measure durations
* <li>one in milliseconds, for accuracy, useful to provide epoch time
*
* <p>Once created this class captures current time with millisecond presition and current
* nanosecond counter.
*
* <p>It provides an API to create {@link Timestamp} that can be used to measure durations with
* nanosecond precision.
*/
@EqualsAndHashCode
class Clock {
/** Tracer that created this clock */
private final Tracer tracer;
/** Trace start time in nano seconds measured up to a millisecond accuracy */
private final long startTimeNano;
/** Nano ticks counter when clock is created */
private final long startNanoTicks;
Clock(final Tracer tracer) {
this.tracer = tracer;
startTimeNano = epochTimeNano();
startNanoTicks = nanoTicks();
}
/** @return {@link Tracer} that created this clock. */
public Tracer getTracer() {
return tracer;
}
/**
* Create new timestamp instance for current time.
*
* @return new timestamp capturing current time.
*/
public Timestamp createCurrentTimestamp() {
return new Timestamp(this);
}
/**
* Create new timestamp instance for current time.
*
* @return new timestamp capturing current time.
*/
public Timestamp createTimestampForTime(final long time, final TimeUnit unit) {
return new Timestamp(this, time, unit);
}
/**
* Get the current nanos ticks (i.e. System.nanoTime()), this method can't be use for date
* accuracy (only duration calculations).
*
* @return The current nanos ticks.
*/
long nanoTicks() {
return System.nanoTime();
}
/**
* Get the current epoch time in micros.
*
* <p>Note: The actual precision is the millis.
*
* @return the current epoch time in micros.
*/
long epochTimeMicro() {
return TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis());
}
/**
* Get the current epoch time in nanos.
*
* <p>Note: The actual precision is the millis. This will overflow ~290 years after epoch.
*
* @return the current epoch time in nanos.
*/
long epochTimeNano() {
return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
}
/**
* Get time this clock instance was created in nanos.
*
* @return the time this clock instance was created in nanos.
*/
long getStartTimeNano() {
return startTimeNano;
}
/**
* Get nano ticks counter value when this clock instance was created.
*
* @return nano ticks counter value when this clock instance was created.
*/
long getStartNanoTicks() {
return startNanoTicks;
}
}

View File

@ -1,26 +0,0 @@
package datadog.trace.tracer;
/**
* Continuations are used to prevent a trace from reporting without creating a span.
*
* <p>All spans are thread safe.
*
* <p>To create a Span, see {@link Trace#createContinuation(Span parentSpan)}
*/
public interface Continuation {
/** @return parent span used to create this continuation. */
Span getSpan();
/** @return trace used to create this continuation. */
Trace getTrace();
/** @return true iff continuation has been closed. */
boolean isClosed();
/**
* Close the continuation. Continuation's trace will not block reporting on account of this
* continuation.
*/
void close();
}

View File

@ -1,72 +0,0 @@
package datadog.trace.tracer;
import lombok.extern.slf4j.Slf4j;
/** Concrete implementation of a continuation */
@Slf4j
class ContinuationImpl implements Continuation {
private final TraceInternal trace;
private final Span span;
private volatile boolean closed = false;
ContinuationImpl(final TraceInternal trace, final Span span) {
this.trace = trace;
this.span = span;
}
@Override
public Trace getTrace() {
return trace;
}
@Override
public Span getSpan() {
return span;
}
@Override
public boolean isClosed() {
return closed;
}
@Override
public synchronized void close() {
if (closed) {
reportUsageError("Attempted to close continuation that is already closed: %s", this);
} else {
closeContinuation(false);
}
}
@Override
protected synchronized void finalize() {
try {
if (!closed) {
log.debug(
"Closing continuation due to GC, this will prevent trace from being reported: {}",
this);
closeContinuation(true);
}
} catch (final Throwable t) {
// Exceptions thrown in finalizer are eaten up and ignored, so log them instead
log.debug("Span finalizer had thrown an exception: ", t);
}
}
/**
* Helper method to perform operations needed to close the continuation.
*
* <p>Note: This has to be called under object lock.
*
* @param invalid true iff continuation is being closed due to GC, this will make trace invalid.
*/
private void closeContinuation(final boolean invalid) {
closed = true;
trace.closeContinuation(this, invalid);
}
private void reportUsageError(final String message, final Object... args) {
trace.getTracer().reportError(message, args);
}
}

View File

@ -1,29 +0,0 @@
package datadog.trace.tracer;
/**
* An Interceptor allows adding hooks to particular events of a span starting and finishing and also
* trace being written to backend.
*/
public interface Interceptor {
/**
* Called after a span is started.
*
* @param span the started span.
*/
void afterSpanStarted(Span span);
/**
* Called before a span is finished.
*
* @param span the span to be finished.
*/
void beforeSpanFinished(Span span);
/**
* Invoked when a trace is eligible for writing but hasn't been handed off to its writer yet.
*
* @param trace The intercepted trace.
* @return modified trace. Null if trace is to be dropped.
*/
Trace beforeTraceWritten(Trace trace);
}

View File

@ -1,51 +0,0 @@
package datadog.trace.tracer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
/**
* Helper class to limit errors logged into application's error log.
*
* <p>TODO: can we make this class not public?
*
* <p>TODO: once we drop 1.7 support we should be able to use {@code java.time.Clock} instead of
* {@code System.currentTimeMillis} to simplify testing.
*/
public class LogRateLimiter {
private final Logger log;
private final long millisecondsBetweenLog;
private long nextAllowedLogTime = 0;
public LogRateLimiter(final Logger log, final long millisecondsBetweenLog) {
this.log = log;
this.millisecondsBetweenLog = millisecondsBetweenLog;
}
public synchronized void warn(String message, final Object... arguments) {
if (log.isDebugEnabled()) {
log.debug(message, arguments);
} else if (nextAllowedLogTime <= System.currentTimeMillis()) {
message +=
" (going silent for "
+ TimeUnit.MILLISECONDS.toMinutes(millisecondsBetweenLog)
+ " minutes)";
nextAllowedLogTime = System.currentTimeMillis() + millisecondsBetweenLog;
log.warn(message, arguments);
}
}
public synchronized void error(String message, final Object... arguments) {
if (log.isDebugEnabled()) {
log.debug(message, arguments);
} else if (nextAllowedLogTime <= System.currentTimeMillis()) {
message +=
" (going silent for "
+ TimeUnit.MILLISECONDS.toMinutes(millisecondsBetweenLog)
+ " minutes)";
nextAllowedLogTime = System.currentTimeMillis() + millisecondsBetweenLog;
log.error(message, arguments);
}
}
}

View File

@ -1,141 +0,0 @@
package datadog.trace.tracer;
import java.util.Map;
/**
* A single measurement of time with arbitrary key-value attributes.
*
* <p>All spans are thread safe.
*
* <p>To create a Span, see {@link Trace#createSpan(SpanContext parentContext, Timestamp
* startTimestamp)}
*/
public interface Span {
/** @return The trace this span is associated with. */
Trace getTrace();
/** @return start timestamp of this span */
public Timestamp getStartTimestamp();
/** @return duration of this span in nanoseconds or null if span is not finished */
public Long getDuration();
/** @return true if a finish method has been invoked on this span. */
boolean isFinished();
/**
* Get the span context for this span - required attributes to report to datadog.
*
* <p>See https://docs.datadoghq.com/api/?lang=python#tracing
*
* @return the span context.
*/
SpanContext getContext();
/** @return the span's service. */
String getService();
/**
* Set the span's service.
*
* <p>May not exceed 100 characters.
*
* @param service the new service for the span.
*/
void setService(String service);
/** @return the span's resource. */
String getResource();
/**
* Span the span's resource (e.g. http endpoint).
*
* <p>May not exceed 5000 characters.
*
* @param resource the new resource for the span.
*/
void setResource(String resource);
/** @return the span's type. */
String getType();
/**
* Set the span's type (web, db, etc). {@see DDSpanTypes}.
*
* @param type the new type of the span.
*/
void setType(String type);
/** @return the span's name. */
String getName();
/**
* Set the span's name.
*
* <p>May not exceed 100 characters.
*
* @param name the new name for the span.
*/
void setName(String name);
/** @return true iff span was marked as error span. */
boolean isErrored();
/**
* Mark the span as having an error.
*
* @param errored true if the span has an error.
*/
void setErrored(boolean errored);
/**
* Attach a throwable to this span.
*
* @param throwable throwable to attach
*/
void attachThrowable(Throwable throwable);
/**
* Get all the metadata attached to this span.
*
* @return immutable map of span metadata.
*/
Map<String, Object> getMeta();
/**
* Get a meta value on a span.
*
* @param key The meta key
* @return The value currently associated with the given key. Null if no associated.
*/
Object getMeta(String key);
/**
* Set key-value metadata on the span.
*
* @param key to set
* @param value to set
*/
void setMeta(String key, String value);
/** {@link Span#setMeta(String, String)} for boolean values */
void setMeta(String key, Boolean value);
/** {@link Span#setMeta(String, String)} for number values */
void setMeta(String key, Number value);
/** Stop the span's timer. Has no effect if the span is already finished. */
void finish();
/**
* Stop the span's timer. Has no effect if the span is already finished.
*
* <p>It's undefined behavior to specify a finish timestamp which occurred before this span's
* start timestamp.
*
* @param finishTimestampNanoseconds Epoch time in nanoseconds.
*/
// FIXME: This should take a Timestamp object instead.
void finish(long finishTimestampNanoseconds);
}

View File

@ -1,40 +0,0 @@
package datadog.trace.tracer;
/**
* All attributes of a {@link Span} which propagate for distributed tracing.
*
* <p>All Spans must have a SpanContext, but not all SpanContexts require a span.
*
* <p>All SpanContexts are thread safe.
*/
public interface SpanContext {
/**
* Get this context's trace id.
*
* @return 64 bit unsigned integer in String format.
*/
String getTraceId();
/**
* Get this context's parent span id.
*
* @return 64 bit unsigned integer in String format.
*/
String getParentId();
/**
* Get this context's span id.
*
* @return 64 bit unsigned integer in String format.
*/
String getSpanId();
/**
* Get the sampling flag for this context.
*
* @return sampling flag for null if no sampling flags are set.
*/
// TODO: should we add a @Nullable annotation to our project?
Integer getSamplingFlags();
}

View File

@ -1,69 +0,0 @@
package datadog.trace.tracer;
import datadog.trace.api.sampling.PrioritySampling;
import java.math.BigInteger;
import java.util.concurrent.ThreadLocalRandom;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode
class SpanContextImpl implements SpanContext {
public static final String ZERO = "0";
private final String traceId;
private final String parentId;
private final String spanId;
SpanContextImpl(final String traceId, final String parentId, final String spanId) {
this.traceId = traceId;
this.parentId = parentId;
this.spanId = spanId;
}
@Override
public String getTraceId() {
return traceId;
}
@Override
public String getParentId() {
return parentId;
}
@Override
public String getSpanId() {
return spanId;
}
// TODO: Implement proper priority handling methods
@Override
public Integer getSamplingFlags() {
return PrioritySampling.SAMPLER_KEEP;
}
static SpanContext fromParent(final SpanContext parent) {
final String traceId;
final String parentId;
if (parent == null) {
traceId = generateNewId();
parentId = ZERO;
} else {
traceId = parent.getTraceId();
parentId = parent.getSpanId();
}
return new SpanContextImpl(traceId, parentId, generateNewId());
}
/** @return Random 64bit unsigned number strictly greater than zero */
static String generateNewId() {
// Note: we can probably optimize this using {@link ThreadLocalRandom#nextLong()}
// and {@link Long#toUnsignedString} but {@link Long#toUnsignedString} is only
// available in Java8+ and {@link ThreadLocalRandom#nextLong()} cannot
// generate negative numbers before Java8.
BigInteger result = BigInteger.ZERO;
while (result.equals(BigInteger.ZERO)) {
result = new BigInteger(64, ThreadLocalRandom.current());
}
return result.toString();
}
}

View File

@ -1,345 +0,0 @@
package datadog.trace.tracer;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import datadog.trace.api.DDTags;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.math.BigInteger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
/** Concrete implementation of a span */
@Slf4j
// Disable autodetection of fields and accessors
@JsonAutoDetect(
fieldVisibility = Visibility.NONE,
setterVisibility = Visibility.NONE,
getterVisibility = Visibility.NONE,
isGetterVisibility = Visibility.NONE,
creatorVisibility = Visibility.NONE)
class SpanImpl implements Span {
private final TraceInternal trace;
private final SpanContext context;
private final Timestamp startTimestamp;
/* Note: some fields are volatile so we could make getters non synchronized.
Alternatively we could make getters synchronized, but this may create more contention.
*/
private volatile Long duration = null;
private volatile String service;
private volatile String resource;
private volatile String type;
private volatile String name;
private volatile boolean errored = false;
private final Map<String, Object> meta = new HashMap<>();
private final List<Interceptor> interceptors;
/**
* Create a span with the a specific startTimestamp timestamp.
*
* @param trace The trace to associate this span with.
* @param parentContext identifies the parent of this span. May be null.
* @param startTimestamp timestamp when this span was started.
*/
SpanImpl(
final TraceInternal trace, final SpanContext parentContext, final Timestamp startTimestamp) {
this.trace = trace;
context = SpanContextImpl.fromParent(parentContext);
if (startTimestamp == null) {
reportUsageError("Cannot create span without timestamp: %s", trace);
throw new TraceException(String.format("Cannot create span without timestamp: %s", trace));
}
this.startTimestamp = startTimestamp;
service = trace.getTracer().getDefaultServiceName();
interceptors = trace.getTracer().getInterceptors();
for (final Interceptor interceptor : interceptors) {
interceptor.afterSpanStarted(this);
}
}
@Override
public Trace getTrace() {
return trace;
}
@Override
public SpanContext getContext() {
return context;
}
@JsonGetter("trace_id")
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getTraceId() {
return context.getTraceId();
}
@JsonGetter("span_id")
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getSpanId() {
return context.getSpanId();
}
@JsonGetter("parent_id")
@JsonSerialize(using = UInt64IDStringSerializer.class)
public String getParentId() {
return context.getParentId();
}
@Override
@JsonGetter("start")
public Timestamp getStartTimestamp() {
return startTimestamp;
}
@Override
@JsonGetter("duration")
public Long getDuration() {
return duration;
}
@Override
public boolean isFinished() {
return duration != null;
}
@Override
@JsonGetter("service")
public String getService() {
return service;
}
@Override
public synchronized void setService(final String service) {
if (isFinished()) {
reportSetterUsageError("service");
} else {
this.service = service;
}
}
@Override
@JsonGetter("resource")
public String getResource() {
return resource;
}
@Override
public synchronized void setResource(final String resource) {
if (isFinished()) {
reportSetterUsageError("resource");
} else {
this.resource = resource;
}
}
@Override
@JsonGetter("type")
public String getType() {
return type;
}
@Override
public synchronized void setType(final String type) {
if (isFinished()) {
reportSetterUsageError("type");
} else {
this.type = type;
}
}
@Override
@JsonGetter("name")
public String getName() {
return name;
}
@Override
public synchronized void setName(final String name) {
if (isFinished()) {
reportSetterUsageError("name");
} else {
this.name = name;
}
}
@Override
@JsonGetter("error")
@JsonFormat(shape = JsonFormat.Shape.NUMBER)
public boolean isErrored() {
return errored;
}
@Override
public synchronized void attachThrowable(final Throwable throwable) {
if (isFinished()) {
reportSetterUsageError("throwable");
} else {
setErrored(true);
setMeta(DDTags.ERROR_MSG, throwable.getMessage());
setMeta(DDTags.ERROR_TYPE, throwable.getClass().getName());
final StringWriter errorString = new StringWriter();
throwable.printStackTrace(new PrintWriter(errorString));
setMeta(DDTags.ERROR_STACK, errorString.toString());
}
}
@Override
public synchronized void setErrored(final boolean errored) {
if (isFinished()) {
reportSetterUsageError("errored");
} else {
this.errored = errored;
}
}
@Override
public synchronized Map<String, Object> getMeta() {
return Collections.unmodifiableMap(meta);
}
/**
* The agent expects meta's values to be strings.
*
* @return a copy of meta with all values converted to strings.
*/
@JsonGetter("meta")
synchronized Map<String, String> getMetaString() {
final Map<String, String> result = new HashMap<>(meta.size());
for (final Map.Entry<String, Object> entry : meta.entrySet()) {
result.put(entry.getKey(), String.valueOf(entry.getValue()));
}
return result;
}
@Override
public synchronized Object getMeta(final String key) {
return meta.get(key);
}
protected synchronized void setMeta(final String key, final Object value) {
if (isFinished()) {
reportSetterUsageError("meta value " + key);
} else {
if (value == null) {
meta.remove(key);
} else {
meta.put(key, value);
}
}
}
@Override
public void setMeta(final String key, final String value) {
setMeta(key, (Object) value);
}
@Override
public void setMeta(final String key, final Boolean value) {
setMeta(key, (Object) value);
}
@Override
public void setMeta(final String key, final Number value) {
setMeta(key, (Object) value);
}
// FIXME: Add metrics support and json rendering for metrics
@Override
public synchronized void finish() {
if (isFinished()) {
reportUsageError("Attempted to finish span that is already finished: %s", this);
} else {
finishSpan(startTimestamp.getDuration(), false);
}
}
// FIXME: This should take a Timestamp object instead.
@Override
public synchronized void finish(final long finishTimestampNanoseconds) {
if (isFinished()) {
reportUsageError("Attempted to finish span that is already finish: %s", this);
} else {
finishSpan(startTimestamp.getDuration(finishTimestampNanoseconds), false);
}
}
@Override
protected synchronized void finalize() {
try {
// Note: according to docs finalize is only called once for a given instance - even if
// instance is 'revived' from the dead by passing reference to some other object and
// then dies again.
if (!isFinished()) {
log.debug(
"Finishing span due to GC, this will prevent trace from being reported: {}", this);
finishSpan(startTimestamp.getDuration(), true);
}
} catch (final Throwable t) {
// Exceptions thrown in finalizer are eaten up and ignored, so log them instead
log.debug("Span finalizer had thrown an exception: ", t);
}
}
/**
* Helper method to perform operations to finish the span.
*
* <p>Note: This has to be called under object lock.
*
* @param duration duration of the span.
* @param fromGC true iff we are closing span because it is being GCed, this will make trace
* invalid.
*/
private void finishSpan(final long duration, final boolean fromGC) {
// Run interceptors in 'reverse' order
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).beforeSpanFinished(this);
}
this.duration = duration;
trace.finishSpan(this, fromGC);
}
private void reportUsageError(final String message, final Object... args) {
trace.getTracer().reportError(message, args);
}
private void reportSetterUsageError(final String fieldName) {
reportUsageError("Attempted to set '%s' when span is already finished: %s", fieldName, this);
}
/** Helper to serialize string value as 64 bit unsigned integer */
private static class UInt64IDStringSerializer extends StdSerializer<String> {
public UInt64IDStringSerializer() {
super(String.class);
}
@Override
public void serialize(
final String value, final JsonGenerator jsonGenerator, final SerializerProvider provider)
throws IOException {
jsonGenerator.writeNumber(new BigInteger(value));
}
}
}

View File

@ -1,99 +0,0 @@
package datadog.trace.tracer;
import static java.lang.Math.max;
import com.fasterxml.jackson.annotation.JsonValue;
import java.util.concurrent.TimeUnit;
import lombok.EqualsAndHashCode;
/**
* Class that encapsulations notion of a given timestamp, or instant in time.
*
* <p>Timestamps are created by a [@link Clock} instance.
*/
@EqualsAndHashCode
public class Timestamp {
private final Clock clock;
private final long nanoTicks;
/**
* Create timestamp for a given clock and given nanoTicks state
*
* @param clock clock instance
*/
Timestamp(final Clock clock) {
this.clock = clock;
nanoTicks = clock.nanoTicks();
}
/**
* Create timestamp for a given clock and given start time and precision
*
* @param clock clock instance
*/
Timestamp(final Clock clock, final long time, final TimeUnit unit) {
this.clock = clock;
final long currentTime = clock.epochTimeNano();
final long currentTick = clock.nanoTicks();
final long desiredTime = unit.toNanos(time);
final long offset = currentTime - desiredTime;
nanoTicks = currentTick - offset;
}
/** @return clock instance used by this timestamp */
Clock getClock() {
return clock;
}
/** @return time since epoch in nanoseconds */
@JsonValue
public long getTime() {
return clock.getStartTimeNano() + startTicksOffset();
}
/** @return duration in nanoseconds from this time stamp to current time. */
public long getDuration() {
return getDuration(clock.createCurrentTimestamp());
}
/**
* Get duration in nanoseconds from this time stamp to provided finish timestamp.
*
* @param finishTimestamp finish timestamp to use as period's end.
* @return duration in nanoseconds.
*/
public long getDuration(final Timestamp finishTimestamp) {
if (clock != finishTimestamp.clock) {
clock
.getTracer()
.reportError(
"Trying to find duration between two timestamps created by different clocks. Current clock: %s, finish timestamp clock: %s",
clock, finishTimestamp.clock);
// Do our best to try to calculate nano-second time using millisecond clock start time and
// nanosecond offset.
return max(0, finishTimestamp.getTime() - getTime());
}
return max(0, finishTimestamp.nanoTicks - nanoTicks);
}
/**
* Duration in nanoseconds for external finish time.
*
* <p>Note: since we can only get time with millisecond precision in Java this ends up being
* effectively millisecond precision duration converted to nanoseconds.
*
* @param finishTimeNanoseconds
* @return duration in nanoseconds (with millisecond precision).
*/
public long getDuration(final long finishTimeNanoseconds) {
// This is calculated as the difference between finish time and clock start time and then
// subtracting difference between timestamp nanoticks and clock start nanoticks to account for
// time after clock has been created and before timestamp has been created.
return max(0, finishTimeNanoseconds - clock.getStartTimeNano() - startTicksOffset());
}
private long startTicksOffset() {
return nanoTicks - clock.getStartNanoTicks();
}
}

View File

@ -1,59 +0,0 @@
package datadog.trace.tracer;
import java.util.List;
/**
* A tree of {@link Span}s with a single root node plus logic to determine when to report said tree
* to the backend.
*
* <p>A trace will be written when all of its spans are finished and all trace continuations are
* closed.
*
* <p>To create a Trace, see {@link Tracer#buildTrace(SpanContext parentContext)}
*/
public interface Trace {
/** @return the tracer which created this trace. */
Tracer getTracer();
/** @return the root span for this trace. This will never be null. */
Span getRootSpan();
/**
* @return list of spans for this trace. Note: if trace is not finished this will report error.
*/
List<Span> getSpans();
/** @return true iff trace is valid (invalid traces should not be reported). */
boolean isValid();
/** @return current timestamp using this trace's clock */
Timestamp createCurrentTimestamp();
/**
* Create a new span in this trace as a child of the given parent context.
*
* @param parentContext the parent to use. Must be a span in this trace.
* @return the new span. It is the caller's responsibility to ensure {@link Span#finish()} is
* eventually invoked on this span.
*/
Span createSpan(final SpanContext parentContext);
/**
* Create a new span in this trace as a child of the given parent context.
*
* @param parentContext the parent to use. Must be a span in this trace.
* @param startTimestamp timestamp to use as start timestamp for a new span.
* @return the new span. It is the caller's responsibility to ensure {@link Span#finish()} is
* eventually invoked on this span.
*/
Span createSpan(final SpanContext parentContext, final Timestamp startTimestamp);
/**
* Create a new continuation for this trace
*
* @param parentSpan the parent to use. Must be a span in this trace.
* @return the new continuation. It is the caller's responsibility to ensure {@link
* Continuation#close()} is eventually invoked on this continuation.
*/
Continuation createContinuation(Span parentSpan);
}

View File

@ -1,54 +0,0 @@
package datadog.trace.tracer;
/** Tracer-specific runtime exception. */
public class TraceException extends RuntimeException {
/**
* Constructs a new trace exception with {@code null} as its detail message. The cause is not
* initialized, and may subsequently be initialized by a call to {@link #initCause}.
*/
public TraceException() {
super();
}
/**
* Constructs a new trace exception with the specified detail message. The cause is not
* initialized, and may subsequently be initialized by a call to {@link #initCause}.
*
* @param message the detail message. The detail message is saved for later retrieval by the
* {@link #getMessage()} method.
*/
public TraceException(final String message) {
super(message);
}
/**
* Constructs a new trace exception with the specified detail message and cause.
*
* <p>Note that the detail message associated with {@code cause} is <i>not</i> automatically
* incorporated in this runtime exception's detail message.
*
* @param message the detail message (which is saved for later retrieval by the {@link
* #getMessage()} method).
* @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method).
* (A <tt>null</tt> value is permitted, and indicates that the cause is nonexistent or
* unknown.)
*/
public TraceException(final String message, final Throwable cause) {
super(message, cause);
}
/**
* Constructs a new runtime exception with the specified cause and a detail message of
* <tt>(cause==null ? null : cause.toString())</tt> (which typically contains the class and detail
* message of <tt>cause</tt>). This constructor is useful for runtime exceptions that are little
* more than wrappers for other throwables.
*
* @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method).
* (A <tt>null</tt> value is permitted, and indicates that the cause is nonexistent or
* unknown.)
*/
public TraceException(final Throwable cause) {
super(cause);
}
}

View File

@ -1,199 +0,0 @@
package datadog.trace.tracer;
import com.fasterxml.jackson.annotation.JsonValue;
import datadog.trace.tracer.writer.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
class TraceImpl implements TraceInternal {
/* We use weakly referenced sets to track 'in-flight' spans and continuations. We use span/continuation's
finalizer to notify trace that span/continuation is being GCed.
If any part of the trace (span or continuation) has been was finished (closed) via GC then trace would be
marked as 'invalid' and will not be reported the the backend. Instead only writer's counter would be incremented.
This allows us not to report traces that have wrong timing information.
Note: instead of using {@link WeakHashMap} we may want to consider using more fancy implementations from
{@link datadog.trace.agent.tooling.WeakMapSuppliers}. If we do this care should be taken to avoid creating
cleanup threads per trace.
*/
private final Set<Span> inFlightSpans =
Collections.newSetFromMap(new WeakHashMap<Span, Boolean>());
private final Set<Continuation> inFlightContinuations =
Collections.newSetFromMap(new WeakHashMap<Continuation, Boolean>());
/** Strong refs to spans which are closed */
private final List<Span> finishedSpans = new ArrayList();
private final Tracer tracer;
private final Clock clock;
private final Span rootSpan;
private boolean valid = true;
private boolean finished = false;
/**
* Create a new Trace.
*
* @param tracer the Tracer to apply settings from.
*/
TraceImpl(
final Tracer tracer,
final SpanContext rootSpanParentContext,
final Timestamp rootSpanStartTimestamp) {
this.tracer = tracer;
clock = rootSpanStartTimestamp.getClock();
rootSpan = new SpanImpl(this, rootSpanParentContext, rootSpanStartTimestamp);
inFlightSpans.add(rootSpan);
}
@Override
public Tracer getTracer() {
return tracer;
}
@Override
public Span getRootSpan() {
return rootSpan;
}
@Override
@JsonValue
public synchronized List<Span> getSpans() {
if (!finished) {
tracer.reportError("Cannot get spans, trace is not finished yet: %s", this);
return Collections.EMPTY_LIST;
}
return Collections.unmodifiableList(finishedSpans);
}
@Override
public synchronized boolean isValid() {
return valid;
}
@Override
public Timestamp createCurrentTimestamp() {
return clock.createCurrentTimestamp();
}
@Override
public Span createSpan(final SpanContext parentContext) {
return createSpan(parentContext, createCurrentTimestamp());
}
@Override
public synchronized Span createSpan(
final SpanContext parentContext, final Timestamp startTimestamp) {
checkTraceFinished("create span");
if (parentContext == null) {
throw new TraceException("Got null parent context, trace: " + this);
}
if (!parentContext.getTraceId().equals(rootSpan.getContext().getTraceId())) {
throw new TraceException(
String.format(
"Wrong trace id when creating a span. Got %s, expected %s",
parentContext.getTraceId(), rootSpan.getContext().getTraceId()));
}
final Span span = new SpanImpl(this, parentContext, startTimestamp);
inFlightSpans.add(span);
return span;
}
@Override
public synchronized Continuation createContinuation(final Span span) {
checkTraceFinished("create continuation");
if (span == null) {
throw new TraceException("Got null parent span, trace: " + this);
}
if (!span.getContext().getTraceId().equals(rootSpan.getContext().getTraceId())) {
throw new TraceException(
String.format(
"Wrong trace id when creating a span. Got %s, expected %s",
span.getContext().getTraceId(), rootSpan.getContext().getTraceId()));
}
final Continuation continuation = new ContinuationImpl(this, span);
inFlightContinuations.add(continuation);
return continuation;
}
@Override
public synchronized void finishSpan(final Span span, final boolean invalid) {
checkTraceFinished("finish span");
if (!inFlightSpans.contains(span)) {
tracer.reportError("Trace doesn't contain continuation to finish: %s, trace: %s", span, this);
return;
}
if (invalid) {
valid = false;
}
inFlightSpans.remove(span);
finishedSpans.add(span);
checkAndWriteTrace();
}
@Override
public synchronized void closeContinuation(
final Continuation continuation, final boolean invalid) {
checkTraceFinished("close continuation");
if (!inFlightContinuations.contains(continuation)) {
tracer.reportError(
"Trace doesn't contain continuation to finish: %s, trace: %s", continuation, this);
return;
}
if (invalid) {
valid = false;
}
inFlightContinuations.remove(continuation);
checkAndWriteTrace();
}
/**
* Helper to check if trace is ready to be written and write it if it is.
*
* <p>Note: This has to be called under object lock.
*/
private void checkAndWriteTrace() {
if (inFlightSpans.isEmpty() && inFlightContinuations.isEmpty()) {
final Writer writer = tracer.getWriter();
writer.incrementTraceCount();
final Trace trace = runInterceptorsBeforeTraceWritten(this);
if (trace != null && tracer.getSampler().sample(trace)) {
writer.write(trace);
}
finished = true;
}
}
/**
* Helper to run interceptor hooks before trace is finished.
*
* <p>Note: This has to be called under object lock.
*/
private Trace runInterceptorsBeforeTraceWritten(Trace trace) {
final List<Interceptor> interceptors = tracer.getInterceptors();
// Run interceptors in 'reverse' order
for (int i = interceptors.size() - 1; i >= 0; i--) {
// TODO: we probably should handle exceptions in interceptors more or less gracefully
trace = interceptors.get(i).beforeTraceWritten(trace);
if (trace == null) {
break;
}
}
return trace;
}
/**
* Helper to check if trace is finished and report an error if it is.
*
* <p>This has to be called under object lock
*
* @param action action to report error with.
*/
private void checkTraceFinished(final String action) {
if (finished) {
tracer.reportError("Cannot %s, trace has already been finished: %s", action, this);
}
}
}

View File

@ -1,21 +0,0 @@
package datadog.trace.tracer;
/** Trace interface that provides additional methods used internally */
interface TraceInternal extends Trace {
/**
* Called by the span to inform trace that span is finished.
*
* @param span span to finish.
* @param invalid true iff span is 'invalid'.
*/
void finishSpan(final Span span, final boolean invalid);
/**
* Called by the continuation to inform trace that span is closed.
*
* @param continuation continuation to close.
* @param invalid true iff span is 'invalid'.
*/
void closeContinuation(final Continuation continuation, final boolean invalid);
}

View File

@ -1,171 +0,0 @@
package datadog.trace.tracer;
import datadog.trace.api.Config;
import datadog.trace.tracer.sampling.AllSampler;
import datadog.trace.tracer.sampling.Sampler;
import datadog.trace.tracer.writer.Writer;
import java.io.Closeable;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
/** A Tracer creates {@link Trace}s and holds common settings across traces. */
@Slf4j
public class Tracer implements Closeable {
/** Writer is an charge of reporting traces and spans to the desired endpoint */
private final Writer writer;
/** Sampler defines the sampling policy in order to reduce the number of traces for instance */
private final Sampler sampler;
/** Settings for this tracer. */
private final Config config;
/** Interceptors to be called on certain trace and span events */
private final List<Interceptor> interceptors;
/**
* JVM shutdown callback, keeping a reference to it to remove this if Tracer gets destroyed
* earlier
*/
private final Thread shutdownCallback;
@Builder
private Tracer(
final Config config,
final Writer writer,
final Sampler sampler,
final List<Interceptor> interceptors) {
// Apply defaults:
this.config = config != null ? config : Config.get();
this.writer = writer != null ? writer : Writer.Builder.forConfig(this.config);
this.sampler = sampler != null ? sampler : new AllSampler();
// TODO: implement and include "standard" interceptors
this.interceptors =
interceptors != null
? Collections.unmodifiableList(new ArrayList<>(interceptors))
: Collections.<Interceptor>emptyList();
shutdownCallback = new ShutdownHook(this);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
/** @return {@link Writer} used by this tracer */
public Writer getWriter() {
return writer;
}
/** @return {@link Sampler} used by this tracer. */
public Sampler getSampler() {
return sampler;
}
/** @return unmodifiable list of trace/span interceptors. */
public List<Interceptor> getInterceptors() {
return interceptors;
}
/** @return service name to use on span by default. */
String getDefaultServiceName() {
return config.getServiceName();
}
/**
* @return timestamp for current time. Note: this is mainly useful when there is no 'current'
* trace. If there is 'current' trace already then one should use it to get timestamps.
*/
public Timestamp createCurrentTimestamp() {
return new Clock(this).createCurrentTimestamp();
}
/**
* @return timestamp for given time. Note: this is mainly useful when there is no 'current' trace.
* If there is 'current' trace already then one should use it to get timestamps.
*/
public Timestamp createTimestampForTime(final long time, final TimeUnit unit) {
return new Clock(this).createTimestampForTime(time, unit);
}
/**
* Construct a new trace using this tracer's settings and return the root span.
*
* @param parentContext parent context of a root span in this trace. May be null.
* @return The root span of the new trace.
*/
public Span buildTrace(final SpanContext parentContext) {
return buildTrace(parentContext, createCurrentTimestamp());
}
/**
* Construct a new trace using this tracer's settings and return the root span.
*
* @param parentContext parent context of a root span in this trace. May be null.
* @param timestamp root span start timestamp.
* @return The root span of the new trace.
*/
public Span buildTrace(final SpanContext parentContext, final Timestamp timestamp) {
final Trace trace = new TraceImpl(this, parentContext, timestamp);
return trace.getRootSpan();
}
// TODO: doc inject and extract
// TODO: inject and extract helpers on span context?
public <T> void inject(final SpanContext spanContext, final Object format, final T carrier) {}
public <T> SpanContext extract(final Object format, final T carrier) {
return null;
}
/*
Report error to the log/console. This may throw an exception
*/
void reportError(final String message, final Object... args) {
// TODO: Provide way to do logging or throwing an exception according to config?
final String completeMessage = String.format(message, args);
log.debug(completeMessage);
throw new TraceException(completeMessage);
}
@Override
public void finalize() {
try {
Runtime.getRuntime().removeShutdownHook(shutdownCallback);
shutdownCallback.run();
} catch (final Exception e) {
log.error("Error while finalizing Tracer.", e);
}
}
@Override
public void close() {
// FIXME: Handle the possibility of close being called more than once or not at all.
// FIXME: Depends on order of execution between finalize, GC, and the shutdown hook.
writer.close();
}
private static class ShutdownHook extends Thread {
private final WeakReference<Tracer> reference;
private ShutdownHook(final Tracer tracer) {
reference = new WeakReference<>(tracer);
}
@Override
public void run() {
final Tracer tracer = reference.get();
if (tracer != null) {
tracer.close();
}
}
}
}

View File

@ -1,11 +0,0 @@
package datadog.trace.tracer.sampling;
import datadog.trace.tracer.Trace;
/** Sampler that samples all traces. */
public class AllSampler implements Sampler {
@Override
public boolean sample(final Trace trace) {
return true;
}
}

View File

@ -1,19 +0,0 @@
package datadog.trace.tracer.sampling;
import datadog.trace.tracer.Trace;
/**
* Keeps or discards traces.
*
* <p>Note that in most cases the sampler will keep all traces. Most of the sampling logic is done
* downstream by the trace-agent or dd-backend.
*/
public interface Sampler {
/**
* Run tracer sampling logic on the trace.
*
* @param trace
* @return true if the trace should be kept/written/reported.
*/
boolean sample(Trace trace);
}

View File

@ -1,147 +0,0 @@
package datadog.trace.tracer.writer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.tracer.LogRateLimiter;
import datadog.trace.tracer.Trace;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.jackson.dataformat.MessagePackFactory;
// FIXME: this has to be migrated to okhttp before we can use this.
// Otherwise http requests coming from here will get traced.
@Slf4j
class AgentClient {
static final String TRACES_ENDPOINT = "/v0.4/traces";
static final String CONTENT_TYPE = "Content-Type";
static final String MSGPACK = "application/msgpack";
static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version";
static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count";
static final int CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
static final int READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
private static final LogRateLimiter LOG_RATE_LIMITER =
new LogRateLimiter(log, MILLISECONDS_BETWEEN_ERROR_LOG);
@Getter private final URL agentUrl;
AgentClient(final String host, final int port) {
final String url = "http://" + host + ":" + port + TRACES_ENDPOINT;
try {
agentUrl = new URL(url);
} catch (final MalformedURLException e) {
// This should essentially mean agent should bail out from installing, we cannot meaningfully
// recover from this.
throw new RuntimeException("Cannot parse agent url: " + url, e);
}
}
/**
* Send traces to the Datadog agent
*
* @param traces the traces to be sent
* @param traceCount total number of traces
*/
public SampleRateByService sendTraces(final List<Trace> traces, final int traceCount) {
final TracesRequest request = new TracesRequest(traces);
try {
final HttpURLConnection connection = createHttpConnection();
connection.setRequestProperty(X_DATADOG_TRACE_COUNT, String.valueOf(traceCount));
try (final OutputStream out = connection.getOutputStream()) {
OBJECT_MAPPER.writeValue(out, request);
}
final int responseCode = connection.getResponseCode();
if (responseCode != 200) {
throw new IOException(
String.format(
"Error while sending %d of %d traces to the DD agent. Status: %d, ResponseMessage: %s",
traces.size(), traceCount, responseCode, connection.getResponseMessage()));
}
try (final InputStream in = connection.getInputStream()) {
final TracesResponse response = OBJECT_MAPPER.readValue(in, TracesResponse.class);
return response.getSampleRateByService();
}
} catch (final IOException e) {
LOG_RATE_LIMITER.warn(
"Error while sending {} of {} traces to the DD agent.", traces.size(), traceCount, e);
}
return null;
}
private HttpURLConnection createHttpConnection() throws IOException {
final HttpURLConnection connection = (HttpURLConnection) agentUrl.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
// It is important to have timeout for agent request here: we need to finish request in some
// reasonable amount
// of time to allow following requests to be run.
connection.setConnectTimeout(CONNECT_TIMEOUT);
connection.setReadTimeout(READ_TIMEOUT);
connection.setRequestMethod("PUT");
connection.setRequestProperty(CONTENT_TYPE, MSGPACK);
connection.setRequestProperty(DATADOG_META_LANG, "java");
// TODO: set these variables properly!!!
connection.setRequestProperty(DATADOG_META_LANG_VERSION, "TODO: DDTraceOTInfo.JAVA_VERSION");
connection.setRequestProperty(
DATADOG_META_LANG_INTERPRETER, "TODO: DDTraceOTInfo.JAVA_VM_NAME");
connection.setRequestProperty(DATADOG_META_TRACER_VERSION, "TODO: DDTraceOTInfo.VERSION");
return connection;
}
private static class TracesRequest {
private final List<Trace> traces;
TracesRequest(final List<Trace> traces) {
this.traces = Collections.unmodifiableList(traces);
}
@JsonValue
public List<Trace> getTraces() {
return traces;
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
private static class TracesResponse {
private final SampleRateByService sampleRateByService;
@JsonCreator
TracesResponse(@JsonProperty("rate_by_service") final SampleRateByService sampleRateByService) {
this.sampleRateByService = sampleRateByService;
}
public SampleRateByService getSampleRateByService() {
return sampleRateByService;
}
}
}

View File

@ -1,168 +0,0 @@
package datadog.trace.tracer.writer;
import datadog.trace.tracer.Trace;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AgentWriter implements Writer {
/** Maximum number of traces kept in memory */
static final int DEFAULT_QUEUE_SIZE = 7000;
/** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1;
/** Maximum amount of time to await for scheduler to shutdown */
static final long SHUTDOWN_TIMEOUT_SECONDS = 1;
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, "dd-agent-writer");
thread.setDaemon(true);
return thread;
}
};
/** Scheduled thread pool, acting like a cron */
private final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, THREAD_FACTORY);
private final TracesSendingTask task;
private final ShutdownCallback shutdownCallback;
public AgentWriter(final AgentClient client) {
this(client, DEFAULT_QUEUE_SIZE);
}
AgentWriter(final AgentClient client, final int queueSize) {
task = new TracesSendingTask(client, queueSize);
shutdownCallback = new ShutdownCallback(executorService);
}
/** @return Datadog agent URL. Visible for testing. */
URL getAgentUrl() {
return task.getClient().getAgentUrl();
}
@Override
public void write(final Trace trace) {
if (trace.isValid()) {
if (!task.getQueue().offer(trace)) {
log.debug("Writer queue is full, dropping trace {}", trace);
}
}
}
@Override
public void incrementTraceCount() {
task.getTraceCount().incrementAndGet();
}
@Override
public SampleRateByService getSampleRateByService() {
return task.getSampleRateByService().get();
}
@Override
public void start() {
executorService.scheduleAtFixedRate(task, 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
@Override
public void close() {
// Perform actions needed to shutdown this writer
shutdownCallback.run();
}
@Override
public void finalize() {
close();
}
/** Infinite tasks blocking until some spans come in the queue. */
private static final class TracesSendingTask implements Runnable {
/** The Datadog agent client */
@Getter private final AgentClient client;
/** Queue size */
private final int queueSize;
/** In memory collection of traces waiting for departure */
@Getter private final BlockingQueue<Trace> queue;
/** Number of traces to be written */
@Getter private final AtomicInteger traceCount = new AtomicInteger(0);
/** Sample rate by service returned by Datadog agent */
@Getter
private final AtomicReference<SampleRateByService> sampleRateByService =
new AtomicReference<>(SampleRateByService.EMPTY_INSTANCE);
TracesSendingTask(final AgentClient client, final int queueSize) {
this.client = client;
this.queueSize = queueSize;
queue = new ArrayBlockingQueue<>(queueSize);
}
@Override
public void run() {
try {
final List<Trace> tracesToWrite = new ArrayList<>(queueSize);
queue.drainTo(tracesToWrite);
if (tracesToWrite.size() > 0) {
sampleRateByService.set(client.sendTraces(tracesToWrite, traceCount.getAndSet(0)));
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
}
}
}
/**
* Helper to handle shutting down of the Writer because JVM is shutting down or Writer is closed.
*/
// Visible for testing
static final class ShutdownCallback extends Thread {
private final ExecutorService executorService;
public ShutdownCallback(final ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void run() {
// We use this logic in two cases:
// * When JVM is shutting down
// * When Writer is closed manually/via GC
// In latter case we need to remove shutdown hook.
try {
Runtime.getRuntime().removeShutdownHook(this);
} catch (final IllegalStateException ex) {
// The JVM may be shutting down.
}
try {
executorService.shutdownNow();
executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
}
}
}

View File

@ -1,34 +0,0 @@
package datadog.trace.tracer.writer;
import datadog.trace.tracer.Trace;
import lombok.extern.slf4j.Slf4j;
/** Writer implementation that just logs traces as they are being written */
@Slf4j
public class LoggingWriter implements Writer {
@Override
public void write(final Trace trace) {
log.debug("Trace written: {}", trace);
}
@Override
public void incrementTraceCount() {
// Nothing to do here.
}
@Override
public SampleRateByService getSampleRateByService() {
return SampleRateByService.EMPTY_INSTANCE;
}
@Override
public void start() {
// TODO: do we really need this? and if we do - who is responsible for calling this?
log.debug("{} started", getClass().getSimpleName());
}
@Override
public void close() {
log.debug("{} closed", getClass().getSimpleName());
}
}

View File

@ -1,28 +0,0 @@
package datadog.trace.tracer.writer;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.Collections;
import java.util.Map;
import lombok.EqualsAndHashCode;
/**
* Holds sample rate for all known services. This is reported by Dadadog agent in response to
* writing traces.
*/
@EqualsAndHashCode
class SampleRateByService {
static final SampleRateByService EMPTY_INSTANCE = new SampleRateByService(Collections.EMPTY_MAP);
private final Map<String, Double> rateByService;
@JsonCreator
SampleRateByService(final Map<String, Double> rateByService) {
this.rateByService = Collections.unmodifiableMap(rateByService);
}
public Double getRate(final String service) {
// TODO: improve logic in this class to handle default value better
return rateByService.get(service);
}
}

View File

@ -1,75 +0,0 @@
package datadog.trace.tracer.writer;
import datadog.trace.api.Config;
import datadog.trace.tracer.Trace;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** A writer sends traces to some place. */
public interface Writer {
/**
* Write a trace represented by the entire list of all the finished spans.
*
* <p>It is up to the tracer to decide if the trace should be written (e.g. for invalid traces).
*
* <p>This call doesn't increment trace counter, see {@code incrementTraceCount} for that
*
* @param trace the trace to write
*/
void write(Trace trace);
/**
* Inform the writer that a trace occurred but will not be written. Used by tracer-side sampling.
*/
void incrementTraceCount();
/** @return Most up to date {@link SampleRateByService} instance. */
SampleRateByService getSampleRateByService();
/** Start the writer */
void start();
/**
* Indicates to the writer that no future writing will come and it should terminates all
* connections and tasks
*/
void close();
@Slf4j
final class Builder {
public static Writer forConfig(final Config config) {
if (config == null) {
// There is no way config is not create so getting here must be a code bug
throw new NullPointerException("Config is required to create writer");
}
final Writer writer;
final String configuredType = config.getWriterType();
if (Config.DD_AGENT_WRITER_TYPE.equals(configuredType)) {
writer = createAgentWriter(config);
} else if (Config.LOGGING_WRITER_TYPE.equals(configuredType)) {
writer = new LoggingWriter();
} else {
log.warn(
"Writer type not configured correctly: Type {} not recognized. Defaulting to AgentWriter.",
configuredType);
writer = createAgentWriter(config);
}
return writer;
}
public static Writer forConfig(final Properties config) {
return forConfig(Config.get(config));
}
private static Writer createAgentWriter(final Config config) {
return new AgentWriter(new AgentClient(config.getAgentHost(), config.getAgentPort()));
}
private Builder() {}
}
}

View File

@ -1,64 +0,0 @@
package datadog.trace.tracer
import nl.jqno.equalsverifier.EqualsVerifier
import nl.jqno.equalsverifier.Warning
import spock.lang.Shared
import spock.lang.Specification
import java.util.concurrent.TimeUnit
class ClockTest extends Specification {
// Assume it takes less than a minute to run this test
public static final long MINUTE = TimeUnit.MINUTES.toNanos(1)
@Shared
def tracer = Mock(Tracer)
def "test getters"() {
setup:
def currentTimeNano = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())
def nanoTicks = System.nanoTime()
when:
def clock = new Clock(tracer)
then:
clock.getTracer() == tracer
clock.getStartTimeNano() - currentTimeNano <= MINUTE
clock.getStartNanoTicks() - nanoTicks <= MINUTE
clock.epochTimeNano() - currentTimeNano <= MINUTE
TimeUnit.MICROSECONDS.toNanos(clock.epochTimeMicro()) - currentTimeNano <= MINUTE
clock.nanoTicks() - nanoTicks <= MINUTE
}
def "test timestamp creation"() {
setup:
def clock = new Clock(tracer)
when:
def timestamp = clock.createCurrentTimestamp()
then:
timestamp.getClock() == clock
}
def "test timestamp creation with custom time"() {
setup:
def clock = new Clock(tracer)
when:
def timestamp = clock.createTimestampForTime(1, TimeUnit.SECONDS)
then:
timestamp.getClock() == clock
}
def "test equals"() {
when:
EqualsVerifier.forClass(Clock).suppress(Warning.STRICT_INHERITANCE).verify()
then:
noExceptionThrown()
}
}

View File

@ -1,103 +0,0 @@
package datadog.trace.tracer
import spock.lang.Specification
class ContinuationImplTest extends Specification {
def tracer = Mock(Tracer)
def trace = Mock(TraceImpl) {
getTracer() >> tracer
}
def span = Mock(Span)
def "test getters"() {
when:
def continuation = new ContinuationImpl(trace, span)
then:
continuation.getTrace() == trace
continuation.getSpan() == span
}
def "happy lifecycle"() {
when: "continuation is created"
def continuation = new ContinuationImpl(trace, span)
then: "continuation is opened"
!continuation.isClosed()
when: "close continuation"
continuation.close()
then: "continuation is closed and no errors are reported"
continuation.isClosed()
0 * tracer.reportError(*_)
and: "continuation is reported as closed to a trace"
1 * trace.closeContinuation(continuation, false)
when: "continuation is finalized"
continuation.finalize()
then: "continuation is still closed and no errors are reported"
continuation.isClosed()
0 * tracer.reportError(*_)
and: "continuation is not reported as closed to a trace again"
0 * trace.closeContinuation(_, _)
}
def "double close"() {
setup:
def continuation = new ContinuationImpl(trace, span)
when: "close continuation"
continuation.close()
then: "continuation is closed"
continuation.isClosed()
when: "close continuation again"
continuation.close()
then: "error is reported"
1 * tracer.reportError(_, [continuation])
and: "continuation is not reported as closed to a trace again"
0 * trace.closeContinuation(_, _)
}
def "finalize"() {
setup:
def continuation = new ContinuationImpl(trace, span)
when: "finalize continuation"
continuation.finalize()
then: "continuation is closed"
continuation.isClosed()
and: "continuation is reported as closed to a trace"
1 * trace.closeContinuation(continuation, true)
when: "finalize continuation again"
continuation.finalize()
then: "continuation is still closed"
continuation.isClosed()
and: "continuation is not reported as closed to a trace again"
0 * trace.closeContinuation(_, _)
and: "no error is reported"
0 * tracer.reportError(_, *_)
}
def "finalize catches all exceptions"() {
setup:
def continuation = new ContinuationImpl(trace, span)
when:
continuation.finalize()
then:
1 * trace.closeContinuation(_, _) >> { throw new Throwable() }
noExceptionThrown()
}
}

View File

@ -1,63 +0,0 @@
package datadog.trace.tracer
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
/**
* Helper class to parse serialized span to verify serialization logic
*/
@EqualsAndHashCode
@ToString
class JsonSpan {
@JsonProperty("trace_id")
BigInteger traceId
@JsonProperty("parent_id")
BigInteger parentId
@JsonProperty("span_id")
BigInteger spanId
@JsonProperty("start")
long start
@JsonProperty("duration")
long duration
@JsonProperty("service")
String service
@JsonProperty("resource")
String resource
@JsonProperty("type")
String type
@JsonProperty("name")
String name
@JsonProperty("error")
// Somehow MsgPack formatter can not convert number to boolean with newer jackson so we have to do this manually
//@JsonFormat(shape = JsonFormat.Shape.NUMBER)
int error
@JsonProperty("meta")
Map<String, String> meta
@JsonCreator
JsonSpan() {}
JsonSpan(SpanImpl span) {
traceId = new BigInteger(span.getContext().getTraceId())
parentId = new BigInteger(span.getContext().getParentId())
spanId = new BigInteger(span.getContext().getSpanId())
start = span.getStartTimestamp().getTime()
duration = span.getDuration()
service = span.getService()
resource = span.getResource()
type = span.getType()
name = span.getName()
error = span.isErrored() ? 1 : 0
meta = span.getMetaString()
}
}

View File

@ -1,70 +0,0 @@
package datadog.trace.tracer
import org.slf4j.Logger
import spock.lang.Specification
class LogRateLimiterTest extends Specification {
private static final String MESSAGE = "message"
private static final int REPEAT_COUNT = 10
def log = Mock(Logger)
def object = new Object()
def "test debugging enabled: #method"() {
setup:
log.isDebugEnabled() >> true
def logRateLimiter = new LogRateLimiter(log, 10)
when: "message is logged"
logRateLimiter."${method}"(MESSAGE, object)
then: "debug message is output"
1 * log.debug(MESSAGE, object)
where:
method | _
"warn" | _
"error" | _
}
def "test debugging disabled, no delay: #method"() {
setup: "debug is disabled and delay between log is zero"
log.isDebugEnabled() >> false
def logRateLimiter = new LogRateLimiter(log, 0)
when: "messages are logged"
for (int i = 0; i < REPEAT_COUNT; i++) {
logRateLimiter."${method}"(MESSAGE, object)
}
then: "all messages are output with appropriate log level"
REPEAT_COUNT * log."${method}"({it.contains(MESSAGE)}, object)
where:
method | _
"warn" | _
"error" | _
}
def "test debugging disabled, large delay: #method"() {
setup: "debug is disabled and delay between log is large"
log.isDebugEnabled() >> false
def logRateLimiter = new LogRateLimiter(log, 10000000)
when: "messages are logged"
for (int i = 0; i < REPEAT_COUNT; i++) {
logRateLimiter."${method}"(MESSAGE, object)
}
then: "message is output once"
1 * log."${method}"({it.contains(MESSAGE)}, object)
where:
method | _
"warn" | _
"error" | _
}
}

View File

@ -1,54 +0,0 @@
package datadog.trace.tracer
import datadog.trace.api.sampling.PrioritySampling
import nl.jqno.equalsverifier.EqualsVerifier
import nl.jqno.equalsverifier.Warning
import spock.lang.Specification
class SpanContextImplTest extends Specification {
def "test getters"() {
when:
def context = new SpanContextImpl("trace id", "parent id", "span id")
then:
context.getTraceId() == "trace id"
context.getParentId() == "parent id"
context.getSpanId() == "span id"
// TODO: this still to be implemented
context.getSamplingFlags() == PrioritySampling.SAMPLER_KEEP
}
def "create from parent"() {
setup:
def parent = new SpanContextImpl("trace id", "parent's parent id", "parent span id")
when:
def context = SpanContextImpl.fromParent(parent)
then:
context.getTraceId() == "trace id"
context.getParentId() == "parent span id"
context.getSpanId() ==~ /\d+/
}
def "create from no parent"() {
when:
def context = SpanContextImpl.fromParent(null)
then:
context.getTraceId() ==~ /\d+/
context.getParentId() == SpanContextImpl.ZERO
context.getSpanId() ==~ /\d+/
}
def "test equals"() {
when:
EqualsVerifier.forClass(SpanContextImpl).suppress(Warning.STRICT_INHERITANCE).verify()
then:
noExceptionThrown()
}
}

View File

@ -1,370 +0,0 @@
package datadog.trace.tracer
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.api.DDTags
import spock.lang.Specification
class SpanImplTest extends Specification {
private static final String SERVICE_NAME = "service.name"
private static final String PARENT_TRACE_ID = "trace id"
private static final String PARENT_SPAN_ID = "span id"
private static final long START_TIME = 100
private static final long DURATION = 321
def interceptors = [Mock(name: "interceptor-1", Interceptor), Mock(name: "interceptor-2", Interceptor)]
def tracer = Mock(Tracer) {
getDefaultServiceName() >> SERVICE_NAME
getInterceptors() >> interceptors
}
def parentContext = Mock(SpanContextImpl) {
getTraceId() >> PARENT_TRACE_ID
getSpanId() >> PARENT_SPAN_ID
}
def startTimestamp = Mock(Timestamp) {
getTime() >> START_TIME
getDuration() >> DURATION
getDuration(_) >> { args -> args[0] + DURATION }
}
def trace = Mock(TraceImpl) {
getTracer() >> tracer
}
ObjectMapper objectMapper = new ObjectMapper()
def "test setters and default values"() {
when: "create span"
def span = new SpanImpl(trace, parentContext, startTimestamp)
then: "span got created"
span.getTrace() == trace
span.getStartTimestamp() == startTimestamp
span.getDuration() == null
span.getContext().getTraceId() == PARENT_TRACE_ID
span.getContext().getParentId() == PARENT_SPAN_ID
span.getContext().getSpanId() ==~ /\d+/
span.getTraceId() == PARENT_TRACE_ID
span.getParentId() == PARENT_SPAN_ID
span.getSpanId() == span.getContext().getSpanId()
span.getService() == SERVICE_NAME
span.getResource() == null
span.getType() == null
span.getName() == null
!span.isErrored()
when: "span settings changes"
span.setService("new.service.name")
span.setResource("resource")
span.setType("type")
span.setName("name")
span.setErrored(true)
then: "span fields get updated"
span.getService() == "new.service.name"
span.getResource() == "resource"
span.getType() == "type"
span.getName() == "name"
span.isErrored()
}
def "test setter #setter on finished span"() {
setup: "create span"
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.finish()
when: "call setter on finished span"
span."$setter"(newValue)
then: "error reported"
1 * tracer.reportError(_, {
it[0] == fieldName
it[1] == span
})
and: "value unchanged"
span."$getter"() == oldValue
where:
fieldName | setter | getter | newValue | oldValue
"service" | "setService" | "getService" | "new service" | SERVICE_NAME
"resource" | "setResource" | "getResource" | "new resource" | null
"type" | "setType" | "getType" | "new type" | null
"name" | "setName" | "getName" | "new name" | null
"errored" | "setErrored" | "isErrored" | true | false
}
def "test meta set and remove for #key"() {
when:
def span = new SpanImpl(trace, parentContext, startTimestamp)
then:
span.getMeta(key) == null
when:
span.setMeta(key, value)
then:
span.getMeta(key) == value
when:
span.setMeta(key, value.class.cast(null))
then:
span.getMeta(key) == null
where:
key | value
"string.key" | "string"
"boolean.key" | true
"number.key" | 123
}
def "test getMeta"() {
setup:
def span = new SpanImpl(trace, parentContext, startTimestamp)
when:
span.setMeta("number.key", 123)
span.setMeta("string.key", "meta string")
span.setMeta("boolean.key", true)
then:
span.getMeta() == ["number.key": 123, "string.key": "meta string", "boolean.key": true]
span.getMetaString() == ["number.key": "123", "string.key": "meta string", "boolean.key": "true"]
}
def "test meta setter on finished span for #key"() {
setup: "create span"
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.finish()
when: "call setter on finished span"
span.setMeta(key, value)
then: "error reported"
1 * tracer.reportError(_, {
it[0] == "meta value " + key
it[1] == span
})
and: "value unchanged"
span.getMeta(key) == null
where:
key | value
"string.key" | "string"
"boolean.key" | true
"number.key" | 123
}
def "test attachThrowable"() {
setup:
def exception = new RuntimeException("test message")
when:
def span = new SpanImpl(trace, parentContext, startTimestamp)
then:
!span.isErrored()
span.getMeta(DDTags.ERROR_MSG) == null
span.getMeta(DDTags.ERROR_TYPE) == null
span.getMeta(DDTags.ERROR_STACK) == null
when:
span.attachThrowable(exception)
then:
span.isErrored()
span.getMeta(DDTags.ERROR_MSG) == "test message"
span.getMeta(DDTags.ERROR_TYPE) == RuntimeException.getName()
span.getMeta(DDTags.ERROR_STACK) != null
}
def "test attachThrowable on finished span"() {
setup: "create span"
def exception = new RuntimeException("test message")
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.finish()
when: "attach throwable"
span.attachThrowable(exception)
then: "error reported"
1 * tracer.reportError(_, {
it[0] == "throwable"
it[1] == span
})
and: "span unchanged"
!span.isErrored()
span.getMeta(DDTags.ERROR_MSG) == null
span.getMeta(DDTags.ERROR_TYPE) == null
span.getMeta(DDTags.ERROR_STACK) == null
}
def "test no parent"() {
when:
def span = new SpanImpl(trace, null, startTimestamp)
then:
span.getContext().getTraceId() ==~ /\d+/
span.getContext().getParentId() == SpanContextImpl.ZERO
span.getContext().getSpanId() ==~ /\d+/
}
def "test lifecycle '#name'"() {
when: "create span"
def span = new SpanImpl(trace, parentContext, startTimestamp)
then: "interceptors called"
interceptors.each({ interceptor ->
then:
1 * interceptor.afterSpanStarted({
// Apparently invocation verification has to know expected value before 'when' section
// To work around this we just check parent span id
it.getContext().getParentId() == parentContext.getSpanId()
})
0 * interceptor._
})
then:
!span.isFinished()
when: "finish/finalize span"
span."$method"(*methodArgs)
then: "interceptors called"
interceptors.reverseEach({ interceptor ->
then:
1 * interceptor.beforeSpanFinished({
it == span
it.getDuration() == null // Make sure duration is not set yet
})
0 * interceptor._
})
then: "trace is informed that span is closed"
1 * trace.finishSpan({
it == span
it.getDuration() == expectedDuration
}, finalizeErrorReported)
0 * trace._
span.isFinished()
when: "try to finish span again"
span.finish(*secondFinishCallArgs)
then: "interceptors are not called"
interceptors.each({ interceptor ->
0 * interceptor._
})
and: "trace is not informed"
0 * trace.finishSpan(_, _)
and: "error is reported"
1 * tracer.reportError(_, span)
where:
name | method | methodArgs | expectedDuration | finalizeErrorReported | secondFinishCallArgs
"happy" | "finish" | [] | DURATION | false | []
"happy" | "finish" | [] | DURATION | false | [222]
"happy with timestamp" | "finish" | [111] | DURATION + 111 | false | [222]
"happy with timestamp" | "finish" | [111] | DURATION + 111 | false | []
// Note: doing GC tests with mocks is nearly impossible because mocks hold all sort of references
// We will do 'real' GC test as part of integration testing, this is more of a unit test
"finalized" | "finalize" | [] | DURATION | true | []
"finalized" | "finalize" | [] | DURATION | true | [222]
}
def "finished span GCed without errors"() {
setup: "create span"
def span = new SpanImpl(trace, parentContext, startTimestamp)
when: "finish span"
span.finish()
then:
span.isFinished()
0 * tracer.reportError(_, *_)
when: "finalize span"
span.finalize()
then:
0 * tracer.reportError(_, *_)
}
def "finalize catches all exceptions"() {
setup:
def span = new SpanImpl(trace, parentContext, startTimestamp)
when:
span.finalize()
then:
1 * startTimestamp.getDuration() >> { throw new Throwable() }
noExceptionThrown()
}
def "span without timestamp"() {
when:
new SpanImpl(trace, parentContext, null)
then:
thrown TraceException
}
def "test JSON rendering"() {
setup: "create span"
def parentContext = new SpanContextImpl("123", "456", "789")
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.setResource("test resource")
span.setType("test type")
span.setName("test name")
span.setMeta("number.key", 123)
span.setMeta("string.key", "meta string")
span.setMeta("boolean.key", true)
span.finish()
when: "convert to JSON"
def string = objectMapper.writeValueAsString(span)
def parsedSpan = objectMapper.readerFor(JsonSpan).readValue(string)
then:
parsedSpan == new JsonSpan(span)
}
def "test JSON rendering with throwable"() {
setup: "create span"
def parentContext = new SpanContextImpl("123", "456", "789")
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.attachThrowable(new RuntimeException("test"))
span.finish()
when: "convert to JSON"
def string = objectMapper.writeValueAsString(span)
def parsedSpan = objectMapper.readerFor(JsonSpan).readValue(string)
then:
parsedSpan == new JsonSpan(span)
}
def "test JSON rendering with big ID values"() {
setup: "create span"
def parentContext = new SpanContextImpl(
BigInteger.valueOf(2).pow(64).subtract(1).toString(),
"123",
BigInteger.valueOf(2).pow(64).subtract(2).toString())
def span = new SpanImpl(trace, parentContext, startTimestamp)
span.finish()
when: "convert to JSON"
def string = objectMapper.writeValueAsString(span)
def parsedSpan = objectMapper.readValue(string, JsonSpan)
then:
parsedSpan == new JsonSpan(span)
when:
def json = objectMapper.readTree(string)
then: "make sure ids rendered as number"
json.get("trace_id").isNumber()
json.get("parent_id").isNumber()
json.get("span_id").isNumber()
}
}

View File

@ -1,163 +0,0 @@
package datadog.trace.tracer
import com.fasterxml.jackson.databind.ObjectMapper
import nl.jqno.equalsverifier.EqualsVerifier
import nl.jqno.equalsverifier.Warning
import spock.lang.Specification
import static java.util.concurrent.TimeUnit.MICROSECONDS
import static java.util.concurrent.TimeUnit.NANOSECONDS
class TimestampTest extends Specification {
private static final long CLOCK_START_TIME = 100
private static final long CLOCK_START_NANO_TICKS = 300
private static final long CLOCK_NANO_TICKS = 500
private static final long FINISH_NANO_TICKS = 600
private static final long FINISH_TIME = 700
def tracer = Mock(Tracer)
def clock = Mock(Clock) {
getTracer() >> tracer
}
ObjectMapper objectMapper = new ObjectMapper()
def "test getter"() {
when:
def timestamp = new Timestamp(clock)
then:
timestamp.getClock() == clock
}
def "test getTime"() {
setup:
clock.nanoTicks() >> CLOCK_NANO_TICKS
clock.getStartTimeNano() >> CLOCK_START_TIME
clock.getStartNanoTicks() >> CLOCK_START_NANO_TICKS
def timestamp = new Timestamp(clock)
when:
def duration = timestamp.getDuration(FINISH_TIME)
then:
duration == 400
0 * tracer._
}
def "test timestamp with custom time"() {
setup:
clock.nanoTicks() >> CLOCK_NANO_TICKS
clock.getStartTimeNano() >> CLOCK_START_TIME
clock.getStartNanoTicks() >> CLOCK_START_NANO_TICKS
def timestamp = new Timestamp(clock, CLOCK_START_TIME + offset, unit)
when:
def time = timestamp.getTime()
then:
time == expected
where:
offset | unit | expected
10 | NANOSECONDS | 410
-20 | NANOSECONDS | 380
3 | MICROSECONDS | 103300
-4 | MICROSECONDS | 96300
}
def "test getDuration with literal finish time"() {
setup:
clock.nanoTicks() >> CLOCK_NANO_TICKS
clock.getStartTimeNano() >> CLOCK_START_TIME
clock.getStartNanoTicks() >> CLOCK_START_NANO_TICKS
def timestamp = new Timestamp(clock)
when:
def duration = timestamp.getDuration(FINISH_TIME)
then:
duration == 400
0 * tracer._
}
def "test getDuration with timestamp '#name'"() {
setup:
clock.nanoTicks() >> startNanoTicks >> finishNanoTicks
def startTimestamp = new Timestamp(clock)
def finishTimestamp = new Timestamp(clock)
when:
def duration = startTimestamp.getDuration(finishTimestamp)
then:
duration == expectedDuration
0 * tracer._
where:
name | startNanoTicks | finishNanoTicks | expectedDuration
'normal' | CLOCK_START_NANO_TICKS | FINISH_NANO_TICKS | FINISH_NANO_TICKS - CLOCK_START_NANO_TICKS
'overflow' | Long.MAX_VALUE - CLOCK_START_NANO_TICKS | Long.MIN_VALUE + FINISH_NANO_TICKS | FINISH_NANO_TICKS + CLOCK_START_NANO_TICKS + 1
}
def "test getDuration with current time"() {
setup:
clock.createCurrentTimestamp() >> { new Timestamp(clock) }
clock.nanoTicks() >> CLOCK_START_NANO_TICKS >> FINISH_NANO_TICKS
def timestamp = new Timestamp(clock)
when:
def duration = timestamp.getDuration()
then:
duration == FINISH_NANO_TICKS - CLOCK_START_NANO_TICKS
0 * tracer._
}
def "test getDuration with wrong clock"() {
setup:
clock.nanoTicks() >> CLOCK_NANO_TICKS
clock.getStartTimeNano() >> CLOCK_START_TIME
clock.getStartNanoTicks() >> CLOCK_START_NANO_TICKS
def timestamp = new Timestamp(clock)
def otherClock = Mock(Clock) {
getTracer() >> tracer
nanoTicks() >> CLOCK_NANO_TICKS + 400
getStartTimeNano() >> CLOCK_START_TIME + 200
getStartNanoTicks() >> CLOCK_START_NANO_TICKS + 100
}
def finishTimestamp = new Timestamp(otherClock)
when:
def duration = timestamp.getDuration(finishTimestamp)
then:
duration == 500
1 * tracer.reportError(_, clock, otherClock)
}
def "test equals"() {
when:
EqualsVerifier.forClass(Timestamp).suppress(Warning.STRICT_INHERITANCE).verify()
then:
noExceptionThrown()
}
def "test JSON rendering"() {
setup:
clock.nanoTicks() >> CLOCK_NANO_TICKS
clock.getStartTimeNano() >> CLOCK_START_TIME
clock.getStartNanoTicks() >> CLOCK_START_NANO_TICKS
def timestamp = new Timestamp(clock)
when:
def string = objectMapper.writeValueAsString(timestamp)
then:
string == "300"
}
}

View File

@ -1,49 +0,0 @@
package datadog.trace.tracer
import spock.lang.Specification
class TraceExceptionTest extends Specification {
static final MESSAGE = "message"
def "test default constructor"() {
when:
def exception = new TraceException()
then:
exception != null
}
def "test message constructor"() {
when:
def exception = new TraceException(MESSAGE)
then:
exception.getMessage() == MESSAGE
}
def "test cause constructor"() {
setup:
def cause = new RuntimeException()
when:
def exception = new TraceException(cause)
then:
exception.getCause() == cause
}
def "test cause and message constructor"() {
setup:
def cause = new RuntimeException()
when:
def exception = new TraceException(MESSAGE, cause)
then:
exception.getMessage() == MESSAGE
exception.getCause() == cause
}
}

View File

@ -1,493 +0,0 @@
package datadog.trace.tracer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.tracer.sampling.Sampler
import datadog.trace.tracer.writer.Writer
import spock.lang.Specification
class TraceImplTest extends Specification {
private static final String SERVICE_NAME = "service.name"
private static final String PARENT_TRACE_ID = "trace id"
private static final String PARENT_SPAN_ID = "span id"
def interceptors = [
Mock(name: "interceptor-1", Interceptor) {
beforeTraceWritten(_) >> { args -> args[0] }
},
Mock(name: "interceptor-2", Interceptor) {
beforeTraceWritten(_) >> { args -> args[0] }
}
]
def writer = Mock(Writer)
def sampler = Mock(Sampler) {
sample(_) >> true
}
def tracer = Mock(Tracer) {
getDefaultServiceName() >> SERVICE_NAME
getInterceptors() >> interceptors
getWriter() >> writer
getSampler() >> sampler
}
def parentContext = Mock(SpanContextImpl) {
getTraceId() >> PARENT_TRACE_ID
getSpanId() >> PARENT_SPAN_ID
}
def startTimestamp = Mock(Timestamp)
ObjectMapper objectMapper = new ObjectMapper()
def "test getters"() {
when:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
then:
trace.getTracer() == tracer
trace.getRootSpan().getTrace() == trace
trace.getRootSpan().getStartTimestamp() == startTimestamp
trace.getRootSpan().getContext().getTraceId() == PARENT_TRACE_ID
trace.getRootSpan().getContext().getParentId() == PARENT_SPAN_ID
trace.isValid()
}
def "test getSpans on unfinished spans"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when:
trace.getSpans()
then:
1 * tracer.reportError(_, trace)
}
def "test timestamp creation"() {
setup:
def newTimestamp = Mock(Timestamp)
def clock = Mock(Clock) {
createCurrentTimestamp() >> newTimestamp
}
startTimestamp.getClock() >> clock
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when:
def createdTimestamp = trace.createCurrentTimestamp()
then:
createdTimestamp == newTimestamp
}
def "finish root span"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "root span is finished"
trace.getRootSpan().finish()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then: "interceptors get called"
interceptors.reverseEach({ interceptor ->
then:
1 * interceptor.beforeTraceWritten(trace) >> trace
})
then: "trace gets sampled"
1 * sampler.sample(trace) >> { true }
then: "trace gets written"
1 * writer.write(trace)
trace.isValid()
trace.getSpans() == [trace.getRootSpan()]
when: "root span is finalized"
trace.getRootSpan().finalize()
then: "nothing happens"
0 * writer.incrementTraceCount()
interceptors.reverseEach({ interceptor ->
0 * interceptor.beforeTraceWritten(_)
})
0 * sampler.sample(_)
0 * writer.write(_)
0 * tracer.reportError(_, *_)
}
def "GC root span"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "root span is finalized"
trace.getRootSpan().finalize()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then: "interceptors get called"
interceptors.reverseEach({ interceptor ->
then:
1 * interceptor.beforeTraceWritten(trace) >> trace
})
then: "trace gets sampled"
1 * sampler.sample(trace) >> { true }
then: "trace gets written"
1 * writer.write(trace)
!trace.isValid()
}
def "finish root span dropped by interceptor"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when:
trace.getRootSpan().finish()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then:
1 * interceptors[1].beforeTraceWritten(trace) >> null
0 * interceptors[0].beforeTraceWritten(_)
0 * sampler.sample(_)
0 * writer.write(_)
}
def "finish root span replaced by interceptor"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def replacementTrace = new TraceImpl(tracer, parentContext, startTimestamp)
when:
trace.getRootSpan().finish()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then:
1 * interceptors[1].beforeTraceWritten(trace) >> replacementTrace
then:
1 * interceptors[0].beforeTraceWritten(replacementTrace) >> replacementTrace
then:
1 * sampler.sample(replacementTrace) >> true
then:
1 * writer.write(replacementTrace)
}
def "finish root span dropped by sampler"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when:
trace.getRootSpan().finish()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then:
1 * sampler.sample(trace) >> false
0 * writer.write(_)
}
def "finish root span and then finish it again by error"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "root span is finished"
trace.getRootSpan().finish()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then: "interceptors get called"
interceptors.reverseEach({ interceptor ->
then:
1 * interceptor.beforeTraceWritten(trace) >> trace
})
then: "trace gets sampled"
1 * sampler.sample(trace) >> { true }
then: "trace gets written"
1 * writer.write(trace)
when: "root span is finalized"
trace.finishSpan(trace.getRootSpan(), false)
then: "error is reported"
interceptors.reverseEach({ interceptor ->
0 * interceptor.beforeTraceWritten(_)
})
0 * sampler.sample(_)
0 * writer.incrementTraceCount()
0 * writer.write(_)
1 * tracer.reportError(_, "finish span", trace)
}
def "create and finish new span"() {
setup:
def newSpanTimestamp = Mock(Timestamp)
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "new span is created"
def span = trace.createSpan(trace.getRootSpan().getContext(), newSpanTimestamp)
then:
span.getTrace() == trace
span.getStartTimestamp() == newSpanTimestamp
span.getContext().getTraceId() == PARENT_TRACE_ID
span.getContext().getParentId() == trace.getRootSpan().getContext().getSpanId()
when: "root span is finished"
trace.getRootSpan().finish()
then: "nothing gets written"
0 * writer.incrementTraceCount()
0 * writer.write(_)
when: "new span is finished"
span.finish()
then: "trace gets written"
1 * writer.incrementTraceCount()
1 * writer.write(trace)
trace.isValid()
trace.getSpans() == [trace.getRootSpan(), span]
}
def "create and finish new span with default timestamp"() {
setup:
def newSpanTimestamp = Mock(Timestamp)
def clock = Mock(Clock) {
createCurrentTimestamp() >> newSpanTimestamp
}
startTimestamp.getClock() >> clock
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "new span is created"
def span = trace.createSpan(trace.getRootSpan().getContext())
then:
span.getTrace() == trace
span.getStartTimestamp() == newSpanTimestamp
span.getContext().getTraceId() == PARENT_TRACE_ID
span.getContext().getParentId() == trace.getRootSpan().getContext().getSpanId()
when: "root span is finished"
trace.getRootSpan().finish()
then: "nothing gets written"
0 * writer.incrementTraceCount()
0 * writer.write(_)
when: "new span is finished"
span.finish()
then: "trace gets written"
1 * writer.incrementTraceCount()
1 * writer.write(trace)
trace.isValid()
trace.getSpans() == [trace.getRootSpan(), span]
}
def "create span on finished trace"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "root span is finished"
trace.getRootSpan().finish()
then: "trace is finished"
trace.getSpans() != []
when: "new span is created"
trace.createSpan(trace.getRootSpan().getContext(), startTimestamp)
then: "error is reported"
1 * tracer.reportError(_, "create span", trace)
}
def "create span and finish it twice"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def span = trace.createSpan(trace.getRootSpan().getContext(), startTimestamp)
when: "new span is created"
span.finish()
trace.finishSpan(span, true)
then: "error is reported"
1 * tracer.reportError(_, span, trace)
}
def "create span with null parent context"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "new span with null parent context is created"
trace.createSpan(null, startTimestamp)
then: "error is reported"
thrown TraceException
}
def "create span with parent context from different trace"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def anotherParentContext = Mock(SpanContextImpl) {
getTraceId() >> "different trace"
getSpanId() >> PARENT_SPAN_ID
}
when: "new span with null parent context is created"
trace.createSpan(anotherParentContext, startTimestamp)
then: "error is reported"
thrown TraceException
}
def "create and close new continuation"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "new continuation is created"
def continuation = trace.createContinuation(trace.getRootSpan())
then:
continuation.getSpan() == trace.getRootSpan()
when: "root span is finished"
trace.getRootSpan().finish()
then: "nothing gets written"
0 * writer.incrementTraceCount()
0 * writer.write(_)
when: "new continuation is closed"
continuation.close()
then: "trace gets written"
1 * writer.incrementTraceCount()
1 * writer.write(trace)
trace.isValid()
trace.getSpans() == [trace.getRootSpan()]
}
def "GC continuation"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def continuation = trace.createContinuation(trace.getRootSpan())
trace.getRootSpan().finish()
when: "continuation finalized"
continuation.finalize()
then: "trace gets counted"
1 * writer.incrementTraceCount()
then: "interceptors get called"
interceptors.reverseEach({ interceptor ->
then:
1 * interceptor.beforeTraceWritten(trace) >> trace
})
then: "trace gets sampled"
1 * sampler.sample(trace) >> { true }
then: "trace gets written"
1 * writer.write(trace)
!trace.isValid()
}
def "create and close new continuation, then close it again"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def continuation = trace.createContinuation(trace.getRootSpan())
continuation.close()
when: "continuation is closed again"
trace.closeContinuation(continuation, true)
then: "error is reported"
1 * tracer.reportError(_, continuation, trace)
}
def "create and close new continuation, then close it in finished trace"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def continuation = trace.createContinuation(trace.getRootSpan())
continuation.close()
trace.getRootSpan().finish()
when: "continuation is closed again"
trace.closeContinuation(continuation, true)
then: "error is reported"
1 * tracer.reportError(_, "close continuation", trace)
}
def "create continuation on finished trace"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "root span is finished"
trace.getRootSpan().finish()
then: "trace is finished"
trace.getSpans() != []
when: "new continuation is created"
trace.createContinuation(trace.getRootSpan())
then: "error is reported"
1 * tracer.reportError(_, "create continuation", trace)
}
def "create continuation with null parent span"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
when: "new continuation with null parent span is created"
trace.createContinuation(null)
then: "error is reported"
thrown TraceException
}
def "create continuation with parent span from another trace"() {
setup:
def trace = new TraceImpl(tracer, parentContext, startTimestamp)
def anotherParentContext = Mock(SpanContextImpl) {
getTraceId() >> "different trace"
getSpanId() >> PARENT_SPAN_ID
}
def anotherParentSpan = Mock(SpanImpl) {
getContext() >> anotherParentContext
}
when: "new continuation from span from another trace is created"
trace.createContinuation(anotherParentSpan)
then: "error is reported"
thrown TraceException
}
def "test JSON rendering"() {
setup: "create trace"
def clock = new Clock(tracer)
def parentContext = new SpanContextImpl("123", "456", "789")
def trace = new TraceImpl(tracer, parentContext, clock.createCurrentTimestamp())
trace.getRootSpan().setResource("test resource")
trace.getRootSpan().setType("test type")
trace.getRootSpan().setName("test name")
trace.getRootSpan().setMeta("number.key", 123)
trace.getRootSpan().setMeta("string.key", "meta string")
trace.getRootSpan().setMeta("boolean.key", true)
def childSpan = trace.createSpan(trace.getRootSpan().getContext())
childSpan.setResource("child span test resource")
childSpan.setType("child span test type")
childSpan.setName("child span test name")
childSpan.setMeta("child.span.number.key", 123)
childSpan.setMeta("child.span.string.key", "meta string")
childSpan.setMeta("child.span.boolean.key", true)
childSpan.finish()
trace.getRootSpan().finish()
when: "convert to JSON"
def string = objectMapper.writeValueAsString(trace)
def parsedTrace = objectMapper.readValue(string, new TypeReference<List<JsonSpan>>() {})
then:
parsedTrace == [new JsonSpan(childSpan), new JsonSpan(trace.getRootSpan())]
}
}

View File

@ -1,285 +0,0 @@
package datadog.trace.tracer
import datadog.trace.api.Config
import datadog.trace.tracer.sampling.AllSampler
import datadog.trace.tracer.sampling.Sampler
import datadog.trace.tracer.writer.AgentWriter
import datadog.trace.tracer.writer.SampleRateByService
import datadog.trace.tracer.writer.Writer
import datadog.trace.util.gc.GCUtils
import spock.lang.Requires
import spock.lang.Shared
import spock.lang.Specification
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
class TracerTest extends Specification {
@Shared
def config = Config.get()
def testWriter = new TestWriter()
// TODO: add more tests for different config options and interceptors
def "test getters"() {
when:
def tracer = Tracer.builder().config(config).build()
then:
tracer.getWriter() instanceof AgentWriter
tracer.getSampler() instanceof AllSampler
tracer.getInterceptors() == []
}
def "close closes writer"() {
setup:
def writer = Mock(Writer)
def tracer = Tracer.builder().writer(writer).build()
when:
tracer.close()
then: "closed writer"
1 * writer.close()
0 * _ // don't allow any other interaction
}
def "finalize closes writer"() {
setup:
def writer = Mock(Writer)
def tracer = Tracer.builder().writer(writer).build()
when:
tracer.finalize()
then: "closed writer"
1 * writer.close()
0 * _ // don't allow any other interaction
when:
tracer.finalize()
then: "thrown error swallowed"
1 * writer.close() >> { throw new Exception("test error") }
0 * _ // don't allow any other interaction
}
def "test create current timestamp"() {
setup:
def tracer = Tracer.builder().config(config).build()
when:
def timestamp = tracer.createCurrentTimestamp()
then:
timestamp.getDuration() <= TimeUnit.MINUTES.toNanos(1) // Assume test takes less than a minute to run
}
def "test trace happy path"() {
setup:
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when:
def rootSpan = tracer.buildTrace(null)
def continuation = rootSpan.getTrace().createContinuation(rootSpan)
def span = rootSpan.getTrace().createSpan(rootSpan.getContext(), tracer.createCurrentTimestamp())
then:
rootSpan.getService() == config.getServiceName()
when:
rootSpan.finish()
continuation.close()
then:
testWriter.traces == []
when:
span.finish()
then:
testWriter.traces == [rootSpan.getContext().getTraceId()]
testWriter.validity == [true]
rootSpan.getTrace().getSpans() == [rootSpan, span]
testWriter.traceCount.get() == 1
}
def "sampler called on span completion"() {
setup:
def sampler = Mock(Sampler)
def tracer = Tracer.builder().sampler(sampler).build()
when:
tracer.buildTrace(null).finish()
then: "closed writer"
1 * sampler.sample(_ as Trace)
0 * _ // don't allow any other interaction
}
def "interceptor called on span events"() {
setup:
def interceptor = Mock(Interceptor)
def tracer = Tracer.builder().interceptors([interceptor]).build()
when:
tracer.buildTrace(null).finish()
then: "closed writer"
1 * interceptor.afterSpanStarted(_ as Span)
1 * interceptor.beforeSpanFinished(_ as Span)
1 * interceptor.beforeTraceWritten(_ as Trace)
0 * _ // don't allow any other interaction
}
def "test inject"() {
//TODO implement this test properly
setup:
def context = Mock(SpanContext)
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when:
tracer.inject(context, null, null)
then:
noExceptionThrown()
}
def "test extract"() {
//TODO implement this test properly
setup:
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when:
def context = tracer.extract(null, null)
then:
context == null
}
def testReportError() {
//TODO implement this test properly
setup:
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when:
tracer.reportError("message %s", 123)
then:
thrown TraceException
}
def "test trace that had a GCed span"() {
setup:
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when: "trace and spans get created"
def rootSpan = tracer.buildTrace(null)
def traceId = rootSpan.getContext().getTraceId()
def span = rootSpan.getTrace().createSpan(rootSpan.getContext(), tracer.createCurrentTimestamp())
rootSpan.finish()
then: "nothing is written yet"
testWriter.traces == []
when: "remove all references to traces and spans and wait for GC"
span = null
def traceRef = new WeakReference<>(rootSpan.getTrace())
rootSpan = null
GCUtils.awaitGC(traceRef)
then: "invalid trace is written"
testWriter.waitForTraces(1)
testWriter.traces == [traceId]
testWriter.validity == [false]
testWriter.traceCount.get() == 1
}
def "test trace that had a GCed continuation"() {
setup:
def tracer = Tracer.builder().config(config).writer(testWriter).build()
when: "trace and spans get created"
def rootSpan = tracer.buildTrace(null)
def traceId = rootSpan.getContext().getTraceId()
def continuation = rootSpan.getTrace().createContinuation(rootSpan)
rootSpan.finish()
then: "nothing is written yet"
testWriter.traces == []
when: "remove all references to traces and spans and wait for GC"
continuation = null
def traceRef = new WeakReference<>(rootSpan.getTrace())
rootSpan = null
GCUtils.awaitGC(traceRef)
then: "invalid trace is written"
testWriter.waitForTraces(1)
testWriter.traces == [traceId]
testWriter.validity == [false]
testWriter.traceCount.get() == 1
}
/**
* We cannot use mocks for testing of things related to GC because mocks capture arguments with hardlinks.
* For the same reason this test writer cannot capture complete references to traces in this writer. Instead
* we capture 'strategic' values. Other values have been tested in 'lower level' tests.
*/
static class TestWriter implements Writer {
def traces = new ArrayList<String>()
def validity = new ArrayList<Boolean>()
def traceCount = new AtomicInteger()
@Override
synchronized void write(Trace trace) {
traces.add(trace.getRootSpan().getContext().getTraceId())
validity.add(trace.isValid())
}
@Override
void incrementTraceCount() {
traceCount.incrementAndGet()
}
@Override
SampleRateByService getSampleRateByService() {
return null // Doesn't matter for now
}
@Override
void start() {
//nothing to do for now
}
@Override
void close() {
//nothing to do for now
}
/**
* JVM gives very little guarantees for when finalizers are run. {@link WeakReference} documentation
* says that weak reference is set to null first, and then objects are marked as finalizable. Then
* finalization happens asynchronously in separate thread.
* This means that currently we do not have a good way of knowing that all traces have been closed/GCed right
* now and only thing we can do is to bluntly wait.
* In the future we have plans to implement limiting of number of inflight traces - this might provide us with
* better way.
* @param numberOfTraces number of traces to wait for.
*/
void waitForTraces(int numberOfTraces) {
while (true) {
synchronized (this) {
if (traces.size() >= numberOfTraces && validity.size() >= numberOfTraces) {
return
}
}
Thread.sleep(500)
}
}
}
}

View File

@ -1,177 +0,0 @@
package datadog.trace.tracer.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.tomakehurst.wiremock.http.Request
import com.github.tomakehurst.wiremock.junit.WireMockRule
import com.github.tomakehurst.wiremock.matching.MatchResult
import datadog.trace.tracer.Clock
import datadog.trace.tracer.JsonSpan
import datadog.trace.tracer.SpanContextImpl
import datadog.trace.tracer.Trace
import datadog.trace.tracer.TraceImpl
import datadog.trace.tracer.Tracer
import datadog.trace.tracer.sampling.Sampler
import org.junit.Rule
import org.msgpack.jackson.dataformat.MessagePackFactory
import spock.lang.Specification
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo
import static com.github.tomakehurst.wiremock.client.WireMock.put
import static com.github.tomakehurst.wiremock.client.WireMock.putRequestedFor
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo
import static com.github.tomakehurst.wiremock.client.WireMock.verify
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig
class AgentClientTest extends Specification {
private static final int TRACE_COUNT = 10
private static final String SERVICE_NAME = "service.name"
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory())
def writer = Mock(Writer)
def sampler = Mock(Sampler) {
sample(_) >> true
}
def tracer = Mock(Tracer) {
getDefaultServiceName() >> SERVICE_NAME
getInterceptors() >> []
getWriter() >> writer
getSampler() >> sampler
}
@Rule
public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().dynamicPort().dynamicHttpsPort())
private AgentClient client
def setup() {
client = new AgentClient("localhost", wireMockRule.port())
def response = ["rate_by_service": ["test": 0.1, "another test": 0.2]]
stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/msgpack")
.withBody(objectMapper.writeValueAsBytes(response))))
}
def "test send traces"() {
setup:
def traces = [createTrace("123"), createTrace("312")]
when:
def response = client.sendTraces(traces, TRACE_COUNT)
then: "got expected response"
response.getRate("test") == 0.1d
response.getRate("another test") == 0.2d
response.getRate("doesn't exist") == null
and: "request got expected parameters"
byte[] requestBody = null
verify(putRequestedFor(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.withHeader(AgentClient.CONTENT_TYPE, equalTo(AgentClient.MSGPACK))
.withHeader(AgentClient.DATADOG_META_LANG, equalTo("java"))
// TODO: fill in these headers
// .withHeader(AgentClient.DATADOG_META_LANG_VERSION, equalTo("java"))
// .withHeader(AgentClient.DATADOG_META_LANG_INTERPRETER, equalTo("java"))
// .withHeader(AgentClient.DATADOG_META_TRACER_VERSION, equalTo("java"))
.withHeader(AgentClient.X_DATADOG_TRACE_COUNT, equalTo(Integer.toString(TRACE_COUNT)))
.andMatching({ Request request ->
requestBody = request.getBody()
MatchResult.of(true)
}))
objectMapper.readValue(requestBody, new TypeReference<List<List<JsonSpan>>>() {}) == traces.collect {
it.getSpans().collect { new JsonSpan(it) }
}
}
def "test send empty list"() {
when:
def response = client.sendTraces([], TRACE_COUNT)
then: "got expected response"
response.getRate("test") == 0.1d
response.getRate("another test") == 0.2d
response.getRate("doesn't exist") == null
and: "request got expected parameters"
verify(putRequestedFor(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.withHeader(AgentClient.CONTENT_TYPE, equalTo(AgentClient.MSGPACK))
.withHeader(AgentClient.DATADOG_META_LANG, equalTo("java"))
// TODO: fill in these headers
// .withHeader(AgentClient.DATADOG_META_LANG_VERSION, equalTo("java"))
// .withHeader(AgentClient.DATADOG_META_LANG_INTERPRETER, equalTo("java"))
// .withHeader(AgentClient.DATADOG_META_TRACER_VERSION, equalTo("java"))
.withHeader(AgentClient.X_DATADOG_TRACE_COUNT, equalTo(Integer.toString(TRACE_COUNT)))
.andMatching({ Request request ->
MatchResult.of(objectMapper.readValue(request.getBody(), new TypeReference<List<List<JsonSpan>>>() {}) == [])
}))
}
def "test failure"() {
setup:
stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.willReturn(aResponse()
.withStatus(500)))
def trace = createTrace("123")
when:
def response = client.sendTraces([trace], TRACE_COUNT)
then:
response == null
}
def "test timeout"() {
setup:
stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withChunkedDribbleDelay(5, AgentClient.READ_TIMEOUT * 2)))
def trace = createTrace("123")
when:
def response = client.sendTraces([trace], TRACE_COUNT)
then:
response == null
}
def "test invalid url"() {
when:
client = new AgentClient("localhost", -100)
then:
thrown RuntimeException
}
Trace createTrace(String traceId) {
def clock = new Clock(tracer)
def parentContext = new SpanContextImpl("123", "456", "789")
def trace = new TraceImpl(tracer, parentContext, clock.createCurrentTimestamp())
trace.getRootSpan().setResource("test resource")
trace.getRootSpan().setType("test type")
trace.getRootSpan().setName("test name")
trace.getRootSpan().setMeta("number.key", 123)
trace.getRootSpan().setMeta("string.key", "meta string")
trace.getRootSpan().setMeta("boolean.key", true)
def childSpan = trace.createSpan(trace.getRootSpan().getContext())
childSpan.setResource("child span test resource")
childSpan.setType("child span test type")
childSpan.setName("child span test name")
childSpan.setMeta("child.span.number.key", 234)
childSpan.setMeta("child.span.string.key", "new meta string")
childSpan.setMeta("child.span.boolean.key", true)
childSpan.finish()
trace.getRootSpan().finish()
return trace
}
}

View File

@ -1,186 +0,0 @@
package datadog.trace.tracer.writer
import datadog.trace.tracer.Trace
import spock.lang.Retry
import spock.lang.Specification
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
class AgentWriterTest extends Specification {
// Amount of time within with we expect flush to happen.
// We make this slightly longer than flush time.
private static final int FLUSH_DELAY = TimeUnit.SECONDS.toMillis(AgentWriter.FLUSH_TIME_SECONDS * 2)
private static final AGENT_URL = new URL("http://example.com")
def sampleRateByService = Mock(SampleRateByService)
def client = Mock(AgentClient) {
getAgentUrl() >> AGENT_URL
}
def "test happy path"() {
setup:
def incrementTraceCountBy = 5
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> false
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
when:
for (def trace : traces) {
writer.write(trace)
}
incrementTraceCountBy.times {
writer.incrementTraceCount()
}
// Starting writer after submissions to make sure all updates go out in 1 request
writer.start()
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0], traces[2]], incrementTraceCountBy) >> sampleRateByService
and:
writer.getSampleRateByService() == sampleRateByService
then:
0 * client.sendTraces(_, _)
cleanup:
writer.close()
}
def "test small queue"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client, 1)
when:
for (def trace : traces) {
writer.write(trace)
}
writer.start()
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0)
cleanup:
writer.close()
}
def "test client exception handling"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
writer.write(traces[0])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception") }
writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE
when:
writer.write(traces[1])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[1]], 0) >> sampleRateByService
writer.getSampleRateByService() == sampleRateByService
cleanup:
writer.close()
}
def "test agent url getter"() {
setup:
def writer = new AgentWriter(client)
when:
def agentUrl = writer.getAgentUrl()
then:
agentUrl == AGENT_URL
}
def "test default sample rate by service"() {
setup:
def writer = new AgentWriter(client)
when:
def sampleRateByService = writer.getSampleRateByService()
then:
sampleRateByService == SampleRateByService.EMPTY_INSTANCE
}
@Retry
def "test start/#closeMethod"() {
setup:
def writer = new AgentWriter(client)
expect:
!isWriterThreadRunning()
when:
writer.start()
then:
isWriterThreadRunning()
when:
writer."${closeMethod}"()
then:
!isWriterThreadRunning()
where:
closeMethod | _
"close" | _
"finalize" | _
}
def "test shutdown callback"() {
setup:
def executor = Mock(ExecutorService) {
awaitTermination(_, _) >> { throw new InterruptedException() }
}
def callback = new AgentWriter.ShutdownCallback(executor)
when:
callback.run()
then:
noExceptionThrown()
}
boolean isWriterThreadRunning() {
// This is known to fail sometimes.
return Thread.getAllStackTraces().keySet().any { t -> t.getName() == "dd-agent-writer" }
}
}

View File

@ -1,48 +0,0 @@
package datadog.trace.tracer.writer
import datadog.trace.tracer.Trace
import spock.lang.Shared
import spock.lang.Specification
// TODO: this test set is incomplete, fill it in
class LoggingWriterTest extends Specification {
@Shared
def writer = new LoggingWriter()
def "test start"() {
when:
writer.start()
then:
noExceptionThrown()
}
def "test close"() {
when:
writer.close()
then:
noExceptionThrown()
}
def "test write"() {
setup:
def trace = Mock(Trace)
when:
writer.write(trace)
then:
1 * trace.toString()
}
def "test getter"() {
when:
def sampleRateByInstance = writer.getSampleRateByService()
then:
sampleRateByInstance == SampleRateByService.EMPTY_INSTANCE
}
}

View File

@ -1,40 +0,0 @@
package datadog.trace.tracer.writer
import com.fasterxml.jackson.databind.ObjectMapper
import nl.jqno.equalsverifier.EqualsVerifier
import nl.jqno.equalsverifier.Warning
import spock.lang.Specification
class SampleRateByServiceTest extends Specification {
private static final Map<String, Double> TEST_MAP = ["test": 0.1d, "another test": 0.2d]
ObjectMapper objectMapper = new ObjectMapper()
def "test constructor and getter"() {
when:
def sampleRate = new SampleRateByService(TEST_MAP)
then:
sampleRate.getRate("test") == 0.1d
sampleRate.getRate("another test") == 0.2d
sampleRate.getRate("doesn't exist") == null
}
def "test JSON parsing"() {
when:
def sampleRate = objectMapper.readValue("{\"test\": 0.8, \"another test\": 0.9}", SampleRateByService)
then:
sampleRate.getRate("test") == 0.8d
sampleRate.getRate("another test") == 0.9d
}
def "test equals"() {
when:
EqualsVerifier.forClass(SampleRateByService).suppress(Warning.STRICT_INHERITANCE).verify()
then:
noExceptionThrown()
}
}

View File

@ -1,63 +0,0 @@
package datadog.trace.tracer.writer
import datadog.trace.api.Config
import spock.lang.Specification
class WriterTest extends Specification {
def "test builder logging writer"() {
setup:
def config = Mock(Config) {
getWriterType() >> Config.LOGGING_WRITER_TYPE
}
when:
def writer = Writer.Builder.forConfig(config)
then:
writer instanceof LoggingWriter
}
def "test builder logging writer properties"() {
setup:
def properties = new Properties()
properties.setProperty(Config.WRITER_TYPE, Config.LOGGING_WRITER_TYPE)
when:
def writer = Writer.Builder.forConfig(properties)
then:
writer instanceof LoggingWriter
}
def "test builder agent writer: '#writerType'"() {
setup:
def config = Mock(Config) {
getWriterType() >> writerType
getAgentHost() >> "test.host"
getAgentPort() >> 1234
}
when:
def writer = Writer.Builder.forConfig(config)
then:
writer instanceof AgentWriter
((AgentWriter) writer).getAgentUrl() == new URL("http://test.host:1234/v0.4/traces");
where:
writerType | _
Config.DD_AGENT_WRITER_TYPE | _
"some odd string" | _
}
def "test builder no config"() {
when:
Writer.Builder.forConfig(null)
then:
thrown NullPointerException
}
}

View File

@ -1,16 +1,12 @@
rootProject.name = 'dd-trace-java'
// core tracing projects
// external apis
include ':dd-trace-api'
include ':dd-java-agent:agent-bootstrap'
include ':dd-trace'
include ':dd-trace-ext'
// implements for third-party tracing libraries
include ':dd-trace-ot'
// agent projects
include ':dd-java-agent'
include ':dd-java-agent:agent-bootstrap'
include ':dd-java-agent:agent-tooling'
include ':dd-java-agent:agent-jmxfetch'