Split HttpClientTest execution methods for sync and callback (#2675)

This commit is contained in:
Anuraag Agrawal 2021-04-07 07:26:01 +09:00 committed by GitHub
parent d0d4168d62
commit d8f6018ba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 859 additions and 1077 deletions

View File

@ -9,10 +9,13 @@ import akka.actor.ActorSystem
import akka.http.javadsl.Http import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpMethods import akka.http.javadsl.model.HttpMethods
import akka.http.javadsl.model.HttpRequest import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.http.javadsl.model.headers.RawHeader import akka.http.javadsl.model.headers.RawHeader
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletionStage
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class AkkaHttpClientInstrumentationTest extends HttpClientTest implements AgentTestTrait { class AkkaHttpClientInstrumentationTest extends HttpClientTest implements AgentTestTrait {
@ -23,21 +26,32 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest implements AgentT
ActorMaterializer materializer = ActorMaterializer.create(system) ActorMaterializer materializer = ActorMaterializer.create(system)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).toCompletableFuture().get().status().intValue()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).thenAccept {
callback.accept(it.status().intValue())
}
}
private CompletionStage<HttpResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
def request = HttpRequest.create(uri.toString()) def request = HttpRequest.create(uri.toString())
.withMethod(HttpMethods.lookup(method).get()) .withMethod(HttpMethods.lookup(method).get())
.addHeaders(headers.collect { RawHeader.create(it.key, it.value) }) .addHeaders(headers.collect { RawHeader.create(it.key, it.value) })
def response = Http.get(system) return Http.get(system)
.singleRequest(request, materializer) .singleRequest(request, materializer)
//.whenComplete { result, error -> }
// FIXME: Callback should be here instead.
// callback?.call() // TODO(anuraaga): Context leak seems to prevent us from running asynchronous tests in a row.
//} // Disable for now.
.toCompletableFuture() // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2639
.get() @Override
callback?.call() boolean testCallback() {
return response.status().intValue() false
} }
@Override @Override

View File

@ -1,77 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletableFuture
import org.apache.http.HttpResponse
import org.apache.http.client.config.RequestConfig
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.HttpAsyncClients
import org.apache.http.message.BasicHeader
import spock.lang.AutoCleanup
import spock.lang.Shared
class ApacheHttpAsyncClientCallbackTest extends HttpClientTest implements AgentTestTrait {
@Shared
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build()
@AutoCleanup
@Shared
def client = HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build()
def setupSpec() {
client.start()
}
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = new HttpUriRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
def responseFuture = new CompletableFuture<>()
client.execute(request, new FutureCallback<HttpResponse>() {
@Override
void completed(HttpResponse result) {
callback?.call()
responseFuture.complete(result.statusLine.statusCode)
}
@Override
void failed(Exception ex) {
responseFuture.completeExceptionally(ex)
}
@Override
void cancelled() {
responseFuture.cancel(true)
}
})
return responseFuture.get()
}
@Override
Integer statusOnRedirectError() {
return 302
}
@Override
boolean testRemoteConnection() {
false // otherwise SocketTimeoutException for https requests
}
@Override
boolean testCausality() {
false
}
}

View File

@ -1,62 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.Future
import org.apache.http.client.config.RequestConfig
import org.apache.http.impl.nio.client.HttpAsyncClients
import org.apache.http.message.BasicHeader
import spock.lang.AutoCleanup
import spock.lang.Shared
class ApacheHttpAsyncClientNullCallbackTest extends HttpClientTest implements AgentTestTrait {
@Shared
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build()
@AutoCleanup
@Shared
def client = HttpAsyncClients.custom().setDefaultRequestConfig(requestConfig).build()
def setupSpec() {
client.start()
}
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = new HttpUriRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
// The point here is to test case when callback is null - fire-and-forget style
// So to make sure request is done we start request, wait for future to finish
// and then call callback if present.
Future future = client.execute(request, null)
future.get()
if (callback != null) {
callback()
}
return future.get().statusLine.statusCode
}
@Override
Integer statusOnRedirectError() {
return 302
}
@Override
boolean testRemoteConnection() {
false // otherwise SocketTimeoutException for https requests
}
@Override
boolean testCausality() {
false
}
}

View File

@ -5,7 +5,8 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CountDownLatch import java.util.concurrent.CancellationException
import java.util.function.Consumer
import org.apache.http.HttpResponse import org.apache.http.HttpResponse
import org.apache.http.client.config.RequestConfig import org.apache.http.client.config.RequestConfig
import org.apache.http.concurrent.FutureCallback import org.apache.http.concurrent.FutureCallback
@ -30,38 +31,36 @@ class ApacheHttpAsyncClientTest extends HttpClientTest implements AgentTestTrait
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def request = new HttpUriRequest(method, uri) return client.execute(buildRequest(method, uri, headers), null).get().statusLine.statusCode
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
def latch = new CountDownLatch(callback == null ? 0 : 1)
def handler = callback == null ? null : new FutureCallback<HttpResponse>() {
@Override
void completed(HttpResponse result) {
callback()
latch.countDown()
} }
@Override @Override
void failed(Exception ex) { void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
latch.countDown() client.execute(buildRequest(method, uri, headers), new FutureCallback<HttpResponse>() {
@Override
void completed(HttpResponse httpResponse) {
callback.accept(httpResponse.statusLine.statusCode)
}
@Override
void failed(Exception e) {
throw e
} }
@Override @Override
void cancelled() { void cancelled() {
latch.countDown() throw new CancellationException()
} }
})
} }
def future = client.execute(request, handler) private static HttpUriRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def response = future.get() def request = new HttpUriRequest(method, uri)
response.entity?.content?.close() // Make sure the connection is closed. headers.entrySet().each {
latch.await() request.addHeader(new BasicHeader(it.key, it.value))
response.statusLine.statusCode }
return request
} }
@Override @Override
@ -76,6 +75,6 @@ class ApacheHttpAsyncClientTest extends HttpClientTest implements AgentTestTrait
@Override @Override
boolean testCausality() { boolean testCausality() {
return false false
} }
} }

View File

@ -30,7 +30,7 @@ class CommonsHttpClientTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
HttpMethod httpMethod HttpMethod httpMethod
switch (method) { switch (method) {
@ -63,7 +63,6 @@ class CommonsHttpClientTest extends HttpClientTest implements AgentTestTrait {
try { try {
client.executeMethod(httpMethod) client.executeMethod(httpMethod)
callback?.call()
return httpMethod.getStatusCode() return httpMethod.getStatusCode()
} finally { } finally {
httpMethod.releaseConnection() httpMethod.releaseConnection()
@ -75,4 +74,9 @@ class CommonsHttpClientTest extends HttpClientTest implements AgentTestTrait {
// Generates 4 spans // Generates 4 spans
false false
} }
@Override
boolean testCallback() {
false
}
} }

View File

@ -1,53 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import org.apache.http.HttpResponse
import org.apache.http.client.ResponseHandler
import org.apache.http.impl.client.DefaultHttpClient
import org.apache.http.message.BasicHeader
import org.apache.http.params.HttpConnectionParams
import org.apache.http.params.HttpParams
import spock.lang.Shared
class ApacheHttpClientResponseHandlerTest extends HttpClientTest implements AgentTestTrait {
@Shared
def client = new DefaultHttpClient()
@Shared
def handler = new ResponseHandler<Integer>() {
@Override
Integer handleResponse(HttpResponse response) {
return response.statusLine.statusCode
}
}
def setupSpec() {
HttpParams httpParams = client.getParams()
HttpConnectionParams.setConnectionTimeout(httpParams, CONNECT_TIMEOUT_MS)
}
@Override
boolean testCausality() {
return false
}
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = new HttpUriRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
def status = client.execute(request, handler)
// handler execution is included within the client span, so we can't call the callback there.
callback?.call()
return status
}
}

View File

@ -5,6 +5,7 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.apache.http.HttpHost import org.apache.http.HttpHost
import org.apache.http.HttpRequest import org.apache.http.HttpRequest
import org.apache.http.HttpResponse import org.apache.http.HttpResponse
@ -31,21 +32,36 @@ abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTes
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def request = createRequest(method, uri) def request = createRequest(method, uri)
headers.entrySet().each { headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value)) request.addHeader(new BasicHeader(it.key, it.value))
} }
def response = executeRequest(request, uri, callback) def response = executeRequest(request, uri)
response.entity?.content?.close() // Make sure the connection is closed. response.entity?.content?.close() // Make sure the connection is closed.
return response.statusLine.statusCode return response.statusLine.statusCode
} }
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = createRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
executeRequestWithCallback(request, uri) {
it.entity?.content?.close() // Make sure the connection is closed.
callback.accept(it.statusLine.statusCode)
}
}
abstract T createRequest(String method, URI uri) abstract T createRequest(String method, URI uri)
abstract HttpResponse executeRequest(T request, URI uri, Closure callback) abstract HttpResponse executeRequest(T request, URI uri)
abstract void executeRequestWithCallback(T request, URI uri, Consumer<HttpResponse> callback)
static String fullPathFromURI(URI uri) { static String fullPathFromURI(URI uri) {
StringBuilder builder = new StringBuilder() StringBuilder builder = new StringBuilder()
@ -73,10 +89,15 @@ class ApacheClientHostRequest extends ApacheHttpClientTest<BasicHttpRequest> {
} }
@Override @Override
HttpResponse executeRequest(BasicHttpRequest request, URI uri, Closure callback) { HttpResponse executeRequest(BasicHttpRequest request, URI uri) {
def response = client.execute(new HttpHost(uri.getHost(), uri.getPort()), request) return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request)
callback?.call() }
return response
@Override
void executeRequestWithCallback(BasicHttpRequest request, URI uri, Consumer<HttpResponse> callback) {
client.execute(new HttpHost(uri.getHost(), uri.getPort()), request) {
callback.accept(it)
}
} }
@Override @Override
@ -92,49 +113,14 @@ class ApacheClientHostRequestContext extends ApacheHttpClientTest<BasicHttpReque
} }
@Override @Override
HttpResponse executeRequest(BasicHttpRequest request, URI uri, Closure callback) { HttpResponse executeRequest(BasicHttpRequest request, URI uri) {
def response = client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext()) return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext())
callback?.call()
return response
} }
@Override @Override
boolean testRemoteConnection() { void executeRequestWithCallback(BasicHttpRequest request, URI uri, Consumer<HttpResponse> callback) {
return false client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, {
} callback.accept(it)
}
class ApacheClientHostRequestResponseHandler extends ApacheHttpClientTest<BasicHttpRequest> {
@Override
BasicHttpRequest createRequest(String method, URI uri) {
return new BasicHttpRequest(method, fullPathFromURI(uri))
}
@Override
HttpResponse executeRequest(BasicHttpRequest request, URI uri, Closure callback) {
return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, {
callback?.call()
return it
})
}
@Override
boolean testRemoteConnection() {
return false
}
}
class ApacheClientHostRequestResponseHandlerContext extends ApacheHttpClientTest<BasicHttpRequest> {
@Override
BasicHttpRequest createRequest(String method, URI uri) {
return new BasicHttpRequest(method, fullPathFromURI(uri))
}
@Override
HttpResponse executeRequest(BasicHttpRequest request, URI uri, Closure callback) {
return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, {
callback?.call()
return it
}, new BasicHttpContext()) }, new BasicHttpContext())
} }
@ -151,10 +137,15 @@ class ApacheClientUriRequest extends ApacheHttpClientTest<HttpUriRequest> {
} }
@Override @Override
HttpResponse executeRequest(HttpUriRequest request, URI uri, Closure callback) { HttpResponse executeRequest(HttpUriRequest request, URI uri) {
def response = client.execute(request) return client.execute(request)
callback?.call() }
return response
@Override
void executeRequestWithCallback(HttpUriRequest request, URI uri, Consumer<HttpResponse> callback) {
client.execute(request) {
callback.accept(it)
}
} }
} }
@ -165,39 +156,14 @@ class ApacheClientUriRequestContext extends ApacheHttpClientTest<HttpUriRequest>
} }
@Override @Override
HttpResponse executeRequest(HttpUriRequest request, URI uri, Closure callback) { HttpResponse executeRequest(HttpUriRequest request, URI uri) {
def response = client.execute(request, new BasicHttpContext()) return client.execute(request, new BasicHttpContext())
callback?.call()
return response
}
}
class ApacheClientUriRequestResponseHandler extends ApacheHttpClientTest<HttpUriRequest> {
@Override
HttpUriRequest createRequest(String method, URI uri) {
return new HttpUriRequest(method, uri)
} }
@Override @Override
HttpResponse executeRequest(HttpUriRequest request, URI uri, Closure callback) { void executeRequestWithCallback(HttpUriRequest request, URI uri, Consumer<HttpResponse> callback) {
return client.execute(request, { client.execute(request, {
callback?.call() callback.accept(it)
it
})
}
}
class ApacheClientUriRequestResponseHandlerContext extends ApacheHttpClientTest<HttpUriRequest> {
@Override
HttpUriRequest createRequest(String method, URI uri) {
return new HttpUriRequest(method, uri)
}
@Override
HttpResponse executeRequest(HttpUriRequest request, URI uri, Closure callback) {
return client.execute(request, {
callback?.call()
it
}, new BasicHttpContext()) }, new BasicHttpContext())
} }
} }

View File

@ -1,96 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase
import org.apache.hc.client5.http.config.RequestConfig
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder
import org.apache.hc.client5.http.impl.classic.HttpClients
import org.apache.hc.core5.http.ClassicHttpRequest
import org.apache.hc.core5.http.ClassicHttpResponse
import org.apache.hc.core5.http.HttpHost
import org.apache.hc.core5.http.HttpRequest
import org.apache.hc.core5.http.io.HttpClientResponseHandler
import org.apache.hc.core5.http.message.BasicHeader
import org.apache.hc.core5.http.protocol.BasicHttpContext
import spock.lang.AutoCleanup
import spock.lang.Shared
abstract class ApacheHttpClientResponseHandlerTest<T extends HttpRequest> extends HttpClientTest implements AgentTestTrait {
@Shared
@AutoCleanup
def client
@Shared
def handler = new HttpClientResponseHandler<Integer>() {
@Override
Integer handleResponse(ClassicHttpResponse response) {
return response.code
}
}
def setupSpec() {
HttpClientBuilder builder = HttpClients.custom()
builder.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.build())
client = builder.build()
}
abstract int executeRequest(T request)
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = new HttpUriRequestBase(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
def status = executeRequest(request)
// handler execution is included within the client span, so we can't call the callback there.
callback?.call()
return status
}
}
class ApacheClientHandlerRequest extends ApacheHttpClientResponseHandlerTest<ClassicHttpRequest> {
@Override
int executeRequest(ClassicHttpRequest request) {
return client.execute(request, handler)
}
}
class ApacheClientContextHandlerRequest extends ApacheHttpClientResponseHandlerTest<ClassicHttpRequest> {
@Override
int executeRequest(ClassicHttpRequest request) {
return client.execute(request, new BasicHttpContext(), handler)
}
}
class ApacheClientHostHandlerRequest extends ApacheHttpClientResponseHandlerTest<ClassicHttpRequest> {
@Override
int executeRequest(ClassicHttpRequest request) {
URI uri = request.getUri()
return client.execute(new HttpHost(uri.getScheme(), uri.getHost(), uri.getPort()), request, handler)
}
}
class ApacheClientHostAndContextHandlerRequest extends ApacheHttpClientResponseHandlerTest<ClassicHttpRequest> {
@Override
int executeRequest(ClassicHttpRequest request) {
URI uri = request.getUri()
return client.execute(new HttpHost(uri.getScheme(), uri.getHost(), uri.getPort()), request, new BasicHttpContext(), handler)
}
}

View File

@ -6,8 +6,10 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase
import org.apache.hc.client5.http.config.RequestConfig import org.apache.hc.client5.http.config.RequestConfig
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder import org.apache.hc.client5.http.impl.classic.HttpClientBuilder
import org.apache.hc.client5.http.impl.classic.HttpClients import org.apache.hc.client5.http.impl.classic.HttpClients
import org.apache.hc.core5.http.ClassicHttpRequest import org.apache.hc.core5.http.ClassicHttpRequest
@ -23,7 +25,7 @@ import spock.lang.Shared
abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTest implements AgentTestTrait { abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTest implements AgentTestTrait {
@Shared @Shared
@AutoCleanup @AutoCleanup
def client CloseableHttpClient client
def setupSpec() { def setupSpec() {
HttpClientBuilder builder = HttpClients.custom() HttpClientBuilder builder = HttpClients.custom()
@ -35,21 +37,36 @@ abstract class ApacheHttpClientTest<T extends HttpRequest> extends HttpClientTes
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def request = createRequest(method, uri) def request = createRequest(method, uri)
headers.entrySet().each { headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value)) request.addHeader(new BasicHeader(it.key, it.value))
} }
def response = executeRequest(request, uri, callback) def response = executeRequest(request, uri)
response.close() // Make sure the connection is closed. response.close() // Make sure the connection is closed.
return response.code return response.code
} }
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = createRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
}
executeRequestWithCallback(request, uri) {
it.close() // Make sure the connection is closed.
callback.accept(it.code)
}
}
abstract T createRequest(String method, URI uri) abstract T createRequest(String method, URI uri)
abstract ClassicHttpResponse executeRequest(T request, URI uri, Closure callback) abstract ClassicHttpResponse executeRequest(T request, URI uri)
abstract void executeRequestWithCallback(T request, URI uri, Consumer<ClassicHttpResponse> callback)
static String fullPathFromURI(URI uri) { static String fullPathFromURI(URI uri) {
StringBuilder builder = new StringBuilder() StringBuilder builder = new StringBuilder()
@ -77,10 +94,15 @@ class ApacheClientHostRequest extends ApacheHttpClientTest<ClassicHttpRequest> {
} }
@Override @Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) { ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri) {
def response = client.execute(new HttpHost(uri.getHost(), uri.getPort()), request) return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request)
callback?.call() }
return response
@Override
void executeRequestWithCallback(ClassicHttpRequest request, URI uri, Consumer<ClassicHttpResponse> callback) {
client.execute(new HttpHost(uri.getHost(), uri.getPort()), request) {
callback.accept(it)
}
} }
@Override @Override
@ -96,52 +118,17 @@ class ApacheClientHostRequestContext extends ApacheHttpClientTest<ClassicHttpReq
} }
@Override @Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) { ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri) {
def response = client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext()) return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext())
callback?.call()
return response
} }
@Override @Override
boolean testRemoteConnection() { void executeRequestWithCallback(ClassicHttpRequest request, URI uri, Consumer<ClassicHttpResponse> callback) {
return false client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext()) {
callback.accept(it)
} }
} }
class ApacheClientHostRequestResponseHandler extends ApacheHttpClientTest<ClassicHttpRequest> {
@Override
ClassicHttpRequest createRequest(String method, URI uri) {
return new BasicClassicHttpRequest(method, fullPathFromURI(uri))
}
@Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) {
return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, {
callback?.call()
return it
})
}
@Override
boolean testRemoteConnection() {
return false
}
}
class ApacheClientHostRequestResponseHandlerContext extends ApacheHttpClientTest<ClassicHttpRequest> {
@Override
ClassicHttpRequest createRequest(String method, URI uri) {
return new BasicClassicHttpRequest(method, fullPathFromURI(uri))
}
@Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) {
return client.execute(new HttpHost(uri.getHost(), uri.getPort()), request, new BasicHttpContext(), {
callback?.call()
return it
})
}
@Override @Override
boolean testRemoteConnection() { boolean testRemoteConnection() {
return false return false
@ -155,10 +142,15 @@ class ApacheClientUriRequest extends ApacheHttpClientTest<ClassicHttpRequest> {
} }
@Override @Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) { ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri) {
def response = client.execute(request) return client.execute(request)
callback?.call() }
return response
@Override
void executeRequestWithCallback(ClassicHttpRequest request, URI uri, Consumer<ClassicHttpResponse> callback) {
client.execute(request) {
callback.accept(it)
}
} }
} }
@ -169,39 +161,14 @@ class ApacheClientUriRequestContext extends ApacheHttpClientTest<ClassicHttpRequ
} }
@Override @Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) { ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri) {
def response = client.execute(request, new BasicHttpContext()) return client.execute(request, new BasicHttpContext())
callback?.call()
return response
}
}
class ApacheClientUriRequestResponseHandler extends ApacheHttpClientTest<ClassicHttpRequest> {
@Override
ClassicHttpRequest createRequest(String method, URI uri) {
return new HttpUriRequestBase(method, uri)
} }
@Override @Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) { void executeRequestWithCallback(ClassicHttpRequest request, URI uri, Consumer<ClassicHttpResponse> callback) {
return client.execute(request, { client.execute(request, new BasicHttpContext()) {
callback?.call() callback.accept(it)
it
})
} }
} }
class ApacheClientUriRequestResponseHandlerContext extends ApacheHttpClientTest<ClassicHttpRequest> {
@Override
ClassicHttpRequest createRequest(String method, URI uri) {
return new HttpUriRequestBase(method, uri)
}
@Override
ClassicHttpResponse executeRequest(ClassicHttpRequest request, URI uri, Closure callback) {
return client.execute(request, {
callback?.call()
it
})
}
} }

View File

@ -19,4 +19,11 @@ class ArmeriaHttpClientTest extends AbstractArmeriaHttpClientTest implements Lib
boolean testWithClientParent() { boolean testWithClientParent() {
false false
} }
// Agent users have automatic propagation through executor instrumentation, but library users
// should do manually using Armeria patterns.
@Override
boolean testCallbackWithParent() {
false
}
} }

View File

@ -5,7 +5,6 @@
package io.opentelemetry.instrumentation.armeria.v1_3 package io.opentelemetry.instrumentation.armeria.v1_3
import com.google.common.util.concurrent.MoreExecutors
import com.linecorp.armeria.client.WebClient import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.client.WebClientBuilder import com.linecorp.armeria.client.WebClientBuilder
import com.linecorp.armeria.common.AggregatedHttpResponse import com.linecorp.armeria.common.AggregatedHttpResponse
@ -13,13 +12,10 @@ import com.linecorp.armeria.common.HttpMethod
import com.linecorp.armeria.common.HttpRequest import com.linecorp.armeria.common.HttpRequest
import com.linecorp.armeria.common.RequestHeaders import com.linecorp.armeria.common.RequestHeaders
import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch import java.util.concurrent.CompletionException
import java.util.concurrent.TimeUnit import java.util.function.Consumer
import java.util.concurrent.atomic.AtomicReference
import java.util.function.BiConsumer
import spock.lang.Shared import spock.lang.Shared
abstract class AbstractArmeriaHttpClientTest extends HttpClientTest { abstract class AbstractArmeriaHttpClientTest extends HttpClientTest {
@ -30,33 +26,28 @@ abstract class AbstractArmeriaHttpClientTest extends HttpClientTest {
def client = configureClient(WebClient.builder()).build() def client = configureClient(WebClient.builder()).build()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
HttpRequest request = HttpRequest.of( AggregatedHttpResponse response
try {
response = client.execute(buildRequest(method, uri, headers)).aggregate().join()
} catch(CompletionException e) {
throw e.cause
}
return response.status().code()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
client.execute(buildRequest(method, uri, headers)).aggregate().thenAccept {
callback.accept(it.status().code())
}
}
private static HttpRequest buildRequest(String method, URI uri, Map<String, String> headers = [:]) {
return HttpRequest.of(
RequestHeaders.builder(HttpMethod.valueOf(method), uri.toString()) RequestHeaders.builder(HttpMethod.valueOf(method), uri.toString())
.set(headers.entrySet()) .set(headers.entrySet())
.build()) .build())
AtomicReference<AggregatedHttpResponse> responseRef = new AtomicReference<>()
AtomicReference<Throwable> exRef = new AtomicReference<>()
def latch = new CountDownLatch(1)
client.execute(request).aggregate().whenCompleteAsync(new BiConsumer<AggregatedHttpResponse, Throwable>() {
@Override
void accept(AggregatedHttpResponse aggregatedHttpResponse, Throwable throwable) {
if (throwable != null) {
exRef.set(throwable)
} else {
responseRef.set(aggregatedHttpResponse)
}
callback?.call()
latch.countDown()
}
}, Context.current().wrap(MoreExecutors.directExecutor()))
latch.await(30, TimeUnit.SECONDS)
if (exRef.get() != null) {
throw exRef.get()
}
return responseRef.get().status().code()
} }
// Not supported yet: https://github.com/line/armeria/issues/2489 // Not supported yet: https://github.com/line/armeria/issues/2489

View File

@ -11,6 +11,7 @@ import com.ning.http.client.Response
import com.ning.http.client.uri.Uri import com.ning.http.client.uri.Uri
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Shared import spock.lang.Shared
@ -21,27 +22,29 @@ class AsyncHttpClientTest extends HttpClientTest implements AgentTestTrait {
def client = new AsyncHttpClient() def client = new AsyncHttpClient()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
return client.executeRequest(buildRequest(method, uri, headers)).get().statusCode
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(buildRequest(method, uri, headers), new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
return null
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
RequestBuilder requestBuilder = new RequestBuilder(method) RequestBuilder requestBuilder = new RequestBuilder(method)
.setUri(Uri.create(uri.toString())) .setUri(Uri.create(uri.toString()))
headers.entrySet().each { headers.entrySet().each {
requestBuilder.addHeader(it.key, it.value) requestBuilder.addHeader(it.key, it.value)
} }
Request request = requestBuilder.build() return requestBuilder.build()
def handler = new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
if (callback != null) {
callback()
}
return response
}
}
def response = client.executeRequest(request, handler).get()
response.statusCode
} }
@Override @Override

View File

@ -5,6 +5,7 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.asynchttpclient.AsyncCompletionHandler import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.Dsl import org.asynchttpclient.Dsl
import org.asynchttpclient.Request import org.asynchttpclient.Request
@ -21,27 +22,28 @@ class AsyncHttpClientTest extends HttpClientTest implements AgentTestTrait {
def client = Dsl.asyncHttpClient() def client = Dsl.asyncHttpClient()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
return client.executeRequest(buildRequest(method, uri, headers)).get().statusCode
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
client.executeRequest(buildRequest(method, uri, headers), new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
return null
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
RequestBuilder requestBuilder = new RequestBuilder(method) RequestBuilder requestBuilder = new RequestBuilder(method)
.setUri(Uri.create(uri.toString())) .setUri(Uri.create(uri.toString()))
headers.entrySet().each { headers.entrySet().each {
requestBuilder.addHeader(it.key, it.value) requestBuilder.addHeader(it.key, it.value)
} }
Request request = requestBuilder.build() return requestBuilder.build()
def handler = new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
if (callback != null) {
callback()
}
return response
}
}
def response = client.executeRequest(request, handler).get()
response.statusCode
} }
//TODO see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2347 //TODO see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2347

View File

@ -20,11 +20,20 @@ abstract class AbstractGoogleHttpClientTest extends HttpClientTest implements Ag
def requestFactory = new NetHttpTransport().createRequestFactory() def requestFactory = new NetHttpTransport().createRequestFactory()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { boolean testCallback() {
doRequest(method, uri, headers, callback, false) // executeAsync does not actually allow asynchronous execution since it returns a standard
// Future which cannot have callbacks attached. We instrument execute and executeAsync
// differently so test both but do not need to run our normal asynchronous tests, which check
// context propagation, as there is no possible context propagation.
return false
} }
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback, boolean throwExceptionOnError) { @Override
int doRequest(String method, URI uri, Map<String, String> headers) {
doRequest(method, uri, headers, false)
}
int doRequest(String method, URI uri, Map<String, String> headers, boolean throwExceptionOnError) {
GenericUrl genericUrl = new GenericUrl(uri) GenericUrl genericUrl = new GenericUrl(uri)
HttpRequest request = requestFactory.buildRequest(method, genericUrl, null) HttpRequest request = requestFactory.buildRequest(method, genericUrl, null)
@ -40,8 +49,6 @@ abstract class AbstractGoogleHttpClientTest extends HttpClientTest implements Ag
request.setThrowExceptionOnExecuteError(throwExceptionOnError) request.setThrowExceptionOnExecuteError(throwExceptionOnError)
HttpResponse response = executeRequest(request) HttpResponse response = executeRequest(request)
callback?.call()
return response.getStatusCode() return response.getStatusCode()
} }

View File

@ -9,7 +9,7 @@ import io.opentelemetry.instrumentation.test.base.HttpClientTest
class HttpUrlConnectionResponseCodeOnlyTest extends HttpClientTest implements AgentTestTrait { class HttpUrlConnectionResponseCodeOnlyTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
HttpURLConnection connection = uri.toURL().openConnection() HttpURLConnection connection = uri.toURL().openConnection()
try { try {
connection.setRequestMethod(method) connection.setRequestMethod(method)
@ -18,7 +18,6 @@ class HttpUrlConnectionResponseCodeOnlyTest extends HttpClientTest implements Ag
connection.setRequestProperty("Connection", "close") connection.setRequestProperty("Connection", "close")
return connection.getResponseCode() return connection.getResponseCode()
} finally { } finally {
callback?.call()
connection.disconnect() connection.disconnect()
} }
} }
@ -32,4 +31,9 @@ class HttpUrlConnectionResponseCodeOnlyTest extends HttpClientTest implements Ag
Integer statusOnRedirectError() { Integer statusOnRedirectError() {
return 302 return 302
} }
@Override
boolean testCallback() {
return false
}
} }

View File

@ -22,7 +22,7 @@ class HttpUrlConnectionTest extends HttpClientTest implements AgentTestTrait {
static final STATUS = 200 static final STATUS = 200
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
HttpURLConnection connection = uri.toURL().openConnection() HttpURLConnection connection = uri.toURL().openConnection()
try { try {
connection.setRequestMethod(method) connection.setRequestMethod(method)
@ -35,7 +35,6 @@ class HttpUrlConnectionTest extends HttpClientTest implements AgentTestTrait {
assert Span.current() == parentSpan assert Span.current() == parentSpan
stream.readLines() stream.readLines()
stream.close() stream.close()
callback?.call()
return connection.getResponseCode() return connection.getResponseCode()
} finally { } finally {
connection.disconnect() connection.disconnect()
@ -52,6 +51,11 @@ class HttpUrlConnectionTest extends HttpClientTest implements AgentTestTrait {
return 302 return 302
} }
@Override
boolean testCallback() {
return false
}
@Unroll @Unroll
def "trace request with propagation (useCaches: #useCaches)"() { def "trace request with propagation (useCaches: #useCaches)"() {
setup: setup:

View File

@ -10,7 +10,7 @@ import io.opentelemetry.instrumentation.test.base.HttpClientTest
class HttpUrlConnectionUseCachesFalseTest extends HttpClientTest implements AgentTestTrait { class HttpUrlConnectionUseCachesFalseTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
HttpURLConnection connection = uri.toURL().openConnection() HttpURLConnection connection = uri.toURL().openConnection()
try { try {
connection.setRequestMethod(method) connection.setRequestMethod(method)
@ -23,7 +23,6 @@ class HttpUrlConnectionUseCachesFalseTest extends HttpClientTest implements Agen
assert Span.current() == parentSpan assert Span.current() == parentSpan
stream.readLines() stream.readLines()
stream.close() stream.close()
callback?.call()
return connection.getResponseCode() return connection.getResponseCode()
} finally { } finally {
connection.disconnect() connection.disconnect()
@ -39,4 +38,9 @@ class HttpUrlConnectionUseCachesFalseTest extends HttpClientTest implements Agen
Integer statusOnRedirectError() { Integer statusOnRedirectError() {
return 302 return 302
} }
@Override
boolean testCallback() {
return false
}
} }

View File

@ -5,6 +5,7 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpEntity import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod import org.springframework.http.HttpMethod
@ -27,19 +28,27 @@ class SpringRestTemplateTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
try { try {
def httpHeaders = new HttpHeaders() def httpHeaders = new HttpHeaders()
headers.each { httpHeaders.put(it.key, [it.value]) } headers.each { httpHeaders.put(it.key, [it.value]) }
def request = new HttpEntity<String>(httpHeaders) def request = new HttpEntity<String>(httpHeaders)
ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.resolve(method), request, String) ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.valueOf(method), request, String)
callback?.call()
return response.statusCode.value() return response.statusCode.value()
} catch (ResourceAccessException exception) { } catch (ResourceAccessException exception) {
throw exception.getCause() throw exception.getCause()
} }
} }
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
restTemplate.execute(uri, HttpMethod.valueOf(method), { request ->
headers.forEach(request.getHeaders().&add)
}, { response ->
callback.accept(response.statusCode.value())
})
}
@Override @Override
int maxRedirects() { int maxRedirects() {
20 20

View File

@ -1,15 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import java.net.http.HttpRequest
import java.net.http.HttpResponse
class JdkHttpClientAsyncTest extends JdkHttpClientTest {
@Override
HttpResponse send(HttpRequest request) {
return client.sendAsync(request, HttpResponse.BodyHandlers.ofString()).get()
}
}

View File

@ -1,16 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import java.net.http.HttpRequest
import java.net.http.HttpResponse
class JdkHttpClientSyncTest extends JdkHttpClientTest {
@Override
HttpResponse send(HttpRequest request) {
return client.send(request, HttpResponse.BodyHandlers.ofString())
}
}

View File

@ -13,32 +13,39 @@ import java.net.http.HttpRequest
import java.net.http.HttpResponse import java.net.http.HttpResponse
import java.time.Duration import java.time.Duration
import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit
import java.util.function.Consumer
import spock.lang.Requires import spock.lang.Requires
import spock.lang.Shared import spock.lang.Shared
abstract class JdkHttpClientTest extends HttpClientTest implements AgentTestTrait { class JdkHttpClientTest extends HttpClientTest implements AgentTestTrait {
@Shared @Shared
def client = HttpClient.newBuilder().connectTimeout(Duration.of(CONNECT_TIMEOUT_MS, def client = HttpClient.newBuilder().connectTimeout(Duration.of(CONNECT_TIMEOUT_MS,
ChronoUnit.MILLIS)).followRedirects(HttpClient.Redirect.NORMAL).build() ChronoUnit.MILLIS)).followRedirects(HttpClient.Redirect.NORMAL).build()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
return client.send(buildRequest(method, uri, headers), HttpResponse.BodyHandlers.ofString())
.statusCode()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
client.sendAsync(buildRequest(method, uri, headers), HttpResponse.BodyHandlers.ofString())
.thenAccept {
callback.accept(it.statusCode())
}
}
private static HttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def builder = HttpRequest.newBuilder().uri(uri).method(method, HttpRequest.BodyPublishers.noBody()) def builder = HttpRequest.newBuilder().uri(uri).method(method, HttpRequest.BodyPublishers.noBody())
headers.entrySet().each { headers.entrySet().each {
builder.header(it.key, it.value) builder.header(it.key, it.value)
} }
def request = builder.build() return builder.build()
def resp = send(request)
callback?.call()
return resp.statusCode()
} }
abstract HttpResponse send(HttpRequest request)
@Override @Override
boolean testCircularRedirects() { boolean testCircularRedirects() {
return false return false

View File

@ -24,17 +24,22 @@ class JaxRsClientV1Test extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def resource = client.resource(uri).requestBuilder def resource = client.resource(uri).requestBuilder
headers.each { resource.header(it.key, it.value) } headers.each { resource.header(it.key, it.value) }
def body = BODY_METHODS.contains(method) ? "" : null def body = BODY_METHODS.contains(method) ? "" : null
ClientResponse response = resource.method(method, ClientResponse, body) ClientResponse response = resource.method(method, ClientResponse, body)
callback?.call()
return response.status return response.status
} }
@Override
boolean testCircularRedirects() { boolean testCircularRedirects() {
false false
} }
@Override
boolean testCallback() {
false
}
} }

View File

@ -1,96 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import javax.ws.rs.client.AsyncInvoker
import javax.ws.rs.client.Client
import javax.ws.rs.client.ClientBuilder
import javax.ws.rs.client.Entity
import javax.ws.rs.client.InvocationCallback
import javax.ws.rs.client.WebTarget
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
import org.apache.cxf.jaxrs.client.spec.ClientBuilderImpl
import org.glassfish.jersey.client.ClientConfig
import org.glassfish.jersey.client.ClientProperties
import org.glassfish.jersey.client.JerseyClientBuilder
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder
abstract class JaxRsClientAsyncTest extends HttpClientTest implements AgentTestTrait {
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
Client client = builder().build()
WebTarget service = client.target(uri)
def builder = service.request(MediaType.TEXT_PLAIN)
headers.each { builder.header(it.key, it.value) }
AsyncInvoker request = builder.async()
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
def latch = new CountDownLatch(1)
Response response = request.method(method, (Entity) body, new InvocationCallback<Response>() {
@Override
void completed(Response s) {
callback?.call()
latch.countDown()
}
@Override
void failed(Throwable throwable) {
latch.countDown()
}
}).get()
response.close()
// need to wait for callback to complete in case test is expecting span from it
latch.await()
return response.status
}
abstract ClientBuilder builder()
}
class JerseyClientAsyncTest extends JaxRsClientAsyncTest {
@Override
ClientBuilder builder() {
ClientConfig config = new ClientConfig()
config.property(ClientProperties.CONNECT_TIMEOUT, CONNECT_TIMEOUT_MS)
return new JerseyClientBuilder().withConfig(config)
}
boolean testCircularRedirects() {
false
}
}
class ResteasyClientAsyncTest extends JaxRsClientAsyncTest {
@Override
ClientBuilder builder() {
return new ResteasyClientBuilder()
.establishConnectionTimeout(CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
boolean testRedirects() {
false
}
}
class CxfClientAsyncTest extends JaxRsClientAsyncTest {
@Override
ClientBuilder builder() {
return new ClientBuilderImpl()
.property("http.connection.timeout", (long) CONNECT_TIMEOUT_MS)
}
boolean testRedirects() {
false
}
}

View File

@ -9,10 +9,12 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import javax.ws.rs.client.Client import javax.ws.rs.client.Client
import javax.ws.rs.client.ClientBuilder import javax.ws.rs.client.ClientBuilder
import javax.ws.rs.client.Entity import javax.ws.rs.client.Entity
import javax.ws.rs.client.Invocation import javax.ws.rs.client.Invocation
import javax.ws.rs.client.InvocationCallback
import javax.ws.rs.client.WebTarget import javax.ws.rs.client.WebTarget
import javax.ws.rs.core.MediaType import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response import javax.ws.rs.core.Response
@ -26,17 +28,37 @@ import spock.lang.Unroll
abstract class JaxRsClientTest extends HttpClientTest implements AgentTestTrait { abstract class JaxRsClientTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def request = buildRequest(uri, headers)
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
Response response = request.method(method, (Entity) body)
return response.status
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = buildRequest(uri, headers).async()
def body = BODY_METHODS.contains(method) ? Entity.text("") : null
request.method(method, (Entity) body, new InvocationCallback<Response>() {
@Override
void completed(Response response) {
callback.accept(response.status)
}
@Override
void failed(Throwable throwable) {
throw throwable
}
})
}
private Invocation.Builder buildRequest(URI uri, Map<String, String> headers) {
Client client = builder().build() Client client = builder().build()
WebTarget service = client.target(uri) WebTarget service = client.target(uri)
Invocation.Builder request = service.request(MediaType.TEXT_PLAIN) Invocation.Builder request = service.request(MediaType.TEXT_PLAIN)
headers.each { request.header(it.key, it.value) } headers.each { request.header(it.key, it.value) }
def body = BODY_METHODS.contains(method) ? Entity.text("") : null return request
Response response = request.method(method, (Entity) body)
callback?.call()
return response.status
} }
abstract ClientBuilder builder() abstract ClientBuilder builder()

View File

@ -19,7 +19,7 @@ import org.jboss.resteasy.specimpl.ResteasyUriBuilder
class ResteasyProxyClientTest extends HttpClientTest implements AgentTestTrait { class ResteasyProxyClientTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
def proxyMethodName = "${method}_${uri.path}".toLowerCase() def proxyMethodName = "${method}_${uri.path}".toLowerCase()
.replace("/", "") .replace("/", "")
.replace('-', '_') .replace('-', '_')
@ -38,8 +38,6 @@ class ResteasyProxyClientTest extends HttpClientTest implements AgentTestTrait {
def response = proxy."$proxyMethodName"(param, isTestServer) def response = proxy."$proxyMethodName"(param, isTestServer)
callback?.call()
return response.status return response.status
} }
@ -63,6 +61,11 @@ class ResteasyProxyClientTest extends HttpClientTest implements AgentTestTrait {
false false
} }
@Override
boolean testCallback() {
false
}
} }
@Path("") @Path("")

View File

@ -10,14 +10,11 @@ import khttp.KHttp
class KHttpClientTest extends HttpClientTest implements AgentTestTrait { class KHttpClientTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
headers.put("User-Agent", "khttp") headers.put("User-Agent", "khttp")
// khttp applies the same timeout for both connect and read // khttp applies the same timeout for both connect and read
def timeoutSeconds = CONNECT_TIMEOUT_MS / 1000 def timeoutSeconds = CONNECT_TIMEOUT_MS / 1000
def response = KHttp.request(method, uri.toString(), headers, Collections.emptyMap(), null, null, null, null, timeoutSeconds) def response = KHttp.request(method, uri.toString(), headers, Collections.emptyMap(), null, null, null, null, timeoutSeconds)
if (callback != null) {
callback.call()
}
return response.statusCode return response.statusCode
} }
@ -26,6 +23,11 @@ class KHttpClientTest extends HttpClientTest implements AgentTestTrait {
return false return false
} }
@Override
boolean testCallback() {
return false
}
@Override @Override
String userAgent() { String userAgent() {
return "khttp" return "khttp"

View File

@ -11,16 +11,22 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
import com.ning.http.client.AsyncCompletionHandler import com.ning.http.client.AsyncCompletionHandler
import com.ning.http.client.AsyncHttpClient import com.ning.http.client.AsyncHttpClient
import com.ning.http.client.AsyncHttpClientConfig import com.ning.http.client.AsyncHttpClientConfig
import com.ning.http.client.Request
import com.ning.http.client.RequestBuilder
import com.ning.http.client.Response import com.ning.http.client.Response
import com.ning.http.client.uri.Uri
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Shared import spock.lang.Shared
class Netty38ClientTest extends HttpClientTest implements AgentTestTrait { class Netty38ClientTest extends HttpClientTest implements AgentTestTrait {
// NB: Copied from AsyncHttpClientTest.
@Shared @Shared
def clientConfig = new AsyncHttpClientConfig.Builder() def clientConfig = new AsyncHttpClientConfig.Builder()
.setRequestTimeout(TimeUnit.SECONDS.toMillis(10).toInteger()) .setRequestTimeout(TimeUnit.SECONDS.toMillis(10).toInteger())
@ -28,21 +34,32 @@ class Netty38ClientTest extends HttpClientTest implements AgentTestTrait {
@Shared @Shared
@AutoCleanup @AutoCleanup
AsyncHttpClient asyncHttpClient = new AsyncHttpClient(clientConfig) AsyncHttpClient client = new AsyncHttpClient(clientConfig)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def methodName = "prepare" + method.toLowerCase().capitalize() return client.executeRequest(buildRequest(method, uri, headers)).get().statusCode
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
} }
}).get()
return response.statusCode @Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(buildRequest(method, uri, headers), new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
return null
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
RequestBuilder requestBuilder = new RequestBuilder(method)
.setUri(Uri.create(uri.toString()))
headers.entrySet().each {
requestBuilder.addHeader(it.key, it.value)
}
return requestBuilder.build()
} }
@Override @Override

View File

@ -11,11 +11,14 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
import com.ning.http.client.AsyncCompletionHandler import com.ning.http.client.AsyncCompletionHandler
import com.ning.http.client.AsyncHttpClient import com.ning.http.client.AsyncHttpClient
import com.ning.http.client.AsyncHttpClientConfig import com.ning.http.client.AsyncHttpClientConfig
import com.ning.http.client.Request
import com.ning.http.client.RequestBuilder
import com.ning.http.client.Response import com.ning.http.client.Response
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Shared import spock.lang.Shared
@ -28,21 +31,32 @@ class Netty38ClientTest extends HttpClientTest implements AgentTestTrait {
@Shared @Shared
@AutoCleanup @AutoCleanup
AsyncHttpClient asyncHttpClient = new AsyncHttpClient(clientConfig) AsyncHttpClient client = new AsyncHttpClient(clientConfig)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def methodName = "prepare" + method.toLowerCase().capitalize() return client.executeRequest(buildRequest(method, uri, headers)).get().statusCode
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
} }
}).get()
return response.statusCode @Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(buildRequest(method, uri, headers), new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
callback.accept(response.statusCode)
return null
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
RequestBuilder requestBuilder = new RequestBuilder(method)
.setUrl(uri.toString())
headers.entrySet().each {
requestBuilder.addHeader(it.key, it.value)
}
return requestBuilder.build()
} }
@Override @Override

View File

@ -3,12 +3,12 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import groovy.lang.Closure;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/* /*
Bridges from async Netty world to the sync world of our http client tests. Bridges from async Netty world to the sync world of our http client tests.
@ -16,10 +16,10 @@ When request initiated by a test gets a response, calls a given callback and com
future with response's status code. future with response's status code.
*/ */
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> { public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Closure<Void> callback; private final Consumer<Integer> callback;
private final CompletableFuture<Integer> responseCode; private final CompletableFuture<Integer> responseCode;
public ClientHandler(Closure<Void> callback, CompletableFuture<Integer> responseCode) { public ClientHandler(Consumer<Integer> callback, CompletableFuture<Integer> responseCode) {
this.callback = callback; this.callback = callback;
this.responseCode = responseCode; this.responseCode = responseCode;
} }
@ -30,7 +30,7 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
if (callback != null) { if (callback != null) {
callback.call(); callback.accept(((HttpResponse) msg).getStatus().code());
} }
HttpResponse response = (HttpResponse) msg; HttpResponse response = (HttpResponse) msg;

View File

@ -27,6 +27,7 @@ import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class Netty40ClientTest extends HttpClientTest implements AgentTestTrait { class Netty40ClientTest extends HttpClientTest implements AgentTestTrait {
@ -49,18 +50,33 @@ class Netty40ClientTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel() Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>() def result = new CompletableFuture<Integer>()
ch.pipeline().addLast(new ClientHandler(callback, result)) ch.pipeline().addLast(new ClientHandler(null, result))
def request = buildRequest(method, uri, headers)
ch.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
ch.pipeline().addLast(new ClientHandler(callback, CompletableFuture.completedFuture(0)))
def request = buildRequest(method, uri, headers)
ch.writeAndFlush(request)
}
private DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER) def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
HttpHeaders.setHost(request, uri.host) HttpHeaders.setHost(request, uri.host)
request.headers().set("user-agent", userAgent()) request.headers().set("user-agent", userAgent())
headers.each { k, v -> request.headers().set(k, v) } headers.each { k, v -> request.headers().set(k, v) }
return request
ch.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS)
} }
@Override @Override

View File

@ -3,12 +3,12 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import groovy.lang.Closure;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/* /*
Bridges from async Netty world to the sync world of our http client tests. Bridges from async Netty world to the sync world of our http client tests.
@ -16,10 +16,10 @@ When request initiated by a test gets a response, calls a given callback and com
future with response's status code. future with response's status code.
*/ */
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> { public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final Closure<Void> callback; private final Consumer<Integer> callback;
private final CompletableFuture<Integer> responseCode; private final CompletableFuture<Integer> responseCode;
public ClientHandler(Closure<Void> callback, CompletableFuture<Integer> responseCode) { public ClientHandler(Consumer<Integer> callback, CompletableFuture<Integer> responseCode) {
this.callback = callback; this.callback = callback;
this.responseCode = responseCode; this.responseCode = responseCode;
} }
@ -30,7 +30,7 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
ctx.pipeline().remove(this); ctx.pipeline().remove(this);
if (callback != null) { if (callback != null) {
callback.call(); callback.accept(((HttpResponse) msg).status().code());
} }
HttpResponse response = (HttpResponse) msg; HttpResponse response = (HttpResponse) msg;

View File

@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class Netty41ClientTest extends HttpClientTest implements AgentTestTrait { class Netty41ClientTest extends HttpClientTest implements AgentTestTrait {
@ -57,19 +58,34 @@ class Netty41ClientTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel() Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
def result = new CompletableFuture<Integer>() def result = new CompletableFuture<Integer>()
ch.pipeline().addLast(new ClientHandler(callback, result)) ch.pipeline().addLast(new ClientHandler(null, result))
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER) def request = buildRequest(method, uri, headers)
request.headers().set(HttpHeaderNames.HOST, uri.host)
headers.each { k, v -> request.headers().set(k, v) }
ch.writeAndFlush(request).get() ch.writeAndFlush(request).get()
return result.get(20, TimeUnit.SECONDS) return result.get(20, TimeUnit.SECONDS)
} }
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Channel ch = bootstrap.connect(uri.host, uri.port).sync().channel()
ch.pipeline().addLast(new ClientHandler(callback, CompletableFuture.completedFuture(0)))
def request = buildRequest(method, uri, headers)
ch.writeAndFlush(request)
}
private static DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
request.headers().set(HttpHeaderNames.HOST, uri.host)
headers.each { k, v -> request.headers().set(k, v) }
return request
}
@Override @Override
boolean testRedirects() { boolean testRedirects() {
false false

View File

@ -1,55 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static java.util.concurrent.TimeUnit.SECONDS
import com.squareup.okhttp.Callback
import com.squareup.okhttp.Headers
import com.squareup.okhttp.MediaType
import com.squareup.okhttp.Request
import com.squareup.okhttp.RequestBody
import com.squareup.okhttp.Response
import com.squareup.okhttp.internal.http.HttpMethod
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
class OkHttp2AsyncTest extends OkHttp2Test {
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null
def request = new Request.Builder()
.url(uri.toURL())
.method(method, body)
.headers(Headers.of(HeadersUtil.headersToArray(headers)))
.build()
AtomicReference<Response> responseRef = new AtomicReference()
AtomicReference<Exception> exRef = new AtomicReference()
def latch = new CountDownLatch(1)
client.newCall(request).enqueue(new Callback() {
void onResponse(Response response) {
responseRef.set(response)
callback?.call()
latch.countDown()
}
void onFailure(Request req, IOException e) {
exRef.set(e)
latch.countDown()
}
})
latch.await(20, SECONDS)
if (exRef.get() != null) {
throw exRef.get()
}
return responseRef.get().code()
}
@Override
boolean testCausality() {
false
}
}

View File

@ -3,15 +3,18 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import com.squareup.okhttp.Callback
import com.squareup.okhttp.Headers import com.squareup.okhttp.Headers
import com.squareup.okhttp.MediaType import com.squareup.okhttp.MediaType
import com.squareup.okhttp.OkHttpClient import com.squareup.okhttp.OkHttpClient
import com.squareup.okhttp.Request import com.squareup.okhttp.Request
import com.squareup.okhttp.RequestBody import com.squareup.okhttp.RequestBody
import com.squareup.okhttp.Response
import com.squareup.okhttp.internal.http.HttpMethod import com.squareup.okhttp.internal.http.HttpMethod
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class OkHttp2Test extends HttpClientTest implements AgentTestTrait { class OkHttp2Test extends HttpClientTest implements AgentTestTrait {
@ -23,17 +26,33 @@ class OkHttp2Test extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null def response = client.newCall(buildRequest(method, uri, headers)).execute()
return response.code()
}
def request = new Request.Builder() @Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
client.newCall(buildRequest(method, uri, headers)).enqueue(new Callback() {
@Override
void onFailure(Request request, IOException e) {
throw e
}
@Override
void onResponse(Response response) throws IOException {
callback.accept(response.code())
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null
return new Request.Builder()
.url(uri.toURL()) .url(uri.toURL())
.method(method, body) .method(method, body)
.headers(Headers.of(HeadersUtil.headersToArray(headers))) .headers(Headers.of(HeadersUtil.headersToArray(headers)))
.build() .build()
def response = client.newCall(request).execute()
callback?.call()
return response.code()
} }
boolean testRedirects() { boolean testRedirects() {

View File

@ -1,17 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.okhttp.v3_0
import io.opentelemetry.instrumentation.okhttp.v3_0.AbstractOkHttp3AsyncTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import okhttp3.OkHttpClient
class OkHttp3AsyncTest extends AbstractOkHttp3AsyncTest implements AgentTestTrait {
@Override
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
return clientBuilder
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.okhttp.v3_0
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import okhttp3.OkHttpClient
import spock.lang.Ignore
// Async test relies on javaagent concurrency instrumentation currently.
@Ignore
class OkHttp3AsyncTest extends AbstractOkHttp3AsyncTest implements LibraryTestTrait {
@Override
OkHttpClient.Builder configureClient(OkHttpClient.Builder clientBuilder) {
return clientBuilder.addInterceptor(OkHttpTracing.create(getOpenTelemetry()).newInterceptor())
}
// library instrumentation doesn't have a good way of suppressing nested CLIENT spans yet
@Override
boolean testWithClientParent() {
false
}
}

View File

@ -19,4 +19,11 @@ class OkHttp3Test extends AbstractOkHttp3Test implements LibraryTestTrait {
boolean testWithClientParent() { boolean testWithClientParent() {
false false
} }
// TODO(anuraaga): Enable after https://github.com/open-telemetry/opentelemetry-java/blob/main/context/src/main/java/io/opentelemetry/context/Context.java#L128
// is released.
@Override
boolean testCallback() {
false
}
} }

View File

@ -1,61 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.okhttp.v3_0
import static java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers
import okhttp3.MediaType
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.Response
import okhttp3.internal.http.HttpMethod
abstract class AbstractOkHttp3AsyncTest extends AbstractOkHttp3Test {
@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null
def request = new Request.Builder()
.url(uri.toURL())
.method(method, body)
.headers(Headers.of(headers))
.build()
AtomicReference<Response> responseRef = new AtomicReference()
AtomicReference<Exception> exRef = new AtomicReference()
def latch = new CountDownLatch(1)
client.newCall(request).enqueue(new Callback() {
void onResponse(Call call, Response response) {
responseRef.set(response)
callback?.call()
latch.countDown()
}
void onFailure(Call call, IOException e) {
exRef.set(e)
latch.countDown()
}
})
// need to wait a while for tests of the connection timeout (20 seconds led to failures in CI)
latch.await(30, SECONDS)
if (exRef.get() != null) {
throw exRef.get()
}
return responseRef.get().code()
}
@Override
boolean testCausality() {
false
}
}

View File

@ -7,11 +7,15 @@ package io.opentelemetry.instrumentation.okhttp.v3_0
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers import okhttp3.Headers
import okhttp3.MediaType import okhttp3.MediaType
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
import okhttp3.RequestBody import okhttp3.RequestBody
import okhttp3.Response
import okhttp3.internal.http.HttpMethod import okhttp3.internal.http.HttpMethod
import spock.lang.Shared import spock.lang.Shared
@ -27,15 +31,34 @@ abstract class AbstractOkHttp3Test extends HttpClientTest {
.build() .build()
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
def request = buildRequest(method, uri, headers)
def response = client.newCall(request).execute()
return response.code()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = buildRequest(method, uri, headers)
client.newCall(request).enqueue(new Callback() {
@Override
void onFailure(Call call, IOException e) {
throw e
}
@Override
void onResponse(Call call, Response response) throws IOException {
callback.accept(response.code())
}
})
}
private static Request buildRequest(String method, URI uri, Map<String, String> headers) {
def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null def body = HttpMethod.requiresRequestBody(method) ? RequestBody.create(MediaType.parse("text/plain"), "") : null
def request = new Request.Builder() return new Request.Builder()
.url(uri.toURL()) .url(uri.toURL())
.method(method, body) .method(method, body)
.headers(Headers.of(headers)).build() .headers(Headers.of(headers)).build()
def response = client.newCall(request).execute()
callback?.call()
return response.code()
} }
boolean testRedirects() { boolean testRedirects() {

View File

@ -3,16 +3,20 @@
* SPDX-License-Identifier: Apache-2.0 * SPDX-License-Identifier: Apache-2.0
*/ */
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import play.libs.ws.StandaloneWSClient import play.libs.ws.StandaloneWSClient
import play.libs.ws.StandaloneWSRequest import play.libs.ws.StandaloneWSRequest
import play.libs.ws.StandaloneWSResponse import play.libs.ws.StandaloneWSResponse
import play.libs.ws.ahc.StandaloneAhcWSClient import play.libs.ws.ahc.StandaloneAhcWSClient
import scala.Function1
import scala.collection.JavaConverters import scala.collection.JavaConverters
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.util.Try
import spock.lang.Shared import spock.lang.Shared
class PlayJavaWsClientTestBase extends PlayWsClientTestBaseBase { class PlayJavaWsClientTestBase extends PlayWsClientTestBaseBase {
@ -20,16 +24,22 @@ class PlayJavaWsClientTestBase extends PlayWsClientTestBaseBase {
StandaloneWSClient wsClient StandaloneWSClient wsClient
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).toCompletableFuture().get().status
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).thenAccept {
callback.accept(it.status)
}
}
private CompletionStage<StandaloneWSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
StandaloneWSRequest wsRequest = wsClient.url(uri.toURL().toString()).setFollowRedirects(true) StandaloneWSRequest wsRequest = wsClient.url(uri.toURL().toString()).setFollowRedirects(true)
headers.entrySet().each { entry -> wsRequest.addHeader(entry.getKey(), entry.getValue()) } headers.entrySet().each { entry -> wsRequest.addHeader(entry.getKey(), entry.getValue()) }
StandaloneWSResponse wsResponse = wsRequest.setMethod(method).execute() return wsRequest.setMethod(method).execute()
.whenComplete({ response, throwable ->
callback?.call()
}).toCompletableFuture().get(5, TimeUnit.SECONDS)
return wsResponse.getStatus()
} }
def setupSpec() { def setupSpec() {
@ -46,19 +56,30 @@ class PlayJavaStreamedWsClientTestBase extends PlayWsClientTestBaseBase {
StandaloneWSClient wsClient StandaloneWSClient wsClient
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).toCompletableFuture().get().status
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).thenAccept {
callback.accept(it.status)
}
}
private CompletionStage<StandaloneWSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
StandaloneWSRequest wsRequest = wsClient.url(uri.toURL().toString()).setFollowRedirects(true) StandaloneWSRequest wsRequest = wsClient.url(uri.toURL().toString()).setFollowRedirects(true)
headers.entrySet().each { entry -> wsRequest.addHeader(entry.getKey(), entry.getValue()) } headers.entrySet().each { entry -> wsRequest.addHeader(entry.getKey(), entry.getValue()) }
StandaloneWSResponse wsResponse = wsRequest.setMethod(method).stream() CompletionStage<StandaloneWSResponse> stream = wsRequest.setMethod(method).stream()
.whenComplete({ response, throwable ->
callback?.call()
}).toCompletableFuture().get(5, TimeUnit.SECONDS)
// The status can be ready before the body so explicitly call wait for body to be ready // The status can be ready before the body so explicitly call wait for body to be ready
wsResponse.getBodyAsSource().runFold("", { acc, out -> "" }, materializer) return stream
.toCompletableFuture().get(5, TimeUnit.SECONDS) .thenCompose { StandaloneWSResponse response ->
return wsResponse.getStatus() response.getBodyAsSource().runFold("", { acc, out -> "" }, materializer)
}
.thenCombine(stream) { String body, StandaloneWSResponse response ->
response
}
} }
def setupSpec() { def setupSpec() {
@ -75,20 +96,30 @@ class PlayScalaWsClientTestBase extends PlayWsClientTestBaseBase {
play.api.libs.ws.StandaloneWSClient wsClient play.api.libs.ws.StandaloneWSClient wsClient
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = wsClient.url(uri.toURL().toString()) Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = sendRequest(method, uri, headers)
play.api.libs.ws.StandaloneWSResponse wsResponse = Await.result(futureResponse, Duration.apply(5, TimeUnit.SECONDS))
return wsResponse.status()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = sendRequest(method, uri, headers)
futureResponse.onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
@Override
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
callback.accept(response.get().status())
return null
}
}, ExecutionContext.global())
}
private Future<play.api.libs.ws.StandaloneWSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
return wsClient.url(uri.toURL().toString())
.withMethod(method) .withMethod(method)
.withFollowRedirects(true) .withFollowRedirects(true)
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq()) .withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq())
.execute() .execute()
.transform({ theTry ->
callback?.call()
theTry
}, ExecutionContext.global())
play.api.libs.ws.StandaloneWSResponse wsResponse = Await.result(futureResponse, Duration.apply(5, TimeUnit.SECONDS))
return wsResponse.status()
} }
def setupSpec() { def setupSpec() {
@ -105,24 +136,43 @@ class PlayScalaStreamedWsClientTestBase extends PlayWsClientTestBaseBase {
play.api.libs.ws.StandaloneWSClient wsClient play.api.libs.ws.StandaloneWSClient wsClient
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = sendRequest(method, uri, headers)
play.api.libs.ws.StandaloneWSResponse wsResponse = Await.result(futureResponse, Duration.apply(5, TimeUnit.SECONDS))
return wsResponse.status()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = sendRequest(method, uri, headers)
futureResponse.onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
@Override
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
callback.accept(response.get().status())
return null
}
}, ExecutionContext.global())
}
private Future<play.api.libs.ws.StandaloneWSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = wsClient.url(uri.toURL().toString()) Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = wsClient.url(uri.toURL().toString())
.withMethod(method) .withMethod(method)
.withFollowRedirects(true) .withFollowRedirects(true)
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq()) .withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq())
.stream() .stream()
.transform({ theTry ->
callback?.call()
theTry
}, ExecutionContext.global())
play.api.libs.ws.StandaloneWSResponse wsResponse = Await.result(futureResponse, Duration.apply(5, TimeUnit.SECONDS))
// The status can be ready before the body so explicitly call wait for body to be ready // The status can be ready before the body so explicitly call wait for body to be ready
Await.result( Future<String> bodyResponse = futureResponse.flatMap(new Function1<play.api.libs.ws.StandaloneWSResponse, Future<String>>() {
wsResponse.bodyAsSource().runFold("", { acc, out -> "" }, materializer), @Override
Duration.apply(5, TimeUnit.SECONDS)) Future<String> apply(play.api.libs.ws.StandaloneWSResponse wsResponse) {
return wsResponse.status() return wsResponse.bodyAsSource().runFold("", { acc, out -> "" }, materializer)
}
}, ExecutionContext.global())
return bodyResponse.flatMap(new Function1<String, Future<play.api.libs.ws.StandaloneWSResponse>>() {
@Override
Future<play.api.libs.ws.StandaloneWSResponse> apply(String v1) {
return futureResponse
}
}, ExecutionContext.global())
} }
def setupSpec() { def setupSpec() {

View File

@ -8,8 +8,12 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.function.Consumer
import play.GlobalSettings import play.GlobalSettings
import play.libs.F
import play.libs.ws.WS import play.libs.ws.WS
import play.libs.ws.WSClient
import play.libs.ws.WSResponse
import play.test.FakeApplication import play.test.FakeApplication
import play.test.Helpers import play.test.Helpers
import spock.lang.Shared import spock.lang.Shared
@ -27,7 +31,7 @@ class PlayWsClientTest extends HttpClientTest implements AgentTestTrait {
) )
@Shared @Shared
def client WSClient client
def setupSpec() { def setupSpec() {
Helpers.start(application) Helpers.start(application)
@ -39,19 +43,23 @@ class PlayWsClientTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).get(1, TimeUnit.SECONDS).status
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).onRedeem {
callback.accept(it.status)
}
}
private F.Promise<WSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
def request = client.url(uri.toString()) def request = client.url(uri.toString())
headers.entrySet().each { headers.entrySet().each {
request.setHeader(it.key, it.value) request.setHeader(it.key, it.value)
} }
return request.execute(method)
def status = request.execute(method).map({
callback?.call()
it
}).map({
it.status
})
return status.get(1, TimeUnit.SECONDS)
} }
@Override @Override

View File

@ -7,7 +7,10 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.concurrent.CompletionStage
import java.util.function.Consumer
import play.libs.ws.WS import play.libs.ws.WS
import play.libs.ws.WSResponse
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Subject import spock.lang.Subject
@ -21,19 +24,24 @@ class PlayWsClientTest extends HttpClientTest implements AgentTestTrait {
def client = WS.newClient(-1) def client = WS.newClient(-1)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).toCompletableFuture().get().status
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).thenAccept {
callback.accept(it.status)
}
}
private CompletionStage<WSResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
def request = client.url(uri.toString()) def request = client.url(uri.toString())
headers.entrySet().each { headers.entrySet().each {
request.setHeader(it.key, it.value) request.setHeader(it.key, it.value)
} }
def status = request.execute(method).thenApply { return request.execute(method)
callback?.call()
it
}.thenApply {
it.status
}
return status.toCompletableFuture().get()
} }
//TODO see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2347 //TODO see https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2347

View File

@ -5,14 +5,29 @@
package client package client
import java.util.function.Consumer
import ratpack.exec.ExecResult import ratpack.exec.Operation
import ratpack.exec.Promise
class RatpackForkedHttpClientTest extends RatpackHttpClientTest { class RatpackForkedHttpClientTest extends RatpackHttpClientTest {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
ExecResult<Integer> result = exec.yield { exec.yield {
sendRequest(method, uri, headers)
}.value
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
exec.execute(Operation.of {
sendRequest(method, uri, headers).result {
callback.accept(it.value)
}
})
}
private Promise<Integer> sendRequest(String method, URI uri, Map<String, String> headers) {
def resp = client.request(uri) { spec -> def resp = client.request(uri) { spec ->
spec.method(method) spec.method(method)
spec.headers { headersSpec -> spec.headers { headersSpec ->
@ -22,10 +37,7 @@ class RatpackForkedHttpClientTest extends RatpackHttpClientTest {
} }
} }
return resp.fork().map { return resp.fork().map {
callback?.call()
it.status.code it.status.code
} }
} }
return result.value
}
} }

View File

@ -8,7 +8,9 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.time.Duration import java.time.Duration
import ratpack.exec.ExecResult import java.util.function.Consumer
import ratpack.exec.Operation
import ratpack.exec.Promise
import ratpack.http.client.HttpClient import ratpack.http.client.HttpClient
import ratpack.test.exec.ExecHarness import ratpack.test.exec.ExecHarness
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
@ -27,8 +29,22 @@ class RatpackHttpClientTest extends HttpClientTest implements AgentTestTrait {
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
ExecResult<Integer> result = exec.yield { return exec.yield {
sendRequest(method, uri, headers)
}.value
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
exec.execute(Operation.of {
sendRequest(method, uri, headers).result {
callback.accept(it.value)
}
})
}
private Promise<Integer> sendRequest(String method, URI uri, Map<String, String> headers) {
def resp = client.request(uri) { spec -> def resp = client.request(uri) { spec ->
spec.connectTimeout(Duration.ofSeconds(2)) spec.connectTimeout(Duration.ofSeconds(2))
spec.method(method) spec.method(method)
@ -39,12 +55,9 @@ class RatpackHttpClientTest extends HttpClientTest implements AgentTestTrait {
} }
} }
return resp.map { return resp.map {
callback?.call()
it.status.code it.status.code
} }
} }
return result.value
}
@Override @Override
boolean testRedirects() { boolean testRedirects() {

View File

@ -5,6 +5,8 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.HttpClientResponse import reactor.netty.http.client.HttpClientResponse
@ -31,19 +33,27 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest impleme
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers = [:], Closure callback = null) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
HttpClientResponse resp = createHttpClient() Mono<HttpClientResponse> response = sendRequest(method, uri, headers)
return response.block().status().code()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Mono<HttpClientResponse> response = sendRequest(method, uri, headers)
response.subscribe {
callback.accept(it.status().code())
}
}
Mono<HttpClientResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
return createHttpClient()
.followRedirect(true) .followRedirect(true)
.headers({ h -> headers.each { k, v -> h.add(k, v) } }) .headers({ h -> headers.each { k, v -> h.add(k, v) } })
.baseUrl(server.address.toString()) .baseUrl(server.address.toString())
."${method.toLowerCase()}"() ."${method.toLowerCase()}"()
.uri(uri.toString()) .uri(uri.toString())
.response() .response()
.block()
if (callback != null) {
callback.call()
}
return resp.status().code()
} }
abstract HttpClient createHttpClient() abstract HttpClient createHttpClient()

View File

@ -5,6 +5,8 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.HttpClientResponse import reactor.netty.http.client.HttpClientResponse
@ -31,19 +33,27 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest impleme
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers = [:], Closure callback = null) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
HttpClientResponse resp = createHttpClient() Mono<HttpClientResponse> response = sendRequest(method, uri, headers)
return response.block().status().code()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
Mono<HttpClientResponse> response = sendRequest(method, uri, headers)
response.subscribe {
callback.accept(it.status().code())
}
}
Mono<HttpClientResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
return createHttpClient()
.followRedirect(true) .followRedirect(true)
.headers({ h -> headers.each { k, v -> h.add(k, v) } }) .headers({ h -> headers.each { k, v -> h.add(k, v) } })
.baseUrl(server.address.toString()) .baseUrl(server.address.toString())
."${method.toLowerCase()}"() ."${method.toLowerCase()}"()
.uri(uri.toString()) .uri(uri.toString())
.response() .response()
.block()
if (callback != null) {
callback.call()
}
return resp.status().code()
} }
abstract HttpClient createHttpClient() abstract HttpClient createHttpClient()

View File

@ -6,7 +6,11 @@
import io.opentelemetry.instrumentation.spring.httpclients.RestTemplateInterceptor import io.opentelemetry.instrumentation.spring.httpclients.RestTemplateInterceptor
import io.opentelemetry.instrumentation.test.LibraryTestTrait import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod import org.springframework.http.HttpMethod
import org.springframework.http.ResponseEntity
import org.springframework.web.client.ResourceAccessException import org.springframework.web.client.ResourceAccessException
import org.springframework.web.client.RestTemplate import org.springframework.web.client.RestTemplate
import spock.lang.Shared import spock.lang.Shared
@ -23,19 +27,27 @@ class RestTemplateInstrumentationTest extends HttpClientTest implements LibraryT
} }
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
try { try {
return restTemplate.execute(uri, HttpMethod.valueOf(method), { request -> def httpHeaders = new HttpHeaders()
headers.forEach(request.getHeaders().&add) headers.each { httpHeaders.put(it.key, [it.value]) }
}, { response -> def request = new HttpEntity<String>(httpHeaders)
callback?.call() ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.valueOf(method), request, String)
response.statusCode.value() return response.statusCode.value()
})
} catch (ResourceAccessException exception) { } catch (ResourceAccessException exception) {
throw exception.getCause() throw exception.getCause()
} }
} }
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
restTemplate.execute(uri, HttpMethod.valueOf(method), { request ->
headers.forEach(request.getHeaders().&add)
}, { response ->
callback.accept(response.statusCode.value())
})
}
@Override @Override
boolean testCircularRedirects() { boolean testCircularRedirects() {
false false

View File

@ -7,24 +7,31 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import java.util.function.Consumer
import org.springframework.http.HttpMethod import org.springframework.http.HttpMethod
import org.springframework.web.reactive.function.client.ClientResponse import org.springframework.web.reactive.function.client.ClientResponse
import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Mono
class SpringWebfluxHttpClientTest extends HttpClientTest implements AgentTestTrait { class SpringWebfluxHttpClientTest extends HttpClientTest implements AgentTestTrait {
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
ClientResponse response = WebClient.builder().build().method(HttpMethod.resolve(method)) return sendRequest(method, uri, headers).block().statusCode().value()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers).subscribe {
callback.accept(it.statusCode().value())
}
}
private static Mono<ClientResponse> sendRequest(String method, URI uri, Map<String, String> headers) {
return WebClient.builder().build().method(HttpMethod.resolve(method))
.uri(uri) .uri(uri)
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) } .headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
.exchange() .exchange()
.doAfterSuccessOrError { res, ex ->
callback?.call()
}
.block()
response.statusCode().value()
} }
boolean testRedirects() { boolean testRedirects() {

View File

@ -13,8 +13,10 @@ import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions import io.vertx.ext.web.client.WebClientOptions
import io.vertx.reactivex.circuitbreaker.CircuitBreaker import io.vertx.reactivex.circuitbreaker.CircuitBreaker
import io.vertx.reactivex.core.Vertx import io.vertx.reactivex.core.Vertx
import io.vertx.reactivex.ext.web.client.HttpRequest
import io.vertx.reactivex.ext.web.client.WebClient import io.vertx.reactivex.ext.web.client.WebClient
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class VertxRxCircuitBreakerWebClientTest extends HttpClientTest implements AgentTestTrait { class VertxRxCircuitBreakerWebClientTest extends HttpClientTest implements AgentTestTrait {
@ -32,12 +34,19 @@ class VertxRxCircuitBreakerWebClientTest extends HttpClientTest implements Agent
) )
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
def request = client.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri") // VertxRx doesn't seem to provide a synchronous API at all for circuit breaker. Bridge through
headers.each { request.putHeader(it.key, it.value) } // a callback.
CompletableFuture<Integer> future = new CompletableFuture<>()
def future = new CompletableFuture<Integer>() doRequestWithCallback(method, uri, headers) {
future.complete(it)
}
return future.get()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = buildRequest(method, uri, headers)
breaker.executeCommand({command -> breaker.executeCommand({command ->
request.rxSend().doOnSuccess { request.rxSend().doOnSuccess {
command.complete(it) command.complete(it)
@ -45,14 +54,14 @@ class VertxRxCircuitBreakerWebClientTest extends HttpClientTest implements Agent
command.fail(it) command.fail(it)
}.subscribe() }.subscribe()
}, { }, {
callback?.call() callback.accept(it.result().statusCode())
if (it.succeeded()) {
future.complete(it.result().statusCode())
} else {
future.completeExceptionally(it.cause())
}
}) })
return future.get() }
private HttpRequest<?> buildRequest(String method, URI uri, Map<String, String> headers) {
def request = client.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) }
return request
} }
@Override @Override

View File

@ -7,11 +7,14 @@ package client
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.reactivex.Single
import io.vertx.core.VertxOptions import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpMethod import io.vertx.core.http.HttpMethod
import io.vertx.ext.web.client.WebClientOptions import io.vertx.ext.web.client.WebClientOptions
import io.vertx.reactivex.core.Vertx import io.vertx.reactivex.core.Vertx
import io.vertx.reactivex.ext.web.client.HttpResponse
import io.vertx.reactivex.ext.web.client.WebClient import io.vertx.reactivex.ext.web.client.WebClient
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class VertxRxWebClientTest extends HttpClientTest implements AgentTestTrait { class VertxRxWebClientTest extends HttpClientTest implements AgentTestTrait {
@ -24,15 +27,25 @@ class VertxRxWebClientTest extends HttpClientTest implements AgentTestTrait {
WebClient client = WebClient.create(vertx, clientOptions) WebClient client = WebClient.create(vertx, clientOptions)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
return sendRequest(method, uri, headers).blockingGet().statusCode()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
sendRequest(method, uri, headers)
.subscribe(new io.reactivex.functions.Consumer<HttpResponse<?>>() {
@Override
void accept(HttpResponse<?> httpResponse) throws Exception {
callback.accept(httpResponse.statusCode())
}
})
}
private Single<HttpResponse<?>> sendRequest(String method, URI uri, Map<String, String> headers) {
def request = client.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri") def request = client.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) } headers.each { request.putHeader(it.key, it.value) }
return request return request.rxSend()
.rxSend()
.doOnSuccess { response -> callback?.call() }
.map { it.statusCode() }
.toObservable()
.blockingFirst()
} }
@Override @Override

View File

@ -11,9 +11,9 @@ import io.opentelemetry.instrumentation.test.base.SingleConnection
import io.vertx.core.Vertx import io.vertx.core.Vertx
import io.vertx.core.VertxOptions import io.vertx.core.VertxOptions
import io.vertx.core.http.HttpClientOptions import io.vertx.core.http.HttpClientOptions
import io.vertx.core.http.HttpClientResponse
import io.vertx.core.http.HttpMethod import io.vertx.core.http.HttpMethod
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import spock.lang.Shared import spock.lang.Shared
class VertxHttpClientTest extends HttpClientTest implements AgentTestTrait { class VertxHttpClientTest extends HttpClientTest implements AgentTestTrait {
@ -26,17 +26,23 @@ class VertxHttpClientTest extends HttpClientTest implements AgentTestTrait {
def httpClient = vertx.createHttpClient(clientOptions) def httpClient = vertx.createHttpClient(clientOptions)
@Override @Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) { int doRequest(String method, URI uri, Map<String, String> headers) {
CompletableFuture<HttpClientResponse> future = new CompletableFuture<>() // Vertx doesn't seem to provide any synchronous API so bridge through a callback
CompletableFuture<Integer> future = new CompletableFuture<>()
doRequestWithCallback(method, uri, headers) {
future.complete(it)
}
return future.get()
}
@Override
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
def request = httpClient.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri") def request = httpClient.request(HttpMethod.valueOf(method), uri.port, uri.host, "$uri")
headers.each { request.putHeader(it.key, it.value) } headers.each { request.putHeader(it.key, it.value) }
request.handler { response -> request.handler { response ->
callback?.call() callback.accept(response.statusCode())
future.complete(response)
} }
request.end() request.end()
return future.get().statusCode()
} }
@Override @Override

View File

@ -27,10 +27,12 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.function.Consumer
import spock.lang.AutoCleanup import spock.lang.AutoCleanup
import spock.lang.Requires import spock.lang.Requires
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Unroll import spock.lang.Unroll
import spock.util.concurrent.BlockingVariable
@Unroll @Unroll
abstract class HttpClientTest extends InstrumentationSpecification { abstract class HttpClientTest extends InstrumentationSpecification {
@ -86,11 +88,60 @@ abstract class HttpClientTest extends InstrumentationSpecification {
} }
/** /**
* Make the request and return the status code response * Make the request and return the status code of the response synchronously. Some clients, e.g.,
* @param method * HTTPUrlConnection only support synchronous execution without callbacks, and many offer a
* @return * dedicated API for invoking synchronously, such as OkHttp's execute method. When implementing
* this method, such an API should be used and the HTTP status code of the response returned,
* for example:
*
* @Override
* int doRequest(String method, URI uri, Map<String, String headers = [:]) {
* HttpResponse response = client.execute(new Request(method, uri, headers))
* return response.statusCode()
* }
*
* If there is no synchronous API available at all, for example as in Vert.X, a CompletableFuture
* can be used to block on a result, for example:
*
* @Override
* int doRequest(String method, URI uri, Map<String, String headers = [:]) {
* CompletableFuture<Integer> result = new CompletableFuture<>()
* doRequestWithCallback(method, uri, headers) {
* result.complete(it.statusCode())
* }
* return result.join()
* }
*/ */
abstract int doRequest(String method, URI uri, Map<String, String> headers = [:], Closure callback = null) abstract int doRequest(String method, URI uri, Map<String, String> headers = [:])
/**
* Maks the request and return the status code of the response through the callback. This method
* should be implemented if the client offers any request execution methods that accept a callback
* which receives the response. This will generally be an API for asynchronous execution of a
* request, such as OkHttp's enqueue method, but may also be a callback executed synchronously,
* such as ApacheHttpClient's response handler callbacks. This method is used in tests to verify
* the context is propagated correctly to such callbacks.
*
* @Override
* void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
* // Hypothetical client accepting a callback
* client.executeAsync(new Request(method, uri, headers)) {
* callback.accept(it.statusCode())
* }
*
* // Hypothetical client returning a CompletableFuture
* client.executeAsync(new Request(method, uri, headers)).thenAccept {
* callback.accept(it.statusCode())
* }
* }
*
* If the client offers no APIs that accept callbacks, then this method should not be implemented
* and instead, {@link #testCallback} should be implemented to return false.
*/
void doRequestWithCallback(String method, URI uri, Map<String, String> headers = [:], Consumer<Integer> callback) {
// Must be implemented if testAsync is true
throw new UnsupportedOperationException()
}
Integer statusOnRedirectError() { Integer statusOnRedirectError() {
return null return null
@ -195,17 +246,21 @@ abstract class HttpClientTest extends InstrumentationSpecification {
def "trace request with callback and parent"() { def "trace request with callback and parent"() {
given: given:
assumeTrue(testCallback())
assumeTrue(testCallbackWithParent()) assumeTrue(testCallbackWithParent())
def status = new BlockingVariable<Integer>()
when: when:
def status = runUnderTrace("parent") { runUnderTrace("parent") {
doRequest(method, server.address.resolve("/success"), ["is-test-server": "false"]) { doRequestWithCallback(method, server.address.resolve("/success"), ["is-test-server": "false"]) {
runUnderTrace("child") {} runUnderTrace("child") {}
status.set(it)
} }
} }
then: then:
status == 200 status.get() == 200
// only one trace (client). // only one trace (client).
assertTraces(1) { assertTraces(1) {
trace(0, 3 + extraClientSpans()) { trace(0, 3 + extraClientSpans()) {
@ -220,14 +275,20 @@ abstract class HttpClientTest extends InstrumentationSpecification {
} }
def "trace request with callback and no parent"() { def "trace request with callback and no parent"() {
given:
assumeTrue(testCallback())
def status = new BlockingVariable<Integer>()
when: when:
def status = doRequest(method, server.address.resolve("/success"), ["is-test-server": "false"]) { doRequestWithCallback(method, server.address.resolve("/success"), ["is-test-server": "false"]) {
runUnderTrace("callback") { runUnderTrace("callback") {
} }
status.set(it)
} }
then: then:
status == 200 status.get() == 200
// only one trace (client). // only one trace (client).
assertTraces(2) { assertTraces(2) {
trace(0, 1 + extraClientSpans()) { trace(0, 1 + extraClientSpans()) {
@ -656,6 +717,10 @@ abstract class HttpClientTest extends InstrumentationSpecification {
true true
} }
boolean testCallback() {
return true
}
boolean testCallbackWithParent() { boolean testCallbackWithParent() {
// FIXME: this hack is here because callback with parent is broken in play-ws when the stream() // FIXME: this hack is here because callback with parent is broken in play-ws when the stream()
// function is used. There is no way to stop a test from a derived class hence the flag // function is used. There is no way to stop a test from a derived class hence the flag