Add connection reuse tests for Reactor Netty libraries (#2647)

This commit is contained in:
Nikita Salnikov-Tarnovski 2021-03-29 07:41:09 +03:00 committed by GitHub
parent 62a929f511
commit c918d05aa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 4 deletions

View File

@ -44,8 +44,8 @@ public class TracingSubscriber<T> implements CoreSubscriber<T> {
Context ctx,
io.opentelemetry.context.Context contextToPropagate) {
this.subscriber = subscriber;
this.traceContext = contextToPropagate;
this.context = ctx;
this.traceContext = contextToPropagate;
}
@Override

View File

@ -19,4 +19,5 @@ dependencies {
testInstrumentation project(':instrumentation:reactor-netty-1.0:javaagent')
testInstrumentation project(':instrumentation:netty:netty-4.1:javaagent')
testInstrumentation project(':instrumentation:reactor-3.1:javaagent')
}

View File

@ -64,7 +64,7 @@ public class ReactorNettyInstrumentationModule extends InstrumentationModule {
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isStatic().and(named("create")),
isStatic().and(named("create").or(named("newConnection"))),
ReactorNettyInstrumentationModule.class.getName() + "$CreateAdvice");
}
}

View File

@ -5,6 +5,9 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.HttpClientResponse
@ -45,4 +48,33 @@ class ReactorNettyHttpClientTest extends HttpClientTest implements AgentTestTrai
}
return resp.status().code()
}
@Override
SingleConnection createSingleConnection(String host, int port) {
String url
try {
url = new URL("http", host, port, "").toString()
} catch (MalformedURLException e) {
throw new ExecutionException(e)
}
def httpClient = HttpClient
.newConnection()
.baseUrl(url)
return new SingleConnection() {
@Override
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
return httpClient
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.get()
.uri(path)
.response()
.block()
.status().code()
}
}
}
}

View File

@ -20,4 +20,5 @@ dependencies {
testInstrumentation project(':instrumentation:reactor-netty-0.9:javaagent')
testInstrumentation project(':instrumentation:netty:netty-4.1:javaagent')
testInstrumentation project(':instrumentation:reactor-3.1:javaagent')
}

View File

@ -63,7 +63,7 @@ public class ReactorNettyInstrumentationModule extends InstrumentationModule {
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isStatic().and(named("create")),
isStatic().and(named("create").or(named("newConnection"))),
ReactorNettyInstrumentationModule.class.getName() + "$CreateAdvice");
}
}

View File

@ -5,6 +5,9 @@
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException
import reactor.netty.http.client.HttpClient
import reactor.netty.http.client.HttpClientResponse
@ -29,7 +32,7 @@ class ReactorNettyHttpClientTest extends HttpClientTest implements AgentTestTrai
String userAgent() {
return "ReactorNetty"
}
@Override
int doRequest(String method, URI uri, Map<String, String> headers = [:], Closure callback = null) {
HttpClientResponse resp = HttpClient.create()
@ -45,4 +48,26 @@ class ReactorNettyHttpClientTest extends HttpClientTest implements AgentTestTrai
}
return resp.status().code()
}
@Override
SingleConnection createSingleConnection(String host, int port) {
def httpClient = HttpClient
.newConnection()
.host(host)
.port(port)
return new SingleConnection() {
@Override
int doRequest(String path, Map<String, String> headers) throws ExecutionException, InterruptedException, TimeoutException {
return httpClient
.headers({ h -> headers.each { k, v -> h.add(k, v) } })
.get()
.uri(path)
.response()
.block()
.status().code()
}
}
}
}