Merge branch 'master' into cecile/setError

This commit is contained in:
Cécile Terpin 2020-02-06 17:03:33 +01:00
commit 25ddcd434b
50 changed files with 2065 additions and 882 deletions

View File

@ -291,7 +291,10 @@ class MuzzlePlugin implements Plugin<Project> {
doLast {
final ClassLoader instrumentationCL = createInstrumentationClassloader(instrumentationProject, toolingProject)
def ccl = Thread.currentThread().contextClassLoader
def bogusLoader = new SecureClassLoader()
def bogusLoader = new SecureClassLoader() {
@Override
String toString() { return "bogus" }
}
Thread.currentThread().contextClassLoader = bogusLoader
final ClassLoader userCL = createClassLoaderForTask(instrumentationProject, bootstrapProject, taskName)
try {

View File

@ -16,7 +16,7 @@ public class AgentTooling {
}
private static final DDLocationStrategy LOCATION_STRATEGY = new DDLocationStrategy();
private static final DDCachingPoolStrategy POOL_STRATEGY = new DDCachingPoolStrategy(CLEANER);
private static final DDCachingPoolStrategy POOL_STRATEGY = new DDCachingPoolStrategy();
public static void init() {
// Only need to trigger static initializers for now.

View File

@ -20,6 +20,7 @@ class Cleaner {
final Thread thread = new Thread(r, "dd-cleaner");
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
thread.setContextClassLoader(null);
return thread;
}
};

View File

@ -1,146 +1,235 @@
package datadog.trace.agent.tooling;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.BOOTSTRAP_CLASSLOADER;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.skipClassLoader;
import static net.bytebuddy.agent.builder.AgentBuilder.PoolStrategy;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import datadog.trace.bootstrap.WeakMap;
import java.security.SecureClassLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.lang.ref.WeakReference;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.ClassFileLocator;
import net.bytebuddy.pool.TypePool;
/**
* Custom Pool strategy.
* NEW (Jan 2020) Custom Pool strategy.
*
* <p>Here we are using WeakMap.Provider as the backing ClassLoader -> CacheProvider lookup.
* <ul>
* Uses a Guava Cache directly...
* <li>better control over locking than WeakMap.Provider
* <li>provides direct control over concurrency level
* <li>initial and maximum capacity
* </ul>
*
* <p>We also use our bootstrap proxy when matching against the bootstrap loader.
* <ul>
* There two core parts to the cache...
* <li>a cache of ClassLoader to WeakReference&lt;ClassLoader&gt;
* <li>a single cache of TypeResolutions for all ClassLoaders - keyed by a custom composite key of
* ClassLoader & class name
* </ul>
*
* <p>The CacheProvider is a custom implementation that uses guava's cache to expire and limit size.
* <p>This design was chosen to create a single limited size cache that can be adjusted for the
* entire application -- without having to create a large number of WeakReference objects.
*
* <p>By evicting from the cache we are able to reduce the memory overhead of the agent for apps
* that have many classes.
*
* <p>See eviction policy below.
* <p>Eviction is handled almost entirely through a size restriction; however, softValues are still
* used as a further safeguard.
*/
public class DDCachingPoolStrategy
implements PoolStrategy, WeakMap.ValueSupplier<ClassLoader, TypePool.CacheProvider> {
@Slf4j
public class DDCachingPoolStrategy implements PoolStrategy {
// Many things are package visible for testing purposes --
// others to avoid creation of synthetic accessors
// Need this because we can't put null into the typePoolCache map.
private static final ClassLoader BOOTSTRAP_CLASSLOADER_PLACEHOLDER =
new SecureClassLoader(null) {};
static final int CONCURRENCY_LEVEL = 8;
static final int LOADER_CAPACITY = 64;
static final int TYPE_CAPACITY = 64;
private final WeakMap<ClassLoader, TypePool.CacheProvider> typePoolCache =
WeakMap.Provider.newWeakMap();
private final Cleaner cleaner;
static final int BOOTSTRAP_HASH = 0;
public DDCachingPoolStrategy(final Cleaner cleaner) {
this.cleaner = cleaner;
}
/**
* Cache of recent ClassLoader WeakReferences; used to...
*
* <ul>
* <li>Reduced number of WeakReferences created
* <li>Allow for quick fast path equivalence check of composite keys
* </ul>
*/
final Cache<ClassLoader, WeakReference<ClassLoader>> loaderRefCache =
CacheBuilder.newBuilder()
.weakKeys()
.concurrencyLevel(CONCURRENCY_LEVEL)
.initialCapacity(LOADER_CAPACITY / 2)
.maximumSize(LOADER_CAPACITY)
.build();
/**
* Single shared Type.Resolution cache -- uses a composite key -- conceptually of loader & name
*/
final Cache<TypeCacheKey, TypePool.Resolution> sharedResolutionCache =
CacheBuilder.newBuilder()
.softValues()
.concurrencyLevel(CONCURRENCY_LEVEL)
.initialCapacity(TYPE_CAPACITY)
.maximumSize(TYPE_CAPACITY)
.build();
/** Fast path for bootstrap */
final SharedResolutionCacheAdapter bootstrapCacheProvider =
new SharedResolutionCacheAdapter(BOOTSTRAP_HASH, null, sharedResolutionCache);
@Override
public TypePool typePool(final ClassFileLocator classFileLocator, final ClassLoader classLoader) {
final ClassLoader key =
BOOTSTRAP_CLASSLOADER == classLoader ? BOOTSTRAP_CLASSLOADER_PLACEHOLDER : classLoader;
final TypePool.CacheProvider cache = typePoolCache.computeIfAbsent(key, this);
public final TypePool typePool(
final ClassFileLocator classFileLocator, final ClassLoader classLoader) {
if (classLoader == null) {
return createCachingTypePool(bootstrapCacheProvider, classFileLocator);
}
WeakReference<ClassLoader> loaderRef = loaderRefCache.getIfPresent(classLoader);
if (loaderRef == null) {
loaderRef = new WeakReference<>(classLoader);
loaderRefCache.put(classLoader, loaderRef);
}
int loaderHash = classLoader.hashCode();
return createCachingTypePool(loaderHash, loaderRef, classFileLocator);
}
private final TypePool.CacheProvider createCacheProvider(
final int loaderHash, final WeakReference<ClassLoader> loaderRef) {
return new SharedResolutionCacheAdapter(loaderHash, loaderRef, sharedResolutionCache);
}
private final TypePool createCachingTypePool(
final int loaderHash,
final WeakReference<ClassLoader> loaderRef,
final ClassFileLocator classFileLocator) {
return new TypePool.Default.WithLazyResolution(
cache, classFileLocator, TypePool.Default.ReaderMode.FAST);
createCacheProvider(loaderHash, loaderRef),
classFileLocator,
TypePool.Default.ReaderMode.FAST);
}
@Override
public TypePool.CacheProvider get(final ClassLoader key) {
if (BOOTSTRAP_CLASSLOADER_PLACEHOLDER != key && skipClassLoader().matches(key)) {
// Don't bother creating a cache for a classloader that won't match.
// (avoiding a lot of DelegatingClassLoader instances)
// This is primarily an optimization.
return TypePool.CacheProvider.NoOp.INSTANCE;
} else {
return EvictingCacheProvider.withObjectType(cleaner, 1, TimeUnit.MINUTES);
}
private final TypePool createCachingTypePool(
final TypePool.CacheProvider cacheProvider, final ClassFileLocator classFileLocator) {
return new TypePool.Default.WithLazyResolution(
cacheProvider, classFileLocator, TypePool.Default.ReaderMode.FAST);
}
private static class EvictingCacheProvider implements TypePool.CacheProvider {
final long approximateSize() {
return sharedResolutionCache.size();
}
/** A map containing all cached resolutions by their names. */
private final Cache<String, TypePool.Resolution> cache;
/**
* TypeCacheKey is key for the sharedResolutionCache. Conceptually, it is a mix of ClassLoader &
* class name.
*
* <p>For efficiency & GC purposes, it is actually composed of loaderHash &
* WeakReference&lt;ClassLoader&gt;
*
* <p>The loaderHash exists to avoid calling get & strengthening the Reference.
*/
static final class TypeCacheKey {
private final int loaderHash;
private final WeakReference<ClassLoader> loaderRef;
private final String className;
/** Creates a new simple cache. */
private EvictingCacheProvider(
final Cleaner cleaner, final long expireDuration, final TimeUnit unit) {
cache =
CacheBuilder.newBuilder()
.initialCapacity(100) // Per classloader, so we want a small default.
.maximumSize(5000)
.softValues()
.expireAfterAccess(expireDuration, unit)
.build();
private final int hashCode;
/*
* The cache only does cleanup on occasional reads and writes.
* We want to ensure this happens more regularly, so we schedule a thread to do run cleanup manually.
*/
cleaner.scheduleCleaning(cache, CacheCleaner.CLEANER, expireDuration, unit);
}
TypeCacheKey(
final int loaderHash, final WeakReference<ClassLoader> loaderRef, final String className) {
this.loaderHash = loaderHash;
this.loaderRef = loaderRef;
this.className = className;
private static EvictingCacheProvider withObjectType(
final Cleaner cleaner, final long expireDuration, final TimeUnit unit) {
final EvictingCacheProvider cacheProvider =
new EvictingCacheProvider(cleaner, expireDuration, unit);
cacheProvider.register(
Object.class.getName(), new TypePool.Resolution.Simple(TypeDescription.OBJECT));
return cacheProvider;
hashCode = (int) (31 * this.loaderHash) ^ className.hashCode();
}
@Override
public TypePool.Resolution find(final String name) {
return cache.getIfPresent(name);
public final int hashCode() {
return hashCode;
}
@Override
public TypePool.Resolution register(final String name, final TypePool.Resolution resolution) {
try {
return cache.get(name, new ResolutionProvider(resolution));
} catch (final ExecutionException e) {
public boolean equals(final Object obj) {
if (!(obj instanceof TypeCacheKey)) return false;
TypeCacheKey that = (TypeCacheKey) obj;
if (loaderHash != that.loaderHash) return false;
// Fastpath loaderRef equivalence -- works because of WeakReference cache used
// Also covers the bootstrap null loaderRef case
if (loaderRef == that.loaderRef) {
// still need to check name
return className.equals(that.className);
} else if (className.equals(that.className)) {
// need to perform a deeper loader check -- requires calling Reference.get
// which can strengthen the Reference, so deliberately done last
// If either reference has gone null, they aren't considered equivalent
// Technically, this is a bit of violation of equals semantics, since
// two equivalent references can become not equivalent.
// In this case, it is fine because that means the ClassLoader is no
// longer live, so the entries will never match anyway and will fall
// out of the cache.
ClassLoader thisLoader = loaderRef.get();
if (thisLoader == null) return false;
ClassLoader thatLoader = that.loaderRef.get();
if (thatLoader == null) return false;
return (thisLoader == thatLoader);
} else {
return false;
}
}
}
static final class SharedResolutionCacheAdapter implements TypePool.CacheProvider {
private static final String OBJECT_NAME = "java.lang.Object";
private static final TypePool.Resolution OBJECT_RESOLUTION =
new TypePool.Resolution.Simple(TypeDescription.OBJECT);
private final int loaderHash;
private final WeakReference<ClassLoader> loaderRef;
private final Cache<TypeCacheKey, TypePool.Resolution> sharedResolutionCache;
SharedResolutionCacheAdapter(
final int loaderHash,
final WeakReference<ClassLoader> loaderRef,
final Cache<TypeCacheKey, TypePool.Resolution> sharedResolutionCache) {
this.loaderHash = loaderHash;
this.loaderRef = loaderRef;
this.sharedResolutionCache = sharedResolutionCache;
}
@Override
public TypePool.Resolution find(final String className) {
TypePool.Resolution existingResolution =
sharedResolutionCache.getIfPresent(new TypeCacheKey(loaderHash, loaderRef, className));
if (existingResolution != null) return existingResolution;
if (OBJECT_NAME.equals(className)) {
return OBJECT_RESOLUTION;
}
return null;
}
@Override
public TypePool.Resolution register(
final String className, final TypePool.Resolution resolution) {
if (OBJECT_NAME.equals(className)) {
return resolution;
}
sharedResolutionCache.put(new TypeCacheKey(loaderHash, loaderRef, className), resolution);
return resolution;
}
@Override
public void clear() {
cache.invalidateAll();
}
public long size() {
return cache.size();
}
private static class CacheCleaner implements Cleaner.Adapter<Cache> {
private static final CacheCleaner CLEANER = new CacheCleaner();
@Override
public void clean(final Cache target) {
target.cleanUp();
}
}
private static class ResolutionProvider implements Callable<TypePool.Resolution> {
private final TypePool.Resolution value;
private ResolutionProvider(final TypePool.Resolution value) {
this.value = value;
}
@Override
public TypePool.Resolution call() {
return value;
}
// Allowing the high-level eviction policy make the clearing decisions
}
}
}

View File

@ -0,0 +1,221 @@
package datadog.trace.agent.tooling
import datadog.trace.util.test.DDSpecification
import net.bytebuddy.description.type.TypeDescription
import net.bytebuddy.dynamic.ClassFileLocator
import net.bytebuddy.pool.TypePool
import spock.lang.Timeout
import java.lang.ref.WeakReference
@Timeout(5)
class CacheProviderTest extends DDSpecification {
def "key bootstrap equivalence"() {
// def loader = null
def loaderHash = DDCachingPoolStrategy.BOOTSTRAP_HASH
def loaderRef = null
def key1 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "foo")
def key2 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "foo")
expect:
key1.hashCode() == key2.hashCode()
key1.equals(key2)
}
def "key same ref equivalence"() {
setup:
def loader = newClassLoader()
def loaderHash = loader.hashCode()
def loaderRef = new WeakReference<ClassLoader>(loader)
def key1 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "foo")
def key2 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "foo")
expect:
key1.hashCode() == key2.hashCode()
key1.equals(key2)
}
def "key different ref equivalence"() {
setup:
def loader = newClassLoader()
def loaderHash = loader.hashCode()
def loaderRef1 = new WeakReference<ClassLoader>(loader)
def loaderRef2 = new WeakReference<ClassLoader>(loader)
def key1 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef1, "foo")
def key2 = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef2, "foo")
expect:
loaderRef1 != loaderRef2
key1.hashCode() == key2.hashCode()
key1.equals(key2)
}
def "key mismatch -- same loader - diff name"() {
setup:
def loader = newClassLoader()
def loaderHash = loader.hashCode()
def loaderRef = new WeakReference<ClassLoader>(loader)
def fooKey = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "foo")
def barKey = new DDCachingPoolStrategy.TypeCacheKey(loaderHash, loaderRef, "bar")
expect:
// not strictly guaranteed -- but important for performance
fooKey.hashCode() != barKey.hashCode()
!fooKey.equals(barKey)
}
def "key mismatch -- same name - diff loader"() {
setup:
def loader1 = newClassLoader()
def loader1Hash = loader1.hashCode()
def loaderRef1 = new WeakReference<ClassLoader>(loader1)
def loader2 = newClassLoader()
def loader2Hash = loader2.hashCode()
def loaderRef2 = new WeakReference<ClassLoader>(loader2)
def fooKey1 = new DDCachingPoolStrategy.TypeCacheKey(loader1Hash, loaderRef1, "foo")
def fooKey2 = new DDCachingPoolStrategy.TypeCacheKey(loader2Hash, loaderRef2, "foo")
expect:
// not strictly guaranteed -- but important for performance
fooKey1.hashCode() != fooKey2.hashCode()
!fooKey1.equals(fooKey2)
}
def "test basic caching"() {
setup:
def poolStrat = new DDCachingPoolStrategy()
def loader = newClassLoader()
def loaderHash = loader.hashCode()
def loaderRef = new WeakReference<ClassLoader>(loader)
def cacheProvider = poolStrat.createCacheProvider(loaderHash, loaderRef)
when:
cacheProvider.register("foo", new TypePool.Resolution.Simple(TypeDescription.VOID))
then:
// not strictly guaranteed, but fine for this test
cacheProvider.find("foo") != null
poolStrat.approximateSize() == 1
}
def "test loader equivalence"() {
setup:
def poolStrat = new DDCachingPoolStrategy()
def loader1 = newClassLoader()
def loaderHash1 = loader1.hashCode()
def loaderRef1A = new WeakReference<ClassLoader>(loader1)
def loaderRef1B = new WeakReference<ClassLoader>(loader1)
def cacheProvider1A = poolStrat.createCacheProvider(loaderHash1, loaderRef1A)
def cacheProvider1B = poolStrat.createCacheProvider(loaderHash1, loaderRef1B)
when:
cacheProvider1A.register("foo", newVoid())
then:
// not strictly guaranteed, but fine for this test
cacheProvider1A.find("foo") != null
cacheProvider1B.find("foo") != null
cacheProvider1A.find("foo").is(cacheProvider1B.find("foo"))
poolStrat.approximateSize() == 1
}
def "test loader separation"() {
setup:
def poolStrat = new DDCachingPoolStrategy()
def loader1 = newClassLoader()
def loaderHash1 = loader1.hashCode()
def loaderRef1 = new WeakReference<ClassLoader>(loader1)
def loader2 = newClassLoader()
def loaderHash2 = loader2.hashCode()
def loaderRef2 = new WeakReference<ClassLoader>(loader2)
def cacheProvider1 = poolStrat.createCacheProvider(loaderHash1, loaderRef1)
def cacheProvider2 = poolStrat.createCacheProvider(loaderHash2, loaderRef2)
when:
cacheProvider1.register("foo", newVoid())
cacheProvider2.register("foo", newVoid())
then:
// not strictly guaranteed, but fine for this test
cacheProvider1.find("foo") != null
cacheProvider2.find("foo") != null
!cacheProvider1.find("foo").is(cacheProvider2.find("foo"))
poolStrat.approximateSize() == 2
}
def "test capacity"() {
setup:
def poolStrat = new DDCachingPoolStrategy()
def capacity = DDCachingPoolStrategy.TYPE_CAPACITY
def loader1 = newClassLoader()
def loaderHash1 = loader1.hashCode()
def loaderRef1 = new WeakReference<ClassLoader>(loader1)
def loader2 = newClassLoader()
def loaderHash2 = loader2.hashCode()
def loaderRef2 = new WeakReference<ClassLoader>(loader2)
def cacheProvider1 = poolStrat.createCacheProvider(loaderHash1, loaderRef1)
def cacheProvider2 = poolStrat.createCacheProvider(loaderHash2, loaderRef2)
def id = 0
when:
(capacity / 2).times {
id += 1
cacheProvider1.register("foo${id}", newVoid())
cacheProvider2.register("foo${id}", newVoid())
}
then:
// cache will start to proactively free slots & size calc is approximate
poolStrat.approximateSize() > 0.8 * capacity
when:
10.times {
id += 1
cacheProvider1.register("foo${id}", newVoid())
cacheProvider2.register("foo${id}", newVoid())
}
then:
// cache will start to proactively free slots & size calc is approximate
poolStrat.approximateSize() > 0.8 * capacity
}
static newVoid() {
return new TypePool.Resolution.Simple(TypeDescription.VOID)
}
static newClassLoader() {
return new URLClassLoader([] as URL[], (ClassLoader)null)
}
static newLocator() {
return new ClassFileLocator() {
@Override
ClassFileLocator.Resolution locate(String name) throws IOException {
return null
}
@Override
void close() throws IOException {}
}
}
}

View File

@ -1,102 +0,0 @@
package datadog.trace.agent.tooling
import datadog.trace.util.gc.GCUtils
import datadog.trace.util.test.DDSpecification
import net.bytebuddy.description.type.TypeDescription
import net.bytebuddy.pool.TypePool
import spock.lang.Timeout
import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.tooling.AgentTooling.CLEANER
@Timeout(5)
class EvictingCacheProviderTest extends DDSpecification {
def "test provider"() {
setup:
def provider = new DDCachingPoolStrategy.EvictingCacheProvider(CLEANER, 2, TimeUnit.MINUTES)
expect:
provider.size() == 0
provider.find(className) == null
when:
provider.register(className, new TypePool.Resolution.Simple(TypeDescription.VOID))
then:
provider.size() == 1
provider.find(className) == new TypePool.Resolution.Simple(TypeDescription.VOID)
when:
provider.clear()
then:
provider.size() == 0
provider.find(className) == null
where:
className = "SomeClass"
}
def "test timeout eviction"() {
setup:
def provider = new DDCachingPoolStrategy.EvictingCacheProvider(CLEANER, timeout, TimeUnit.MILLISECONDS)
def resolutionRef = new AtomicReference<TypePool.Resolution>(new TypePool.Resolution.Simple(TypeDescription.VOID))
def weakRef = new WeakReference(resolutionRef.get())
when:
def lastAccess = System.nanoTime()
provider.register(className, resolutionRef.get())
then:
// Ensure continued access prevents expiration.
for (int i = 0; i < timeout + 10; i++) {
assert TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastAccess) < timeout: "test took too long on " + i
assert provider.find(className) != null
assert provider.size() == 1
lastAccess = System.nanoTime()
Thread.sleep(1)
}
when:
Thread.sleep(timeout)
then:
provider.find(className) == null
when:
provider.register(className, resolutionRef.get())
resolutionRef.set(null)
GCUtils.awaitGC(weakRef)
then:
// Verify properly GC'd
provider.find(className) == null
weakRef.get() == null
where:
className = "SomeClass"
timeout = 500 // Takes about 50 ms locally, adding an order of magnitude for CI.
}
def "test size limit"() {
setup:
def provider = new DDCachingPoolStrategy.EvictingCacheProvider(CLEANER, 2, TimeUnit.MINUTES)
def typeDef = new TypePool.Resolution.Simple(TypeDescription.VOID)
for (int i = 0; i < 10000; i++) {
provider.register("ClassName$i", typeDef)
}
expect:
provider.size() == 5000
when:
provider.clear()
then:
provider.size() == 0
}
}

View File

@ -0,0 +1,55 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "${rootDir}/gradle/java.gradle"
apply from: "${rootDir}/gradle/test-with-scala.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
muzzle {
// There are some weird library issues below 2.9 so can't assert inverse
pass {
group = 'com.twitter'
module = 'finatra-http_2.11'
versions = '[2.9.0,]'
}
pass {
group = 'com.twitter'
module = 'finatra-http_2.12'
versions = '[2.9.0,]'
}
}
dependencies {
compileOnly group: 'com.twitter', name: 'finatra-http_2.11', version: '2.9.0'
testCompile project(':dd-java-agent:instrumentation:netty-4.1')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')
testCompile group: 'com.twitter', name: 'finatra-http_2.11', version: '19.12.0'
testCompile(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10') {
force = true
}
// Required for older versions of finatra on JDKs >= 11
testCompile group: 'com.sun.activation', name: 'javax.activation', version: '1.2.0'
latestDepTestCompile project(':dd-java-agent:instrumentation:netty-4.1')
latestDepTestCompile project(':dd-java-agent:instrumentation:java-concurrent')
latestDepTestCompile group: 'com.twitter', name: 'finatra-http_2.11', version: '+'
}
compileLatestDepTestGroovy {
classpath = classpath.plus(files(compileLatestDepTestScala.destinationDir))
dependsOn compileLatestDepTestScala
}

View File

@ -0,0 +1,51 @@
package datadog.trace.instrumentation.finatra;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.http.Response;
import datadog.trace.agent.decorator.HttpServerDecorator;
import java.net.URI;
import java.net.URISyntaxException;
public class FinatraDecorator extends HttpServerDecorator<Request, Request, Response> {
public static final FinatraDecorator DECORATE = new FinatraDecorator();
@Override
protected String component() {
return "finatra";
}
@Override
protected String method(final Request request) {
return request.method().name();
}
@Override
protected URI url(final Request request) throws URISyntaxException {
return URI.create(request.uri());
}
@Override
protected String peerHostname(final Request request) {
return request.remoteHost();
}
@Override
protected String peerHostIP(final Request request) {
return request.remoteAddress().getHostAddress();
}
@Override
protected Integer peerPort(final Request request) {
return request.remotePort();
}
@Override
protected Integer status(final Response response) {
return response.statusCode();
}
@Override
protected String[] instrumentationNames() {
return new String[] {"finatra"};
}
}

View File

@ -0,0 +1,141 @@
package datadog.trace.instrumentation.finatra;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.finatra.FinatraDecorator.DECORATE;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import com.twitter.finagle.http.Request;
import com.twitter.finagle.http.Response;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.api.Tags;
import java.lang.reflect.Method;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Some;
@AutoService(Instrumenter.class)
public class FinatraInstrumentation extends Instrumenter.Default {
public FinatraInstrumentation() {
super("finatra");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ServerDecorator",
"datadog.trace.agent.decorator.HttpServerDecorator",
packageName + ".FinatraDecorator",
FinatraInstrumentation.class.getName() + "$Listener"
};
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named("com.twitter.finatra.http.internal.routing.Route")));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(named("handleMatch"))
.and(takesArguments(2))
.and(takesArgument(0, named("com.twitter.finagle.http.Request"))),
FinatraInstrumentation.class.getName() + "$RouteAdvice");
}
public static class RouteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope nameSpan(
@Advice.Argument(0) final Request request,
@Advice.FieldValue("path") final String path,
@Advice.FieldValue("clazz") final Class clazz,
@Advice.Origin final Method method) {
// Update the parent "netty.request"
final AgentSpan parent = activeSpan();
parent.setTag(DDTags.RESOURCE_NAME, request.method().name() + " " + path);
parent.setTag(Tags.COMPONENT, "finatra");
parent.setSpanName("finatra.request");
final AgentSpan span = startSpan("finatra.controller");
DECORATE.afterStart(span);
span.setTag(DDTags.RESOURCE_NAME, DECORATE.spanNameForClass(clazz));
final AgentScope scope = activateSpan(span, false);
scope.setAsyncPropagation(true);
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void setupCallback(
@Advice.Enter final AgentScope scope,
@Advice.Thrown final Throwable throwable,
@Advice.Return final Some<Future<Response>> responseOption) {
if (scope == null) {
return;
}
final AgentSpan span = scope.span();
if (throwable != null) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
span.finish();
scope.close();
return;
}
responseOption.get().addEventListener(new Listener(scope));
}
}
public static class Listener implements FutureEventListener<Response> {
private final AgentScope scope;
public Listener(final AgentScope scope) {
this.scope = scope;
}
@Override
public void onSuccess(final Response response) {
// Don't use DECORATE.onResponse because this is the controller span
if (Config.get().getHttpServerErrorStatuses().contains(DECORATE.status(response))) {
scope.span().setError(true);
}
DECORATE.beforeFinish(scope.span());
scope.span().finish();
scope.close();
}
@Override
public void onFailure(final Throwable cause) {
DECORATE.onError(scope.span(), cause);
DECORATE.beforeFinish(scope.span());
scope.span().finish();
scope.close();
}
}
}

View File

@ -0,0 +1,91 @@
import com.twitter.finatra.http.HttpServer
import com.twitter.util.Await
import com.twitter.util.Closable
import com.twitter.util.Duration
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.asserts.TraceAssert
import datadog.trace.agent.test.base.HttpServerTest
import datadog.trace.api.DDSpanTypes
import datadog.trace.instrumentation.api.Tags
import datadog.trace.instrumentation.finatra.FinatraDecorator
import java.util.concurrent.TimeoutException
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.ERROR
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.SUCCESS
class FinatraServerTest extends HttpServerTest<HttpServer, FinatraDecorator> {
private static final Duration TIMEOUT = Duration.fromSeconds(5)
private static final long STARTUP_TIMEOUT = 20 * 1000
static closeAndWait(Closable closable) {
if (closable != null) {
Await.ready(closable.close(), TIMEOUT)
}
}
@Override
HttpServer startServer(int port) {
HttpServer testServer = new FinatraServer()
// Starting the server is blocking so start it in a separate thread
Thread startupThread = new Thread({
testServer.main("-admin.port=:0", "-http.port=:" + port)
})
startupThread.setDaemon(true)
startupThread.start()
long startupDeadline = System.currentTimeMillis() + STARTUP_TIMEOUT
while (!testServer.started()) {
if (System.currentTimeMillis() > startupDeadline) {
throw new TimeoutException("Timed out waiting for server startup")
}
}
return testServer
}
@Override
boolean hasHandlerSpan() {
return true
}
@Override
void stopServer(HttpServer httpServer) {
Await.ready(httpServer.close(), TIMEOUT)
}
@Override
FinatraDecorator decorator() {
return FinatraDecorator.DECORATE
}
@Override
String expectedOperationName() {
return "finatra.request"
}
void handlerSpan(TraceAssert trace, int index, Object parent, ServerEndpoint endpoint = SUCCESS) {
def errorEndpoint = endpoint == EXCEPTION || endpoint == ERROR
trace.span(index) {
serviceName expectedServiceName()
operationName "finatra.controller"
resourceName "FinatraController"
spanType DDSpanTypes.HTTP_SERVER
errored errorEndpoint
childOf(parent as DDSpan)
tags {
"$Tags.COMPONENT" FinatraDecorator.DECORATE.component()
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
// Finatra doesn't propagate the stack trace or exception to the instrumentation
// so the normal errorTags() method can't be used
if (errorEndpoint) {
"$Tags.ERROR" true
}
defaultTags()
}
}
}
}

View File

@ -0,0 +1,20 @@
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import datadog.trace.agent.test.base.HttpServerTestAdvice;
import datadog.trace.agent.tooling.Instrumenter;
import net.bytebuddy.agent.builder.AgentBuilder;
@AutoService(Instrumenter.class)
public class NettyServerTestInstrumentation implements Instrumenter {
@Override
public AgentBuilder instrument(final AgentBuilder agentBuilder) {
return agentBuilder
.type(named("io.netty.handler.codec.ByteToMessageDecoder"))
.transform(
new AgentBuilder.Transformer.ForAdvice()
.advice(
named("channelRead"), HttpServerTestAdvice.ServerEntryAdvice.class.getName()));
}
}

View File

@ -0,0 +1,56 @@
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finatra.http.Controller
import com.twitter.util.Future
import datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint._
import datadog.trace.agent.test.base.HttpServerTest.controller
import groovy.lang.Closure
class FinatraController extends Controller {
any(SUCCESS.getPath) { request: Request =>
controller(SUCCESS, new Closure[Response](null) {
override def call(): Response = {
response.ok(SUCCESS.getBody)
}
})
}
any(ERROR.getPath) { request: Request =>
controller(ERROR, new Closure[Response](null) {
override def call(): Response = {
response.internalServerError(ERROR.getBody)
}
})
}
any(NOT_FOUND.getPath) { request: Request =>
controller(NOT_FOUND, new Closure[Response](null) {
override def call(): Response = {
response.notFound(NOT_FOUND.getBody)
}
})
}
any(QUERY_PARAM.getPath) { request: Request =>
controller(QUERY_PARAM, new Closure[Response](null) {
override def call(): Response = {
response.ok(QUERY_PARAM.getBody)
}
})
}
any(EXCEPTION.getPath) { request: Request =>
controller(EXCEPTION, new Closure[Future[Response]](null) {
override def call(): Future[Response] = {
throw new Exception(EXCEPTION.getBody)
}
})
}
any(REDIRECT.getPath) { request: Request =>
controller(REDIRECT, new Closure[Response](null) {
override def call(): Response = {
response.found.location(REDIRECT.getBody)
}
})
}
}

View File

@ -0,0 +1,13 @@
import com.twitter.finagle.http.Request
import com.twitter.finatra.http.HttpServer
import com.twitter.finatra.http.filters.ExceptionMappingFilter
import com.twitter.finatra.http.routing.HttpRouter
class FinatraServer extends HttpServer {
override protected def configureHttp(router: HttpRouter): Unit = {
router
.filter[ExceptionMappingFilter[Request]]
.add[FinatraController]
.exceptionMapper[ResponseSettingExceptionMapper]
}
}

View File

@ -0,0 +1,15 @@
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finatra.http.exceptions.ExceptionMapper
import com.twitter.finatra.http.response.ResponseBuilder
import javax.inject.{Inject, Singleton}
@Singleton
class ResponseSettingExceptionMapper @Inject()(response: ResponseBuilder)
extends ExceptionMapper[Exception] {
override def toResponse(request: Request, exception: Exception): Response = {
response.internalServerError(exception.getMessage)
}
}

View File

@ -26,11 +26,24 @@ public class ExecutorInstrumentationUtils {
* @return true iff given task object should be wrapped
*/
public static boolean shouldAttachStateToTask(final Object task, final Executor executor) {
if (task == null) {
return false;
}
final TraceScope scope = activeScope();
return (scope != null
final Class enclosingClass = task.getClass().getEnclosingClass();
return scope != null
&& scope.isAsyncPropagating()
&& task != null
&& !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task));
&& !ExecutorInstrumentationUtils.isExecutorDisabledForThisTask(executor, task)
// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down. Any created continuations will be open until that time preventing
// traces from being reported
&& (enclosingClass == null
|| !enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor"));
}
/**
@ -44,13 +57,16 @@ public class ExecutorInstrumentationUtils {
*/
public static <T> State setupState(
final ContextStore<T, State> contextStore, final T task, final TraceScope scope) {
final State state = contextStore.putIfAbsent(task, State.FACTORY);
final TraceScope.Continuation continuation = scope.capture();
if (state.setContinuation(continuation)) {
log.debug("created continuation {} from scope {}, state: {}", continuation, scope, state);
} else {
continuation.close(false);
}
return state;
}

View File

@ -71,6 +71,7 @@ public final class RequestDispatcherInstrumentation extends Instrumenter.Default
public static AgentScope start(
@Advice.Origin("#m") final String method,
@Advice.This final RequestDispatcher dispatcher,
@Advice.Local("_requestSpan") Object requestSpan,
@Advice.Argument(0) final ServletRequest request) {
if (activeSpan() == null) {
// Don't want to generate a new top-level span
@ -87,8 +88,9 @@ public final class RequestDispatcherInstrumentation extends Instrumenter.Default
// In case we lose context, inject trace into to the request.
propagate().inject(span, request, SETTER);
// temporarily remove from request to avoid spring resource name bubbling up:
request.removeAttribute(DD_SPAN_ATTRIBUTE);
// temporarily replace from request to avoid spring resource name bubbling up:
requestSpan = request.getAttribute(DD_SPAN_ATTRIBUTE);
request.setAttribute(DD_SPAN_ATTRIBUTE, span);
return activateSpan(span, true).setAsyncPropagation(true);
}
@ -96,14 +98,17 @@ public final class RequestDispatcherInstrumentation extends Instrumenter.Default
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stop(
@Advice.Enter final AgentScope scope,
@Advice.Local("_requestSpan") final Object requestSpan,
@Advice.Argument(0) final ServletRequest request,
@Advice.Thrown final Throwable throwable) {
if (scope == null) {
return;
}
// now add it back...
request.setAttribute(DD_SPAN_ATTRIBUTE, scope.span());
if (requestSpan != null) {
// now add it back...
request.setAttribute(DD_SPAN_ATTRIBUTE, requestSpan);
}
DECORATE.onError(scope, throwable);
DECORATE.beforeFinish(scope);

View File

@ -1,15 +1,23 @@
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import javax.servlet.ServletException
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import static datadog.opentracing.propagation.DatadogHttpCodec.SAMPLING_PRIORITY_KEY
import static datadog.opentracing.propagation.DatadogHttpCodec.SPAN_ID_KEY
import static datadog.opentracing.propagation.DatadogHttpCodec.TRACE_ID_KEY
import static datadog.trace.agent.decorator.HttpServerDecorator.DD_SPAN_ATTRIBUTE
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
class RequestDispatcherTest extends AgentTestRunner {
def dispatcher = new RequestDispatcherUtils(Mock(HttpServletRequest), Mock(HttpServletResponse))
def request = Mock(HttpServletRequest)
def response = Mock(HttpServletResponse)
def mockSpan = Mock(DDSpan)
def dispatcher = new RequestDispatcherUtils(request, response)
def "test dispatch no-parent"() {
when:
@ -17,7 +25,17 @@ class RequestDispatcherTest extends AgentTestRunner {
dispatcher.include("")
then:
assertTraces(0) {}
assertTraces(2) {
trace(0, 1) {
basicSpan(it, 0, "forward-child")
}
trace(1, 1) {
basicSpan(it, 0, "include-child")
}
}
and:
0 * _
}
def "test dispatcher #method with parent"() {
@ -28,7 +46,7 @@ class RequestDispatcherTest extends AgentTestRunner {
then:
assertTraces(1) {
trace(0, 2) {
trace(0, 3) {
basicSpan(it, 0, "parent")
span(1) {
operationName "servlet.$operation"
@ -39,9 +57,22 @@ class RequestDispatcherTest extends AgentTestRunner {
defaultTags()
}
}
basicSpan(it, 2, "$operation-child", span(1))
}
}
then:
1 * request.setAttribute(TRACE_ID_KEY, _)
1 * request.setAttribute(SPAN_ID_KEY, _)
1 * request.setAttribute(SAMPLING_PRIORITY_KEY, _)
then:
1 * request.getAttribute(DD_SPAN_ATTRIBUTE) >> mockSpan
then:
1 * request.setAttribute(DD_SPAN_ATTRIBUTE, { it.spanName == "servlet.$operation" })
then:
1 * request.setAttribute(DD_SPAN_ATTRIBUTE, mockSpan)
0 * _
where:
operation | method
"forward" | "forward"
@ -55,7 +86,7 @@ class RequestDispatcherTest extends AgentTestRunner {
def "test dispatcher #method exception"() {
setup:
def ex = new ServletException("some error")
def dispatcher = new RequestDispatcherUtils(Mock(HttpServletRequest), Mock(HttpServletResponse), ex)
def dispatcher = new RequestDispatcherUtils(request, response, ex)
when:
runUnderTrace("parent") {
@ -67,7 +98,7 @@ class RequestDispatcherTest extends AgentTestRunner {
th == ex
assertTraces(1) {
trace(0, 2) {
trace(0, 3) {
basicSpan(it, 0, "parent", null, ex)
span(1) {
operationName "servlet.$operation"
@ -80,9 +111,22 @@ class RequestDispatcherTest extends AgentTestRunner {
errorTags(ex.class, ex.message)
}
}
basicSpan(it, 2, "$operation-child", span(1))
}
}
then:
1 * request.setAttribute(TRACE_ID_KEY, _)
1 * request.setAttribute(SPAN_ID_KEY, _)
1 * request.setAttribute(SAMPLING_PRIORITY_KEY, _)
then:
1 * request.getAttribute(DD_SPAN_ATTRIBUTE) >> mockSpan
then:
1 * request.setAttribute(DD_SPAN_ATTRIBUTE, { it.spanName == "servlet.$operation" })
then:
1 * request.setAttribute(DD_SPAN_ATTRIBUTE, mockSpan)
0 * _
where:
operation | method
"forward" | "forward"

View File

@ -1,9 +1,12 @@
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.servlet.RequestDispatcher;
import javax.servlet.Servlet;
import javax.servlet.ServletContext;
@ -164,7 +167,15 @@ public class RequestDispatcherUtils {
class TestDispatcher implements RequestDispatcher {
@Override
public void forward(final ServletRequest servletRequest, final ServletResponse servletResponse)
throws ServletException, IOException {
throws ServletException {
runUnderTrace(
"forward-child",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
if (toThrow != null) {
throw toThrow;
}
@ -172,7 +183,15 @@ public class RequestDispatcherUtils {
@Override
public void include(final ServletRequest servletRequest, final ServletResponse servletResponse)
throws ServletException, IOException {
throws ServletException {
runUnderTrace(
"include-child",
new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
if (toThrow != null) {
throw toThrow;
}

View File

@ -78,7 +78,7 @@ class ClassLoadingTest extends Specification {
loader.count == countAfterFirstLoad
}
def "make sure that ByteBuddy doesn't resue cached type descriptions between different classloaders"() {
def "make sure that ByteBuddy doesn't reuse cached type descriptions between different classloaders"() {
setup:
CountingClassLoader loader1 = new CountingClassLoader(classpath)
CountingClassLoader loader2 = new CountingClassLoader(classpath)

View File

@ -1,20 +0,0 @@
package datadog.trace.agent.test.utils
import okhttp3.OkHttpClient
import java.util.concurrent.TimeUnit
class OkHttpUtils {
static clientBuilder() {
def unit = TimeUnit.MINUTES
new OkHttpClient.Builder()
.connectTimeout(1, unit)
.writeTimeout(1, unit)
.readTimeout(1, unit)
}
static client(boolean followRedirects = false) {
clientBuilder().followRedirects(followRedirects).build()
}
}

View File

@ -0,0 +1,27 @@
package datadog.trace.agent.test.utils;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
/**
* This class was moved from groovy to java because groovy kept trying to introspect on the
* OkHttpClient class which contains java 8 only classes, which caused the build to fail for java 7.
*/
public class OkHttpUtils {
static OkHttpClient.Builder clientBuilder() {
final TimeUnit unit = TimeUnit.MINUTES;
return new OkHttpClient.Builder()
.connectTimeout(1, unit)
.writeTimeout(1, unit)
.readTimeout(1, unit);
}
public static OkHttpClient client() {
return client(false);
}
public static OkHttpClient client(final boolean followRedirects) {
return clientBuilder().followRedirects(followRedirects).build();
}
}

View File

@ -231,7 +231,7 @@ class ServerTest extends AgentTestRunner {
def "server redirect"() {
setup:
client = OkHttpUtils.clientBuilder().followRedirects(followRedirects).build()
client = OkHttpUtils.client(followRedirects)
def server = httpServer {
handlers {
get("/redirect") {

View File

@ -16,7 +16,7 @@ def isCI = System.getenv("CI") != null
allprojects {
group = 'com.datadoghq'
version = '0.42.0-SNAPSHOT'
version = '0.43.0-SNAPSHOT'
if (isCI) {
buildDir = "${rootDir}/workspace/${projectDir.path.replace(rootDir.path, '')}/build/"

View File

@ -12,6 +12,7 @@ minimumInstructionCoverage = 0.6
excludedClassesCoverage += [
'datadog.trace.common.writer.ListWriter',
'datadog.trace.common.writer.LoggingWriter',
'datadog.trace.common.writer.DDAgentWriter.DDAgentWriterBuilder',
'datadog.trace.common.sampling.PrioritySampling',
// This code is copied from okHttp samples and we have integration tests to verify that it works.
'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket',
@ -36,9 +37,10 @@ dependencies {
compile group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.1.1'
compile deps.jackson
compile deps.slf4j
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7
compile deps.okhttp
compile group: 'org.msgpack', name: 'msgpack-core', version: '0.8.20'
compile group: 'com.squareup.moshi', name: 'moshi', version: '1.9.2'
compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.23'
compile group: 'com.lmax', name: 'disruptor', version: '3.4.2'
@ -49,6 +51,7 @@ dependencies {
testCompile project(":dd-java-agent:testing")
testCompile project(':utils:gc-utils')
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1'
testCompile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.20'
traceAgentTestCompile deps.testcontainers

View File

@ -3,8 +3,6 @@ package datadog.opentracing;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static io.opentracing.log.Fields.MESSAGE;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.trace.api.DDTags;
import datadog.trace.api.interceptor.MutableSpan;
import datadog.trace.api.sampling.PrioritySampling;
@ -77,7 +75,6 @@ public class DDSpan implements Span, MutableSpan {
context.getTrace().registerSpan(this);
}
@JsonIgnore
public boolean isFinished() {
return durationNano.get() != 0;
}
@ -120,20 +117,17 @@ public class DDSpan implements Span, MutableSpan {
*
* @return true if root, false otherwise
*/
@JsonIgnore
public final boolean isRootSpan() {
return BigInteger.ZERO.equals(context.getParentId());
}
@Override
@Deprecated
@JsonIgnore
public MutableSpan getRootSpan() {
return getLocalRootSpan();
}
@Override
@JsonIgnore
public MutableSpan getLocalRootSpan() {
return context().getTrace().getRootSpan();
}
@ -297,14 +291,13 @@ public class DDSpan implements Span, MutableSpan {
return this;
}
// Getters and JSON serialisation instructions
// Getters
/**
* Meta merges baggage and tags (stringified values)
*
* @return merged context baggage and tags
*/
@JsonGetter
public Map<String, String> getMeta() {
final Map<String, String> meta = new HashMap<>();
for (final Map.Entry<String, String> entry : context().getBaggageItems().entrySet()) {
@ -321,58 +314,48 @@ public class DDSpan implements Span, MutableSpan {
*
* @return metrics for this span
*/
@JsonGetter
public Map<String, Number> getMetrics() {
return context.getMetrics();
}
@Override
@JsonGetter("start")
public long getStartTime() {
return startTimeNano > 0 ? startTimeNano : TimeUnit.MICROSECONDS.toNanos(startTimeMicro);
}
@Override
@JsonGetter("duration")
public long getDurationNano() {
return durationNano.get();
}
@Override
@JsonGetter("service")
public String getServiceName() {
return context.getServiceName();
}
@JsonGetter("trace_id")
public BigInteger getTraceId() {
return context.getTraceId();
}
@JsonGetter("span_id")
public BigInteger getSpanId() {
return context.getSpanId();
}
@JsonGetter("parent_id")
public BigInteger getParentId() {
return context.getParentId();
}
@Override
@JsonGetter("resource")
public String getResourceName() {
return context.getResourceName();
}
@Override
@JsonGetter("name")
public String getOperationName() {
return context.getOperationName();
}
@Override
@JsonIgnore
public Integer getSamplingPriority() {
final int samplingPriority = context.getSamplingPriority();
if (samplingPriority == PrioritySampling.UNSET) {
@ -383,29 +366,24 @@ public class DDSpan implements Span, MutableSpan {
}
@Override
@JsonIgnore
public String getSpanType() {
return context.getSpanType();
}
@Override
@JsonIgnore
public Map<String, Object> getTags() {
return context().getTags();
}
@JsonGetter
public String getType() {
return context.getSpanType();
}
@Override
@JsonIgnore
public Boolean isError() {
return context.getErrorFlag();
}
@JsonGetter
public int getError() {
return context.getErrorFlag() ? 1 : 0;
}

View File

@ -1,6 +1,5 @@
package datadog.opentracing;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.opentracing.decorators.AbstractDecorator;
import datadog.trace.api.DDTags;
import datadog.trace.api.sampling.PrioritySampling;
@ -289,12 +288,10 @@ public class DDSpanContext implements io.opentracing.SpanContext {
return baggageItems.entrySet();
}
@JsonIgnore
public PendingTrace getTrace() {
return trace;
}
@JsonIgnore
public DDTracer getTracer() {
return tracer;
}

View File

@ -44,6 +44,7 @@ import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -96,18 +97,18 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
private final HttpCodec.Injector injector;
private final HttpCodec.Extractor extractor;
public static class Builder {
public static class DDTracerBuilder {
public Builder() {
public DDTracerBuilder() {
// Apply the default values from config.
config(Config.get());
}
public Builder withProperties(final Properties properties) {
public DDTracerBuilder withProperties(final Properties properties) {
return config(Config.get(properties));
}
public Builder config(final Config config) {
public DDTracerBuilder config(final Config config) {
this.config = config;
serviceName(config.getServiceName());
// Explicitly skip setting writer to avoid allocating resources prematurely.
@ -267,7 +268,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
partialFlushMinSpans);
}
@lombok.Builder(builderClassName = "Builder")
@Builder
// These field names must be stable to ensure the builder api is stable.
private DDTracer(
final Config config,

View File

@ -3,13 +3,10 @@ package datadog.trace.common.sampling;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NumericNode;
import datadog.opentracing.DDSpan;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@ -70,23 +67,16 @@ public class RateByServiceSampler implements Sampler, PrioritySampler, DDAgentRe
}
@Override
public void onResponse(final String endpoint, final JsonNode responseJson) {
final JsonNode newServiceRates = responseJson.get("rate_by_service");
public void onResponse(
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
final Map<String, Number> newServiceRates = responseJson.get("rate_by_service");
if (null != newServiceRates) {
log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);
final Map<String, RateSampler> updatedServiceRates = new HashMap<>();
final Iterator<String> itr = newServiceRates.fieldNames();
while (itr.hasNext()) {
final String key = itr.next();
final JsonNode value = newServiceRates.get(key);
try {
if (value instanceof NumericNode) {
updatedServiceRates.put(key, createRateSampler(value.doubleValue()));
} else {
log.debug("Unable to parse new service rate {} -> {}", key, value);
}
} catch (final NumberFormatException nfe) {
log.debug("Unable to parse new service rate {} -> {}", key, value);
for (final Map.Entry<String, Number> entry : newServiceRates.entrySet()) {
if (entry.getValue() != null) {
updatedServiceRates.put(
entry.getKey(), createRateSampler(entry.getValue().doubleValue()));
}
}
if (!updatedServiceRates.containsKey(DEFAULT_KEY)) {

View File

@ -0,0 +1,101 @@
package datadog.trace.common.serialization;
import datadog.opentracing.DDSpan;
import java.io.IOException;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
public abstract class FormatWriter<DEST> {
public abstract void writeKey(String key, DEST destination) throws IOException;
public abstract void writeListHeader(int size, DEST destination) throws IOException;
public abstract void writeListFooter(DEST destination) throws IOException;
public abstract void writeMapHeader(int size, DEST destination) throws IOException;
public abstract void writeMapFooter(DEST destination) throws IOException;
public abstract void writeString(String key, String value, DEST destination) throws IOException;
public abstract void writeShort(String key, short value, DEST destination) throws IOException;
public abstract void writeByte(String key, byte value, DEST destination) throws IOException;
public abstract void writeInt(String key, int value, DEST destination) throws IOException;
public abstract void writeLong(String key, long value, DEST destination) throws IOException;
public abstract void writeFloat(String key, float value, DEST destination) throws IOException;
public abstract void writeDouble(String key, double value, DEST destination) throws IOException;
public abstract void writeBigInteger(String key, BigInteger value, DEST destination)
throws IOException;
public void writeNumber(final String key, final Number value, final DEST destination)
throws IOException {
if (value instanceof Double) {
writeDouble(key, value.doubleValue(), destination);
} else if (value instanceof Long) {
writeLong(key, value.longValue(), destination);
} else if (value instanceof Integer) {
writeInt(key, value.intValue(), destination);
} else if (value instanceof Float) {
writeFloat(key, value.floatValue(), destination);
} else if (value instanceof Byte) {
writeByte(key, value.byteValue(), destination);
} else if (value instanceof Short) {
writeShort(key, value.shortValue(), destination);
}
}
public void writeNumberMap(
final String key, final Map<String, Number> value, final DEST destination)
throws IOException {
writeKey(key, destination);
writeMapHeader(value.size(), destination);
for (final Map.Entry<String, Number> entry : value.entrySet()) {
writeNumber(entry.getKey(), entry.getValue(), destination);
}
writeMapFooter(destination);
}
public void writeStringMap(
final String key, final Map<String, String> value, final DEST destination)
throws IOException {
writeKey(key, destination);
writeMapHeader(value.size(), destination);
for (final Map.Entry<String, String> entry : value.entrySet()) {
writeString(entry.getKey(), entry.getValue(), destination);
}
writeMapFooter(destination);
}
public void writeTrace(final List<DDSpan> trace, final DEST destination) throws IOException {
writeListHeader(trace.size(), destination);
for (final DDSpan span : trace) {
writeDDSpan(span, destination);
}
writeListFooter(destination);
}
public void writeDDSpan(final DDSpan span, final DEST destination) throws IOException {
// Some of the tests rely on the specific ordering here.
writeMapHeader(12, destination); // must match count below.
/* 1 */ writeString("service", span.getServiceName(), destination);
/* 2 */ writeString("name", span.getOperationName(), destination);
/* 3 */ writeString("resource", span.getResourceName(), destination);
/* 4 */ writeBigInteger("trace_id", span.getTraceId(), destination);
/* 5 */ writeBigInteger("span_id", span.getSpanId(), destination);
/* 6 */ writeBigInteger("parent_id", span.getParentId(), destination);
/* 7 */ writeLong("start", span.getStartTime(), destination);
/* 8 */ writeLong("duration", span.getDurationNano(), destination);
/* 9 */ writeString("type", span.getType(), destination);
/* 10 */ writeInt("error", span.getError(), destination);
/* 11 */ writeNumberMap("metrics", span.getMetrics(), destination);
/* 12 */ writeStringMap("meta", span.getMeta(), destination);
writeMapFooter(destination);
}
}

View File

@ -0,0 +1,130 @@
package datadog.trace.common.serialization;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.opentracing.DDSpan;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.math.BigInteger;
import java.util.List;
import java.util.Set;
public class JsonFormatWriter extends FormatWriter<JsonWriter> {
private static final Moshi MOSHI = new Moshi.Builder().add(DDSpanAdapter.FACTORY).build();
public static final JsonAdapter<List<DDSpan>> TRACE_ADAPTER =
MOSHI.adapter(Types.newParameterizedType(List.class, DDSpan.class));
public static final JsonAdapter<DDSpan> SPAN_ADAPTER = MOSHI.adapter(DDSpan.class);
public static JsonFormatWriter JSON_WRITER = new JsonFormatWriter();
@Override
public void writeKey(final String key, final JsonWriter destination) throws IOException {
destination.name(key);
}
@Override
public void writeListHeader(final int size, final JsonWriter destination) throws IOException {
destination.beginArray();
}
@Override
public void writeListFooter(final JsonWriter destination) throws IOException {
destination.endArray();
}
@Override
public void writeMapHeader(final int size, final JsonWriter destination) throws IOException {
destination.beginObject();
}
@Override
public void writeMapFooter(final JsonWriter destination) throws IOException {
destination.endObject();
}
@Override
public void writeString(final String key, final String value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeShort(final String key, final short value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeByte(final String key, final byte value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeInt(final String key, final int value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeLong(final String key, final long value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeFloat(final String key, final float value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeDouble(final String key, final double value, final JsonWriter destination)
throws IOException {
destination.name(key);
destination.value(value);
}
@Override
public void writeBigInteger(
final String key, final BigInteger value, final JsonWriter destination) throws IOException {
destination.name(key);
destination.value(value);
}
static class DDSpanAdapter extends JsonAdapter<DDSpan> {
public static final JsonAdapter.Factory FACTORY =
new JsonAdapter.Factory() {
@Override
public JsonAdapter<?> create(
final Type type, final Set<? extends Annotation> annotations, final Moshi moshi) {
final Class<?> rawType = Types.getRawType(type);
if (rawType.isAssignableFrom(DDSpan.class)) {
return new DDSpanAdapter();
}
return null;
}
};
@Override
public DDSpan fromJson(final JsonReader reader) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void toJson(final JsonWriter writer, final DDSpan value) throws IOException {
JSON_WRITER.writeDDSpan(value, writer);
}
}
}

View File

@ -0,0 +1,87 @@
package datadog.trace.common.serialization;
import java.io.IOException;
import java.math.BigInteger;
import org.msgpack.core.MessagePacker;
public class MsgpackFormatWriter extends FormatWriter<MessagePacker> {
public static MsgpackFormatWriter MSGPACK_WRITER = new MsgpackFormatWriter();
@Override
public void writeKey(final String key, final MessagePacker destination) throws IOException {
destination.packString(key);
}
@Override
public void writeListHeader(final int size, final MessagePacker destination) throws IOException {
destination.packArrayHeader(size);
}
@Override
public void writeListFooter(final MessagePacker destination) throws IOException {}
@Override
public void writeMapHeader(final int size, final MessagePacker destination) throws IOException {
destination.packMapHeader(size);
}
@Override
public void writeMapFooter(final MessagePacker destination) throws IOException {}
@Override
public void writeString(final String key, final String value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packString(value);
}
@Override
public void writeShort(final String key, final short value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packShort(value);
}
@Override
public void writeByte(final String key, final byte value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packByte(value);
}
@Override
public void writeInt(final String key, final int value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packInt(value);
}
@Override
public void writeLong(final String key, final long value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packLong(value);
}
@Override
public void writeFloat(final String key, final float value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packFloat(value);
}
@Override
public void writeDouble(final String key, final double value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packDouble(value);
}
@Override
public void writeBigInteger(
final String key, final BigInteger value, final MessagePacker destination)
throws IOException {
destination.packString(key);
destination.packBigInteger(value);
}
}

View File

@ -3,49 +3,55 @@ package datadog.trace.common.writer;
import static datadog.trace.api.Config.DEFAULT_AGENT_HOST;
import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.ddagent.BatchWritingDisruptor;
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.common.writer.ddagent.DDAgentResponseListener;
import datadog.trace.common.writer.ddagent.Monitor;
import datadog.trace.common.writer.ddagent.TraceConsumer;
import datadog.trace.common.writer.ddagent.TraceSerializingDisruptor;
import datadog.trace.common.writer.ddagent.TraceProcessingDisruptor;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/**
* This writer buffers traces and sends them to the provided DDApi instance.
* This writer buffers traces and sends them to the provided DDApi instance. Buffering is done with
* a distruptor to limit blocking the application threads. Internally, the trace is serialized and
* put onto a separate disruptor that does block to decouple the CPU intensive from the IO bound
* threads.
*
* <p>Written traces are passed off to a disruptor so as to avoid blocking the application's thread.
* If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the
* threshold will be counted and sampled.
* <p>[Application] -> [trace processing buffer] -> [serialized trace batching buffer] -> [dd-agent]
*
* <p>Note: the first buffer is non-blocking and will discard if full, the second is blocking and
* will cause back pressure on the trace processing (serializing) thread.
*
* <p>If the buffer is filled traces are discarded before serializing. Once serialized every effort
* is made to keep, to avoid wasting the serialization effort.
*/
@Slf4j
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private static final int SENDER_QUEUE_SIZE = 16;
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
private static final int DISRUPTOR_BUFFER_SIZE = 1024;
private final DDAgentApi api;
public final int flushFrequencySeconds;
public final TraceSerializingDisruptor disruptor;
private final TraceProcessingDisruptor traceProcessingDisruptor;
private final BatchWritingDisruptor batchWritingDisruptor;
public final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
public final Phaser apiPhaser = new Phaser(); // Ensure API calls are completed when flushing;
public final Monitor monitor;
// Apply defaults to the class generated by lombok.
public static class DDAgentWriterBuilder {
String agentHost = DEFAULT_AGENT_HOST;
int traceAgentPort = DEFAULT_TRACE_AGENT_PORT;
String unixDomainSocket = DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
int traceBufferSize = DISRUPTOR_BUFFER_SIZE;
Monitor monitor = new Monitor.Noop();
int flushFrequencySeconds = 1;
}
@Deprecated
public DDAgentWriter() {
this(
new DDAgentApi(
@ -53,62 +59,38 @@ public class DDAgentWriter implements Writer {
new Monitor.Noop());
}
@Deprecated
public DDAgentWriter(final DDAgentApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, SENDER_QUEUE_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
private DDAgentWriter(final DDAgentApi api) {
this(api, new Monitor.Noop());
}
/**
* Used in the tests.
*
* @param api
* @param disruptorSize Rounded up to next power of 2
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(
final DDAgentApi api,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this(api, new Monitor.Noop(), disruptorSize, senderQueueSize, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
this(api, monitor, disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
// DQH - TODO - Update the tests & remove this
private DDAgentWriter(
final DDAgentApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new Monitor.Noop(), disruptorSize, SENDER_QUEUE_SIZE, flushFrequencySeconds);
}
private DDAgentWriter(
final DDAgentApi api,
final Monitor monitor,
final int disruptorSize,
final int senderQueueSize,
final int flushFrequencySeconds) {
this.api = api;
this.monitor = monitor;
disruptor =
new TraceSerializingDisruptor(
disruptorSize, this, new TraceConsumer(traceCount, senderQueueSize, this));
batchWritingDisruptor = new BatchWritingDisruptor(DISRUPTOR_BUFFER_SIZE, 1, api, monitor, this);
traceProcessingDisruptor =
new TraceProcessingDisruptor(
DISRUPTOR_BUFFER_SIZE, api, batchWritingDisruptor, monitor, this);
}
this.flushFrequencySeconds = flushFrequencySeconds;
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
@lombok.Builder
// These field names must be stable to ensure the builder api is stable.
private DDAgentWriter(
final DDAgentApi agentApi,
final String agentHost,
final int traceAgentPort,
final String unixDomainSocket,
final int traceBufferSize,
final Monitor monitor,
final int flushFrequencySeconds) {
if (agentApi != null) {
api = agentApi;
} else {
api = new DDAgentApi(agentHost, traceAgentPort, unixDomainSocket);
}
this.monitor = monitor;
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
batchWritingDisruptor =
new BatchWritingDisruptor(traceBufferSize, flushFrequencySeconds, api, monitor, this);
traceProcessingDisruptor =
new TraceProcessingDisruptor(traceBufferSize, api, batchWritingDisruptor, monitor, this);
}
public void addResponseListener(final DDAgentResponseListener listener) {
@ -117,7 +99,7 @@ public class DDAgentWriter implements Writer {
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getDisruptorCapacity();
return traceProcessingDisruptor.getDisruptorCapacity();
}
public final long getDisruptorUtilizedCapacity() {
@ -125,20 +107,27 @@ public class DDAgentWriter implements Writer {
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getDisruptorRemainingCapacity();
return traceProcessingDisruptor.getDisruptorRemainingCapacity();
}
@Override
public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
if (disruptor.running) {
final boolean published = disruptor.tryPublish(trace);
if (traceProcessingDisruptor.running) {
final int representativeCount;
if (trace.isEmpty() || !(trace.get(0).isRootSpan())) {
// We don't want to reset the count if we can't correctly report the value.
representativeCount = 1;
} else {
representativeCount = traceCount.getAndSet(0) + 1;
}
final boolean published = traceProcessingDisruptor.publish(trace, representativeCount);
if (published) {
monitor.onPublish(DDAgentWriter.this, trace);
} else {
// We're discarding the trace, but we still want to count it.
traceCount.incrementAndGet();
traceCount.addAndGet(representativeCount);
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
monitor.onFailedPublish(this, trace);
@ -150,6 +139,10 @@ public class DDAgentWriter implements Writer {
}
}
public boolean flush() {
return traceProcessingDisruptor.flush(traceCount.getAndSet(0));
}
@Override
public void incrementTraceCount() {
traceCount.incrementAndGet();
@ -161,31 +154,19 @@ public class DDAgentWriter implements Writer {
@Override
public void start() {
disruptor.start();
batchWritingDisruptor.start();
traceProcessingDisruptor.start();
monitor.onStart(this);
}
@Override
public void close() {
boolean flushSuccess = true;
// We have to shutdown scheduled executor first to make sure no flush events issued after
// disruptor has been shutdown.
// Otherwise those events will never be processed and flush call will wait forever.
scheduledWriterExecutor.shutdown();
final boolean flushSuccess = traceProcessingDisruptor.flush(traceCount.getAndSet(0));
try {
scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS);
} catch (final InterruptedException e) {
log.warn("Waiting for flush executor shutdown interrupted.", e);
flushSuccess = false;
traceProcessingDisruptor.close();
} finally { // in case first close fails.
batchWritingDisruptor.close();
}
flushSuccess |= disruptor.flush();
disruptor.close();
monitor.onShutdown(this, flushSuccess);
}

View File

@ -1,25 +1,29 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.databind.ObjectMapper;
import static datadog.trace.common.serialization.JsonFormatWriter.TRACE_ADAPTER;
import datadog.opentracing.DDSpan;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LoggingWriter implements Writer {
private final ObjectMapper serializer = new ObjectMapper();
@Override
public void write(final List<DDSpan> trace) {
if (log.isInfoEnabled()) {
try {
log.info("write(trace): {}", serializer.writeValueAsString(trace));
log.info("write(trace): {}", toString(trace));
} catch (final Exception e) {
log.error("error writing(trace): {}", trace);
}
}
}
private String toString(final List<DDSpan> trace) {
return TRACE_ADAPTER.toJson(trace);
}
@Override
public void incrementTraceCount() {
log.info("incrementTraceCount()");

View File

@ -53,7 +53,7 @@ public interface Writer extends Closeable {
} else {
log.warn(
"Writer type not configured correctly: No config provided! Defaulting to DDAgentWriter.");
writer = new DDAgentWriter();
writer = DDAgentWriter.builder().build();
}
return writer;
@ -64,7 +64,10 @@ public interface Writer extends Closeable {
}
private static Writer createAgentWriter(final Config config) {
return new DDAgentWriter(createApi(config), createMonitor(config));
return DDAgentWriter.builder()
.agentApi(createApi(config))
.monitor(createMonitor(config))
.build();
}
private static DDAgentApi createApi(final Config config) {

View File

@ -0,0 +1,96 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.Closeable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
@Slf4j
abstract class AbstractDisruptor<T> implements Closeable {
protected final Disruptor<DisruptorEvent<T>> disruptor;
public volatile boolean running = false;
protected final DisruptorEvent.FlushTranslator<T> flushTranslator =
new DisruptorEvent.FlushTranslator<>();
protected final DisruptorEvent.DataTranslator<T> dataTranslator =
new DisruptorEvent.DataTranslator<>();
public AbstractDisruptor(final int disruptorSize, final EventHandler<DisruptorEvent<T>> handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<T>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
getThreadFactory(),
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(handler);
}
protected abstract ThreadFactory getThreadFactory();
public void start() {
disruptor.start();
running = true;
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
/**
* Allows the underlying publish to be defined as a blocking or non blocking call.
*
* @param data
* @param representativeCount
* @return
*/
public abstract boolean publish(final T data, int representativeCount);
/**
* This method will block until the flush is complete.
*
* @param traceCount - number of unreported traces to include in this batch.
*/
public boolean flush(final int traceCount) {
if (running) {
return flush(traceCount, new CountDownLatch(1));
} else {
return false;
}
}
/** This method will block until the flush is complete. */
protected boolean flush(final int traceCount, final CountDownLatch latch) {
log.info("Flushing any remaining traces.");
disruptor.publishEvent(flushTranslator, traceCount, latch);
try {
latch.await();
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
public final long getCurrentCount() {
return disruptor.getCursor() - disruptor.getRingBuffer().getMinimumGatingSequence();
}
}

View File

@ -0,0 +1,171 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes serialized traces and batches them into appropriately sized requests.
*
* <p>publishing to the buffer will block if the buffer is full.
*/
@Slf4j
public class BatchWritingDisruptor extends AbstractDisruptor<byte[]> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
// TODO: move executor to tracer for sharing with other tasks.
private final ScheduledExecutorService heartbeatExecutor =
Executors.newScheduledThreadPool(1, new DaemonThreadFactory("dd-trace-heartbeat"));
private final DisruptorEvent.HeartbeatTranslator<byte[]> heartbeatTranslator =
new DisruptorEvent.HeartbeatTranslator();
public BatchWritingDisruptor(
final int disruptorSize,
final int flushFrequencySeconds,
final DDAgentApi api,
final Monitor monitor,
final DDAgentWriter writer) {
super(disruptorSize, new BatchWritingHandler(flushFrequencySeconds, api, monitor, writer));
if (0 < flushFrequencySeconds) {
// This provides a steady stream of events to enable flushing with a low throughput.
final Runnable heartbeat =
new Runnable() {
@Override
public void run() {
// Only add if the buffer is empty.
if (running && getCurrentCount() == 0) {
disruptor.getRingBuffer().tryPublishEvent(heartbeatTranslator);
}
}
};
heartbeatExecutor.scheduleAtFixedRate(heartbeat, 100, 100, TimeUnit.MILLISECONDS);
}
}
@Override
protected ThreadFactory getThreadFactory() {
return new DaemonThreadFactory("dd-trace-writer");
}
@Override
public boolean publish(final byte[] data, final int representativeCount) {
// blocking call to ensure serialized traces aren't discarded and apply back pressure.
disruptor.getRingBuffer().publishEvent(dataTranslator, data, representativeCount);
return true;
}
// Intentionally not thread safe.
private static class BatchWritingHandler implements EventHandler<DisruptorEvent<byte[]>> {
private final long flushFrequencyNanos;
private final DDAgentApi api;
private final Monitor monitor;
private final DDAgentWriter writer;
private final List<byte[]> serializedTraces = new ArrayList<>();
private int representativeCount = 0;
private int sizeInBytes = 0;
private long nextScheduledFlush;
private BatchWritingHandler(
final int flushFrequencySeconds,
final DDAgentApi api,
final Monitor monitor,
final DDAgentWriter writer) {
flushFrequencyNanos = TimeUnit.SECONDS.toNanos(flushFrequencySeconds);
scheduleNextFlush();
this.api = api;
this.monitor = monitor;
this.writer = writer;
}
// TODO: reduce byte[] garbage by keeping the byte[] on the event and copy before returning.
@Override
public void onEvent(
final DisruptorEvent<byte[]> event, final long sequence, final boolean endOfBatch) {
try {
if (event.data != null) {
sizeInBytes += event.data.length;
serializedTraces.add(event.data);
}
// Flush events might increase this with no data.
representativeCount += event.representativeCount;
if (event.flushLatch != null
|| FLUSH_PAYLOAD_BYTES <= sizeInBytes
|| nextScheduledFlush <= System.nanoTime()) {
flush(event.flushLatch, FLUSH_PAYLOAD_BYTES <= sizeInBytes);
}
} finally {
event.reset();
}
}
private void flush(final CountDownLatch flushLatch, final boolean early) {
try {
if (serializedTraces.isEmpty()) {
// FIXME: this will reset representativeCount without reporting
// anything even if representativeCount > 0.
return;
}
// TODO add retry and rate limiting
final DDAgentApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, serializedTraces);
monitor.onFlush(writer, early);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", serializedTraces.size());
monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
serializedTraces.size(),
representativeCount,
sizeInBytes);
monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
} finally {
serializedTraces.clear();
sizeInBytes = 0;
representativeCount = 0;
scheduleNextFlush();
if (flushLatch != null) {
flushLatch.countDown();
}
}
}
private void scheduleNextFlush() {
// TODO: adjust this depending on responsiveness of the agent.
if (0 < flushFrequencyNanos) {
nextScheduledFlush = System.nanoTime() + flushFrequencyNanos;
} else {
nextScheduledFlush = Long.MAX_VALUE;
}
}
}
}

View File

@ -1,9 +1,10 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.opentracing.ContainerInfo;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
@ -12,8 +13,8 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
@ -24,7 +25,7 @@ import okhttp3.RequestBody;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
import org.msgpack.core.buffer.ArrayBufferOutput;
/** The API pointing to a DD agent */
@Slf4j
@ -47,7 +48,14 @@ public class DDAgentApi {
private volatile long nextAllowedLogTime = 0;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
private static final JsonAdapter<Map<String, Map<String, Number>>> RESPONSE_ADAPTER =
new Moshi.Builder()
.build()
.adapter(
Types.newParameterizedType(
Map.class,
String.class,
Types.newParameterizedType(Map.class, String.class, Double.class)));
private static final MediaType MSGPACK = MediaType.get("application/msgpack");
private final OkHttpClient httpClient;
@ -57,7 +65,7 @@ public class DDAgentApi {
this(
host,
port,
traceEndpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath),
endpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath, true),
unixDomainSocketPath);
}
@ -97,7 +105,7 @@ public class DDAgentApi {
final byte[] serializedTrace = serializeTrace(trace);
sizeInBytes += serializedTrace.length;
serializedTraces.add(serializedTrace);
} catch (final JsonProcessingException e) {
} catch (final IOException e) {
log.warn("Error serializing trace", e);
// TODO: DQH - Incorporate the failed serialization into the Response object???
@ -107,8 +115,13 @@ public class DDAgentApi {
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
}
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(trace);
byte[] serializeTrace(final List<DDSpan> trace) throws IOException {
// TODO: reuse byte array buffer
final ArrayBufferOutput output = new ArrayBufferOutput();
final MessagePacker packer = MessagePack.newDefaultPacker(output);
MSGPACK_WRITER.writeTrace(trace, packer);
packer.flush();
return output.toByteArray();
}
Response sendSerializedTraces(
@ -183,17 +196,16 @@ public class DDAgentApi {
final String responseString = response.body().string().trim();
try {
if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final Map<String, Map<String, Number>> parsedResponse =
RESPONSE_ADAPTER.fromJson(responseString);
final String endpoint = tracesUrl.toString();
for (final DDAgentResponseListener listener : responseListeners) {
listener.onResponse(endpoint, parsedResponse);
}
return Response.success(response.code(), parsedResponse);
}
return Response.success(response.code());
} catch (final JsonParseException e) {
} catch (final IOException e) {
log.debug("Failed to parse DD agent response: " + responseString, e);
return Response.success(response.code(), e);
@ -222,19 +234,13 @@ public class DDAgentApi {
}
}
private static boolean traceEndpointAvailable(
final HttpUrl url, final String unixDomainSocketPath) {
return endpointAvailable(url, unixDomainSocketPath, Collections.emptyList(), true);
}
private static final byte[] EMPTY_LIST = new byte[] {MessagePack.Code.FIXARRAY_PREFIX};
private static boolean endpointAvailable(
final HttpUrl url,
final String unixDomainSocketPath,
final Object data,
final boolean retry) {
final HttpUrl url, final String unixDomainSocketPath, final boolean retry) {
try {
final OkHttpClient client = buildHttpClient(unixDomainSocketPath);
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data));
final RequestBody body = RequestBody.create(MSGPACK, EMPTY_LIST);
final Request request = prepareRequest(url).put(body).build();
try (final okhttp3.Response response = client.newCall(request).execute()) {
@ -242,7 +248,7 @@ public class DDAgentApi {
}
} catch (final IOException e) {
if (retry) {
return endpointAvailable(url, unixDomainSocketPath, data, false);
return endpointAvailable(url, unixDomainSocketPath, false);
}
}
return false;
@ -307,42 +313,31 @@ public class DDAgentApi {
public static final class Response {
/** Factory method for a successful request with a trivial response body */
public static final Response success(final int status) {
return new Response(true, status, null, null);
}
/** Factory method for a successful request with a well-formed JSON response body */
public static final Response success(final int status, final JsonNode json) {
return new Response(true, status, json, null);
return new Response(true, status, null);
}
/** Factory method for a successful request will a malformed response body */
public static final Response success(final int status, final Throwable exception) {
return new Response(true, status, null, exception);
return new Response(true, status, exception);
}
/** Factory method for a request that receive an error status in response */
public static final Response failed(final int status) {
return new Response(false, status, null, null);
return new Response(false, status, null);
}
/** Factory method for a failed communication attempt */
public static final Response failed(final Throwable exception) {
return new Response(false, null, null, exception);
return new Response(false, null, exception);
}
private final boolean success;
private final Integer status;
private final JsonNode json;
private final Throwable exception;
private Response(
final boolean success,
final Integer status,
final JsonNode json,
final Throwable exception) {
private Response(final boolean success, final Integer status, final Throwable exception) {
this.success = success;
this.status = status;
this.json = json;
this.exception = exception;
}
@ -355,10 +350,6 @@ public class DDAgentApi {
return status;
}
public final JsonNode json() {
return json;
}
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
public final Throwable exception() {
return exception;

View File

@ -1,8 +1,8 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
public interface DDAgentResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
void onResponse(String endpoint, Map<String, Map<String, Number>> responseJson);
}

View File

@ -2,13 +2,20 @@ package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import datadog.opentracing.DDSpan;
import java.util.List;
import com.lmax.disruptor.EventTranslatorTwoArg;
import java.util.concurrent.CountDownLatch;
class DisruptorEvent<T> {
public volatile boolean shouldFlush = false;
public volatile T data = null;
// Memory ordering enforced by disruptor's memory fences, so volatile not required.
T data = null;
int representativeCount = 0;
CountDownLatch flushLatch = null;
void reset() {
data = null;
representativeCount = 0;
flushLatch = null;
}
static class Factory<T> implements EventFactory<DisruptorEvent<T>> {
@Override
@ -17,25 +24,38 @@ class DisruptorEvent<T> {
}
}
static class TraceTranslator
implements EventTranslatorOneArg<DisruptorEvent<List<DDSpan>>, List<DDSpan>> {
static final DisruptorEvent.TraceTranslator TRACE_TRANSLATOR =
new DisruptorEvent.TraceTranslator();
static class DataTranslator<T> implements EventTranslatorTwoArg<DisruptorEvent<T>, T, Integer> {
@Override
public void translateTo(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
event.data = trace;
final DisruptorEvent<T> event,
final long sequence,
final T data,
final Integer representativeCount) {
event.data = data;
event.representativeCount = representativeCount;
}
}
static class FlushTranslator implements EventTranslator<DisruptorEvent<List<DDSpan>>> {
static final DisruptorEvent.FlushTranslator FLUSH_TRANSLATOR =
new DisruptorEvent.FlushTranslator();
static class HeartbeatTranslator<T> implements EventTranslator<DisruptorEvent<T>> {
@Override
public void translateTo(final DisruptorEvent<List<DDSpan>> event, final long sequence) {
event.shouldFlush = true;
public void translateTo(final DisruptorEvent<T> event, final long sequence) {
return;
}
}
static class FlushTranslator<T>
implements EventTranslatorTwoArg<DisruptorEvent<T>, Integer, CountDownLatch> {
@Override
public void translateTo(
final DisruptorEvent<T> event,
final long sequence,
final Integer representativeCount,
final CountDownLatch latch) {
event.representativeCount = representativeCount;
event.flushLatch = latch;
}
}
}

View File

@ -1,150 +0,0 @@
package datadog.trace.common.writer.ddagent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
/** This class is intentionally not threadsafe. */
@Slf4j
public class TraceConsumer implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private final AtomicInteger traceCount;
private final Semaphore senderSemaphore;
private final DDAgentWriter writer;
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
public TraceConsumer(
final AtomicInteger traceCount, final int senderQueueSize, final DDAgentWriter writer) {
this.traceCount = traceCount;
senderSemaphore = new Semaphore(senderQueueSize);
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = writer.getApi().serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
writer.monitor.onSerialize(writer, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
writer.monitor.onFailedSerialize(writer, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
final boolean early = (payloadSize >= FLUSH_PAYLOAD_BYTES);
reportTraces(early);
event.shouldFlush = false;
}
}
private void reportTraces(final boolean early) {
try {
if (serializedTraces.isEmpty()) {
writer.monitor.onFlush(writer, early);
writer.apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
if (writer.scheduledWriterExecutor.isShutdown()) {
writer.monitor.onFailedSend(
writer, traceCount.get(), payloadSize, DDAgentApi.Response.failed(-1));
writer.apiPhaser.arrive(); // Allow flush to return
return;
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
try {
writer.monitor.onFlush(writer, early);
// Run the actual IO task on a different thread to avoid blocking the consumer.
try {
senderSemaphore.acquire();
} catch (final InterruptedException e) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
// Finally, we'll schedule another flush
// Any threads awaiting the flush will continue to wait
return;
}
writer.scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
senderSemaphore.release();
try {
final DDAgentApi.Response response =
writer
.getApi()
.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
writer.monitor.onSend(writer, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
writer.monitor.onFailedSend(writer, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(e));
} finally {
writer.apiPhaser.arrive(); // Flush completed.
}
}
});
} catch (final RejectedExecutionException ex) {
writer.monitor.onFailedSend(
writer, representativeCount, sizeInBytes, DDAgentApi.Response.failed(ex));
writer.apiPhaser.arrive(); // Allow flush to return
}
} finally {
payloadSize = 0;
writer.disruptor.scheduleFlush();
}
}
}

View File

@ -0,0 +1,97 @@
package datadog.trace.common.writer.ddagent;
import com.lmax.disruptor.EventHandler;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;
/**
* Disruptor that takes completed traces and applies processing to them. Upon completion, the
* serialized trace is published to {@link BatchWritingDisruptor}.
*
* <p>publishing to the buffer will not block the calling thread, but instead will return false if
* the buffer is full. This is to avoid impacting an application thread.
*/
@Slf4j
public class TraceProcessingDisruptor extends AbstractDisruptor<List<DDSpan>> {
public TraceProcessingDisruptor(
final int disruptorSize,
final DDAgentApi api,
final BatchWritingDisruptor batchWritingDisruptor,
final Monitor monitor,
final DDAgentWriter writer) {
// TODO: add config to enable control over serialization overhead.
super(disruptorSize, new TraceSerializingHandler(api, batchWritingDisruptor, monitor, writer));
}
@Override
protected ThreadFactory getThreadFactory() {
return new DaemonThreadFactory("dd-trace-processor");
}
@Override
public boolean publish(final List<DDSpan> data, final int representativeCount) {
return disruptor.getRingBuffer().tryPublishEvent(dataTranslator, data, representativeCount);
}
// This class is threadsafe if we want to enable more processors.
public static class TraceSerializingHandler
implements EventHandler<DisruptorEvent<List<DDSpan>>> {
private final DDAgentApi api;
private final BatchWritingDisruptor batchWritingDisruptor;
private final Monitor monitor;
private final DDAgentWriter writer;
public TraceSerializingHandler(
final DDAgentApi api,
final BatchWritingDisruptor batchWritingDisruptor,
final Monitor monitor,
final DDAgentWriter writer) {
this.api = api;
this.batchWritingDisruptor = batchWritingDisruptor;
this.monitor = monitor;
this.writer = writer;
}
@Override
public void onEvent(
final DisruptorEvent<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
try {
if (event.data != null) {
if (1 < event.representativeCount && !event.data.isEmpty()) {
// attempt to have agent scale the metrics properly
((DDSpan) event.data.get(0).getLocalRootSpan())
.context()
.setMetric("_sample_rate", 1d / event.representativeCount);
}
try {
final byte[] serializedTrace = api.serializeTrace(event.data);
batchWritingDisruptor.publish(serializedTrace, event.representativeCount);
monitor.onSerialize(writer, event.data, serializedTrace);
event.representativeCount = 0; // reset in case flush is invoked below.
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
monitor.onFailedSerialize(writer, event.data, e);
}
}
if (event.flushLatch != null) {
if (batchWritingDisruptor.running) {
// propagate the flush.
batchWritingDisruptor.flush(event.representativeCount, event.flushLatch);
}
if (!batchWritingDisruptor.running) { // check again to protect against race condition.
// got shutdown early somehow?
event.flushLatch.countDown();
}
}
} finally {
event.reset();
}
}
}
}

View File

@ -1,117 +0,0 @@
package datadog.trace.common.writer.ddagent;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.FlushTranslator.FLUSH_TRANSLATOR;
import static datadog.trace.common.writer.ddagent.DisruptorEvent.TraceTranslator.TRACE_TRANSLATOR;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.opentracing.DDSpan;
import datadog.trace.common.util.DaemonThreadFactory;
import datadog.trace.common.writer.DDAgentWriter;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TraceSerializingDisruptor implements Closeable {
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private final FlushTask flushTask = new FlushTask();
private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
private final DDAgentWriter writer;
public volatile boolean running = false;
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
public TraceSerializingDisruptor(
final int disruptorSize, final DDAgentWriter writer, final TraceConsumer handler) {
disruptor =
new Disruptor<>(
new DisruptorEvent.Factory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
this.writer = writer;
disruptor.handleEventsWith(handler);
}
public void start() {
disruptor.start();
running = true;
scheduleFlush();
}
@Override
public void close() {
running = false;
disruptor.shutdown();
}
public boolean tryPublish(final List<DDSpan> trace) {
return disruptor.getRingBuffer().tryPublishEvent(TRACE_TRANSLATOR, trace);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
writer.apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
writer.apiPhaser.awaitAdvanceInterruptibly(writer.apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
public void scheduleFlush() {
if (writer.flushFrequencySeconds > 0 && !writer.scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
writer.scheduledWriterExecutor.schedule(
flushTask, writer.flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
writer.monitor.onScheduleFlush(writer, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
}

View File

@ -1,43 +1,48 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.collect.Maps
import com.squareup.moshi.Moshi
import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.core.buffer.ArrayBufferInput
import org.msgpack.jackson.dataformat.MessagePackFactory
import org.msgpack.core.buffer.ArrayBufferOutput
import org.msgpack.value.ValueType
import static datadog.trace.common.serialization.JsonFormatWriter.SPAN_ADAPTER
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
class DDSpanSerializationTest extends DDSpecification {
def "serialize spans with sampling #samplingPriority"() throws Exception {
setup:
final Map<String, String> baggage = new HashMap<>()
baggage.put("a-baggage", "value")
final Map<String, Object> tags = new HashMap<>()
baggage.put("k1", "v1")
def jsonAdapter = new Moshi.Builder().build().adapter(Map)
Map<String, Object> expected = Maps.newHashMap()
expected.put("meta", baggage)
expected.put("service", "service")
expected.put("error", 0)
expected.put("type", "type")
expected.put("name", "operation")
expected.put("duration", 33000)
expected.put("resource", "operation")
final Map<String, Number> metrics = new HashMap<>()
metrics.put("_sampling_priority_v1", 1)
final Map<String, Number> metrics = ["_sampling_priority_v1": 1]
if (samplingPriority == PrioritySampling.UNSET) { // RateByServiceSampler sets priority
metrics.put("_dd.agent_psr", 1.0d)
}
expected.put("metrics", metrics)
expected.put("start", 100000)
expected.put("span_id", 2l)
expected.put("parent_id", 0l)
expected.put("trace_id", 1l)
Map<String, Object> expected = [
service : "service",
name : "operation",
resource : "operation",
trace_id : 1l,
span_id : 2l,
parent_id: 0l,
start : 100000,
duration : 33000,
type : "type",
error : 0,
metrics : metrics,
meta : [
"a-baggage" : "value",
"k1" : "v1",
(DDTags.THREAD_NAME): Thread.currentThread().getName(),
(DDTags.THREAD_ID) : String.valueOf(Thread.currentThread().getId()),
],
]
def writer = new ListWriter()
def tracer = DDTracer.builder().writer(writer).build()
@ -51,23 +56,19 @@ class DDSpanSerializationTest extends DDSpecification {
null,
samplingPriority,
null,
new HashMap<>(baggage),
["a-baggage": "value"],
false,
"type",
tags,
["k1": "v1"],
new PendingTrace(tracer, 1G, [:]),
tracer)
baggage.put(DDTags.THREAD_NAME, Thread.currentThread().getName())
baggage.put(DDTags.THREAD_ID, String.valueOf(Thread.currentThread().getId()))
DDSpan span = new DDSpan(100L, context)
span.finish(133L)
ObjectMapper serializer = new ObjectMapper()
def actualTree = serializer.readTree(serializer.writeValueAsString(span))
def expectedTree = serializer.readTree(serializer.writeValueAsString(expected))
def actualTree = jsonAdapter.fromJson(SPAN_ADAPTER.toJson(span))
def expectedTree = jsonAdapter.fromJson(jsonAdapter.toJson(expected))
expect:
actualTree == expectedTree
@ -79,7 +80,6 @@ class DDSpanSerializationTest extends DDSpecification {
def "serialize trace/span with id #value as int"() {
setup:
def objectMapper = new ObjectMapper(new MessagePackFactory())
def writer = new ListWriter()
def tracer = DDTracer.builder().writer(writer).build()
def context = new DDSpanContext(
@ -98,7 +98,11 @@ class DDSpanSerializationTest extends DDSpecification {
new PendingTrace(tracer, 1G, [:]),
tracer)
def span = new DDSpan(0, context)
byte[] bytes = objectMapper.writeValueAsBytes(span)
def buffer = new ArrayBufferOutput()
def packer = MessagePack.newDefaultPacker(buffer)
MSGPACK_WRITER.writeDDSpan(span, packer)
packer.flush()
byte[] bytes = buffer.toByteArray()
def unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes))
int size = unpacker.unpackMapHeader()

View File

@ -1,24 +1,25 @@
package datadog.trace.api.sampling
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.DDSpan
import datadog.opentracing.DDTracer
import datadog.opentracing.SpanFactory
import datadog.trace.api.DDTags
import datadog.trace.common.sampling.RateByServiceSampler
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.util.test.DDSpecification
import static datadog.trace.common.sampling.RateByServiceSampler.DEFAULT_KEY
class RateByServiceSamplerTest extends DDSpecification {
static serializer = DDAgentApi.RESPONSE_ADAPTER
def "invalid rate -> 1"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
String response = '{"rate_by_service": {"service:,env:":' + rate + '}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
expect:
serviceSampler.serviceRates[DEFAULT_KEY].sampleRate == expectedRate
@ -35,11 +36,10 @@ class RateByServiceSamplerTest extends DDSpecification {
def "rate by service name"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
when:
String response = '{"rate_by_service": {"service:spock,env:test":0.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span1 = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.setSamplingPriority(span1)
then:
@ -48,7 +48,7 @@ class RateByServiceSamplerTest extends DDSpecification {
when:
response = '{"rate_by_service": {"service:spock,env:test":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span2 = SpanFactory.newSpanOf("spock", "test")
serviceSampler.setSamplingPriority(span2)
then:
@ -59,9 +59,8 @@ class RateByServiceSamplerTest extends DDSpecification {
def "sampling priority set on context"() {
setup:
RateByServiceSampler serviceSampler = new RateByServiceSampler()
ObjectMapper serializer = new ObjectMapper()
String response = '{"rate_by_service": {"service:,env:":1.0}}'
serviceSampler.onResponse("traces", serializer.readTree(response))
serviceSampler.onResponse("traces", serializer.fromJson(response))
DDSpan span = SpanFactory.newSpanOf("foo", "bar")
serviceSampler.setSamplingPriority(span)
@ -76,8 +75,8 @@ class RateByServiceSamplerTest extends DDSpecification {
def sampler = new RateByServiceSampler()
def tracer = DDTracer.builder().writer(new LoggingWriter()).sampler(sampler).build()
sampler.onResponse("test", new ObjectMapper()
.readTree('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
sampler.onResponse("test", serializer
.fromJson('{"rate_by_service":{"service:,env:":1.0,"service:spock,env:":0.0}}'))
when:
def span = tracer.buildSpan("test").start()

View File

@ -1,11 +1,12 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DDAgentResponseListener
import datadog.trace.util.test.DDSpecification
import org.msgpack.jackson.dataformat.MessagePackFactory
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
@ -13,7 +14,7 @@ import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
class DDAgentApiTest extends DDSpecification {
static mapper = DDAgentApi.OBJECT_MAPPER
static mapper = new ObjectMapper(new MessagePackFactory())
def "sending an empty list of traces returns no errors"() {
setup:
@ -124,15 +125,15 @@ class DDAgentApiTest extends DDSpecification {
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
DDAgentResponseListener responseListener = { String endpoint, JsonNode responseJson ->
agentResponse.set(responseJson.toString())
def agentResponse = new AtomicReference<Map>(null)
DDAgentResponseListener responseListener = { String endpoint, Map responseJson ->
agentResponse.set(responseJson)
}
def agent = httpServer {
handlers {
put("v0.4/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send('{"hello":"test"}')
response.status(status).send('{"hello":{}}')
}
}
}
@ -142,7 +143,7 @@ class DDAgentApiTest extends DDSpecification {
when:
client.sendTraces([[], [], []])
then:
agentResponse.get() == '{"hello":"test"}'
agentResponse.get() == ["hello": [:]]
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"

View File

@ -7,38 +7,63 @@ import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ddagent.BatchWritingDisruptor
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.common.writer.ddagent.TraceConsumer
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.core.buffer.ArrayBufferOutput
import spock.lang.Retry
import spock.lang.Timeout
import java.util.concurrent.Phaser
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import static datadog.opentracing.SpanFactory.newSpanOf
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static datadog.trace.common.serialization.MsgpackFormatWriter.MSGPACK_WRITER
import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
class DDAgentWriterTest extends DDSpecification {
def api = Mock(DDAgentApi)
def phaser = new Phaser()
def api = Mock(DDAgentApi) {
// Define the following response in the spec:
// sendSerializedTraces(_, _, _) >> {
// phaser.arrive()
// return DDAgentApi.Response.success(200)
// }
}
def monitor = Mock(Monitor)
def setup() {
// Register for two threads.
phaser.register()
phaser.register()
}
def "test happy path"() {
setup:
def writer = new DDAgentWriter(api, 2, -1)
def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(2).flushFrequencySeconds(-1).build()
writer.start()
when:
writer.flush()
then:
0 * _
when:
writer.write(trace)
writer.write(trace)
writer.disruptor.flush()
writer.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
0 * _
cleanup:
@ -50,18 +75,18 @@ class DDAgentWriterTest extends DDSpecification {
def "test flood of traces"() {
setup:
def writer = new DDAgentWriter(api, disruptorSize, -1)
def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(disruptorSize).flushFrequencySeconds(-1).build()
writer.start()
when:
(1..traceCount).each {
writer.write(trace)
}
writer.disruptor.flush()
writer.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount })
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) >> DDAgentApi.Response.success(200)
0 * _
cleanup:
@ -75,10 +100,8 @@ class DDAgentWriterTest extends DDSpecification {
def "test flush by size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
def phaser = writer.apiPhaser
def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build()
writer.start()
phaser.register()
when:
(1..6).each {
@ -90,35 +113,35 @@ class DDAgentWriterTest extends DDSpecification {
then:
6 * api.serializeTrace(_) >> { trace -> callRealMethod() }
2 * api.sendSerializedTraces(3, _, { it.size() == 3 })
2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) >> {
phaser.arrive()
return DDAgentApi.Response.success(200)
}
when:
(1..2).each {
writer.write(trace)
}
// Flush the remaining 2
writer.disruptor.flush()
writer.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) >> DDAgentApi.Response.success(200)
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
span = newSpanOf(0, "fixed-thread-name")
trace = (0..10000).collect { span }
}
def "test flush by time"() {
setup:
def writer = new DDAgentWriter(api)
def phaser = writer.apiPhaser
phaser.register()
def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start()
writer.disruptor.flush()
when:
(1..5).each {
@ -128,20 +151,26 @@ class DDAgentWriterTest extends DDSpecification {
then:
5 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(5, _, { it.size() == 5 })
1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) >> DDAgentApi.Response.success(200)
5 * monitor.onPublish(_, _)
5 * monitor.onSerialize(_, _, _)
1 * monitor.onFlush(_, _)
1 * monitor.onSend(_, _, _, _) >> {
phaser.arrive()
}
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
span = newSpanOf(0, "fixed-thread-name")
trace = (1..10).collect { span }
}
def "test default buffer size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
def writer = DDAgentWriter.builder().agentApi(api).traceBufferSize(DISRUPTOR_BUFFER_SIZE).flushFrequencySeconds(-1).build()
writer.start()
when:
@ -153,11 +182,11 @@ class DDAgentWriterTest extends DDSpecification {
// Busywait because we don't want to fill up the ring buffer
}
}
writer.disruptor.flush()
writer.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount })
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) >> DDAgentApi.Response.success(200)
cleanup:
writer.close()
@ -180,40 +209,44 @@ class DDAgentWriterTest extends DDSpecification {
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDAgentApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (TraceConsumer.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
traceSize = calculateSize(minimalTrace)
maxedPayloadTraceCount = ((int) (BatchWritingDisruptor.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {
setup:
def writer = new DDAgentWriter(api)
def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start()
when:
writer.close()
writer.write([])
writer.disruptor.flush()
writer.flush()
then:
// 2 * monitor.onFlush(_, false)
1 * monitor.onFailedPublish(_, _)
1 * monitor.onShutdown(_, _)
0 * _
writer.traceCount.get() == 0
}
def "check shutdown if executor stopped first"() {
def "check shutdown if batchWritingDisruptor stopped first"() {
setup:
def writer = new DDAgentWriter(api)
def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start()
writer.scheduledWriterExecutor.shutdown()
writer.batchWritingDisruptor.close()
when:
writer.write([])
writer.disruptor.flush()
writer.flush()
then:
1 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * monitor.onSerialize(writer, _, _)
1 * monitor.onPublish(writer, _)
0 * _
writer.traceCount.get() == 1
writer.traceCount.get() == 0
cleanup:
writer.close()
@ -253,9 +286,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDAgentApi("localhost", agent.address.port, null)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
when:
writer.start()
@ -265,12 +296,12 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.disruptor.flush()
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onFlush(writer, _)
1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 })
when:
@ -302,9 +333,7 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDAgentApi("localhost", agent.address.port, null)
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
when:
writer.start()
@ -314,12 +343,12 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.disruptor.flush()
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 })
when:
@ -345,8 +374,7 @@ class DDAgentWriterTest extends DDSpecification {
return DDAgentApi.Response.failed(new IOException("comm error"))
}
}
def monitor = Mock(Monitor)
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
when:
writer.start()
@ -356,12 +384,12 @@ class DDAgentWriterTest extends DDSpecification {
when:
writer.write(minimalTrace)
writer.disruptor.flush()
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null })
when:
@ -371,6 +399,8 @@ class DDAgentWriterTest extends DDSpecification {
1 * monitor.onShutdown(writer, true)
}
@Retry(delay = 10)
// if execution is too slow, the http client timeout may trigger.
def "slow response test"() {
def numWritten = 0
def numFlushes = new AtomicInteger(0)
@ -382,7 +412,6 @@ class DDAgentWriterTest extends DDSpecification {
def responseSemaphore = new Semaphore(1)
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def agent = httpServer {
@ -400,30 +429,27 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
monitor.onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet()
}
monitor.onFlush(_, _) >> {
numFlushes.incrementAndGet()
}
monitor.onSend(_, _, _, _) >> {
numRequests.incrementAndGet()
}
monitor.onFailedPublish(_, _, _, _) >> {
numFailedRequests.incrementAndGet()
def monitor = Stub(Monitor) {
onPublish(_, _) >> {
numPublished.incrementAndGet()
}
onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet()
}
onFlush(_, _) >> {
numFlushes.incrementAndGet()
}
onSend(_, _, _, _) >> {
numRequests.incrementAndGet()
}
onFailedPublish(_, _, _, _) >> {
numFailedRequests.incrementAndGet()
}
}
// sender queue is sized in requests -- not traces
def bufferSize = 32
def senderQueueSize = 2
def writer = new DDAgentWriter(api, monitor, bufferSize, senderQueueSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).traceBufferSize(bufferSize).build()
writer.start()
// gate responses
@ -438,7 +464,7 @@ class DDAgentWriterTest extends DDSpecification {
// sanity check coordination mechanism of test
// release to allow response to be generated
responseSemaphore.release()
writer.disruptor.flush()
writer.flush()
// reacquire semaphore to stall further responses
responseSemaphore.acquire()
@ -452,13 +478,10 @@ class DDAgentWriterTest extends DDSpecification {
when:
// send many traces to fill the sender queue...
// loop until outstanding requests > finished requests
while (numFlushes.get() - (numRequests.get() + numFailedRequests.get()) < senderQueueSize) {
// chunk the loop & wait to allow for flushing to send queue
(1..1_000).each {
writer.write(minimalTrace)
numWritten += 1
}
Thread.sleep(100)
while (writer.traceProcessingDisruptor.disruptorRemainingCapacity + writer.batchWritingDisruptor.disruptorRemainingCapacity > 0 || numFailedPublish.get() == 0) {
writer.write(minimalTrace)
numWritten += 1
Thread.sleep(1) // Allow traces to get serialized.
}
then:
@ -469,17 +492,18 @@ class DDAgentWriterTest extends DDSpecification {
def priorNumFailed = numFailedPublish.get()
// with both disruptor & queue full, should reject everything
def expectedRejects = 100_000
def expectedRejects = 100
(1..expectedRejects).each {
writer.write(minimalTrace)
numWritten += 1
}
then:
// If the in-flight requests timeouts and frees up a slot in the sending queue, then
// many of traces will be accepted and batched into a new failing request.
// If the in-flight request times out (we don't currently retry),
// then a new batch will begin processing and many of traces will
// be accepted and batched into a new failing request.
// In that case, the reject number will be low.
numFailedPublish.get() - priorNumFailed > expectedRejects * 0.40
numFailedPublish.get() - priorNumFailed >= expectedRejects * 0.80
numPublished.get() + numFailedPublish.get() == numWritten
cleanup:
@ -487,6 +511,10 @@ class DDAgentWriterTest extends DDSpecification {
writer.close()
agent.close()
where:
bufferSize = 16
minimalTrace = createMinimalTrace()
}
def "multi threaded"() {
@ -505,21 +533,21 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDAgentApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
monitor.onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet()
}
monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
numRepSent.addAndGet(repCount)
def monitor = Stub(Monitor) {
onPublish(_, _) >> {
numPublished.incrementAndGet()
}
onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet()
}
onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
numRepSent.addAndGet(repCount)
}
}
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
writer.start()
when:
@ -538,7 +566,7 @@ class DDAgentWriterTest extends DDSpecification {
t1.join()
t2.join()
writer.disruptor.flush()
writer.flush()
then:
def totalTraces = 100 + 100
@ -566,7 +594,6 @@ class DDAgentWriterTest extends DDSpecification {
}
}
}
def api = new DDAgentApi("localhost", agent.address.port, null)
def statsd = Stub(StatsDClient)
statsd.incrementCounter("queue.accepted") >> { stat ->
@ -580,12 +607,12 @@ class DDAgentWriterTest extends DDSpecification {
}
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().traceAgentPort(agent.address.port).monitor(monitor).build()
writer.start()
when:
writer.write(minimalTrace)
writer.disruptor.flush()
writer.flush()
then:
numTracesAccepted == 1
@ -628,12 +655,12 @@ class DDAgentWriterTest extends DDSpecification {
}
def monitor = new Monitor.StatsD(statsd)
def writer = new DDAgentWriter(api, monitor)
def writer = DDAgentWriter.builder().agentApi(api).monitor(monitor).build()
writer.start()
when:
writer.write(minimalTrace)
writer.disruptor.flush()
writer.flush()
then:
numRequests == 1
@ -643,4 +670,12 @@ class DDAgentWriterTest extends DDSpecification {
cleanup:
writer.close()
}
static int calculateSize(List<DDSpan> trace) {
def buffer = new ArrayBufferOutput()
def packer = MessagePack.newDefaultPacker(buffer)
MSGPACK_WRITER.writeTrace(trace, packer)
packer.flush()
return buffer.size
}
}

View File

@ -0,0 +1,21 @@
package datadog.trace.api.writer
import datadog.opentracing.DDTracer
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.util.test.DDSpecification
import spock.lang.Subject
class LoggingWriterTest extends DDSpecification {
@Subject
def writer = new LoggingWriter()
def tracer = Mock(DDTracer)
def sampleTrace = [SpanFactory.newSpanOf(tracer), SpanFactory.newSpanOf(tracer)]
def "test toString"() {
expect:
writer.toString(sampleTrace).startsWith('[{"service":"fakeService","name":"fakeOperation","resource":"fakeResource","trace_id":1,"span_id":1,"parent_id":0,"start":1000,"duration":0,"type":"fakeType","error":0,"metrics":{},"meta":{')
}
}

View File

@ -1,4 +1,3 @@
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
@ -63,11 +62,11 @@ class DDApiIntegrationTest {
def unixDomainSocketApi
def endpoint = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<Map<String, Map<String, Number>>>(null)
DDAgentResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
DDAgentResponseListener responseListener = { String receivedEndpoint, Map<String, Map<String, Number>> responseJson ->
endpoint.set(receivedEndpoint)
agentResponse.set(responseJson.toString())
agentResponse.set(responseJson)
}
def setupSpec() {
@ -126,7 +125,7 @@ class DDApiIntegrationTest {
api.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
}
where:
@ -147,7 +146,7 @@ class DDApiIntegrationTest {
unixDomainSocketApi.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
}
where:

View File

@ -7,9 +7,7 @@ ext {
slf4j : "1.7.29",
guava : "20.0", // Last version to support Java 7
// When upgrading for security fixes, ensure corresponding change is reflected in jmxfetch.
jackson : "2.10.0", // https://nvd.nist.gov/vuln/detail/CVE-2019-16942 et al
okhttp : "3.12.8", // 3.12.x is last version to support Java7)
spock : "1.3-groovy-$spockGroovyVer",
groovy : groovyVer,
@ -34,12 +32,9 @@ ext {
// General
slf4j : "org.slf4j:slf4j-api:${versions.slf4j}",
guava : "com.google.guava:guava:$versions.guava",
jackson : [
dependencies.create(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson),
dependencies.create(group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.18'),
],
bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: "${versions.bytebuddy}"),
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"),
okhttp : dependencies.create(group: 'com.squareup.okhttp3', name: 'okhttp', version: versions.okhttp),
bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: versions.bytebuddy),
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: versions.bytebuddy),
autoservice : [
dependencies.create(group: 'com.google.auto.service', name: 'auto-service', version: '1.0-rc3'),
dependencies.create(group: 'com.google.auto', name: 'auto-common', version: '0.8'),

View File

@ -60,6 +60,7 @@ include ':dd-java-agent:instrumentation:elasticsearch:transport-2'
include ':dd-java-agent:instrumentation:elasticsearch:transport-5'
include ':dd-java-agent:instrumentation:elasticsearch:transport-5.3'
include ':dd-java-agent:instrumentation:elasticsearch:transport-6'
include ':dd-java-agent:instrumentation:finatra-2.9'
include ':dd-java-agent:instrumentation:glassfish'
include ':dd-java-agent:instrumentation:google-http-client'
include ':dd-java-agent:instrumentation:grizzly-2'