jetty-httpclient-12: send method must pass along implemented response listeners (#12326)
Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
parent
c6c0bb1041
commit
b00354140f
|
@ -9,6 +9,7 @@ import io.opentelemetry.context.Context;
|
|||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal.JettyClientTracingListener;
|
||||
import io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal.JettyClientWrapUtil;
|
||||
import java.net.URI;
|
||||
import java.nio.ByteBuffer;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
|
@ -34,14 +35,9 @@ class TracingHttpRequest extends HttpRequest {
|
|||
@Override
|
||||
public void send(Response.CompleteListener listener) {
|
||||
parentContext = Context.current();
|
||||
// start span and attach listeners.
|
||||
// start span and attach listeners - must handle all listeners, not just CompleteListener.
|
||||
JettyClientTracingListener.handleRequest(parentContext, this, instrumenter);
|
||||
super.send(
|
||||
result -> {
|
||||
try (Scope scope = openScope()) {
|
||||
listener.onComplete(result);
|
||||
}
|
||||
});
|
||||
super.send(JettyClientWrapUtil.wrapTheListener(listener, parentContext));
|
||||
}
|
||||
|
||||
private Scope openScope() {
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.jetty.httpclient.v12_0.internal;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.eclipse.jetty.client.Response;
|
||||
|
||||
/**
|
||||
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
|
||||
* any time.
|
||||
*/
|
||||
public final class JettyClientWrapUtil {
|
||||
private static final Class<?>[] LISTENER_INTERFACES = {
|
||||
Response.CompleteListener.class,
|
||||
Response.FailureListener.class,
|
||||
Response.SuccessListener.class,
|
||||
Response.AsyncContentListener.class,
|
||||
Response.ContentSourceListener.class,
|
||||
Response.ContentListener.class,
|
||||
Response.HeadersListener.class,
|
||||
Response.HeaderListener.class,
|
||||
Response.BeginListener.class
|
||||
};
|
||||
|
||||
private JettyClientWrapUtil() {}
|
||||
|
||||
/**
|
||||
* Utility to wrap the response listeners only, this includes the important CompleteListener.
|
||||
*
|
||||
* @param context top level context that is above the Jetty client span context
|
||||
* @param listener listener passed to Jetty client send() method
|
||||
* @return wrapped listener
|
||||
*/
|
||||
public static Response.CompleteListener wrapTheListener(
|
||||
Response.CompleteListener listener, Context context) {
|
||||
if (listener == null) {
|
||||
return listener;
|
||||
}
|
||||
|
||||
Class<?> listenerClass = listener.getClass();
|
||||
List<Class<?>> interfaces = new ArrayList<>();
|
||||
for (Class<?> type : LISTENER_INTERFACES) {
|
||||
if (type.isInstance(listener)) {
|
||||
interfaces.add(type);
|
||||
}
|
||||
}
|
||||
if (interfaces.isEmpty()) {
|
||||
return listener;
|
||||
}
|
||||
|
||||
return (Response.CompleteListener)
|
||||
Proxy.newProxyInstance(
|
||||
listenerClass.getClassLoader(),
|
||||
interfaces.toArray(new Class<?>[0]),
|
||||
(proxy, method, args) -> {
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
return method.invoke(listener, args);
|
||||
} catch (InvocationTargetException exception) {
|
||||
throw exception.getCause();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
|
@ -5,6 +5,9 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.jetty.httpclient.v12_0;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
|
||||
|
@ -15,13 +18,16 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.FutureResponseListener;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.Request;
|
||||
import org.eclipse.jetty.client.Response;
|
||||
import org.eclipse.jetty.client.Result;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public abstract class AbstractJettyClient12Test extends AbstractHttpClientTest<Request> {
|
||||
|
||||
|
@ -108,6 +114,54 @@ public abstract class AbstractJettyClient12Test extends AbstractHttpClientTest<R
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void callbacksCalled() throws InterruptedException, ExecutionException {
|
||||
URI uri = resolveAddress("/success");
|
||||
Request request = client.newRequest(uri).method("GET");
|
||||
FutureResponseListener responseListener =
|
||||
new FutureResponseListener(request) {
|
||||
@Override
|
||||
public void onHeaders(Response response) {
|
||||
testing.runWithSpan("onHeaders", () -> super.onHeaders(response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSuccess(Response response) {
|
||||
testing.runWithSpan("onSuccess", () -> super.onSuccess(response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Result result) {
|
||||
testing.runWithSpan("onComplete", () -> super.onComplete(result));
|
||||
}
|
||||
};
|
||||
testing.runWithSpan("parent", () -> request.send(responseListener));
|
||||
Response response = responseListener.get();
|
||||
|
||||
assertThat(response.getStatus()).isEqualTo(200);
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
|
||||
span -> span.hasName("GET").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0)),
|
||||
span ->
|
||||
span.hasName("test-http-server")
|
||||
.hasKind(SpanKind.SERVER)
|
||||
.hasParent(trace.getSpan(1)),
|
||||
span ->
|
||||
span.hasName("onHeaders")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)),
|
||||
span ->
|
||||
span.hasName("onSuccess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)),
|
||||
span ->
|
||||
span.hasName("onComplete")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))));
|
||||
}
|
||||
|
||||
private static class JettyClientListener
|
||||
implements Request.FailureListener, Response.FailureListener {
|
||||
volatile Throwable failure;
|
||||
|
|
Loading…
Reference in New Issue