From d3856ad1d7ca901bea9040f9d4cfcaa2680fbfd9 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 28 Jun 2021 11:42:38 +0300 Subject: [PATCH] Context propagation for ratpack Execution.fork() (#3416) --- .../DefaultExecStarterInstrumentation.java | 43 ++++++++++++++++++ .../ratpack/RatpackInstrumentationModule.java | 1 + .../server/RatpackForkedHttpServerTest.groovy | 44 +++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/DefaultExecStarterInstrumentation.java diff --git a/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/DefaultExecStarterInstrumentation.java b/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/DefaultExecStarterInstrumentation.java new file mode 100644 index 0000000000..00df9f1536 --- /dev/null +++ b/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/DefaultExecStarterInstrumentation.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.ratpack; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import ratpack.func.Action; + +public class DefaultExecStarterInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return nameStartsWith("ratpack.exec.internal.DefaultExecController$") + .and(implementsInterface(named("ratpack.exec.ExecStarter"))); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("onComplete") + .or(named("onError")) + .and(takesArgument(0, named("ratpack.func.Action"))), + DefaultExecStarterInstrumentation.class.getName() + "$WrapActionAdvice"); + } + + public static class WrapActionAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void wrapAction(@Advice.Argument(value = 0, readOnly = false) Action action) { + action = ActionWrapper.wrapIfNeeded(action); + } + } +} diff --git a/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java b/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java index 68f6a0ffba..506fb6d438 100644 --- a/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java +++ b/instrumentation/ratpack-1.4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/ratpack/RatpackInstrumentationModule.java @@ -23,6 +23,7 @@ public class RatpackInstrumentationModule extends InstrumentationModule { return asList( new ContinuationInstrumentation(), new DefaultExecutionInstrumentation(), + new DefaultExecStarterInstrumentation(), new ServerErrorHandlerInstrumentation(), new ServerRegistryInstrumentation()); } diff --git a/instrumentation/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy b/instrumentation/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy index 2c04b684df..4eb688cf8c 100644 --- a/instrumentation/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy +++ b/instrumentation/ratpack-1.4/javaagent/src/test/groovy/server/RatpackForkedHttpServerTest.groovy @@ -12,9 +12,17 @@ import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEn import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicServerSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpResponse +import io.opentelemetry.testing.internal.armeria.common.HttpMethod import ratpack.error.ServerErrorHandler +import ratpack.exec.Execution import ratpack.exec.Promise +import ratpack.exec.Result +import ratpack.exec.util.ParallelBatch import ratpack.server.RatpackServer class RatpackForkedHttpServerTest extends RatpackHttpServerTest { @@ -109,6 +117,22 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest { } } } + it.prefix("fork_and_yieldAll") { + it.all {context -> + def promise = Promise.async { upstream -> + Execution.fork().start({ + upstream.accept(Result.success(SUCCESS)) + }) + } + ParallelBatch.of(promise).yieldAll().flatMap { list -> + Promise.sync { list.get(0).value } + } then { endpoint -> + controller(endpoint) { + context.response.status(endpoint.status).send(endpoint.body) + } + } + } + } } } @@ -116,4 +140,24 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest { assert ratpack.bindHost == 'localhost' return ratpack } + + def "test fork and yieldAll"() { + setup: + def url = address.resolve("fork_and_yieldAll").toString() + url = url.replace("http://", "h1c://") + def request = AggregatedHttpRequest.of(HttpMethod.GET, url) + AggregatedHttpResponse response = client.execute(request).aggregate().join() + + expect: + response.status().code() == SUCCESS.status + response.contentUtf8() == SUCCESS.body + + assertTraces(1) { + trace(0, 3) { + basicServerSpan(it, 0, "/fork_and_yieldAll") + basicSpan(it, 1, "/fork_and_yieldAll", span(0)) + basicSpan(it, 2, "controller", span(1)) + } + } + } }