Akka propagation fix and concurrency tests (#3099)

This commit is contained in:
Ago Allikmaa 2021-05-27 19:43:09 +03:00 committed by GitHub
parent 79e42ad665
commit 165bc1e492
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 15 deletions

View File

@ -20,7 +20,7 @@ public final class ContextPropagationDebug {
// locations where the context was propagated to another thread (tracking multiple steps is // locations where the context was propagated to another thread (tracking multiple steps is
// helpful in akka where there is so much recursive async spawning of new work) // helpful in akka where there is so much recursive async spawning of new work)
private static final ContextKey<List<StackTraceElement[]>> THREAD_PROPAGATION_LOCATIONS = private static final ContextKey<List<Propagation>> THREAD_PROPAGATION_LOCATIONS =
ContextKey.named("thread-propagation-locations"); ContextKey.named("thread-propagation-locations");
private static final boolean THREAD_PROPAGATION_DEBUGGER = private static final boolean THREAD_PROPAGATION_DEBUGGER =
@ -36,13 +36,14 @@ public final class ContextPropagationDebug {
return THREAD_PROPAGATION_DEBUGGER; return THREAD_PROPAGATION_DEBUGGER;
} }
public static Context appendLocations(Context context, StackTraceElement[] locations) { public static Context appendLocations(
List<StackTraceElement[]> currentLocations = ContextPropagationDebug.getLocations(context); Context context, StackTraceElement[] locations, Object carrier) {
List<Propagation> currentLocations = ContextPropagationDebug.getPropagations(context);
if (currentLocations == null) { if (currentLocations == null) {
currentLocations = new CopyOnWriteArrayList<>(); currentLocations = new CopyOnWriteArrayList<>();
context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations); context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations);
} }
currentLocations.add(0, locations); currentLocations.add(0, new Propagation(carrier.getClass().getName(), locations));
return context; return context;
} }
@ -67,18 +68,20 @@ public final class ContextPropagationDebug {
} }
} }
private static List<StackTraceElement[]> getLocations(Context context) { private static List<Propagation> getPropagations(Context context) {
return context.get(THREAD_PROPAGATION_LOCATIONS); return context.get(THREAD_PROPAGATION_LOCATIONS);
} }
private static void debugContextPropagation(Context context) { private static void debugContextPropagation(Context context) {
List<StackTraceElement[]> locations = getLocations(context); List<Propagation> propagations = getPropagations(context);
if (locations != null) { if (propagations != null) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Iterator<StackTraceElement[]> i = locations.iterator(); Iterator<Propagation> i = propagations.iterator();
while (i.hasNext()) { while (i.hasNext()) {
for (StackTraceElement ste : i.next()) { Propagation entry = i.next();
sb.append("\n"); sb.append("\ncarrier of type: ").append(entry.carrierClassName);
for (StackTraceElement ste : entry.location) {
sb.append("\n ");
sb.append(ste); sb.append(ste);
} }
if (i.hasNext()) { if (i.hasNext()) {
@ -89,5 +92,15 @@ public final class ContextPropagationDebug {
} }
} }
private static class Propagation {
public final String carrierClassName;
public final StackTraceElement[] location;
public Propagation(String carrierClassName, StackTraceElement[] location) {
this.carrierClassName = carrierClassName;
this.location = location;
}
}
private ContextPropagationDebug() {} private ContextPropagationDebug() {}
} }

View File

@ -14,6 +14,7 @@ import akka.http.javadsl.model.headers.RawHeader
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import spock.lang.Shared import spock.lang.Shared
class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait { class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
@ -67,8 +68,10 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
} }
@Override @Override
boolean testCausality() { SingleConnection createSingleConnection(String host, int port) {
false // singleConnection test would require instrumentation to support requests made through pools
// (newHostConnectionPool, superPool, etc), which is currently not supported.
return null
} }
def "singleRequest exception trace"() { def "singleRequest exception trace"() {

View File

@ -23,6 +23,11 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest<Object>
String expectedServerSpanName(ServerEndpoint endpoint) { String expectedServerSpanName(ServerEndpoint endpoint) {
return "akka.request" return "akka.request"
} }
@Override
boolean testConcurrency() {
return true
}
} }
class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest { class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest {

View File

@ -32,6 +32,12 @@ object AkkaHttpTestAsyncWebServer {
val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain" val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain"
endpoint match { endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody) case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT => case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody)) resp.withHeaders(headers.Location(endpoint.getBody))

View File

@ -30,6 +30,12 @@ object AkkaHttpTestSyncWebServer {
val resp = HttpResponse(status = endpoint.getStatus) val resp = HttpResponse(status = endpoint.getStatus)
endpoint match { endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody) case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT => case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody)) resp.withHeaders(headers.Location(endpoint.getBody))

View File

@ -52,6 +52,13 @@ public class ExecutorInstrumentationUtils {
return false; return false;
} }
// This is a Mailbox created by akka.dispatch.Dispatcher#createMailbox. We must not add
// a context to it as context should only be carried by individual envelopes in the queue
// of this mailbox.
if (taskClass.getName().equals("akka.dispatch.Dispatcher$$anon$1")) {
return false;
}
Class<?> enclosingClass = taskClass.getEnclosingClass(); Class<?> enclosingClass = taskClass.getEnclosingClass();
if (enclosingClass != null) { if (enclosingClass != null) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to // Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
@ -144,7 +151,8 @@ public class ExecutorInstrumentationUtils {
public static <T> State setupState(ContextStore<T, State> contextStore, T task, Context context) { public static <T> State setupState(ContextStore<T, State> contextStore, T task, Context context) {
State state = contextStore.putIfAbsent(task, State.FACTORY); State state = contextStore.putIfAbsent(task, State.FACTORY);
if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) { if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) {
context = ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace()); context =
ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace(), task);
} }
state.setParentContext(context); state.setParentContext(context);
return state; return state;