Fix http concurrency test with large responses (#5648)

* Debug http client concurrency test failures

* debugging

* context porpagation to callbacks isn't really implemented

* verify that request succeeds in single connection concurrency test

* spotless

* verify request status in http client concurrency test

* update comment

* remove large response

* Trigger Build

* Update instrumentation/netty/netty-3.8/javaagent/src/test/groovy/Netty38ClientTest.groovy

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Lauri Tulmin 2022-03-25 19:03:22 +02:00 committed by GitHub
parent b668e73e13
commit 5267462b06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 30 deletions

View File

@ -161,8 +161,7 @@ public abstract class AbstractExecutorServiceTest<T extends ExecutorService, U e
}); });
// Just check there is a single trace, this test is primarily to make sure that scopes aren't // Just check there is a single trace, this test is primarily to make sure that scopes aren't
// leak on // leaked on cancellation.
// cancellation.
testing.waitAndAssertTraces(trace -> {}); testing.waitAndAssertTraces(trace -> {});
} }
} }

View File

@ -10,6 +10,8 @@ import com.ning.http.client.Request
import com.ning.http.client.RequestBuilder import com.ning.http.client.RequestBuilder
import com.ning.http.client.Response import com.ning.http.client.Response
import io.opentelemetry.api.common.AttributeKey import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.test.AgentTestTrait import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
@ -70,17 +72,23 @@ class Netty38ClientTest extends HttpClientTest<Request> implements AgentTestTrai
@Override @Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) { void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, AbstractHttpClientTest.RequestResult requestResult) {
// TODO: context is not automatically propagated into callbacks
Context context = Context.current()
// TODO(anuraaga): Do we also need to test ListenableFuture callback? // TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(request, new AsyncCompletionHandler<Void>() { client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override @Override
Void onCompleted(Response response) throws Exception { Void onCompleted(Response response) throws Exception {
requestResult.complete(response.statusCode) try (Scope scope = context.makeCurrent()) {
requestResult.complete(response.statusCode)
}
return null return null
} }
@Override @Override
void onThrowable(Throwable throwable) { void onThrowable(Throwable throwable) {
requestResult.complete(throwable) try (Scope scope = context.makeCurrent()) {
requestResult.complete(throwable)
}
} }
}) })
} }

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeoutException
class SpringWebFluxSingleConnection implements SingleConnection { class SpringWebFluxSingleConnection implements SingleConnection {
private final ReactorClientHttpConnector connector private final ReactorClientHttpConnector connector
private final WebClient webClient
private final String host private final String host
private final int port private final int port
@ -28,7 +29,7 @@ class SpringWebFluxSingleConnection implements SingleConnection {
if (isOldVersion) { if (isOldVersion) {
connector = new ReactorClientHttpConnector({ HttpClientOptions.Builder clientOptions -> connector = new ReactorClientHttpConnector({ HttpClientOptions.Builder clientOptions ->
clientOptions.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, HttpClientTest.CONNECT_TIMEOUT_MS) clientOptions.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, HttpClientTest.CONNECT_TIMEOUT_MS)
clientOptions.poolResources(PoolResources.fixed("pool", 1, HttpClientTest.CONNECT_TIMEOUT_MS)) clientOptions.poolResources(PoolResources.fixed("pool", 1))
}) })
} else { } else {
def httpClient = HttpClient.create().tcpConfiguration({ tcpClient -> def httpClient = HttpClient.create().tcpConfiguration({ tcpClient ->
@ -40,6 +41,7 @@ class SpringWebFluxSingleConnection implements SingleConnection {
this.host = host this.host = host
this.port = port this.port = port
this.webClient = WebClient.builder().clientConnector(connector).build()
} }
@Override @Override
@ -53,11 +55,13 @@ class SpringWebFluxSingleConnection implements SingleConnection {
throw new ExecutionException(e) throw new ExecutionException(e)
} }
def request = WebClient.builder().clientConnector(connector).build().method(HttpMethod.GET) def request = webClient.method(HttpMethod.GET)
.uri(uri) .uri(uri)
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) } .headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
def response = request.exchange().block() def response = request.exchange().block()
// read response body, this seems to be needed to ensure that the connection can be reused
response.bodyToMono(String).block()
String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER) String responseId = response.headers().asHttpHeaders().getFirst(REQUEST_ID_HEADER)
if (requestId != responseId) { if (requestId != responseId) {

View File

@ -680,17 +680,22 @@ public abstract class AbstractHttpClientTest<REQUEST> {
throw new AssertionError(e); throw new AssertionError(e);
} }
try { try {
testing.runWithSpan( Integer result =
"Parent span " + index, testing.runWithSpan(
() -> { "Parent span " + index,
Span.current().setAttribute("test.request.id", index); () -> {
doRequest( Span.current().setAttribute("test.request.id", index);
method, return doRequest(
uri, method,
Collections.singletonMap("test-request-id", String.valueOf(index))); uri,
}); Collections.singletonMap("test-request-id", String.valueOf(index)));
} catch (Exception e) { });
throw new AssertionError(e); assertThat(result).isEqualTo(200);
} catch (Throwable throwable) {
if (throwable instanceof AssertionError) {
throw (AssertionError) throwable;
}
throw new AssertionError(throwable);
} }
}; };
pool.submit(job); pool.submit(job);
@ -832,19 +837,23 @@ public abstract class AbstractHttpClientTest<REQUEST> {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new AssertionError(e); throw new AssertionError(e);
} }
testing.runWithSpan( try {
"Parent span " + index, Integer result =
() -> { testing.runWithSpan(
Span.current().setAttribute("test.request.id", index); "Parent span " + index,
try { () -> {
singleConnection.doRequest( Span.current().setAttribute("test.request.id", index);
path, Collections.singletonMap("test-request-id", String.valueOf(index))); return singleConnection.doRequest(
} catch (InterruptedException e) { path,
throw new AssertionError(e); Collections.singletonMap("test-request-id", String.valueOf(index)));
} catch (Exception e) { });
throw new AssertionError(e); assertThat(result).isEqualTo(200);
} } catch (Throwable throwable) {
}); if (throwable instanceof AssertionError) {
throw (AssertionError) throwable;
}
throw new AssertionError(throwable);
}
}; };
pool.submit(job); pool.submit(job);
} }