Webflux instrumentation fix (#7251)
When a webflux filter is added which throws an exception, the
instrumentation does not currently capture the `http.status_code`.
The fix is to move `WebClientTracingFilter` from the first to the last
filter in the chain, which I think(?) is the general strategy we've
taken for other client instrumentation, e.g. so that if a filter makes
another http call it won't be suppressed.
I don't love the test coverage I added, so let me know if you have any
better suggestions?
EDIT: btw, I did archaeology to confirm that behavior (adding to the
beginning of the chain) has been in place since the webflux
instrumentation was added originally
6f472a62a0 (diff-493ad89b5bde807c90387aa2bb67eb10d3bcef6b6a388bd31e11796a6d01ac38R36)
This commit is contained in:
parent
d8251d1fea
commit
05471b053b
|
@ -14,7 +14,9 @@ import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
|
||||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||||
import org.springframework.http.HttpMethod
|
import org.springframework.http.HttpMethod
|
||||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector
|
import org.springframework.http.client.reactive.ReactorClientHttpConnector
|
||||||
|
import org.springframework.web.reactive.function.client.ExchangeFilterFunction
|
||||||
import org.springframework.web.reactive.function.client.WebClient
|
import org.springframework.web.reactive.function.client.WebClient
|
||||||
|
import reactor.core.publisher.Mono
|
||||||
|
|
||||||
class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySpec> implements AgentTestTrait {
|
class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySpec> implements AgentTestTrait {
|
||||||
|
|
||||||
|
@ -31,7 +33,12 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
|
||||||
})
|
})
|
||||||
connector = new ReactorClientHttpConnector(httpClient)
|
connector = new ReactorClientHttpConnector(httpClient)
|
||||||
}
|
}
|
||||||
return WebClient.builder().clientConnector(connector).build().method(HttpMethod.resolve(method))
|
return WebClient.builder()
|
||||||
|
.filter(ExchangeFilterFunction.ofResponseProcessor(
|
||||||
|
clientResponse -> {
|
||||||
|
return Mono.error(new TestException(clientResponse.statusCode().value()))
|
||||||
|
}))
|
||||||
|
.clientConnector(connector).build().method(HttpMethod.resolve(method))
|
||||||
.uri(uri)
|
.uri(uri)
|
||||||
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
|
.headers { h -> headers.forEach({ key, value -> h.add(key, value) }) }
|
||||||
}
|
}
|
||||||
|
@ -47,7 +54,11 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
int sendRequest(WebClient.RequestBodySpec request, String method, URI uri, Map<String, String> headers) {
|
int sendRequest(WebClient.RequestBodySpec request, String method, URI uri, Map<String, String> headers) {
|
||||||
return request.exchange().block().statusCode().value()
|
try {
|
||||||
|
return request.exchange().block().statusCode().value()
|
||||||
|
} catch (TestException e) {
|
||||||
|
return e.getStatusCode()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,11 +91,6 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean testCallbackWithImplicitParent() {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
Set<AttributeKey<?>> httpAttributes(URI uri) {
|
Set<AttributeKey<?>> httpAttributes(URI uri) {
|
||||||
def attributes = super.httpAttributes(uri)
|
def attributes = super.httpAttributes(uri)
|
||||||
|
@ -96,4 +102,13 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
|
||||||
SingleConnection createSingleConnection(String host, int port) {
|
SingleConnection createSingleConnection(String host, int port) {
|
||||||
return new SpringWebFluxSingleConnection(isOldVersion(), host, port)
|
return new SpringWebFluxSingleConnection(isOldVersion(), host, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class TestException extends RuntimeException {
|
||||||
|
|
||||||
|
int statusCode
|
||||||
|
|
||||||
|
TestException(int statusCode) {
|
||||||
|
this.statusCode = statusCode
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,6 +46,6 @@ public final class SpringWebfluxTelemetry {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
exchangeFilterFunctions.add(0, new WebClientTracingFilter(instrumenter, propagators));
|
exchangeFilterFunctions.add(new WebClientTracingFilter(instrumenter, propagators));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,14 +285,20 @@ abstract class HttpClientTest<REQUEST> extends InstrumentationSpecification {
|
||||||
assumeTrue(testCallback())
|
assumeTrue(testCallback())
|
||||||
assumeTrue(testCallbackWithParent())
|
assumeTrue(testCallbackWithParent())
|
||||||
expect:
|
expect:
|
||||||
junitTest.requestWithCallbackAndParent()
|
try {
|
||||||
|
junitTest.requestWithCallbackAndParent()
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def "trace request with callback and no parent"() {
|
def "trace request with callback and no parent"() {
|
||||||
assumeTrue(testCallback())
|
assumeTrue(testCallback())
|
||||||
assumeFalse(testCallbackWithImplicitParent())
|
assumeFalse(testCallbackWithImplicitParent())
|
||||||
expect:
|
expect:
|
||||||
junitTest.requestWithCallbackAndNoParent()
|
try {
|
||||||
|
junitTest.requestWithCallbackAndNoParent()
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def "trace request with callback and implicit parent"() {
|
def "trace request with callback and implicit parent"() {
|
||||||
|
|
Loading…
Reference in New Issue