Fix servlet async dispatch

Finish existing trace and propagate to the next request.

Improve dispatch testing.
This commit is contained in:
Tyler Benson 2018-11-14 14:38:38 -08:00
parent 3835157c4f
commit cde02e1ae5
12 changed files with 502 additions and 61 deletions

View File

@ -1,19 +0,0 @@
package datadog.trace.instrumentation.servlet3;
import datadog.trace.agent.tooling.Instrumenter;
public abstract class AbstractServlet3Instrumentation extends Instrumenter.Default {
public AbstractServlet3Instrumentation() {
super("servlet", "servlet-3");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter",
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter$MultivaluedMapFlatIterator",
"datadog.trace.instrumentation.servlet3.TagSettingAsyncListener"
};
}
}

View File

@ -0,0 +1,97 @@
package datadog.trace.instrumentation.servlet3;
import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.instrumentation.servlet3.Servlet3Advice.SERVLET_SPAN;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import io.opentracing.Span;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class AsyncContextInstrumentation extends Instrumenter.Default {
public AsyncContextInstrumentation() {
super("servlet", "servlet-3");
}
@Override
public String[] helperClassNames() {
return new String[] {"datadog.trace.instrumentation.servlet3.HttpServletRequestInjectAdapter"};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(safeHasSuperType(named("javax.servlet.AsyncContext")));
}
@Override
public Map<? extends ElementMatcher, String> transformers() {
return Collections.singletonMap(
isMethod().and(isPublic()).and(named("dispatch")), DispatchAdvice.class.getName());
}
/**
* When a request is dispatched, we want to close out the existing request and replace the
* propagation info in the headers with the closed span.
*/
public static class DispatchAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean enter(
@Advice.This final AsyncContext context, @Advice.AllArguments final Object[] args) {
final int depth = CallDepthThreadLocalMap.incrementCallDepth(AsyncContext.class);
if (depth > 0) {
return false;
}
final ServletRequest request = context.getRequest();
final Object spanAttr = request.getAttribute(SERVLET_SPAN);
if (spanAttr instanceof Span) {
request.removeAttribute(SERVLET_SPAN);
final Span span = (Span) spanAttr;
// Override propagation headers by injecting attributes with new values.
if (request instanceof HttpServletRequest) {
GlobalTracer.get()
.inject(
span.context(),
Format.Builtin.TEXT_MAP,
new HttpServletRequestInjectAdapter((HttpServletRequest) request));
}
final String path;
if (args.length == 1 && args[0] instanceof String) {
path = (String) args[0];
} else if (args.length == 2 && args[1] instanceof String) {
path = (String) args[1];
} else {
path = "true";
}
span.setTag("servlet.dispatch", path);
span.finish();
}
return true;
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void exit(@Advice.Enter final boolean topLevel) {
if (topLevel) {
CallDepthThreadLocalMap.reset(AsyncContext.class);
}
}
}
}

View File

@ -15,7 +15,19 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class FilterChain3Instrumentation extends AbstractServlet3Instrumentation {
public final class FilterChain3Instrumentation extends Instrumenter.Default {
public FilterChain3Instrumentation() {
super("servlet", "servlet-3");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter",
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter$MultivaluedMapFlatIterator",
"datadog.trace.instrumentation.servlet3.TagSettingAsyncListener"
};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {

View File

@ -15,7 +15,19 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(Instrumenter.class)
public final class HttpServlet3Instrumentation extends AbstractServlet3Instrumentation {
public final class HttpServlet3Instrumentation extends Instrumenter.Default {
public HttpServlet3Instrumentation() {
super("servlet", "servlet-3");
}
@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter",
"datadog.trace.instrumentation.servlet3.HttpServletRequestExtractAdapter$MultivaluedMapFlatIterator",
"datadog.trace.instrumentation.servlet3.TagSettingAsyncListener"
};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {

View File

@ -3,6 +3,7 @@ package datadog.trace.instrumentation.servlet3;
import io.opentracing.propagation.TextMap;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@ -52,6 +53,20 @@ public class HttpServletRequestExtractAdapter implements TextMap {
headersResult.put(headerName, valuesList);
}
/*
* Read from the attributes and override the headers.
* This is used by HttpServletRequestInjectAdapter when a request is async-dispatched.
*/
final Enumeration<String> attributeNamesIt = httpServletRequest.getAttributeNames();
while (attributeNamesIt.hasMoreElements()) {
final String attributeName = attributeNamesIt.nextElement();
final Object valuesIt = httpServletRequest.getAttribute(attributeName);
if (valuesIt instanceof String) {
headersResult.put(attributeName, Collections.singletonList((String) valuesIt));
}
}
return headersResult;
}

View File

@ -0,0 +1,27 @@
package datadog.trace.instrumentation.servlet3;
import io.opentracing.propagation.TextMap;
import java.util.Iterator;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
/** Inject into request attributes since the request headers can't be modified. */
public class HttpServletRequestInjectAdapter implements TextMap {
private final HttpServletRequest httpServletRequest;
public HttpServletRequestInjectAdapter(final HttpServletRequest httpServletRequest) {
this.httpServletRequest = httpServletRequest;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException(
"This class should be used only with Tracer.extract()!");
}
@Override
public void put(final String key, final String value) {
httpServletRequest.setAttribute(key, value);
}
}

View File

@ -21,11 +21,13 @@ import javax.servlet.http.HttpServletResponse;
import net.bytebuddy.asm.Advice;
public class Servlet3Advice {
public static final String SERVLET_SPAN = "datadog.servlet.span";
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.This final Object servlet, @Advice.Argument(0) final ServletRequest req) {
if (GlobalTracer.get().activeSpan() != null || !(req instanceof HttpServletRequest)) {
final Object spanAttr = req.getAttribute(SERVLET_SPAN);
if (!(req instanceof HttpServletRequest) || spanAttr != null) {
// Tracing might already be applied by the FilterChain. If so ignore this.
return null;
}
@ -53,6 +55,8 @@ public class Servlet3Advice {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
req.setAttribute(SERVLET_SPAN, scope.span());
return scope;
}
@ -63,13 +67,11 @@ public class Servlet3Advice {
@Advice.Enter final Scope scope,
@Advice.Thrown final Throwable throwable) {
// Set user.principal regardless of who created this span.
final Span currentSpan = GlobalTracer.get().activeSpan();
if (currentSpan != null) {
if (request instanceof HttpServletRequest) {
final Principal principal = ((HttpServletRequest) request).getUserPrincipal();
if (principal != null) {
currentSpan.setTag(DDTags.USER_NAME, principal.getName());
}
final Object spanAttr = request.getAttribute(SERVLET_SPAN);
if (spanAttr instanceof Span && request instanceof HttpServletRequest) {
final Principal principal = ((HttpServletRequest) request).getUserPrincipal();
if (principal != null) {
((Span) spanAttr).setTag(DDTags.USER_NAME, principal.getName());
}
}
@ -90,19 +92,23 @@ public class Servlet3Advice {
((TraceScope) scope).setAsyncPropagation(false);
}
scope.close();
req.removeAttribute(SERVLET_SPAN);
span.finish(); // Finish the span manually since finishSpanOnClose was false
} else if (req.isAsyncStarted()) {
final AtomicBoolean activated = new AtomicBoolean(false);
// what if async is already finished? This would not be called
req.getAsyncContext().addListener(new TagSettingAsyncListener(activated, span));
scope.close();
} else {
Tags.HTTP_STATUS.set(span, resp.getStatus());
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
final AtomicBoolean activated = new AtomicBoolean(false);
if (req.isAsyncStarted()) {
req.getAsyncContext().addListener(new TagSettingAsyncListener(activated, span));
}
// Check again in case the request finished before adding the listener.
if (!req.isAsyncStarted() && activated.compareAndSet(false, true)) {
Tags.HTTP_STATUS.set(span, resp.getStatus());
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}
req.removeAttribute(SERVLET_SPAN);
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
scope.close();
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
}

View File

@ -10,6 +10,7 @@ import io.opentracing.util.GlobalTracer;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletResponse;
@ -72,6 +73,12 @@ public class TagSettingAsyncListener implements AsyncListener {
}
}
/** Re-attach listener for dispatch. */
@Override
public void onStartAsync(final AsyncEvent event) throws IOException {}
public void onStartAsync(final AsyncEvent event) {
final AsyncContext eventAsyncContext = event.getAsyncContext();
if (eventAsyncContext != null) {
eventAsyncContext.addListener(this, event.getSuppliedRequest(), event.getSuppliedResponse());
}
}
}

View File

@ -49,6 +49,10 @@ class JettyServlet3Test extends AgentTestRunner {
servletContext.addServlet(TestServlet3.Sync, "/auth/sync")
servletContext.addServlet(TestServlet3.Async, "/async")
servletContext.addServlet(TestServlet3.Async, "/auth/async")
servletContext.addServlet(TestServlet3.BlockingAsync, "/blocking")
servletContext.addServlet(TestServlet3.DispatchSync, "/dispatch/sync")
servletContext.addServlet(TestServlet3.DispatchAsync, "/dispatch/async")
servletContext.addServlet(TestServlet3.FakeAsync, "/fake")
jettyServer.setHandler(servletContext)
jettyServer.start()
@ -111,40 +115,134 @@ class JettyServlet3Test extends AgentTestRunner {
}
where:
path | expectedResponse | auth | origin | distributedTracing
"async" | "Hello Async" | false | "Async" | false
"sync" | "Hello Sync" | false | "Sync" | false
"auth/async" | "Hello Async" | true | "Async" | false
"auth/sync" | "Hello Sync" | true | "Sync" | false
"async" | "Hello Async" | false | "Async" | true
"sync" | "Hello Sync" | false | "Sync" | true
"auth/async" | "Hello Async" | true | "Async" | true
"auth/sync" | "Hello Sync" | true | "Sync" | true
path | expectedResponse | auth | origin | distributedTracing
"async" | "Hello Async" | false | "Async" | false
"sync" | "Hello Sync" | false | "Sync" | false
"auth/async" | "Hello Async" | true | "Async" | false
"auth/sync" | "Hello Sync" | true | "Sync" | false
"blocking" | "Hello BlockingAsync" | false | "BlockingAsync" | false
"fake" | "Hello FakeAsync" | false | "FakeAsync" | false
"async" | "Hello Async" | false | "Async" | true
"sync" | "Hello Sync" | false | "Sync" | true
"auth/async" | "Hello Async" | true | "Async" | true
"auth/sync" | "Hello Sync" | true | "Sync" | true
"blocking" | "Hello BlockingAsync" | false | "BlockingAsync" | true
"fake" | "Hello FakeAsync" | false | "FakeAsync" | true
}
def "test dispatch #path"() {
setup:
def requestBuilder = new Request.Builder()
.url("http://localhost:$port/dispatch/$path")
.get()
if (distributedTracing) {
requestBuilder.header("x-datadog-trace-id", "123")
requestBuilder.header("x-datadog-parent-id", "456")
}
def response = client.newCall(requestBuilder.build()).execute()
expect:
response.body().string().trim() == "Hello $type"
assertTraces(2) {
trace(0, 1) {
span(0) {
if (distributedTracing) {
traceId "123"
parentId "456"
} else {
parent()
}
serviceName "unnamed-java-app"
operationName "servlet.request"
resourceName "GET /dispatch/$path"
spanType DDSpanTypes.WEB_SERVLET
errored false
tags {
"http.url" "http://localhost:$port/dispatch/$path"
"http.method" "GET"
"span.kind" "server"
"component" "java-web-servlet"
"span.origin.type" "TestServlet3\$Dispatch$type"
"span.type" DDSpanTypes.WEB_SERVLET
"http.status_code" 200
"servlet.dispatch" "/$path"
defaultTags(distributedTracing)
}
}
}
trace(1, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "servlet.request"
resourceName "GET /$path"
spanType DDSpanTypes.WEB_SERVLET
errored false
tags {
"http.url" "http://localhost:$port/$path"
"http.method" "GET"
"span.kind" "server"
"component" "java-web-servlet"
"span.origin.type" "TestServlet3\$$type"
"span.type" DDSpanTypes.WEB_SERVLET
"http.status_code" 200
defaultTags(true)
}
}
}
}
where:
path | distributedTracing
"sync" | true
"sync" | false
"async" | true
"async" | false
type = path.capitalize()
}
def "servlet instrumentation clears state after async request"() {
setup:
def request = new Request.Builder()
.url("http://localhost:$port/async")
.url("http://localhost:$port/$path")
.get()
.build()
def numTraces = 5
def numTraces = 10
for (int i = 0; i < numTraces; ++i) {
client.newCall(request).execute()
}
expect:
assertTraces(numTraces) {
for (int i = 0; i < numTraces; ++i) {
trace(i, 1) {
assertTraces(dispatched ? numTraces * 2 : numTraces) {
for (int i = 0; (dispatched ? i + 1 : i) < TEST_WRITER.size(); i += (dispatched ? 2 : 1)) {
if (dispatched) {
trace(i, 1) {
span(0) {
operationName "servlet.request"
resourceName "GET /dispatch/async"
parent()
}
}
}
trace(dispatched ? i + 1 : i, 1) {
span(0) {
serviceName "unnamed-java-app"
operationName "servlet.request"
resourceName "GET /async"
if (dispatched) {
childOf TEST_WRITER[i][0]
} else {
parent()
}
}
}
}
}
where:
path | dispatched
"async" | false
"dispatch/async" | true
}
def "test #path error servlet call"() {

View File

@ -3,6 +3,7 @@ import groovy.servlet.AbstractHttpServlet
import javax.servlet.annotation.WebServlet
import javax.servlet.http.HttpServletRequest
import javax.servlet.http.HttpServletResponse
import java.util.concurrent.CountDownLatch
class TestServlet3 {
@ -25,11 +26,58 @@ class TestServlet3 {
static class Async extends AbstractHttpServlet {
@Override
void doGet(HttpServletRequest req, HttpServletResponse resp) {
def latch = new CountDownLatch(1)
def context = req.startAsync()
context.start {
latch.await()
resp.writer.print("Hello Async")
context.complete()
}
latch.countDown()
}
}
@WebServlet(asyncSupported = true)
static class BlockingAsync extends AbstractHttpServlet {
@Override
void doGet(HttpServletRequest req, HttpServletResponse resp) {
def latch = new CountDownLatch(1)
def context = req.startAsync()
context.start {
resp.writer.print("Hello BlockingAsync")
context.complete()
latch.countDown()
}
latch.await()
}
}
@WebServlet(asyncSupported = true)
static class DispatchSync extends AbstractHttpServlet {
@Override
void doGet(HttpServletRequest req, HttpServletResponse resp) {
req.startAsync().dispatch("/sync")
}
}
@WebServlet(asyncSupported = true)
static class DispatchAsync extends AbstractHttpServlet {
@Override
void doGet(HttpServletRequest req, HttpServletResponse resp) {
def context = req.startAsync()
context.start {
context.dispatch("/async")
}
}
}
@WebServlet(asyncSupported = true)
static class FakeAsync extends AbstractHttpServlet {
@Override
void doGet(HttpServletRequest req, HttpServletResponse resp) {
def context = req.startAsync()
resp.writer.print("Hello FakeAsync")
context.complete()
}
}
}

View File

@ -52,6 +52,18 @@ class TomcatServlet3Test extends AgentTestRunner {
Tomcat.addServlet(appContext, "asyncServlet", new TestServlet3.Async())
appContext.addServletMappingDecoded("/async", "asyncServlet")
Tomcat.addServlet(appContext, "blockingServlet", new TestServlet3.BlockingAsync())
appContext.addServletMappingDecoded("/blocking", "blockingServlet")
Tomcat.addServlet(appContext, "dispatchServlet", new TestServlet3.DispatchSync())
appContext.addServletMappingDecoded("/dispatch/sync", "dispatchServlet")
Tomcat.addServlet(appContext, "dispatchAsyncServlet", new TestServlet3.DispatchAsync())
appContext.addServletMappingDecoded("/dispatch/async", "dispatchAsyncServlet")
Tomcat.addServlet(appContext, "fakeServlet", new TestServlet3.FakeAsync())
appContext.addServletMappingDecoded("/fake", "fakeServlet")
tomcatServer.start()
System.out.println(
"Tomcat server: http://" + tomcatServer.getHost().getName() + ":" + port + "/")
@ -62,7 +74,7 @@ class TomcatServlet3Test extends AgentTestRunner {
tomcatServer.destroy()
}
def "test #path servlet call (distributed tracing: #distributedTracing)"() {
def "test #path servlet call (auth: #auth, distributed tracing: #distributedTracing)"() {
setup:
def requestBuilder = new Request.Builder()
.url("http://localhost:$port/my-context/$path")
@ -106,11 +118,133 @@ class TomcatServlet3Test extends AgentTestRunner {
}
where:
path | expectedResponse | distributedTracing
"async" | "Hello Async" | false
"sync" | "Hello Sync" | false
"async" | "Hello Async" | true
"sync" | "Hello Sync" | true
path | expectedResponse | origin | distributedTracing
"async" | "Hello Async" | "Async" | false
"sync" | "Hello Sync" | "Sync" | false
"blocking" | "Hello BlockingAsync" | "BlockingAsync" | false
"fake" | "Hello FakeAsync" | "FakeAsync" | false
"async" | "Hello Async" | "Async" | true
"sync" | "Hello Sync" | "Sync" | true
"blocking" | "Hello BlockingAsync" | "BlockingAsync" | true
"fake" | "Hello FakeAsync" | "FakeAsync" | true
}
def "test dispatch #path"() {
setup:
def requestBuilder = new Request.Builder()
.url("http://localhost:$port/my-context/dispatch/$path")
.get()
if (distributedTracing) {
requestBuilder.header("x-datadog-trace-id", "123")
requestBuilder.header("x-datadog-parent-id", "456")
}
def response = client.newCall(requestBuilder.build()).execute()
expect:
response.body().string().trim() == "Hello $type"
assertTraces(2) {
trace(0, 1) {
span(0) {
if (distributedTracing) {
traceId "123"
parentId "456"
} else {
parent()
}
serviceName "my-context"
operationName "servlet.request"
resourceName "GET /my-context/dispatch/$path"
spanType DDSpanTypes.WEB_SERVLET
errored false
tags {
"http.url" "http://localhost:$port/my-context/dispatch/$path"
"http.method" "GET"
"span.kind" "server"
"component" "java-web-servlet"
"span.origin.type" "org.apache.catalina.core.ApplicationFilterChain"
"span.type" DDSpanTypes.WEB_SERVLET
"http.status_code" 200
"servlet.context" "/my-context"
"servlet.dispatch" "/$path"
defaultTags(distributedTracing)
}
}
}
trace(1, 1) {
span(0) {
serviceName "my-context"
operationName "servlet.request"
resourceName "GET /my-context/$path"
spanType DDSpanTypes.WEB_SERVLET
errored false
childOf TEST_WRITER[0][0]
tags {
"http.url" "http://localhost:$port/my-context/$path"
"http.method" "GET"
"span.kind" "server"
"component" "java-web-servlet"
"span.origin.type" "org.apache.catalina.core.ApplicationFilterChain"
"span.type" DDSpanTypes.WEB_SERVLET
"http.status_code" 200
"servlet.context" "/my-context"
defaultTags(true)
}
}
}
}
where:
path | distributedTracing
"sync" | true
"sync" | false
"async" | true
"async" | false
type = path.capitalize()
}
def "servlet instrumentation clears state after async request"() {
setup:
def request = new Request.Builder()
.url("http://localhost:$port/my-context/$path")
.get()
.build()
def numTraces = 10
for (int i = 0; i < numTraces; ++i) {
client.newCall(request).execute()
}
expect:
assertTraces(dispatched ? numTraces * 2 : numTraces) {
for (int i = 0; (dispatched ? i + 1 : i) < TEST_WRITER.size(); i += (dispatched ? 2 : 1)) {
if (dispatched) {
trace(i, 1) {
span(0) {
operationName "servlet.request"
resourceName "GET /my-context/dispatch/async"
parent()
}
}
}
trace(dispatched ? i + 1 : i, 1) {
span(0) {
operationName "servlet.request"
resourceName "GET /my-context/async"
if (dispatched) {
childOf TEST_WRITER[i][0]
} else {
parent()
}
}
}
}
}
where:
path | dispatched
"async" | false
"dispatch/async" | true
}
def "test #path error servlet call"() {
@ -155,7 +289,7 @@ class TomcatServlet3Test extends AgentTestRunner {
"sync" | "Hello Sync"
}
def "test #path error servlet call for non-throwing error"() {
def "test #path non-throwing-error servlet call"() {
setup:
def request = new Request.Builder()
.url("http://localhost:$port/my-context/$path?non-throwing-error=true")

View File

@ -62,6 +62,10 @@ public class HTTPCodec implements Codec<TextMap> {
final String key = entry.getKey().toLowerCase();
final String val = entry.getValue();
if (val == null) {
continue;
}
if (TRACE_ID_KEY.equalsIgnoreCase(key)) {
traceId = validateUInt64BitsID(val);
} else if (SPAN_ID_KEY.equalsIgnoreCase(key)) {