Test failing async http request (#2812)

This commit is contained in:
Lauri Tulmin 2021-04-20 00:11:27 +03:00 committed by GitHub
parent bd829a6494
commit 9464134ffd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 331 additions and 171 deletions

View File

@ -14,7 +14,6 @@ import akka.http.javadsl.model.headers.RawHeader
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import spock.lang.Shared
class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
@ -42,9 +41,9 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
}
@Override
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
Http.get(system).singleRequest(request, materializer).thenAccept {
callback.accept(it.status().intValue())
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
Http.get(system).singleRequest(request, materializer).whenComplete {response, throwable ->
requestResult.complete({ response.status().intValue() }, throwable)
}
}

View File

@ -6,7 +6,6 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CancellationException
import java.util.function.Consumer
import org.apache.http.HttpResponse
import org.apache.http.client.config.RequestConfig
import org.apache.http.concurrent.FutureCallback
@ -45,16 +44,16 @@ class ApacheHttpAsyncClientTest extends HttpClientTest<HttpUriRequest> implement
}
@Override
void sendRequestWithCallback(HttpUriRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpUriRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.execute(request, new FutureCallback<HttpResponse>() {
@Override
void completed(HttpResponse httpResponse) {
callback.accept(httpResponse.statusLine.statusCode)
requestResult.complete(httpResponse.statusLine.statusCode)
}
@Override
void failed(Exception e) {
throw e
requestResult.complete(e)
}
@Override

View File

@ -48,10 +48,14 @@ abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTes
}
// compilation fails with @Override annotation on this method (groovy quirk?)
void sendRequestWithCallback(T request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
executeRequestWithCallback(request, uri) {
it.entity?.content?.close() // Make sure the connection is closed.
callback.accept(it.statusLine.statusCode)
void sendRequestWithCallback(T request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
try {
executeRequestWithCallback(request, uri) {
it.entity?.content?.close() // Make sure the connection is closed.
requestResult.complete(it.statusLine.statusCode)
}
} catch (Throwable throwable) {
requestResult.complete(throwable)
}
}

View File

@ -53,10 +53,14 @@ abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTes
}
// compilation fails with @Override annotation on this method (groovy quirk?)
void sendRequestWithCallback(T request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
executeRequestWithCallback(request, uri) {
it.close() // Make sure the connection is closed.
callback.accept(it.code)
void sendRequestWithCallback(T request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
try {
executeRequestWithCallback(request, uri) {
it.close() // Make sure the connection is closed.
requestResult.complete(it.code)
}
} catch (Throwable throwable) {
requestResult.complete(throwable)
}
}

View File

@ -14,7 +14,6 @@ import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CompletionException
import java.util.function.Consumer
import spock.lang.Shared
abstract class AbstractArmeriaHttpClientTest extends HttpClientTest<HttpRequest> {
@ -46,9 +45,9 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest<HttpRequest>
}
@Override
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
client.execute(request).aggregate().thenAccept {
callback.accept(it.status().code())
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.execute(request).aggregate().whenComplete {response, throwable ->
requestResult.complete({ response.status().code() }, throwable)
}
}
@ -70,6 +69,12 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest<HttpRequest>
false
}
// TODO: context not propagated to callback
@Override
boolean testErrorWithCallback() {
return false
}
@Override
List<AttributeKey<?>> extraAttributes() {
[

View File

@ -11,7 +11,6 @@ import com.ning.http.client.Response
import com.ning.http.client.uri.Uri
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import spock.lang.AutoCleanup
import spock.lang.Shared
@ -37,14 +36,19 @@ class AsyncHttpClientTest extends HttpClientTest<Request> implements AgentTestTr
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
requestResult.complete(response.statusCode)
return null
}
@Override
void onThrowable(Throwable throwable) {
requestResult.complete(throwable)
}
})
}

View File

@ -5,7 +5,6 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.Dsl
import org.asynchttpclient.Request
@ -37,13 +36,18 @@ class AsyncHttpClientTest extends HttpClientTest<Request> implements AgentTestTr
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
requestResult.complete(response.statusCode)
return null
}
@Override
void onThrowable(Throwable throwable) {
requestResult.complete(throwable)
}
})
}

View File

@ -5,7 +5,6 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
@ -45,12 +44,16 @@ class SpringRestTemplateTest extends HttpClientTest<HttpEntity<String>> implemen
}
@Override
void sendRequestWithCallback(HttpEntity<String> request, String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
restTemplate.execute(uri, HttpMethod.valueOf(method), { req ->
req.getHeaders().putAll(request.getHeaders())
}, { response ->
callback.accept(response.statusCode.value())
})
void sendRequestWithCallback(HttpEntity<String> request, String method, URI uri, Map<String, String> headers = [:], RequestResult requestResult) {
try {
restTemplate.execute(uri, HttpMethod.valueOf(method), { req ->
req.getHeaders().putAll(request.getHeaders())
}, { response ->
requestResult.complete(response.statusCode.value())
})
} catch (ResourceAccessException exception) {
requestResult.complete(exception.getCause())
}
}
@Override

View File

@ -13,7 +13,6 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.function.Consumer
import spock.lang.Requires
import spock.lang.Shared
@ -40,10 +39,10 @@ class JdkHttpClientTest extends HttpClientTest<HttpRequest> implements AgentTest
}
@Override
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenAccept {
callback.accept(it.statusCode())
.whenComplete {response, throwable ->
requestResult.complete({ response.statusCode() }, throwable?.getCause())
}
}
@ -65,6 +64,12 @@ class JdkHttpClientTest extends HttpClientTest<HttpRequest> implements AgentTest
false
}
// TODO: context not propagated to callback
@Override
boolean testErrorWithCallback() {
return false
}
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
def "test https request"() {
given:

View File

@ -11,6 +11,7 @@ import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import java.net.URI;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientResponseContext;
@ -56,6 +57,14 @@ public class JaxRsClientTracer
return SETTER;
}
@Override
protected Throwable unwrapThrowable(Throwable throwable) {
if (throwable instanceof ProcessingException) {
throwable = throwable.getCause();
}
return super.unwrapThrowable(throwable);
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.jaxrs-client-2.0";

View File

@ -10,7 +10,7 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import javax.ws.rs.ProcessingException
import javax.ws.rs.client.ClientBuilder
import javax.ws.rs.client.Entity
import javax.ws.rs.client.Invocation
@ -33,25 +33,32 @@ abstract class JaxRsClientTest extends HttpClientTest<Invocation.Builder> implem
@Override
int sendRequest(Invocation.Builder request, String method, URI uri, Map<String, String> headers) {
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
def response = request.build(method, body).invoke()
response.close()
return response.status
try {
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
def response = request.build(method, body).invoke()
response.close()
return response.status
} catch (ProcessingException exception) {
throw exception.getCause()
}
}
@Override
void sendRequestWithCallback(Invocation.Builder request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Invocation.Builder request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
request.async().method(method, (Entity) body, new InvocationCallback<Response>() {
@Override
void completed(Response response) {
callback.accept(response.status)
requestResult.complete(response.status)
}
@Override
void failed(Throwable throwable) {
throw throwable
if (throwable instanceof ProcessingException) {
throwable = throwable.getCause()
}
requestResult.complete(throwable)
}
})
}
@ -117,18 +124,8 @@ class JerseyClientTest extends JaxRsClientTest {
}
@Override
boolean testCircularRedirects() {
false
}
// TODO jaxrs client instrumentation captures the (default) user-agent on the second (reused)
// request, which then fails the test verification
// ideally the instrumentation would capture the default user-agent on the first request,
// and the test http server would verify that the user-agent was sent and matches what was
// captured from the client instrumentation
@Override
boolean testReusedRequest() {
false
int maxRedirects() {
20
}
}

View File

@ -11,6 +11,7 @@ import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import java.net.URI;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Response;
import org.jboss.resteasy.client.jaxrs.internal.ClientInvocation;
@ -56,6 +57,14 @@ public class ResteasyClientTracer
return SETTER;
}
@Override
protected Throwable unwrapThrowable(Throwable throwable) {
if (throwable instanceof ProcessingException) {
throwable = throwable.getCause();
}
return super.unwrapThrowable(throwable);
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.jaxrs-client-2.0-resteasy-2.0";

View File

@ -20,7 +20,6 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.AutoCleanup
import spock.lang.Shared
@ -53,14 +52,19 @@ class Netty38ClientTest extends HttpClientTest<Request> implements AgentTestTrai
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
requestResult.complete(response.statusCode)
return null
}
@Override
void onThrowable(Throwable throwable) {
requestResult.complete(throwable)
}
})
}

View File

@ -19,7 +19,6 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.AutoCleanup
import spock.lang.Shared
@ -50,14 +49,19 @@ class Netty38ClientTest extends HttpClientTest<Request> implements AgentTestTrai
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
requestResult.complete(response.statusCode)
return null
}
@Override
void onThrowable(Throwable throwable) {
requestResult.complete(throwable)
}
})
}

View File

@ -8,7 +8,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/*
Bridges from async Netty world to the sync world of our http client tests.
@ -16,11 +15,9 @@ When request initiated by a test gets a response, calls a given callback and com
future with response's status code.
*/
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Consumer<Integer> callback;
private final CompletableFuture<Integer> responseCode;
public ClientHandler(Consumer<Integer> callback, CompletableFuture<Integer> responseCode) {
this.callback = callback;
public ClientHandler(CompletableFuture<Integer> responseCode) {
this.responseCode = responseCode;
}
@ -29,10 +26,6 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
if (msg instanceof HttpResponse) {
ctx.pipeline().remove(this);
if (callback != null) {
callback.accept(((HttpResponse) msg).getStatus().code());
}
HttpResponse response = (HttpResponse) msg;
responseCode.complete(response.getStatus().code());
}
@ -40,7 +33,7 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
responseCode.completeExceptionally(cause);
ctx.close();
}
}

View File

@ -28,7 +28,6 @@ import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.Shared
class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implements AgentTestTrait {
@ -63,15 +62,19 @@ class Netty40ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers) {
def channel = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(null, result))
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
}
@Override
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
ch.pipeline().addLast(new ClientHandler(callback, CompletableFuture.completedFuture(0)))
def result = new CompletableFuture<Integer>()
result.whenComplete { status, throwable ->
requestResult.complete({ status }, throwable)
}
ch.pipeline().addLast(new ClientHandler(result))
ch.writeAndFlush(request)
}

View File

@ -8,7 +8,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/*
Bridges from async Netty world to the sync world of our http client tests.
@ -16,11 +15,9 @@ When request initiated by a test gets a response, calls a given callback and com
future with response's status code.
*/
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Consumer<Integer> callback;
private final CompletableFuture<Integer> responseCode;
public ClientHandler(Consumer<Integer> callback, CompletableFuture<Integer> responseCode) {
this.callback = callback;
public ClientHandler(CompletableFuture<Integer> responseCode) {
this.responseCode = responseCode;
}
@ -29,10 +26,6 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
if (msg instanceof HttpResponse) {
ctx.pipeline().remove(this);
if (callback != null) {
callback.accept(((HttpResponse) msg).status().code());
}
HttpResponse response = (HttpResponse) msg;
responseCode.complete(response.getStatus().code());
}
@ -40,7 +33,7 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
responseCode.completeExceptionally(cause);
ctx.close();
}
}

View File

@ -35,7 +35,6 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.function.Consumer
import spock.lang.Shared
class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implements AgentTestTrait {
@ -69,15 +68,19 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
int sendRequest(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers) {
def channel = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>()
channel.pipeline().addLast(new ClientHandler(null, result))
channel.pipeline().addLast(new ClientHandler(result))
channel.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
}
@Override
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(DefaultFullHttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
ch.pipeline().addLast(new ClientHandler(callback, CompletableFuture.completedFuture(0)))
def result = new CompletableFuture<Integer>()
result.whenComplete { status, throwable ->
requestResult.complete({ status }, throwable)
}
ch.pipeline().addLast(new ClientHandler(result))
ch.writeAndFlush(request)
}

View File

@ -73,7 +73,7 @@ public class SingleNettyConnection implements SingleConnection {
throws ExecutionException, InterruptedException, TimeoutException {
CompletableFuture<Integer> result = new CompletableFuture<>();
channel.pipeline().addLast(new ClientHandler(null, result));
channel.pipeline().addLast(new ClientHandler(result));
String url;
try {

View File

@ -14,7 +14,6 @@ import com.squareup.okhttp.internal.http.HttpMethod
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.Shared
class OkHttp2Test extends HttpClientTest<Request> implements AgentTestTrait {
@ -41,16 +40,16 @@ class OkHttp2Test extends HttpClientTest<Request> implements AgentTestTrait {
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.newCall(request).enqueue(new Callback() {
@Override
void onFailure(Request req, IOException e) {
throw e
requestResult.complete(e)
}
@Override
void onResponse(Response response) throws IOException {
callback.accept(response.code())
requestResult.complete(response.code())
}
})
}

View File

@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.okhttp.v3_0
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers
@ -45,16 +44,16 @@ abstract class AbstractOkHttp3Test extends HttpClientTest<Request> {
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
client.newCall(request).enqueue(new Callback() {
@Override
void onFailure(Call call, IOException e) {
throw e
requestResult.complete(e)
}
@Override
void onResponse(Call call, Response response) throws IOException {
callback.accept(response.code())
requestResult.complete(response.code())
}
})
}

View File

@ -3,9 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import play.libs.ws.StandaloneWSClient
import play.libs.ws.StandaloneWSRequest
import play.libs.ws.StandaloneWSResponse
@ -36,9 +36,9 @@ class PlayJavaWsClientTestBase extends PlayWsClientTestBaseBase<StandaloneWSRequ
}
@Override
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
request.execute().thenAccept {
callback.accept(it.status)
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientTest.RequestResult requestResult) {
request.execute().whenComplete { response, throwable ->
requestResult.complete({ response.status }, throwable)
}
}
@ -69,9 +69,9 @@ class PlayJavaStreamedWsClientTestBase extends PlayWsClientTestBaseBase<Standalo
}
@Override
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
internalSendRequest(request).thenAccept {
callback.accept(it.status)
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientTest.RequestResult requestResult) {
internalSendRequest(request).whenComplete { response, throwable ->
requestResult.complete({ response.status }, throwable?.getCause())
}
}
@ -116,11 +116,15 @@ class PlayScalaWsClientTestBase extends PlayWsClientTestBaseBase<play.api.libs.w
}
@Override
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientTest.RequestResult requestResult) {
request.execute().onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
@Override
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
callback.accept(response.get().status())
if (response.isSuccess()) {
requestResult.complete(response.get().status())
} else {
requestResult.complete(response.failed().get())
}
return null
}
}, ExecutionContext.global())
@ -153,11 +157,15 @@ class PlayScalaStreamedWsClientTestBase extends PlayWsClientTestBaseBase<play.ap
}
@Override
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientTest.RequestResult requestResult) {
internalSendRequest(request).onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
@Override
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
callback.accept(response.get().status())
if (response.isSuccess()) {
requestResult.complete(response.get().status())
} else {
requestResult.complete(response.failed().get())
}
return null
}
}, ExecutionContext.global())

View File

@ -8,7 +8,6 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletionStage
import java.util.function.Consumer
import play.libs.ws.WS
import play.libs.ws.WSRequest
import play.libs.ws.WSResponse
@ -39,9 +38,9 @@ class PlayWsClientTest extends HttpClientTest<WSRequest> implements AgentTestTra
}
@Override
void sendRequestWithCallback(WSRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
internalSendRequest(request, method).thenAccept {
callback.accept(it.status)
void sendRequestWithCallback(WSRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
internalSendRequest(request, method).whenComplete {response, throwable ->
requestResult.complete({ response.status }, throwable)
}
}

View File

@ -8,7 +8,6 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.time.Duration
import java.util.function.Consumer
import ratpack.exec.Operation
import ratpack.exec.Promise
import ratpack.http.client.HttpClient
@ -41,10 +40,10 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra
}
@Override
void sendRequestWithCallback(Void request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(Void request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
exec.execute(Operation.of {
internalSendRequest(method, uri, headers).result {
callback.accept(it.value)
requestResult.complete(it.value)
}
})
}

View File

@ -5,7 +5,6 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import reactor.netty.http.client.HttpClient
abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpClient.ResponseReceiver> implements AgentTestTrait {
@ -46,9 +45,9 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
}
@Override
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
request.response().subscribe {
callback.accept(it.status().code())
requestResult.complete(it.status().code())
}
}

View File

@ -5,7 +5,6 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import reactor.netty.http.client.HttpClient
abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpClient.ResponseReceiver> implements AgentTestTrait {
@ -46,9 +45,9 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
}
@Override
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
request.response().subscribe {
callback.accept(it.status().code())
requestResult.complete(it.status().code())
}
}

View File

@ -6,7 +6,6 @@
import io.opentelemetry.instrumentation.spring.httpclients.RestTemplateInterceptor
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
@ -44,12 +43,16 @@ class RestTemplateInstrumentationTest extends HttpClientTest<HttpEntity<String>>
}
@Override
void sendRequestWithCallback(HttpEntity<String> request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
restTemplate.execute(uri, HttpMethod.valueOf(method), { req ->
headers.forEach(req.getHeaders().&add)
}, { response ->
callback.accept(response.statusCode.value())
})
void sendRequestWithCallback(HttpEntity<String> request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
try {
restTemplate.execute(uri, HttpMethod.valueOf(method), { req ->
headers.forEach(req.getHeaders().&add)
}, { response ->
requestResult.complete(response.statusCode.value())
})
} catch (ResourceAccessException exception) {
requestResult.complete(exception.getCause())
}
}
@Override

View File

@ -7,7 +7,6 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpMethod
import org.springframework.web.reactive.function.client.WebClient
@ -26,10 +25,12 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
}
@Override
void sendRequestWithCallback(WebClient.RequestBodySpec request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
request.exchange().subscribe {
callback.accept(it.statusCode().value())
}
void sendRequestWithCallback(WebClient.RequestBodySpec request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
request.exchange().subscribe({
requestResult.complete(it.statusCode().value())
}, {
requestResult.complete(it)
})
}
@Override

View File

@ -8,6 +8,7 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.vertx.circuitbreaker.CircuitBreakerOptions
import io.vertx.core.AsyncResult
import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions
@ -45,14 +46,17 @@ class VertxRxCircuitBreakerWebClientTest extends HttpClientTest<HttpRequest<?>>
// VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through
// a callback.
CompletableFuture<Integer> future = new CompletableFuture<>()
sendRequestWithCallback(request, method, uri, headers) {
future.complete(it)
sendRequestWithCallback(request) {
if (it.succeeded()) {
future.complete(it.result().statusCode())
} else {
future.completeExceptionally(it.cause())
}
}
return future.get()
}
@Override
void sendRequestWithCallback(HttpRequest<?> request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpRequest<?> request, Consumer<AsyncResult> consumer) {
breaker.executeCommand({ command ->
request.rxSend().doOnSuccess {
command.complete(it)
@ -60,10 +64,21 @@ class VertxRxCircuitBreakerWebClientTest extends HttpClientTest<HttpRequest<?>>
command.fail(it)
}.subscribe()
}, {
callback.accept(it.result().statusCode())
consumer.accept(it)
})
}
@Override
void sendRequestWithCallback(HttpRequest<?> request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
sendRequestWithCallback(request) {
if (it.succeeded()) {
requestResult.complete(it.result().statusCode())
} else {
requestResult.complete(it.cause())
}
}
}
@Override
String userAgent() {
return "Vert.x-WebClient"

View File

@ -40,12 +40,17 @@ class VertxRxWebClientTest extends HttpClientTest<HttpRequest<Buffer>> implement
}
@Override
void sendRequestWithCallback(HttpRequest<Buffer> request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(HttpRequest<Buffer> request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
request.rxSend()
.subscribe(new io.reactivex.functions.Consumer<HttpResponse<?>>() {
@Override
void accept(HttpResponse<?> httpResponse) throws Exception {
callback.accept(httpResponse.statusCode())
requestResult.complete(httpResponse.statusCode())
}
}, new io.reactivex.functions.Consumer<Throwable>() {
@Override
void accept(Throwable throwable) throws Exception {
requestResult.complete(throwable)
}
})
}

View File

@ -14,7 +14,6 @@ import io.vertx.core.http.HttpClientOptions
import io.vertx.core.http.HttpClientRequest
import io.vertx.core.http.HttpMethod
import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import spock.lang.Shared
class VertxHttpClientTest extends HttpClientTest<HttpClientRequest> implements AgentTestTrait {
@ -33,22 +32,30 @@ class VertxHttpClientTest extends HttpClientTest<HttpClientRequest> implements A
return request
}
@Override
int sendRequest(HttpClientRequest request, String method, URI uri, Map<String, String> headers) {
// Vertx doesn't seem to provide any synchronous API so bridge through a callback
CompletableFuture<Integer> sendRequest(HttpClientRequest request) {
CompletableFuture<Integer> future = new CompletableFuture<>()
sendRequestWithCallback(request, method, uri, headers) {
future.complete(it)
request.handler { response ->
future.complete(response.statusCode())
}.exceptionHandler {throwable ->
future.completeExceptionally(throwable)
}
return future.get()
request.end()
return future
}
@Override
void sendRequestWithCallback(HttpClientRequest request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
request.handler { response ->
callback.accept(response.statusCode())
int sendRequest(HttpClientRequest request, String method, URI uri, Map<String, String> headers) {
// Vertx doesn't seem to provide any synchronous API so bridge through a callback
return sendRequest(request).get()
}
@Override
void sendRequestWithCallback(HttpClientRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
sendRequest(request).whenComplete { status, throwable ->
requestResult.complete({ status }, throwable)
}
request.end()
}
@Override

View File

@ -28,12 +28,13 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import java.util.function.Consumer
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.function.Supplier
import spock.lang.AutoCleanup
import spock.lang.Requires
import spock.lang.Shared
import spock.lang.Unroll
import spock.util.concurrent.BlockingVariable
@Unroll
abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
@ -102,9 +103,56 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
}
// ideally private, but then groovy closures in this class cannot find them
final void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
final RequestResult doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:],
Runnable callback) {
def request = buildRequest(method, uri, headers)
sendRequestWithCallback(request, method, uri, headers, callback)
def requestResult = new RequestResult(callback)
sendRequestWithCallback(request, method, uri, headers, requestResult)
return requestResult
}
/**
* Helper class for capturing result of asynchronous request and running a callback when result
* is received.
*/
static class RequestResult {
private static final long timeout = 10_000
private final CountDownLatch valueReady = new CountDownLatch(1)
private final Runnable callback
private int status
private Throwable throwable
RequestResult(Runnable callback) {
this.callback = callback
}
void complete(int status) {
complete({ status }, null)
}
void complete(Throwable throwable) {
complete(null, throwable)
}
void complete(Supplier<Integer> status, Throwable throwable) {
if (throwable != null) {
this.throwable = throwable
} else {
this.status = status.get()
}
callback.run()
valueReady.countDown()
}
int get() {
if (!valueReady.await(timeout, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for response in " + timeout + "ms")
}
if (throwable != null) {
throw throwable
}
return status
}
}
/**
@ -155,22 +203,28 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
* the context is propagated correctly to such callbacks.
*
* @Override
* void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
* void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
* // Hypothetical client accepting a callback
* client.executeAsync(request) {
* callback.accept(it.statusCode())
* void success(Response response) {
* requestResult.complete(response.statusCode())
* }
* void failure(Throwable throwable) {
* requestResult.complete(throwable)
* }
* }
*
* // Hypothetical client returning a CompletableFuture
* client.executeAsync(request).thenAccept {
* callback.accept(it.statusCode())
* client.executeAsync(request).whenComplete { response, throwable ->
* requestResult.complete({ response.statusCode() }, throwable)
* }
* }
*
* If the client offers no APIs that accept callbacks, then this method should not be implemented
* and instead, {@link #testCallback} should be implemented to return false.
*/
void sendRequestWithCallback(REQUEST request, String method, URI uri, Map<String, String> headers, Consumer<Integer> callback) {
void sendRequestWithCallback(REQUEST request, String method, URI uri, Map<String, String> headers,
RequestResult requestResult) {
// Must be implemented if testAsync is true
throw new UnsupportedOperationException()
}
@ -283,18 +337,15 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
assumeTrue(testCallback())
assumeTrue(testCallbackWithParent())
def responseCode = new BlockingVariable<Integer>()
when:
runUnderTrace("parent") {
def requestResult = runUnderTrace("parent") {
doRequestWithCallback(method, server.address.resolve("/success"), ["is-test-server": "false"]) {
runUnderTrace("child") {}
responseCode.set(it)
}
}
then:
responseCode.get() == 200
requestResult.get() == 200
// only one trace (client).
assertTraces(1) {
trace(0, 3 + extraClientSpans()) {
@ -312,17 +363,14 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
given:
assumeTrue(testCallback())
def responseCode = new BlockingVariable<Integer>()
when:
doRequestWithCallback(method, server.address.resolve("/success"), ["is-test-server": "false"]) {
def requestResult = doRequestWithCallback(method, server.address.resolve("/success"), ["is-test-server": "false"]) {
runUnderTrace("callback") {
}
responseCode.set(it)
}
then:
responseCode.get() == 200
requestResult.get() == 200
// only one trace (client).
assertTraces(2) {
trace(0, 1 + extraClientSpans()) {
@ -487,6 +535,39 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
method = "GET"
}
def "connection error (unopened port) with callback"() {
given:
assumeTrue(testConnectionFailure())
assumeTrue(testCallback())
assumeTrue(testErrorWithCallback())
def uri = new URI("http://localhost:$UNUSABLE_PORT/")
when:
def requestResult = runUnderTrace("parent") {
doRequestWithCallback(method, uri, [:]) {
runUnderTrace("callback") {
}
}
}
requestResult.get()
then:
def ex = thrown(Exception)
def thrownException = ex instanceof ExecutionException ? ex.cause : ex
and:
assertTraces(1) {
trace(0, 3 + extraClientSpans()) {
basicSpan(it, 0, "parent")
clientSpan(it, 1, span(0), method, uri, null, thrownException)
basicSpan(it, 2, "callback", span(0))
}
}
where:
method = "GET"
}
def "connection error dropped request"() {
given:
assumeTrue(testRemoteConnection())
@ -790,6 +871,10 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
true
}
boolean testErrorWithCallback() {
return true
}
URI removeFragment(URI uri) {
return new URI(uri.scheme, null, uri.host, uri.port, uri.path, uri.query, null)
}