Merge pull request #377 from DataDog/mar-kolya/akka-http-client

akka http client
This commit is contained in:
Tyler Benson 2018-07-25 11:22:17 +10:00 committed by GitHub
commit f893948412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 678 additions and 142 deletions

View File

@ -29,7 +29,7 @@ jobs:
- run:
name: Build Project
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx2G -Xms512M" ./gradlew clean :dd-java-agent:shadowJar compileTestGroovy compileTestScala compileTestJava check -x test -x latestDepTest -x traceAgentTest --build-cache --parallel --stacktrace --no-daemon --max-workers=4
command: GRADLE_OPTS="-Dorg.gradle.jvmargs=-Xmx1G -Xms64M" ./gradlew clean :dd-java-agent:shadowJar compileTestGroovy compileTestScala compileTestJava check -x test -x latestDepTest -x traceAgentTest --build-cache --parallel --stacktrace --no-daemon --max-workers=4
- run:
name: Collect Libs
@ -115,7 +115,7 @@ jobs:
test_8:
<<: *default_test_job
environment:
- JAVA8_HOME: /usr/lib/jvm/java-8-openjdk-amd64
# We are building on Java8, this is our default JVM so no need to set more homes
- TEST_TASK: test latestDepTest jacocoTestReport jacocoTestCoverageVerification
test_9:

View File

@ -41,3 +41,6 @@ dependencies {
test.dependsOn lagomTest
testJava8Minimum += '*Test*.class'
// These classes use Ratpack which requires Java 8. (Currently also incompatible with Java 9.)
testJava8Only += '**/AkkaHttpClientInstrumentationTest.class'

View File

@ -0,0 +1,178 @@
package datadog.trace.instrumentation.akkahttp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.*;
import akka.NotUsed;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.*;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMap;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
import scala.util.Try;
@Slf4j
@AutoService(Instrumenter.class)
public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
public AkkaHttpClientInstrumentation() {
super("akka-http", "akka-http-client");
}
@Override
protected boolean defaultEnabled() {
return false;
}
@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("akka.http.scaladsl.HttpExt");
}
@Override
public String[] helperClassNames() {
return new String[] {
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
AkkaHttpClientInstrumentation.class.getPackage().getName() + ".AkkaHttpClientTransformFlow",
AkkaHttpClientInstrumentation.class.getPackage().getName() + ".AkkaHttpClientTransformFlow$",
AkkaHttpClientInstrumentation.class.getPackage().getName()
+ ".AkkaHttpClientTransformFlow$$anonfun$transform$1",
AkkaHttpClientInstrumentation.class.getPackage().getName()
+ ".AkkaHttpClientTransformFlow$$anonfun$transform$2",
};
}
@Override
public Map<ElementMatcher, String> transformers() {
final Map<ElementMatcher, String> transformers = new HashMap<>();
transformers.put(
named("singleRequest").and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
SingleRequestAdvice.class.getName());
transformers.put(
named("superPool").and(returns(named("akka.stream.scaladsl.Flow"))),
SuperPoolAdvice.class.getName());
return transformers;
}
public static class SingleRequestAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope methodEnter(
@Advice.Argument(value = 0, readOnly = false) HttpRequest request) {
Tracer.SpanBuilder builder =
GlobalTracer.get()
.buildSpan("akka-http.request")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.withTag(Tags.COMPONENT.getKey(), "akka-http-client");
if (request != null) {
builder =
builder
.withTag(Tags.HTTP_METHOD.getKey(), request.method().value())
.withTag(Tags.HTTP_URL.getKey(), request.getUri().toString());
}
Scope scope = builder.startActive(false);
if (request != null) {
AkkaHttpHeaders headers = new AkkaHttpHeaders(request);
GlobalTracer.get().inject(scope.span().context(), Format.Builtin.HTTP_HEADERS, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(value = 0) final HttpRequest request,
@Advice.This final HttpExt thiz,
@Advice.Return final Future<HttpResponse> responseFuture,
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable) {
Span span = scope.span();
if (throwable == null) {
responseFuture.onComplete(new OnCompleteHandler(span), thiz.system().dispatcher());
} else {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();
}
scope.close();
}
}
public static class SuperPoolAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <T> void methodExit(
@Advice.Return(readOnly = false)
Flow<Tuple2<HttpRequest, T>, Tuple2<Try<HttpResponse>, T>, NotUsed> flow) {
flow = AkkaHttpClientTransformFlow.transform(flow);
}
}
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final Span span;
public OnCompleteHandler(Span span) {
this.span = span;
}
@Override
public Void apply(Try<HttpResponse> result) {
if (result.isSuccess()) {
Tags.HTTP_STATUS.set(span, result.get().status().intValue());
} else {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, result.failed().get()));
}
span.finish();
return null;
}
}
public static class AkkaHttpHeaders implements TextMap {
private HttpRequest request;
public AkkaHttpHeaders(HttpRequest request) {
this.request = request;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"This class should be used only with Tracer.inject()!");
}
@Override
public void put(final String name, final String value) {
// It looks like this cast is only needed in Java, Scala would have figured it out
request = (HttpRequest) request.addHeader(RawHeader.create(name, value));
}
public HttpRequest getRequest() {
return request;
}
}
}

View File

@ -0,0 +1,48 @@
package datadog.trace.instrumentation.akkahttp
import java.util.Collections
import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.Supervision
import akka.stream.scaladsl.Flow
import datadog.trace.api.{DDSpanTypes, DDTags}
import io.opentracing.log.Fields.ERROR_OBJECT
import io.opentracing.{Scope, Span}
import io.opentracing.propagation.Format
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import scala.util.{Failure, Success, Try}
object AkkaHttpClientTransformFlow {
def transform[T](flow: Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]): Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed] = {
var span: Span = null
Flow.fromFunction((input: (HttpRequest, T)) => {
val (request, data) = input
val scope = GlobalTracer.get
.buildSpan("akka-http.request")
.withTag(Tags.SPAN_KIND.getKey, Tags.SPAN_KIND_CLIENT)
.withTag(Tags.HTTP_METHOD.getKey, request.method.value)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.HTTP_CLIENT)
.withTag(Tags.COMPONENT.getKey, "akka-http-client")
.withTag(Tags.HTTP_URL.getKey, request.getUri.toString)
.startActive(false)
val headers = new AkkaHttpClientInstrumentation.AkkaHttpHeaders(request)
GlobalTracer.get.inject(scope.span.context, Format.Builtin.HTTP_HEADERS, headers)
span = scope.span
scope.close()
(headers.getRequest, data)
}).via(flow).map(output => {
output._1 match {
case Success(response) => Tags.HTTP_STATUS.set(span, response.status.intValue)
case Failure(e) =>
Tags.ERROR.set(span, true)
span.log(Collections.singletonMap(ERROR_OBJECT, e))
}
span.finish()
output
})
}
}

View File

@ -1,5 +1,6 @@
package datadog.trace.instrumentation.akkahttp;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.*;
import akka.http.javadsl.model.HttpHeader;
@ -127,7 +128,7 @@ public final class AkkaHttpServerInstrumentation extends Instrumenter.Default {
public static void finishSpan(Span span, Throwable t) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap("error.object", t));
span.log(Collections.singletonMap(ERROR_OBJECT, t));
Tags.HTTP_STATUS.set(span, 500);
if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) {

View File

@ -0,0 +1,287 @@
import akka.actor.ActorSystem
import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.japi.Pair
import akka.stream.ActorMaterializer
import akka.stream.StreamTcpException
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.RatpackUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import scala.util.Try
import spock.lang.Shared
import java.util.concurrent.CompletionStage
import java.util.concurrent.ExecutionException
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
import static ratpack.groovy.test.embed.GroovyEmbeddedApp.ratpack
class AkkaHttpClientInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.akka-http-client.enabled", "true")
}
private static final String MESSAGE = "an\nmultiline\nhttp\nresponse"
private static final long TIMEOUT = 10000L
@Shared
def server = ratpack {
handlers {
prefix("success") {
all {
RatpackUtils.handleDistributedRequest(context)
response.status(200).send(MESSAGE)
}
}
prefix("error") {
all {
RatpackUtils.handleDistributedRequest(context)
throw new RuntimeException("error")
}
}
}
}
@Shared
ActorSystem system = ActorSystem.create()
@Shared
ActorMaterializer materializer = ActorMaterializer.create(system)
def pool = Http.get(system).<Integer>superPool(materializer)
def "#route request trace" () {
setup:
def url = server.address.resolve("/" + route).toURL()
HttpRequest request = HttpRequest.create(url.toString())
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
when:
HttpResponse response = responseFuture.toCompletableFuture().get()
String message = readMessage(response)
then:
response.status().intValue() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
}
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
operationName "test-http-server"
childOf(TEST_WRITER[1][0])
errored false
tags {
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /$route"
errored expectedError
tags {
defaultTags()
"$Tags.HTTP_STATUS.key" expectedStatus
"$Tags.HTTP_URL.key" "${server.address}$route"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
if (expectedError) {
"$Tags.ERROR.key" true
}
}
}
}
}
where:
route | expectedStatus | expectedError | expectedMessage
"success" | 200 | false | MESSAGE
"error" | 500 | true | null
}
def "error request trace" () {
setup:
def url = new URL("http://localhost:${server.address.port + 1}/test")
HttpRequest request = HttpRequest.create(url.toString())
CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(request, materializer)
when:
responseFuture.toCompletableFuture().get()
then:
thrown ExecutionException
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /test"
errored true
tags {
defaultTags()
"$Tags.HTTP_URL.key" url.toString()
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
"$Tags.ERROR.key" true
errorTags(StreamTcpException, { it.contains("Tcp command") })
}
}
}
}
}
def "singleRequest exception trace" () {
when:
// Passing null causes NPE in singleRequest
Http.get(system).singleRequest(null, materializer)
then:
thrown NullPointerException
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "akka-http.request"
errored true
tags {
defaultTags()
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
"$Tags.ERROR.key" true
errorTags(NullPointerException)
}
}
}
}
}
def "#route pool request trace" () {
setup:
def url = server.address.resolve("/" + route).toURL()
CompletionStage<Pair<Try<HttpResponse>, Integer>> sink = Source
.<Pair<HttpRequest, Integer>>single(new Pair(HttpRequest.create(url.toString()), 1))
.via(pool)
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>>head(), materializer)
when:
HttpResponse response = sink.toCompletableFuture().get().first().get()
String message = readMessage(response)
then:
response.status().intValue() == expectedStatus
if (expectedMessage != null) {
message == expectedMessage
}
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
operationName "test-http-server"
childOf(TEST_WRITER[1][0])
errored false
tags {
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /$route"
errored expectedError
tags {
defaultTags()
"$Tags.HTTP_STATUS.key" expectedStatus
"$Tags.HTTP_URL.key" "${server.address}$route"
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
if (expectedError) {
"$Tags.ERROR.key" true
}
}
}
}
}
where:
route | expectedStatus | expectedError | expectedMessage
"success" | 200 | false | MESSAGE
"error" | 500 | true | null
}
def "error request pool trace" () {
setup:
def url = new URL("http://localhost:${server.address.port + 1}/test")
CompletionStage<Pair<Try<HttpResponse>, Integer>> sink = Source
.<Pair<HttpRequest, Integer>>single(new Pair(HttpRequest.create(url.toString()), 1))
.via(pool)
.runWith(Sink.<Pair<Try<HttpResponse>, Integer>>head(), materializer)
def response = sink.toCompletableFuture().get().first()
when:
response.get()
then:
thrown StreamTcpException
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
parent()
serviceName "unnamed-java-app"
operationName "akka-http.request"
resourceName "GET /test"
errored true
tags {
defaultTags()
"$Tags.HTTP_URL.key" url.toString()
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.HTTP_CLIENT
"$Tags.COMPONENT.key" "akka-http-client"
"$Tags.ERROR.key" true
errorTags(StreamTcpException, { it.contains("Tcp command") })
}
}
}
}
}
String readMessage(HttpResponse response) {
response.entity().toStrict(TIMEOUT, materializer).toCompletableFuture().get().getData().utf8String()
}
}

View File

@ -1,18 +1,13 @@
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.RatpackUtils
import datadog.trace.api.DDSpanTypes
import io.opentracing.Scope
import io.opentracing.SpanContext
import io.opentracing.propagation.Format
import io.opentracing.propagation.TextMap
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import org.apache.http.HttpResponse
import org.apache.http.client.HttpClient
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.http.message.BasicHeader
import ratpack.handling.Context
import spock.lang.Shared
import static datadog.trace.agent.test.TestUtils.runUnderTrace
@ -24,23 +19,9 @@ class ApacheHttpClientTest extends AgentTestRunner {
def server = ratpack {
handlers {
get {
String msg = "<html><body><h1>Hello test.</h1>\n"
boolean isDDServer = true
if (context.request.getHeaders().contains("is-dd-server")) {
isDDServer = Boolean.parseBoolean(context.request.getHeaders().get("is-dd-server"))
}
if (isDDServer) {
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.HTTP_HEADERS, new RatpackResponseAdapter(context))
Scope scope =
GlobalTracer.get()
.buildSpan("test-http-server")
.asChildOf(extractedContext)
.startActive(true)
scope.close()
}
RatpackUtils.handleDistributedRequest(context)
String msg = "<html><body><h1>Hello test.</h1>\n"
response.status(200).send(msg)
}
}
@ -128,22 +109,4 @@ class ApacheHttpClientTest extends AgentTestRunner {
clientSpan.getTags()[Tags.PEER_PORT.getKey()] == server.getAddress().port
clientSpan.getTags()[Tags.SPAN_KIND.getKey()] == Tags.SPAN_KIND_CLIENT
}
private static class RatpackResponseAdapter implements TextMap {
final Context context
RatpackResponseAdapter(Context context) {
this.context = context
}
@Override
void put(String key, String value) {
context.response.set(key, value)
}
@Override
Iterator<Map.Entry<String, String>> iterator() {
return context.request.getHeaders().asMultiValueMap().entrySet().iterator()
}
}
}

View File

@ -13,6 +13,8 @@
*/
package datadog.trace.instrumentation.aws.v0;
import static io.opentracing.log.Fields.*;
import com.amazonaws.AmazonWebServiceResponse;
import com.amazonaws.Request;
import com.amazonaws.Response;
@ -95,15 +97,15 @@ class SpanDecorator {
private static Map<String, Object> errorLogs(final Throwable throwable) {
final Map<String, Object> errorLogs = new HashMap<>(4);
errorLogs.put("event", Tags.ERROR.getKey());
errorLogs.put("error.kind", throwable.getClass().getName());
errorLogs.put("error.object", throwable);
errorLogs.put(EVENT, Tags.ERROR.getKey());
errorLogs.put(ERROR_KIND, throwable.getClass().getName());
errorLogs.put(ERROR_OBJECT, throwable);
errorLogs.put("message", throwable.getMessage());
errorLogs.put(MESSAGE, throwable.getMessage());
final StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
errorLogs.put("stack", sw.toString());
errorLogs.put(STACK, sw.toString());
return errorLogs;
}

View File

@ -13,6 +13,8 @@
*/
package datadog.trace.instrumentation.aws.v106;
import static io.opentracing.log.Fields.*;
import com.amazonaws.AmazonWebServiceResponse;
import com.amazonaws.Request;
import com.amazonaws.Response;
@ -95,15 +97,15 @@ class SpanDecorator {
private static Map<String, Object> errorLogs(final Throwable throwable) {
final Map<String, Object> errorLogs = new HashMap<>(4);
errorLogs.put("event", Tags.ERROR.getKey());
errorLogs.put("error.kind", throwable.getClass().getName());
errorLogs.put("error.object", throwable);
errorLogs.put(EVENT, Tags.ERROR.getKey());
errorLogs.put(ERROR_KIND, throwable.getClass().getName());
errorLogs.put(ERROR_OBJECT, throwable);
errorLogs.put("message", throwable.getMessage());
errorLogs.put(MESSAGE, throwable.getMessage());
final StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
errorLogs.put("stack", sw.toString());
errorLogs.put(STACK, sw.toString());
return errorLogs;
}

View File

@ -58,8 +58,7 @@ public class CassandraClientInstrumentation extends Instrumenter.Default {
* com.datastax.driver.core.Cluster$Manager.newSession() method is called. The opentracing
* contribution is a simple wrapper, so we just have to wrap the new session.
*
* @param session The fresh session to patch
* @return A new tracing session
* @param session The fresh session to patch. This session is replaced with new session
* @throws Exception
*/
@Advice.OnMethodExit(suppress = Throwable.class)

View File

@ -13,6 +13,8 @@
*/
package datadog.trace.instrumentation.datastax.cassandra;
import static io.opentracing.log.Fields.*;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.CloseFuture;
import com.datastax.driver.core.Cluster;
@ -295,15 +297,15 @@ class TracingSession implements Session {
private static Map<String, Object> errorLogs(final Throwable throwable) {
final Map<String, Object> errorLogs = new HashMap<>(4);
errorLogs.put("event", Tags.ERROR.getKey());
errorLogs.put("error.kind", throwable.getClass().getName());
errorLogs.put("error.object", throwable);
errorLogs.put(EVENT, Tags.ERROR.getKey());
errorLogs.put(ERROR_KIND, throwable.getClass().getName());
errorLogs.put(ERROR_OBJECT, throwable);
errorLogs.put("message", throwable.getMessage());
errorLogs.put(MESSAGE, throwable.getMessage());
final StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
errorLogs.put("stack", sw.toString());
errorLogs.put(STACK, sw.toString());
return errorLogs;
}

View File

@ -5,9 +5,13 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDTags
import io.opentracing.tag.Tags
import org.cassandraunit.utils.EmbeddedCassandraServerHelper
import spock.lang.Shared
class CassandraClientTest extends AgentTestRunner {
@Shared
Cluster cluster
def setupSpec() {
/*
This timeout seems excessive but we've seen tests fail with timeout of 40s.
@ -16,6 +20,14 @@ class CassandraClientTest extends AgentTestRunner {
tests would have to assume they run under shared Cassandra and act accordingly.
*/
EmbeddedCassandraServerHelper.startEmbeddedCassandra(120000L)
cluster = EmbeddedCassandraServerHelper.getCluster()
/*
Looks like sometimes our requests fail because Cassandra takes to long to respond,
Increase this timeout as well to try to cope with this.
*/
cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(120000)
}
def cleanupSpec() {
@ -24,7 +36,6 @@ class CassandraClientTest extends AgentTestRunner {
def "sync traces"() {
setup:
final Cluster cluster = EmbeddedCassandraServerHelper.getCluster()
final Session session = cluster.newSession()
session.execute("DROP KEYSPACE IF EXISTS sync_test")
@ -57,7 +68,6 @@ class CassandraClientTest extends AgentTestRunner {
def "async traces"() {
setup:
final Cluster cluster = EmbeddedCassandraServerHelper.getCluster()
final Session session = cluster.connectAsync().get()
session.executeAsync("DROP KEYSPACE IF EXISTS async_test").get()

View File

@ -38,7 +38,7 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner {
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
// Since we use listeners to close spans this should make our span closing deterministic which is good for tests
.put("thread_pool.listener.size", 1)
.put("threadpool.listener.size", 1)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.build()

View File

@ -49,7 +49,7 @@ class Elasticsearch2SpringTemplateTest extends AgentTestRunner {
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
// Since we use listeners to close spans this should make our span closing deterministic which is good for tests
.put("thread_pool.listener.size", 1)
.put("threadpool.listener.size", 1)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.build()

View File

@ -50,7 +50,7 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner {
client = TransportClient.builder().settings(
Settings.builder()
// Since we use listeners to close spans this should make our span closing deterministic which is good for tests
.put("thread_pool.listener.size", 1)
.put("threadpool.listener.size", 1)
.put("cluster.name", "test-cluster")
.build()
).build()

View File

@ -1,13 +1,9 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.RatpackUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import io.opentracing.Scope
import io.opentracing.SpanContext
import io.opentracing.propagation.Format
import io.opentracing.propagation.TextMap
import io.opentracing.tag.Tags
import io.opentracing.util.GlobalTracer
import ratpack.handling.Context
import spock.lang.Shared
import static datadog.trace.agent.test.ListWriterAssert.assertTraces
@ -28,21 +24,7 @@ class HttpUrlConnectionTest extends AgentTestRunner {
def server = ratpack {
handlers {
all {
boolean isDDServer = true
if (context.request.getHeaders().contains("is-dd-server")) {
isDDServer = Boolean.parseBoolean(context.request.getHeaders().get("is-dd-server"))
}
if (isDDServer) {
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.HTTP_HEADERS, new RatpackResponseAdapter(context))
Scope scope =
GlobalTracer.get()
.buildSpan("test-http-server")
.asChildOf((SpanContext) extractedContext)
.startActive(true)
scope.close()
}
RatpackUtils.handleDistributedRequest(context)
response.status(STATUS)
// Ratpack seems to be sending body with HEAD requests - RFC specifically forbids this.
@ -442,22 +424,4 @@ class HttpUrlConnectionTest extends AgentTestRunner {
}
}
}
private static class RatpackResponseAdapter implements TextMap {
final Context context
RatpackResponseAdapter(Context context) {
this.context = context
}
@Override
void put(String key, String value) {
context.response.set(key, value)
}
@Override
Iterator<Map.Entry<String, String>> iterator() {
return context.request.getHeaders().asMultiValueMap().entrySet().iterator()
}
}
}

View File

@ -19,7 +19,7 @@ subprojects { subProj ->
subProj.byteBuddy {
transformation {
// Applying NoOp optimizes build by applying bytebuddy plugin to only compileJava task
tasks = ['compileJava']
tasks = ['compileJava', 'compileScala']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin$NoOp'
}
}
@ -27,7 +27,7 @@ subprojects { subProj ->
subProj.afterEvaluate {
subProj.byteBuddy {
transformation {
tasks = ['compileJava']
tasks = ['compileJava', 'compileScala']
plugin = 'datadog.trace.agent.tooling.muzzle.MuzzleGradlePlugin'
classPath = project(':dd-java-agent:agent-tooling').configurations.instrumentationMuzzle + subProj.configurations.compile + subProj.sourceSets.main.output
}

View File

@ -1,6 +1,7 @@
package datadog.trace.instrumentation.jedis;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
@ -82,7 +83,7 @@ public final class JedisInstrumentation extends Instrumenter.Default {
if (throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap("error.object", throwable));
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
scope.close();
}

View File

@ -44,11 +44,10 @@ class KafkaClientTest extends AgentTestRunner {
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
WRITER_PHASER.register()
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})

View File

@ -43,14 +43,13 @@ class KafkaStreamsTest extends AgentTestRunner {
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED))
// create a thread safe queue to store the processed message
WRITER_PHASER.register()
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("testing", 123)
records.add(record)
}
@ -69,7 +68,7 @@ class KafkaStreamsTest extends AgentTestRunner {
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}

View File

@ -1,5 +1,7 @@
package datadog.trace.instrumentation.lettuce;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDTags;
import io.lettuce.core.ConnectionFuture;
import io.lettuce.core.RedisURI;
@ -44,7 +46,7 @@ public class ConnectionFutureAdvice {
if (throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap("error.object", throwable));
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
scope.close();
return;
}

View File

@ -1,5 +1,7 @@
package datadog.trace.instrumentation.lettuce;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
@ -31,7 +33,7 @@ public class LettuceAsyncBiFunction<T extends Object, U extends Throwable, R ext
this.span.setTag("db.command.cancelled", true);
} else {
Tags.ERROR.set(this.span, true);
this.span.log(Collections.singletonMap("error.object", throwable));
this.span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
}
this.span.finish();

View File

@ -1,5 +1,7 @@
package datadog.trace.instrumentation.lettuce;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDTags;
import io.lettuce.core.protocol.AsyncCommand;
import io.lettuce.core.protocol.RedisCommand;
@ -44,7 +46,7 @@ public class LettuceAsyncCommandsAdvice {
final Span span = scope.span();
if (throwable != null) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap("error.object", throwable));
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();
scope.close();
return;

View File

@ -1,5 +1,7 @@
package datadog.trace.instrumentation.lettuce.rx;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.opentracing.Scope;
@ -36,7 +38,7 @@ public class LettuceFluxTerminationRunnable implements Consumer<Signal>, Runnabl
}
if (throwable != null) {
Tags.ERROR.set(this.span, true);
this.span.log(Collections.singletonMap("error.object", throwable));
this.span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
this.span.finish();
} else {

View File

@ -1,5 +1,7 @@
package datadog.trace.instrumentation.lettuce.rx;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.lettuce.LettuceInstrumentationUtil;
import io.opentracing.Scope;
@ -29,7 +31,7 @@ public class LettuceMonoDualConsumer<R, T, U extends Throwable>
if (this.span != null) {
if (throwable != null) {
Tags.ERROR.set(this.span, true);
this.span.log(Collections.singletonMap("error.object", throwable));
this.span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
this.span.finish();
} else {

View File

@ -1,11 +1,12 @@
package datadog.trace.instrumentation.okhttp3;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.net.Inet4Address;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import okhttp3.Connection;
import okhttp3.Request;
import okhttp3.Response;
@ -64,7 +65,7 @@ public interface OkHttpClientSpanDecorator {
@Override
public void onError(final Throwable throwable, final Span span) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(errorLogs(throwable));
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
@Override
@ -81,13 +82,5 @@ public interface OkHttpClientSpanDecorator {
Tags.PEER_HOST_IPV6.set(span, connection.socket().getInetAddress().toString());
}
}
protected Map<String, Object> errorLogs(final Throwable throwable) {
final Map<String, Object> errorLogs = new HashMap<>(2);
errorLogs.put("event", Tags.ERROR.getKey());
errorLogs.put("error.object", throwable);
return errorLogs;
}
};
}

View File

@ -2,6 +2,7 @@ package datadog.trace.instrumentation.play;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClassWithMethod;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.*;
import akka.japi.JavaPartialFunction;
@ -193,7 +194,7 @@ public final class PlayInstrumentation extends Instrumenter.Default {
public static void onError(final Span span, final Throwable t) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap("error.object", t));
span.log(Collections.singletonMap(ERROR_OBJECT, t));
Tags.HTTP_STATUS.set(span, 500);
}
}

View File

@ -14,7 +14,7 @@ import net.spy.memcached.internal.CheckedOperationTimeoutException
import net.spy.memcached.ops.Operation
import net.spy.memcached.ops.OperationQueueFactory
import org.testcontainers.containers.GenericContainer
import spock.lang.Requires
import spock.lang.Shared
import java.util.concurrent.ArrayBlockingQueue
@ -30,6 +30,9 @@ import static CompletionListener.SPAN_TYPE
import static datadog.trace.agent.test.TestUtils.runUnderTrace
import static net.spy.memcached.ConnectionFactoryBuilder.Protocol.BINARY
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
// It is fine to run on CI because CI provides memcached externally, not through testcontainers
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
class SpymemcachedTest extends AgentTestRunner {
static {

View File

@ -155,12 +155,19 @@ public class TracingAgent {
private static ClassLoader getPlatformClassLoader()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// must invoke ClassLoader.getPlatformClassLoader by reflection to remain compatible with java 7
// + 8.
/*
Must invoke ClassLoader.getPlatformClassLoader by reflection to remain
compatible with java 7 + 8.
*/
final Method method = ClassLoader.class.getDeclaredMethod("getPlatformClassLoader");
return (ClassLoader) method.invoke(null);
}
/**
* Main entry point.
*
* @param args command line agruments
*/
public static void main(final String... args) {
try {
System.out.println(getAgentVersion());
@ -184,7 +191,9 @@ public class TracingAgent {
new InputStreamReader(
TracingAgent.class.getResourceAsStream("/dd-java-agent.version"), "UTF-8");
output = new BufferedReader(input);
for (int c = output.read(); c != -1; c = output.read()) sb.append((char) c);
for (int c = output.read(); c != -1; c = output.read()) {
sb.append((char) c);
}
} finally {
if (null != input) {
input.close();

View File

@ -0,0 +1,47 @@
package datadog.trace.agent.test
import io.opentracing.Scope
import io.opentracing.SpanContext
import io.opentracing.propagation.Format
import io.opentracing.propagation.TextMap
import io.opentracing.util.GlobalTracer
import ratpack.handling.Context
class RatpackUtils {
static handleDistributedRequest(Context context) {
boolean isDDServer = true
if (context.request.getHeaders().contains("is-dd-server")) {
isDDServer = Boolean.parseBoolean(context.request.getHeaders().get("is-dd-server"))
}
if (isDDServer) {
final SpanContext extractedContext =
GlobalTracer.get()
.extract(Format.Builtin.HTTP_HEADERS, new RatpackResponseAdapter(context))
Scope scope =
GlobalTracer.get()
.buildSpan("test-http-server")
.asChildOf(extractedContext)
.startActive(true)
scope.close()
}
}
private static class RatpackResponseAdapter implements TextMap {
final Context context
RatpackResponseAdapter(Context context) {
this.context = context
}
@Override
void put(String key, String value) {
context.response.set(key, value)
}
@Override
Iterator<Map.Entry<String, String>> iterator() {
return context.request.getHeaders().asMultiValueMap().entrySet().iterator()
}
}
}

View File

@ -16,7 +16,6 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.agent.builder.AgentBuilder;
@ -69,8 +68,6 @@ public abstract class AgentTestRunner extends Specification {
private static final Instrumentation instrumentation;
private static volatile ClassFileTransformer activeTransformer = null;
protected static final Phaser WRITER_PHASER = new Phaser();
static {
instrumentation = ByteBuddyAgent.getInstrumentation();
@ -82,7 +79,6 @@ public abstract class AgentTestRunner extends Specification {
@Override
public boolean add(final List<DDSpan> trace) {
final boolean result = super.add(trace);
WRITER_PHASER.arriveAndDeregister();
return result;
}
};
@ -136,16 +132,15 @@ public abstract class AgentTestRunner extends Specification {
@Before
public void beforeTest() {
TEST_WRITER.start();
WRITER_PHASER.register();
INSTRUMENTATION_ERROR_COUNT.set(0);
ERROR_LISTENER.activateTest(this);
assert getTestTracer().activeSpan() == null;
assert getTestTracer().activeSpan() == null : "Span is active before test has started";
}
@After
public void afterTest() {
ERROR_LISTENER.deactivateTest(this);
assert INSTRUMENTATION_ERROR_COUNT.get() == 0;
assert INSTRUMENTATION_ERROR_COUNT.get() == 0 : "Instrumentation errors during test";
}
@AfterClass

View File

@ -5,8 +5,8 @@ minimumInstructionCoverage = 0.6
excludedClassesConverage += [
'datadog.trace.agent.test.*Assert',
'datadog.trace.agent.test.AgentTestRunner.ErrorCountingListener',
'datadog.trace.agent.test.TestUtils',
'datadog.trace.agent.test.OkHttpUtils'
'datadog.trace.agent.test.*Utils',
'datadog.trace.agent.test.*Utils.*'
]
dependencies {
@ -19,6 +19,8 @@ dependencies {
compile deps.guava
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.10.0'
// Note: this should be the same version as in java.gradle
compileOnly group: 'io.ratpack', name: 'ratpack-groovy-test', version: '1.4.6'
compile project(':dd-trace-ot')
compile project(':dd-java-agent:agent-tooling')

View File

@ -7,13 +7,14 @@ buildscript {
classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4.7.3"
classpath "com.jfrog.bintray.gradle:gradle-bintray-plugin:1.8.1"
classpath "net.ltgt.gradle:gradle-errorprone-plugin:0.0.14"
classpath 'org.unbroken-dome.gradle-plugins:gradle-testsets-plugin:1.5.0'
classpath "org.unbroken-dome.gradle-plugins:gradle-testsets-plugin:1.5.0"
}
}
plugins {
id 'com.gradle.build-scan' version '1.14'
id 'com.github.sherter.google-java-format' version '0.7.1'
id 'com.dorongold.task-tree' version '1.3'
}
def isCI = System.getenv("CI") != null

View File

@ -30,6 +30,23 @@ if (configurations.find { it.name == 'jmh' }) {
eclipse.classpath.plusConfigurations += [configurations.jmh]
}
jar {
/*
Make Jar build fail on duplicate files
By default Gradle Jar task can put multiple files with the same name
into a Jar. This may lead to confusion. For example if auto-service
annotation processing creates files with same name in `scala` and
`java` directory this would result in Jar having two files with the
same name in it. Which in turn would result in only one of those
files being actually considered when that Jar is used leading to very
confusing failures.
Instead we should 'fail early' and avoid building such Jars.
*/
duplicatesStrategy = 'fail'
}
task packageSources(type: Jar) {
classifier = 'sources'
from sourceSets.main.allSource