Fix Ratpack server context propagation and enable its concurrency test (#2910)
This commit is contained in:
parent
5670024178
commit
a568daaf0a
|
@ -74,13 +74,12 @@ public class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation
|
|||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
|
||||
task = newTask;
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(
|
||||
contextStore, newTask, Java8BytecodeBridge.currentContext());
|
||||
contextStore, task, Java8BytecodeBridge.currentContext());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -118,13 +117,12 @@ public class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation
|
|||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
|
||||
Runnable newTask = RunnableWrapper.wrapIfNeeded(task);
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
|
||||
task = newTask;
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
|
||||
task = RunnableWrapper.wrapIfNeeded(task);
|
||||
ContextStore<Runnable, State> contextStore =
|
||||
InstrumentationContext.get(Runnable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(
|
||||
contextStore, newTask, Java8BytecodeBridge.currentContext());
|
||||
contextStore, task, Java8BytecodeBridge.currentContext());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -148,13 +146,12 @@ public class JavaExecutorInstrumentation extends AbstractExecutorInstrumentation
|
|||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static State enterJobSubmit(
|
||||
@Advice.Argument(value = 0, readOnly = false) Callable task) {
|
||||
Callable newTask = CallableWrapper.wrapIfNeeded(task);
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(newTask)) {
|
||||
task = newTask;
|
||||
if (ExecutorInstrumentationUtils.shouldAttachStateToTask(task)) {
|
||||
task = CallableWrapper.wrapIfNeeded(task);
|
||||
ContextStore<Callable, State> contextStore =
|
||||
InstrumentationContext.get(Callable.class, State.class);
|
||||
return ExecutorInstrumentationUtils.setupState(
|
||||
contextStore, newTask, Java8BytecodeBridge.currentContext());
|
||||
contextStore, task, Java8BytecodeBridge.currentContext());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import io.netty.util.Attribute;
|
|||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
import ratpack.handling.Context;
|
||||
import ratpack.handling.Handler;
|
||||
|
||||
|
@ -23,9 +24,14 @@ public final class TracingHandler implements Handler {
|
|||
ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_SPAN);
|
||||
io.opentelemetry.context.Context serverSpanContext = spanAttribute.get();
|
||||
|
||||
// Relying on executor instrumentation to assume the netty span is in context as the parent.
|
||||
// Must use context from channel, as executor instrumentation is not accurate - Ratpack
|
||||
// internally queues events and then drains them in batches, causing executor instrumentation to
|
||||
// attach the same context to a batch of events from different requests.
|
||||
io.opentelemetry.context.Context parentContext =
|
||||
serverSpanContext != null ? serverSpanContext : Java8BytecodeBridge.currentContext();
|
||||
|
||||
io.opentelemetry.context.Context ratpackContext =
|
||||
tracer().startSpan("ratpack.handler", SpanKind.INTERNAL);
|
||||
tracer().startSpan(parentContext, "ratpack.handler", SpanKind.INTERNAL);
|
||||
ctx.getExecution().add(ratpackContext);
|
||||
|
||||
ctx.getResponse()
|
||||
|
|
|
@ -7,11 +7,13 @@ package server
|
|||
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.groovy.test.embed.GroovyEmbeddedApp
|
||||
import ratpack.test.embed.EmbeddedApp
|
||||
|
@ -40,6 +42,18 @@ class RatpackAsyncHttpServerTest extends RatpackHttpServerTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
prefix(INDEXED_CHILD.rawPath()) {
|
||||
all {
|
||||
Promise.sync {
|
||||
INDEXED_CHILD
|
||||
} then {
|
||||
controller(INDEXED_CHILD) {
|
||||
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
|
||||
context.response.status(INDEXED_CHILD.status).send()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prefix(QUERY_PARAM.rawPath()) {
|
||||
all {
|
||||
Promise.sync {
|
||||
|
|
|
@ -7,11 +7,13 @@ package server
|
|||
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import ratpack.exec.Promise
|
||||
import ratpack.groovy.test.embed.GroovyEmbeddedApp
|
||||
import ratpack.test.embed.EmbeddedApp
|
||||
|
@ -40,6 +42,18 @@ class RatpackForkedHttpServerTest extends RatpackHttpServerTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
prefix(INDEXED_CHILD.rawPath()) {
|
||||
all {
|
||||
Promise.sync {
|
||||
INDEXED_CHILD
|
||||
}.fork().then {
|
||||
controller(INDEXED_CHILD) {
|
||||
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
|
||||
context.response.status(INDEXED_CHILD.status).send()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prefix(QUERY_PARAM.rawPath()) {
|
||||
all {
|
||||
Promise.sync {
|
||||
|
|
|
@ -8,11 +8,13 @@ package server
|
|||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.ERROR
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.EXCEPTION
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.INDEXED_CHILD
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.PATH_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.QUERY_PARAM
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.REDIRECT
|
||||
import static io.opentelemetry.instrumentation.test.base.HttpServerTest.ServerEndpoint.SUCCESS
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import io.opentelemetry.api.trace.StatusCode
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
||||
|
@ -43,6 +45,14 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
|
|||
}
|
||||
}
|
||||
}
|
||||
prefix(INDEXED_CHILD.rawPath()) {
|
||||
all {
|
||||
controller(INDEXED_CHILD) {
|
||||
Span.current().setAttribute("test.request.id", request.queryParams.get("id") as long)
|
||||
context.response.status(INDEXED_CHILD.status).send()
|
||||
}
|
||||
}
|
||||
}
|
||||
prefix(QUERY_PARAM.rawPath()) {
|
||||
all {
|
||||
controller(QUERY_PARAM) {
|
||||
|
@ -108,6 +118,11 @@ class RatpackHttpServerTest extends HttpServerTest<EmbeddedApp> implements Agent
|
|||
true
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean testConcurrency() {
|
||||
true
|
||||
}
|
||||
|
||||
@Override
|
||||
void handlerSpan(TraceAssert trace, int index, Object parent, String method = "GET", ServerEndpoint endpoint = SUCCESS) {
|
||||
trace.span(index) {
|
||||
|
|
|
@ -92,6 +92,15 @@ public class ExecutorInstrumentationUtils {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (taskClass.getName().startsWith("ratpack.exec.internal.")) {
|
||||
// Context is passed through Netty channels in Ratpack as executor instrumentation is
|
||||
// not suitable. As the context that would be propagated via executor would be
|
||||
// incorrect, skip the propagation. Not checking for concrete class names as this covers
|
||||
// anonymous classes from ratpack.exec.internal.DefaultExecution and
|
||||
// ratpack.exec.internal.DefaultExecController.
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue