Migrate HttpClientTest test server to Armeria (#3225)
* Use Armeria for test HTTP server. * Continue * Migrate test http server to Armeria. * Finish * Use junit extension * Remove unused. * Use localhost for net peer name. * Block for full response in recator-netty tests. * Handle split responses in netty41 and akka * Typo
This commit is contained in:
parent
70c8a0c5e2
commit
7ae23fc694
|
@ -1,12 +1,5 @@
|
|||
apply from: "$rootDir/gradle/instrumentation.gradle"
|
||||
apply from: "$rootDir/gradle/test-with-scala.gradle"
|
||||
apply plugin: 'org.unbroken-dome.test-sets'
|
||||
|
||||
testSets {
|
||||
version101Test {
|
||||
dirName = 'test'
|
||||
}
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
|
@ -50,17 +43,6 @@ muzzle {
|
|||
dependencies {
|
||||
library "com.typesafe.akka:akka-http_2.11:10.0.0"
|
||||
library "com.typesafe.akka:akka-stream_2.11:2.4.14"
|
||||
|
||||
// There are some internal API changes in 10.1 that we would like to test separately for
|
||||
version101TestImplementation "com.typesafe.akka:akka-http_2.11:10.1.0"
|
||||
version101TestImplementation "com.typesafe.akka:akka-stream_2.11:2.5.11"
|
||||
}
|
||||
|
||||
test.dependsOn version101Test
|
||||
|
||||
compileVersion101TestGroovy {
|
||||
classpath = classpath.plus(files(compileVersion101TestScala.destinationDir))
|
||||
dependsOn compileVersion101TestScala
|
||||
}
|
||||
|
||||
tasks.withType(Test).configureEach {
|
||||
|
|
|
@ -10,11 +10,13 @@ import akka.actor.ActorSystem
|
|||
import akka.http.javadsl.Http
|
||||
import akka.http.javadsl.model.HttpMethods
|
||||
import akka.http.javadsl.model.HttpRequest
|
||||
import akka.http.javadsl.model.HttpResponse
|
||||
import akka.http.javadsl.model.headers.RawHeader
|
||||
import akka.stream.ActorMaterializer
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.base.HttpClientTest
|
||||
import io.opentelemetry.instrumentation.test.base.SingleConnection
|
||||
import java.util.concurrent.TimeUnit
|
||||
import spock.lang.Shared
|
||||
|
||||
class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
|
||||
|
@ -33,17 +35,20 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
|
|||
|
||||
@Override
|
||||
int sendRequest(HttpRequest request, String method, URI uri, Map<String, String> headers) {
|
||||
return Http.get(system)
|
||||
HttpResponse response = Http.get(system)
|
||||
.singleRequest(request, materializer)
|
||||
.toCompletableFuture()
|
||||
.get()
|
||||
.status()
|
||||
.intValue()
|
||||
.get(10, TimeUnit.SECONDS)
|
||||
|
||||
response.discardEntityBytes(materializer)
|
||||
|
||||
return response.status().intValue()
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
|
||||
Http.get(system).singleRequest(request, materializer).whenComplete {response, throwable ->
|
||||
response.discardEntityBytes(materializer)
|
||||
requestResult.complete({ response.status().intValue() }, throwable)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ abstract class AbstractGoogleHttpClientTest extends HttpClientTest<HttpRequest>
|
|||
|
||||
def "error traces when exception is not thrown"() {
|
||||
given:
|
||||
def uri = server.address.resolve("/error")
|
||||
def uri = resolveAddress("/error")
|
||||
|
||||
when:
|
||||
def responseCode = doRequest(method, uri)
|
||||
|
@ -85,7 +85,7 @@ abstract class AbstractGoogleHttpClientTest extends HttpClientTest<HttpRequest>
|
|||
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
|
||||
}
|
||||
}
|
||||
server.distributedRequestSpan(it, 1, span(0))
|
||||
serverSpan(it, 1, span(0))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
@Unroll
|
||||
def "trace request (useCaches: #useCaches)"() {
|
||||
setup:
|
||||
def url = server.address.resolve("/success").toURL()
|
||||
def url = resolveAddress("/success").toURL()
|
||||
runUnderTrace("someTrace") {
|
||||
HttpURLConnection connection = url.openConnection()
|
||||
connection.useCaches = useCaches
|
||||
|
@ -109,7 +109,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.HTTP_URL.key}" "$url"
|
||||
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE.key}" STATUS
|
||||
|
@ -130,7 +130,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.HTTP_URL.key}" "$url"
|
||||
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE.key}" STATUS
|
||||
|
@ -153,7 +153,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
|
||||
def "test broken API usage"() {
|
||||
setup:
|
||||
def url = server.address.resolve("/success").toURL()
|
||||
def url = resolveAddress("/success").toURL()
|
||||
HttpURLConnection connection = runUnderTrace("someTrace") {
|
||||
HttpURLConnection connection = url.openConnection()
|
||||
connection.setRequestProperty("Connection", "close")
|
||||
|
@ -177,7 +177,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
childOf span(0)
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.HTTP_URL.key}" "$url"
|
||||
"${SemanticAttributes.HTTP_METHOD.key}" "GET"
|
||||
|
@ -198,7 +198,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
|
||||
def "test post request"() {
|
||||
setup:
|
||||
def url = server.address.resolve("/success").toURL()
|
||||
def url = resolveAddress("/success").toURL()
|
||||
runUnderTrace("someTrace") {
|
||||
HttpURLConnection connection = url.openConnection()
|
||||
connection.setRequestMethod("POST")
|
||||
|
@ -236,7 +236,7 @@ class HttpUrlConnectionTest extends HttpClientTest<HttpURLConnection> implements
|
|||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort()
|
||||
"${SemanticAttributes.HTTP_URL.key}" "$url"
|
||||
"${SemanticAttributes.HTTP_METHOD.key}" "POST"
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE.key}" STATUS
|
||||
|
|
|
@ -3,25 +3,23 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.base.HttpClientTest
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.net.http.HttpClient
|
||||
import java.net.http.HttpRequest
|
||||
import java.net.http.HttpResponse
|
||||
import java.time.Duration
|
||||
import java.time.temporal.ChronoUnit
|
||||
import spock.lang.Requires
|
||||
import spock.lang.Shared
|
||||
|
||||
class JdkHttpClientTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
|
||||
|
||||
@Shared
|
||||
def client = HttpClient.newBuilder().connectTimeout(Duration.of(CONNECT_TIMEOUT_MS,
|
||||
ChronoUnit.MILLIS)).followRedirects(HttpClient.Redirect.NORMAL).build()
|
||||
def client = HttpClient.newBuilder()
|
||||
.version(HttpClient.Version.HTTP_1_1)
|
||||
.connectTimeout(Duration.of(CONNECT_TIMEOUT_MS, ChronoUnit.MILLIS))
|
||||
.followRedirects(HttpClient.Redirect.NORMAL)
|
||||
.build()
|
||||
|
||||
@Override
|
||||
HttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
|
@ -64,40 +62,4 @@ class JdkHttpClientTest extends HttpClientTest<HttpRequest> implements AgentTest
|
|||
boolean testErrorWithCallback() {
|
||||
return false
|
||||
}
|
||||
|
||||
@Requires({ !System.getProperty("java.vm.name").contains("IBM J9 VM") })
|
||||
def "test https request"() {
|
||||
given:
|
||||
def uri = new URI("https://www.google.com/")
|
||||
|
||||
when:
|
||||
def responseCode = doRequest(method, uri)
|
||||
|
||||
then:
|
||||
responseCode == 200
|
||||
assertTraces(1) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
hasNoParent()
|
||||
name expectedOperationName(method)
|
||||
kind CLIENT
|
||||
attributes {
|
||||
"${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP
|
||||
"${SemanticAttributes.NET_PEER_NAME.key}" uri.host
|
||||
"${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" }
|
||||
// Optional
|
||||
"${SemanticAttributes.NET_PEER_PORT.key}" uri.port > 0 ? uri.port : { it == null || it == 443 }
|
||||
"${SemanticAttributes.HTTP_URL.key}" { it == "${uri}" || it == "${removeFragment(uri)}" }
|
||||
"${SemanticAttributes.HTTP_METHOD.key}" method
|
||||
"${SemanticAttributes.HTTP_FLAVOR.key}" "2.0"
|
||||
"${SemanticAttributes.HTTP_STATUS_CODE.key}" responseCode
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
where:
|
||||
method = "HEAD"
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ abstract class JaxRsClientTest extends HttpClientTest<Invocation.Builder> implem
|
|||
def "should properly convert HTTP status #statusCode to span error status"() {
|
||||
given:
|
||||
def method = "GET"
|
||||
def uri = server.address.resolve(path)
|
||||
def uri = resolveAddress(path)
|
||||
|
||||
when:
|
||||
def actualStatusCode = doRequest(method, uri)
|
||||
|
|
|
@ -23,7 +23,7 @@ class ResteasyProxyClientTest extends HttpClientTest<ResteasyProxyResource> impl
|
|||
ResteasyProxyResource buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
return new ResteasyClientBuilder()
|
||||
.build()
|
||||
.target(new ResteasyUriBuilder().uri(server.address))
|
||||
.target(new ResteasyUriBuilder().uri(resolveAddress("")))
|
||||
.proxy(ResteasyProxyResource)
|
||||
}
|
||||
|
||||
|
|
|
@ -5,8 +5,11 @@
|
|||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.util.AttributeKey;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/*
|
||||
|
@ -15,6 +18,10 @@ When request initiated by a test gets a response, calls a given callback and com
|
|||
future with response's status code.
|
||||
*/
|
||||
public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
|
||||
|
||||
private static final AttributeKey<HttpResponse> HTTP_RESPONSE =
|
||||
AttributeKey.valueOf(ClientHandler.class, "http-response");
|
||||
|
||||
private final CompletableFuture<Integer> responseCode;
|
||||
|
||||
public ClientHandler(CompletableFuture<Integer> responseCode) {
|
||||
|
@ -23,11 +30,16 @@ public class ClientHandler extends SimpleChannelInboundHandler<HttpObject> {
|
|||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
|
||||
if (msg instanceof HttpResponse) {
|
||||
if (msg instanceof FullHttpResponse) {
|
||||
ctx.pipeline().remove(this);
|
||||
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
FullHttpResponse response = (FullHttpResponse) msg;
|
||||
responseCode.complete(response.getStatus().code());
|
||||
} else if (msg instanceof HttpResponse) {
|
||||
// Headers before body have been received, store them to use when finishing the span.
|
||||
ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg);
|
||||
} else if (msg instanceof LastHttpContent) {
|
||||
ctx.pipeline().remove(this);
|
||||
responseCode.complete(ctx.channel().attr(HTTP_RESPONSE).get().getStatus().code());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -145,14 +145,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
|
|||
pipeline.addLast(new HttpClientCodec())
|
||||
}
|
||||
})
|
||||
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, server.address.resolve("/success").toString(), Unpooled.EMPTY_BUFFER)
|
||||
request.headers().set(HttpHeaderNames.HOST, server.address.host)
|
||||
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, resolveAddress("/success").toString(), Unpooled.EMPTY_BUFFER)
|
||||
request.headers().set(HttpHeaderNames.HOST, "localhost")
|
||||
Channel ch = null
|
||||
|
||||
when:
|
||||
// note that this is a purely asynchronous request
|
||||
runUnderTrace("parent1") {
|
||||
ch = b.connect(server.address.host, server.address.port).sync().channel()
|
||||
ch = b.connect("localhost", server.httpPort()).sync().channel()
|
||||
ch.write(request)
|
||||
ch.flush()
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
|
|||
}
|
||||
}
|
||||
clientSpan(it, 2, span(1), method)
|
||||
server.distributedRequestSpan(it, 3, span(2))
|
||||
serverSpan(it, 3, span(2))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,8 +295,9 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
|
|||
|
||||
class TracedClass {
|
||||
int tracedMethod(String method) {
|
||||
def uri = resolveAddress("/success")
|
||||
runUnderTrace("tracedMethod") {
|
||||
doRequest(method, server.address.resolve("/success"))
|
||||
doRequest(method, uri)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra
|
|||
return new SingleConnection() {
|
||||
@Override
|
||||
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
|
||||
def uri = server.address.resolve(path)
|
||||
def uri = resolveAddress(path)
|
||||
return exec.yield {
|
||||
internalSendRequest(singleConnectionClient, "GET", uri, headers)
|
||||
}.valueOrThrow
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
|
||||
|
||||
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||
|
@ -40,19 +42,27 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
return createHttpClient()
|
||||
.followRedirect(true)
|
||||
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
|
||||
.baseUrl(server.address.toString())
|
||||
.baseUrl(resolveAddress("").toString())
|
||||
."${method.toLowerCase()}"()
|
||||
.uri(uri.toString())
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers) {
|
||||
return request.response().block().status().code()
|
||||
return request.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map {
|
||||
resp
|
||||
}
|
||||
}.block().status().code()
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
|
||||
request.response().subscribe({
|
||||
request.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}.subscribe({
|
||||
requestResult.complete(it.status().code())
|
||||
}, { throwable ->
|
||||
requestResult.complete(throwable)
|
||||
|
@ -112,10 +122,13 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
|
||||
when:
|
||||
runUnderTrace("parent") {
|
||||
httpClient.baseUrl(server.address.toString())
|
||||
httpClient.baseUrl(resolveAddress("").toString())
|
||||
.get()
|
||||
.uri("/success")
|
||||
.response()
|
||||
.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
}
|
||||
|
||||
|
@ -126,7 +139,7 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
def nettyClientSpan = span(1)
|
||||
|
||||
basicSpan(it, 0, "parent")
|
||||
clientSpan(it, 1, parentSpan, "GET", server.address.resolve("/success"))
|
||||
clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success"))
|
||||
serverSpan(it, 2, nettyClientSpan)
|
||||
|
||||
assertSameSpan(parentSpan, onRequestSpan)
|
||||
|
@ -148,7 +161,10 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
runUnderTrace("parent") {
|
||||
httpClient.get()
|
||||
.uri("http://localhost:$UNUSABLE_PORT/")
|
||||
.response()
|
||||
.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
}
|
||||
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
|
||||
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.opentelemetry.instrumentation.test.base.SingleConnection
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
@ -38,9 +40,11 @@ class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
|
|||
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
|
||||
.get()
|
||||
.uri(path)
|
||||
.response()
|
||||
.block()
|
||||
.status().code()
|
||||
.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block().status().code()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9
|
||||
|
||||
import io.netty.channel.ChannelOption
|
||||
import reactor.netty.http.client.HttpClient
|
||||
import reactor.netty.tcp.TcpClient
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
||||
|
||||
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||
|
@ -45,19 +47,27 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
return createHttpClient()
|
||||
.followRedirect(true)
|
||||
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
|
||||
.baseUrl(server.address.toString())
|
||||
.baseUrl(resolveAddress("").toString())
|
||||
."${method.toLowerCase()}"()
|
||||
.uri(uri.toString())
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers) {
|
||||
return request.response().block().status().code()
|
||||
return request.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map {
|
||||
resp
|
||||
}
|
||||
}.block().status().code()
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(HttpClient.ResponseReceiver request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
|
||||
request.response().subscribe({
|
||||
request.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}.subscribe({
|
||||
requestResult.complete(it.status().code())
|
||||
}, { throwable ->
|
||||
requestResult.complete(throwable)
|
||||
|
@ -121,10 +131,13 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
|
||||
when:
|
||||
runUnderTrace("parent") {
|
||||
httpClient.baseUrl(server.address.toString())
|
||||
httpClient.baseUrl(resolveAddress("").toString())
|
||||
.get()
|
||||
.uri("/success")
|
||||
.response()
|
||||
.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
}
|
||||
|
||||
|
@ -135,7 +148,7 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
|
|||
def nettyClientSpan = span(1)
|
||||
|
||||
basicSpan(it, 0, "parent")
|
||||
clientSpan(it, 1, parentSpan, "GET", server.address.resolve("/success"))
|
||||
clientSpan(it, 1, parentSpan, "GET", resolveAddress("/success"))
|
||||
serverSpan(it, 2, nettyClientSpan)
|
||||
|
||||
assertSameSpan(parentSpan, onRequestSpan)
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
||||
|
||||
import io.netty.channel.ChannelOption
|
||||
import io.opentelemetry.instrumentation.test.base.SingleConnection
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
@ -32,7 +34,10 @@ class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest {
|
|||
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
|
||||
.get()
|
||||
.uri(path)
|
||||
.response()
|
||||
.responseSingle {resp, content ->
|
||||
// Make sure to consume content since that's when we close the span.
|
||||
content.map { resp }
|
||||
}
|
||||
.block()
|
||||
.status().code()
|
||||
}
|
|
@ -3,6 +3,8 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0
|
||||
|
||||
import io.netty.channel.ChannelOption
|
||||
import reactor.netty.http.client.HttpClient
|
||||
import reactor.netty.tcp.TcpClient
|
|
@ -72,6 +72,7 @@ include ':javaagent-api'
|
|||
include ':dependencyManagement'
|
||||
include ':testing:agent-exporter'
|
||||
include ':testing:agent-for-testing'
|
||||
include ':testing:armeria-shaded-for-testing'
|
||||
include ':testing-common'
|
||||
include ':testing-common:integration-tests'
|
||||
include ':testing-common:library-for-integration-tests'
|
||||
|
|
|
@ -8,13 +8,13 @@ package io.opentelemetry.instrumentation.test.base
|
|||
import static io.opentelemetry.api.trace.SpanKind.CLIENT
|
||||
import static io.opentelemetry.api.trace.SpanKind.SERVER
|
||||
import static io.opentelemetry.api.trace.StatusCode.ERROR
|
||||
import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer
|
||||
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicClientSpan
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderParentClientSpan
|
||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
|
||||
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP
|
||||
import static io.opentelemetry.testing.armeria.common.MediaType.PLAIN_TEXT_UTF_8
|
||||
import static org.junit.Assume.assumeTrue
|
||||
|
||||
import groovy.transform.stc.ClosureParams
|
||||
|
@ -22,19 +22,34 @@ import groovy.transform.stc.SimpleType
|
|||
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||
import io.opentelemetry.api.common.AttributeKey
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import io.opentelemetry.api.trace.SpanBuilder
|
||||
import io.opentelemetry.api.trace.Tracer
|
||||
import io.opentelemetry.context.Context
|
||||
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.asserts.AttributesAssert
|
||||
import io.opentelemetry.instrumentation.test.asserts.SpanAssert
|
||||
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
||||
import io.opentelemetry.instrumentation.test.server.http.RequestContextGetter
|
||||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import io.opentelemetry.testing.armeria.common.HttpData
|
||||
import io.opentelemetry.testing.armeria.common.HttpRequest
|
||||
import io.opentelemetry.testing.armeria.common.HttpResponse
|
||||
import io.opentelemetry.testing.armeria.common.HttpStatus
|
||||
import io.opentelemetry.testing.armeria.common.ResponseHeaders
|
||||
import io.opentelemetry.testing.armeria.common.ResponseHeadersBuilder
|
||||
import io.opentelemetry.testing.armeria.server.DecoratingHttpServiceFunction
|
||||
import io.opentelemetry.testing.armeria.server.HttpService
|
||||
import io.opentelemetry.testing.armeria.server.ServerBuilder
|
||||
import io.opentelemetry.testing.armeria.server.ServiceRequestContext
|
||||
import io.opentelemetry.testing.armeria.server.logging.LoggingService
|
||||
import io.opentelemetry.testing.armeria.testing.junit5.server.ServerExtension
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.function.Supplier
|
||||
import spock.lang.AutoCleanup
|
||||
import spock.lang.Requires
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Unroll
|
||||
|
@ -46,52 +61,79 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
protected static final BASIC_AUTH_KEY = "custom-authorization-header"
|
||||
protected static final BASIC_AUTH_VAL = "plain text auth token"
|
||||
|
||||
@AutoCleanup
|
||||
@Shared
|
||||
def server = httpServer {
|
||||
handlers {
|
||||
prefix("success") {
|
||||
handleDistributedRequest()
|
||||
String msg = "Hello."
|
||||
response.status(200).id(request.getHeader("test-request-id")).send(msg)
|
||||
}
|
||||
prefix("client-error") {
|
||||
handleDistributedRequest()
|
||||
String msg = "Invalid RQ"
|
||||
response.status(400).send(msg)
|
||||
}
|
||||
prefix("error") {
|
||||
handleDistributedRequest()
|
||||
String msg = "Sorry."
|
||||
response.status(500).send(msg)
|
||||
}
|
||||
prefix("redirect") {
|
||||
handleDistributedRequest()
|
||||
redirect(server.address.resolve("/success").toURL().toString())
|
||||
}
|
||||
prefix("another-redirect") {
|
||||
handleDistributedRequest()
|
||||
redirect(server.address.resolve("/redirect").toURL().toString())
|
||||
}
|
||||
prefix("circular-redirect") {
|
||||
handleDistributedRequest()
|
||||
redirect(server.address.resolve("/circular-redirect").toURL().toString())
|
||||
}
|
||||
prefix("secured") {
|
||||
handleDistributedRequest()
|
||||
if (request.headers.get(BASIC_AUTH_KEY) == BASIC_AUTH_VAL) {
|
||||
response.status(200).send("secured string under basic auth")
|
||||
} else {
|
||||
response.status(401).send("Unauthorized")
|
||||
Tracer tracer = openTelemetry.getTracer("test")
|
||||
|
||||
@Shared
|
||||
def server = new ServerExtension(false) {
|
||||
@Override
|
||||
protected void configure(ServerBuilder sb) throws Exception {
|
||||
sb.http(0)
|
||||
.service("/success") {ctx, req ->
|
||||
ResponseHeadersBuilder headers = ResponseHeaders.builder(HttpStatus.OK)
|
||||
def testRequestId = req.headers().get("test-request-id")
|
||||
if (testRequestId != null) {
|
||||
headers.set("test-request-id", testRequestId)
|
||||
}
|
||||
HttpResponse.of(headers.build(), HttpData.ofAscii("Hello."))
|
||||
}
|
||||
}
|
||||
prefix("to-secured") {
|
||||
handleDistributedRequest()
|
||||
redirect(server.address.resolve("/secured").toURL().toString())
|
||||
}
|
||||
.service("/client-error") {ctx, req ->
|
||||
HttpResponse.of(HttpStatus.BAD_REQUEST, PLAIN_TEXT_UTF_8, "Invalid RQ")
|
||||
}
|
||||
.service("/error") {ctx, req ->
|
||||
HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, PLAIN_TEXT_UTF_8, "Sorry.")
|
||||
}
|
||||
.service("/redirect") {ctx, req ->
|
||||
HttpResponse.ofRedirect(HttpStatus.FOUND, "/success")
|
||||
}
|
||||
.service("/another-redirect") {ctx, req ->
|
||||
HttpResponse.ofRedirect(HttpStatus.FOUND, "/redirect")
|
||||
}
|
||||
.service("/circular-redirect") {ctx, req ->
|
||||
HttpResponse.ofRedirect(HttpStatus.FOUND, "/circular-redirect")
|
||||
}
|
||||
.service("/secured") {ctx, req ->
|
||||
if (req.headers().get(BASIC_AUTH_KEY) == BASIC_AUTH_VAL) {
|
||||
return HttpResponse.of(HttpStatus.OK, PLAIN_TEXT_UTF_8, "secured string under basic auth")
|
||||
}
|
||||
return HttpResponse.of(HttpStatus.UNAUTHORIZED, PLAIN_TEXT_UTF_8, "Unauthorized")
|
||||
}
|
||||
.service("/to-secured") {ctx, req ->
|
||||
HttpResponse.ofRedirect(HttpStatus.FOUND, "/secured")
|
||||
}
|
||||
.decorator(new DecoratingHttpServiceFunction() {
|
||||
@Override
|
||||
HttpResponse serve(HttpService delegate, ServiceRequestContext ctx, HttpRequest req) {
|
||||
for (String field : openTelemetry.propagators.textMapPropagator.fields()) {
|
||||
if (req.headers().getAll(field).size() > 1) {
|
||||
throw new AssertionError((Object) ("more than one " + field + " header present"))
|
||||
}
|
||||
}
|
||||
SpanBuilder span = tracer.spanBuilder("test-http-server")
|
||||
.setSpanKind(SERVER)
|
||||
.setParent(openTelemetry.propagators.textMapPropagator.extract(Context.current(), ctx, RequestContextGetter.INSTANCE))
|
||||
|
||||
def traceRequestId = req.headers().get("test-request-id")
|
||||
if (traceRequestId != null) {
|
||||
span.setAttribute("test.request.id", Integer.parseInt(traceRequestId))
|
||||
}
|
||||
span.startSpan().end()
|
||||
|
||||
return delegate.serve(ctx, req)
|
||||
}
|
||||
})
|
||||
.decorator(LoggingService.newDecorator())
|
||||
}
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
server.start()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
server.stop()
|
||||
}
|
||||
|
||||
// ideally private, but then groovy closures in this class cannot find them
|
||||
final int doRequest(String method, URI uri, Map<String, String> headers = [:]) {
|
||||
def request = buildRequest(method, uri, headers)
|
||||
|
@ -287,13 +329,14 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
path << ["/success", "/success?with=params"]
|
||||
|
||||
method = "GET"
|
||||
url = server.address.resolve(path)
|
||||
url = resolveAddress(path)
|
||||
}
|
||||
|
||||
def "basic #method request with parent"() {
|
||||
when:
|
||||
def uri = resolveAddress("/success")
|
||||
def responseCode = runUnderTrace("parent") {
|
||||
doRequest(method, server.address.resolve("/success"))
|
||||
doRequest(method, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
|
@ -315,8 +358,9 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
assumeTrue(testWithClientParent())
|
||||
|
||||
when:
|
||||
def uri = resolveAddress("/success")
|
||||
def responseCode = runUnderParentClientSpan {
|
||||
doRequest(method, server.address.resolve("/success"))
|
||||
doRequest(method, uri)
|
||||
}
|
||||
|
||||
then:
|
||||
|
@ -347,8 +391,9 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
assumeTrue(testCallbackWithParent())
|
||||
|
||||
when:
|
||||
def uri = resolveAddress("/success")
|
||||
def requestResult = runUnderTrace("parent") {
|
||||
doRequestWithCallback(method, server.address.resolve("/success")) {
|
||||
doRequestWithCallback(method, uri) {
|
||||
runUnderTrace("child") {}
|
||||
}
|
||||
}
|
||||
|
@ -374,7 +419,8 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
assumeTrue(testCallback())
|
||||
|
||||
when:
|
||||
def requestResult = doRequestWithCallback(method, server.address.resolve("/success")) {
|
||||
def uri = resolveAddress("/success")
|
||||
def requestResult = doRequestWithCallback(method, uri) {
|
||||
runUnderTrace("callback") {
|
||||
}
|
||||
}
|
||||
|
@ -402,7 +448,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
|
||||
given:
|
||||
assumeTrue(testRedirects())
|
||||
def uri = server.address.resolve("/redirect")
|
||||
def uri = resolveAddress("/redirect")
|
||||
|
||||
when:
|
||||
def responseCode = doRequest(method, uri)
|
||||
|
@ -424,7 +470,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
def "basic #method request with 2 redirects"() {
|
||||
given:
|
||||
assumeTrue(testRedirects())
|
||||
def uri = server.address.resolve("/another-redirect")
|
||||
def uri = resolveAddress("/another-redirect")
|
||||
|
||||
when:
|
||||
def responseCode = doRequest(method, uri)
|
||||
|
@ -447,7 +493,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
def "basic #method request with circular redirects"() {
|
||||
given:
|
||||
assumeTrue(testRedirects() && testCircularRedirects())
|
||||
def uri = server.address.resolve("/circular-redirect")
|
||||
def uri = resolveAddress("/circular-redirect")
|
||||
|
||||
when:
|
||||
doRequest(method, uri)
|
||||
|
@ -473,7 +519,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
def "redirect #method to secured endpoint copies auth header"() {
|
||||
given:
|
||||
assumeTrue(testRedirects())
|
||||
def uri = server.address.resolve("/to-secured")
|
||||
def uri = resolveAddress("/to-secured")
|
||||
|
||||
when:
|
||||
|
||||
|
@ -494,7 +540,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
}
|
||||
|
||||
def "error span"() {
|
||||
def uri = server.address.resolve("/error")
|
||||
def uri = resolveAddress("/error")
|
||||
when:
|
||||
runUnderTrace("parent") {
|
||||
try {
|
||||
|
@ -539,7 +585,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
where:
|
||||
path = "/success"
|
||||
method = "GET"
|
||||
url = server.address.resolve(path)
|
||||
url = resolveAddress(path)
|
||||
}
|
||||
|
||||
// this test verifies two things:
|
||||
|
@ -564,7 +610,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
where:
|
||||
path = "/success"
|
||||
method = "GET"
|
||||
url = server.address.resolve(path)
|
||||
url = resolveAddress(path)
|
||||
}
|
||||
|
||||
def "connection error (unopened port)"() {
|
||||
|
@ -710,7 +756,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
assumeTrue(testCausality())
|
||||
int count = 50
|
||||
def method = 'GET'
|
||||
def url = server.address.resolve("/success")
|
||||
def url = resolveAddress("/success")
|
||||
|
||||
def latch = new CountDownLatch(1)
|
||||
|
||||
|
@ -757,7 +803,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
|
||||
int count = 50
|
||||
def method = 'GET'
|
||||
def url = server.address.resolve("/success")
|
||||
def url = resolveAddress("/success")
|
||||
|
||||
def latch = new CountDownLatch(1)
|
||||
|
||||
|
@ -805,12 +851,12 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
*/
|
||||
def "high concurrency test on single connection"() {
|
||||
setup:
|
||||
def singleConnection = createSingleConnection(server.address.host, server.address.port)
|
||||
def singleConnection = createSingleConnection("localhost", server.httpPort())
|
||||
assumeTrue(singleConnection != null)
|
||||
int count = 50
|
||||
def method = 'GET'
|
||||
def path = "/success"
|
||||
def url = server.address.resolve(path)
|
||||
def url = resolveAddress(path)
|
||||
|
||||
def latch = new CountDownLatch(1)
|
||||
def pool = Executors.newFixedThreadPool(4)
|
||||
|
@ -855,7 +901,7 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
}
|
||||
|
||||
// parent span must be cast otherwise it breaks debugging classloading (junit loads it early)
|
||||
void clientSpan(TraceAssert trace, int index, Object parentSpan, String method = "GET", URI uri = server.address.resolve("/success"), Integer responseCode = 200, Throwable exception = null, String httpFlavor = "1.1") {
|
||||
void clientSpan(TraceAssert trace, int index, Object parentSpan, String method = "GET", URI uri = resolveAddress("/success"), Integer responseCode = 200, Throwable exception = null, String httpFlavor = "1.1") {
|
||||
def userAgent = userAgent()
|
||||
def httpClientAttributes = httpAttributes(uri)
|
||||
trace.span(index) {
|
||||
|
@ -1019,4 +1065,8 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
|||
URI removeFragment(URI uri) {
|
||||
return new URI(uri.scheme, null, uri.host, uri.port, uri.path, uri.query, null)
|
||||
}
|
||||
|
||||
protected URI resolveAddress(String path) {
|
||||
return URI.create("http://localhost:${server.httpPort()}${path}")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.test.server.http;
|
||||
|
||||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import io.opentelemetry.testing.armeria.server.ServiceRequestContext;
|
||||
import io.opentelemetry.testing.netty.util.AsciiString;
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
public enum RequestContextGetter implements TextMapGetter<ServiceRequestContext> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public Iterable<String> keys(@Nullable ServiceRequestContext carrier) {
|
||||
if (carrier == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return carrier.request().headers().names().stream()
|
||||
.map(AsciiString::toString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String get(@Nullable ServiceRequestContext carrier, String key) {
|
||||
if (carrier == null) {
|
||||
return null;
|
||||
}
|
||||
return carrier.request().headers().get(key);
|
||||
}
|
||||
}
|
|
@ -15,6 +15,8 @@ dependencies {
|
|||
api "io.opentelemetry:opentelemetry-sdk-metrics"
|
||||
api "io.opentelemetry:opentelemetry-sdk-testing"
|
||||
|
||||
api project(path: ":testing:armeria-shaded-for-testing", configuration: "shadow")
|
||||
|
||||
implementation("io.opentelemetry:opentelemetry-proto") {
|
||||
// Only need the proto, not gRPC.
|
||||
exclude group: 'io.grpc'
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
plugins {
|
||||
id "com.github.johnrengelman.shadow"
|
||||
}
|
||||
|
||||
apply from: "$rootDir/gradle/java.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation("com.linecorp.armeria:armeria-junit5:1.8.0") {
|
||||
// Native dependencies don't get shaded so exclude them to avoid conflicts.
|
||||
exclude group: "io.netty", module: "netty-tcnative-boringssl-static"
|
||||
exclude group: "io.netty", module: "netty-transport-native-epoll"
|
||||
exclude group: "io.netty", module: "netty-transport-native-kqueue"
|
||||
exclude group: "io.netty", module: "netty-transport-native-unix-common"
|
||||
}
|
||||
}
|
||||
|
||||
shadowJar {
|
||||
// Ensures tests are not affected by Armeria instrumentation
|
||||
relocate "com.linecorp.armeria", "io.opentelemetry.testing.armeria"
|
||||
// Allows tests of Netty instrumentations which would otherwise conflict.
|
||||
relocate "io.netty", "io.opentelemetry.testing.netty"
|
||||
}
|
Loading…
Reference in New Issue