From 165bc1e492aa52ac8ff1a8503b20baa42761e41a Mon Sep 17 00:00:00 2001 From: Ago Allikmaa Date: Thu, 27 May 2021 19:43:09 +0300 Subject: [PATCH] Akka propagation fix and concurrency tests (#3099) --- .../api/internal/ContextPropagationDebug.java | 33 +++++++++++++------ .../AkkaHttpClientInstrumentationTest.groovy | 7 ++-- .../AkkaHttpServerInstrumentationTest.groovy | 5 +++ .../scala/AkkaHttpTestAsyncWebServer.scala | 8 ++++- .../scala/AkkaHttpTestSyncWebServer.scala | 8 ++++- .../ExecutorInstrumentationUtils.java | 10 +++++- 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java index 6efdf0ecf9..89ff4abb0c 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ContextPropagationDebug.java @@ -20,7 +20,7 @@ public final class ContextPropagationDebug { // locations where the context was propagated to another thread (tracking multiple steps is // helpful in akka where there is so much recursive async spawning of new work) - private static final ContextKey> THREAD_PROPAGATION_LOCATIONS = + private static final ContextKey> THREAD_PROPAGATION_LOCATIONS = ContextKey.named("thread-propagation-locations"); private static final boolean THREAD_PROPAGATION_DEBUGGER = @@ -36,13 +36,14 @@ public final class ContextPropagationDebug { return THREAD_PROPAGATION_DEBUGGER; } - public static Context appendLocations(Context context, StackTraceElement[] locations) { - List currentLocations = ContextPropagationDebug.getLocations(context); + public static Context appendLocations( + Context context, StackTraceElement[] locations, Object carrier) { + List currentLocations = ContextPropagationDebug.getPropagations(context); if (currentLocations == null) { currentLocations = new CopyOnWriteArrayList<>(); context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations); } - currentLocations.add(0, locations); + currentLocations.add(0, new Propagation(carrier.getClass().getName(), locations)); return context; } @@ -67,18 +68,20 @@ public final class ContextPropagationDebug { } } - private static List getLocations(Context context) { + private static List getPropagations(Context context) { return context.get(THREAD_PROPAGATION_LOCATIONS); } private static void debugContextPropagation(Context context) { - List locations = getLocations(context); - if (locations != null) { + List propagations = getPropagations(context); + if (propagations != null) { StringBuilder sb = new StringBuilder(); - Iterator i = locations.iterator(); + Iterator i = propagations.iterator(); while (i.hasNext()) { - for (StackTraceElement ste : i.next()) { - sb.append("\n"); + Propagation entry = i.next(); + sb.append("\ncarrier of type: ").append(entry.carrierClassName); + for (StackTraceElement ste : entry.location) { + sb.append("\n "); sb.append(ste); } if (i.hasNext()) { @@ -89,5 +92,15 @@ public final class ContextPropagationDebug { } } + private static class Propagation { + public final String carrierClassName; + public final StackTraceElement[] location; + + public Propagation(String carrierClassName, StackTraceElement[] location) { + this.carrierClassName = carrierClassName; + this.location = location; + } + } + private ContextPropagationDebug() {} } diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy index a04ef5bca7..42aa0d26fd 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy +++ b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpClientInstrumentationTest.groovy @@ -14,6 +14,7 @@ import akka.http.javadsl.model.headers.RawHeader import akka.stream.ActorMaterializer import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.base.HttpClientTest +import io.opentelemetry.instrumentation.test.base.SingleConnection import spock.lang.Shared class AkkaHttpClientInstrumentationTest extends HttpClientTest implements AgentTestTrait { @@ -67,8 +68,10 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest impl } @Override - boolean testCausality() { - false + SingleConnection createSingleConnection(String host, int port) { + // singleConnection test would require instrumentation to support requests made through pools + // (newHostConnectionPool, superPool, etc), which is currently not supported. + return null } def "singleRequest exception trace"() { diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy index 77dac38131..044c43c906 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy +++ b/instrumentation/akka-http-10.0/javaagent/src/test/groovy/AkkaHttpServerInstrumentationTest.groovy @@ -23,6 +23,11 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest String expectedServerSpanName(ServerEndpoint endpoint) { return "akka.request" } + + @Override + boolean testConcurrency() { + return true + } } class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest { diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala index 9b7e695142..bdebd6b470 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala +++ b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestAsyncWebServer.scala @@ -31,7 +31,13 @@ object AkkaHttpTestAsyncWebServer { def doCall(): HttpResponse = { val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain" endpoint match { - case SUCCESS => resp.withEntity(endpoint.getBody) + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case REDIRECT => resp.withHeaders(headers.Location(endpoint.getBody)) diff --git a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala index 010a820d50..bae57cd00a 100644 --- a/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala +++ b/instrumentation/akka-http-10.0/javaagent/src/test/scala/AkkaHttpTestSyncWebServer.scala @@ -29,7 +29,13 @@ object AkkaHttpTestSyncWebServer { def doCall(): HttpResponse = { val resp = HttpResponse(status = endpoint.getStatus) endpoint match { - case SUCCESS => resp.withEntity(endpoint.getBody) + case SUCCESS => resp.withEntity(endpoint.getBody) + case INDEXED_CHILD => + INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider { + override def getParameter(name: String): String = + uri.query().get(name).orNull + }) + resp.withEntity("") case QUERY_PARAM => resp.withEntity(uri.queryString().orNull) case REDIRECT => resp.withHeaders(headers.Location(endpoint.getBody)) diff --git a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java index b7ea1014af..3c7db80798 100644 --- a/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java +++ b/javaagent-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/concurrent/ExecutorInstrumentationUtils.java @@ -52,6 +52,13 @@ public class ExecutorInstrumentationUtils { return false; } + // This is a Mailbox created by akka.dispatch.Dispatcher#createMailbox. We must not add + // a context to it as context should only be carried by individual envelopes in the queue + // of this mailbox. + if (taskClass.getName().equals("akka.dispatch.Dispatcher$$anon$1")) { + return false; + } + Class enclosingClass = taskClass.getEnclosingClass(); if (enclosingClass != null) { // Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to @@ -144,7 +151,8 @@ public class ExecutorInstrumentationUtils { public static State setupState(ContextStore contextStore, T task, Context context) { State state = contextStore.putIfAbsent(task, State.FACTORY); if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) { - context = ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace()); + context = + ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace(), task); } state.setParentContext(context); return state;