Migrate Spring Webflux HTTP client instrumentation to Instrumenter API (#4517)

* Migrate Spring Webflux HTTP client instrumentation to Instrumenter API

* Update instrumentation/spring/spring-webflux-5.0/library/src/main/java/io/opentelemetry/instrumentation/spring/webflux/client/SpringWebfluxHttpAttributesExtractor.java

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Mateusz Rzeszutek 2021-10-27 21:52:02 +02:00 committed by GitHub
parent 73ff98904e
commit 270c4b4e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 407 additions and 191 deletions

View File

@ -6,12 +6,9 @@
package io.opentelemetry.instrumentation.spring.autoconfigure.httpclients.webclient;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
import java.util.List;
import java.util.function.Consumer;
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
/**
@ -44,18 +41,10 @@ final class WebClientBeanPostProcessor implements BeanPostProcessor {
OpenTelemetry openTelemetry = openTelemetryProvider.getIfUnique();
if (openTelemetry != null) {
return webClientBuilder.filters(webClientFilterFunctionConsumer(openTelemetry));
SpringWebfluxTracing instrumentation = SpringWebfluxTracing.create(openTelemetry);
return webClientBuilder.filters(instrumentation::addClientTracingFilter);
} else {
return webClientBuilder;
}
}
private static Consumer<List<ExchangeFilterFunction>> webClientFilterFunctionConsumer(
OpenTelemetry openTelemetry) {
return functions -> {
if (functions.stream().noneMatch(filter -> filter instanceof WebClientTracingFilter)) {
WebClientTracingFilter.addFilter(openTelemetry, functions);
}
};
}
}

View File

@ -12,7 +12,6 @@ import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@ -20,6 +19,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
@ExtendWith(MockitoExtension.class)
@ -86,10 +86,7 @@ class WebClientBeanPostProcessorTest {
.mutate()
.filters(
functions ->
assertThat(
functions.stream()
.filter(wctf -> wctf instanceof WebClientTracingFilter)
.count())
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
.isEqualTo(1));
verify(openTelemetryProvider).getIfUnique();
@ -110,10 +107,7 @@ class WebClientBeanPostProcessorTest {
.mutate()
.filters(
functions ->
assertThat(
functions.stream()
.filter(wctf -> wctf instanceof WebClientTracingFilter)
.count())
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
.isEqualTo(0));
verify(openTelemetryProvider).getIfUnique();
@ -135,12 +129,13 @@ class WebClientBeanPostProcessorTest {
webClientBuilder.filters(
functions ->
assertThat(
functions.stream()
.filter(wctf -> wctf instanceof WebClientTracingFilter)
.count())
assertThat(functions.stream().filter(wctf -> isOtelExchangeFilter(wctf)).count())
.isEqualTo(1));
verify(openTelemetryProvider, times(3)).getIfUnique();
}
private static boolean isOtelExchangeFilter(ExchangeFilterFunction wctf) {
return wctf.getClass().getName().startsWith("io.opentelemetry.instrumentation");
}
}

View File

@ -6,13 +6,16 @@
package io.opentelemetry.javaagent.instrumentation.spring.webflux.client;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter;
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
import java.util.List;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
public class WebClientHelper {
private static final SpringWebfluxTracing INSTRUMENTATION =
SpringWebfluxTracing.create(GlobalOpenTelemetry.get());
public static void addFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
WebClientTracingFilter.addFilter(GlobalOpenTelemetry.get(), exchangeFilterFunctions);
INSTRUMENTATION.addClientTracingFilter(exchangeFilterFunctions);
}
}

View File

@ -6,10 +6,12 @@
package client
import io.netty.channel.ChannelOption
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.springframework.http.HttpMethod
import org.springframework.http.client.reactive.ReactorClientHttpConnector
import org.springframework.web.reactive.function.client.WebClient
@ -78,6 +80,13 @@ class SpringWebfluxHttpClientTest extends HttpClientTest<WebClient.RequestBodySp
false
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
def attributes = super.httpAttributes(uri)
attributes.remove(SemanticAttributes.HTTP_FLAVOR)
return attributes
}
@Override
SingleConnection createSingleConnection(String host, int port) {
return new SpringWebFluxSingleConnection(isOldVersion(), host, port)

View File

@ -1,6 +1,6 @@
# Manual Instrumentation for Spring Webflux
Provides OpenTelemetry instrumentation for Spring's WebClient.
Provides OpenTelemetry instrumentation for Spring's `WebClient`.
## Quickstart
@ -10,8 +10,7 @@ Replace `SPRING_VERSION` with the version of spring you're using.
`Minimum version: 5.0`
Replace `OPENTELEMETRY_VERSION` with the latest stable [release](https://mvnrepository.com/artifact/io.opentelemetry).
`Minimum version: 0.8.0`
`Minimum version: 1.8.0`
For Maven add to your `pom.xml`:
@ -60,17 +59,18 @@ implementation("org.springframework:spring-webflux:SPRING_VERSION")
### Features
#### WebClientTracingFilter
#### `SpringWebfluxTracing`
WebClientTracingFilter adds OpenTelemetry client spans to requests sent using WebClient by implementing the [ExchangeFilterFunction](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/function/client/ExchangeFilterFunction.html)
`SpringWebfluxTracing` emits client span for each request sent using `WebClient` by implementing
the [ExchangeFilterFunction](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/reactive/function/client/ExchangeFilterFunction.html)
interface. An example is shown below:
##### Usage
```java
import io.opentelemetry.instrumentation.spring.webflux.client.WebClientTracingFilter
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.spring.webflux.client.SpringWebfluxTracing;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@ -81,12 +81,12 @@ import org.springframework.web.reactive.function.client.WebClient;
public class WebClientConfig {
@Bean
public WebClient.Builder webClient(Tracer tracer) {
public WebClient.Builder webClient(OpenTelemetry openTelemetry) {
WebClient webClient = WebClient.create();
WebClientTracingFilter webClientTracingFilter = new WebClientTracingFilter(tracer);
SpringWebfluxTracing instrumentation = SpringWebfluxTracing.create(openTelemetry);
return webClient.mutate().filter(webClientTracingFilter);
return webClient.mutate().filter(instrumentation::addClientTracingFilter);
}
}
```

View File

@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import javax.annotation.Nullable;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
final class SpringWebfluxExperimentalAttributesExtractor
implements AttributesExtractor<ClientRequest, ClientResponse> {
private static final AttributeKey<String> SPRING_WEBFLUX_EVENT =
stringKey("spring-webflux.event");
private static final AttributeKey<String> SPRING_WEBFLUX_MESSAGE =
stringKey("spring-webflux.message");
public static boolean enabled() {
return Config.get()
.getBoolean("otel.instrumentation.spring-webflux.experimental-span-attributes", false);
}
@Override
public void onStart(AttributesBuilder attributes, ClientRequest request) {}
@Override
public void onEnd(
AttributesBuilder attributes,
ClientRequest request,
@Nullable ClientResponse response,
@Nullable Throwable error) {
// no response and no error means that the request has been cancelled
if (response == null && error == null) {
set(attributes, SPRING_WEBFLUX_EVENT, "cancelled");
set(attributes, SPRING_WEBFLUX_MESSAGE, "The subscription was cancelled");
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import static java.util.Collections.emptyList;
import io.opentelemetry.instrumentation.api.instrumenter.http.CapturedHttpHeaders;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.List;
import javax.annotation.Nullable;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
final class SpringWebfluxHttpAttributesExtractor
extends HttpClientAttributesExtractor<ClientRequest, ClientResponse> {
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
// rawStatusCode() method was introduced in webflux 5.1
// prior to this method, the best we can get is HttpStatus enum, which only covers standard status
// codes (see usage below)
private static MethodHandle findRawStatusCode() {
try {
return MethodHandles.publicLookup()
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
} catch (IllegalAccessException | NoSuchMethodException e) {
return null;
}
}
SpringWebfluxHttpAttributesExtractor(CapturedHttpHeaders capturedHttpHeaders) {
super(capturedHttpHeaders);
}
@Override
protected String url(ClientRequest request) {
return request.url().toString();
}
@Nullable
@Override
protected String flavor(ClientRequest request, @Nullable ClientResponse response) {
return null;
}
@Override
protected String method(ClientRequest request) {
return request.method().name();
}
@Override
protected List<String> requestHeader(ClientRequest request, String name) {
return request.headers().getOrDefault(name, emptyList());
}
@Nullable
@Override
protected Long requestContentLength(ClientRequest request, @Nullable ClientResponse response) {
return null;
}
@Nullable
@Override
protected Long requestContentLengthUncompressed(
ClientRequest request, @Nullable ClientResponse response) {
return null;
}
@Override
protected Integer statusCode(ClientRequest request, ClientResponse response) {
if (RAW_STATUS_CODE != null) {
// rawStatusCode() method was introduced in webflux 5.1
try {
return (int) RAW_STATUS_CODE.invokeExact(response);
} catch (Throwable ignored) {
// Ignore
}
}
// prior to webflux 5.1, the best we can get is HttpStatus enum, which only covers standard
// status codes
return response.statusCode().value();
}
@Nullable
@Override
protected Long responseContentLength(ClientRequest request, ClientResponse response) {
return null;
}
@Nullable
@Override
protected Long responseContentLengthUncompressed(ClientRequest request, ClientResponse response) {
return null;
}
@Override
protected List<String> responseHeader(
ClientRequest request, ClientResponse response, String name) {
return response.headers().header(name);
}
}

View File

@ -1,105 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.net.URI;
import java.util.List;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
class SpringWebfluxHttpClientTracer
extends HttpClientTracer<ClientRequest, ClientRequest.Builder, ClientResponse> {
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get()
.getBoolean("otel.instrumentation.spring-webflux.experimental-span-attributes", false);
SpringWebfluxHttpClientTracer(OpenTelemetry openTelemetry) {
super(openTelemetry, new NetPeerAttributes());
}
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
void onCancel(Context context) {
if (captureExperimentalSpanAttributes()) {
Span span = Span.fromContext(context);
span.setAttribute("spring-webflux.event", "cancelled");
span.setAttribute("spring-webflux.message", "The subscription was cancelled");
}
}
@Override
protected String method(ClientRequest httpRequest) {
return httpRequest.method().name();
}
@Override
protected URI url(ClientRequest httpRequest) {
return httpRequest.url();
}
@Override
protected Integer status(ClientResponse httpResponse) {
if (RAW_STATUS_CODE != null) {
// rawStatusCode() method was introduced in webflux 5.1
try {
return (int) RAW_STATUS_CODE.invokeExact(httpResponse);
} catch (Throwable ignored) {
// Ignore
}
}
// prior to webflux 5.1, the best we can get is HttpStatus enum, which only covers standard
// status codes
return httpResponse.statusCode().value();
}
@Override
protected String requestHeader(ClientRequest clientRequest, String name) {
return clientRequest.headers().getFirst(name);
}
@Override
protected String responseHeader(ClientResponse clientResponse, String name) {
List<String> headers = clientResponse.headers().header(name);
return !headers.isEmpty() ? headers.get(0) : null;
}
@Override
protected TextMapSetter<ClientRequest.Builder> getSetter() {
return HttpHeadersSetter.INSTANCE;
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.spring-webflux-5.0";
}
// rawStatusCode() method was introduced in webflux 5.1
// prior to this method, the best we can get is HttpStatus enum, which only covers standard status
// codes (see usage above)
private static MethodHandle findRawStatusCode() {
try {
return MethodHandles.publicLookup()
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
} catch (IllegalAccessException | NoSuchMethodException e) {
return null;
}
}
private static boolean captureExperimentalSpanAttributes() {
return CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
final class SpringWebfluxNetAttributesExtractor
extends NetClientAttributesExtractor<ClientRequest, ClientResponse> {
@Override
public String transport(ClientRequest request, @Nullable ClientResponse response) {
return SemanticAttributes.NetTransportValues.IP_TCP;
}
@Nullable
@Override
public String peerName(ClientRequest request, @Nullable ClientResponse response) {
return request.url().getHost();
}
@Override
public Integer peerPort(ClientRequest request, @Nullable ClientResponse response) {
return request.url().getPort();
}
@Nullable
@Override
public String peerIp(ClientRequest request, @Nullable ClientResponse response) {
return null;
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
/** Entrypoint for tracing Spring Webflux HTTP clients. */
public final class SpringWebfluxTracing {
/** Returns a new {@link SpringWebfluxTracing} configured with the given {@link OpenTelemetry}. */
public static SpringWebfluxTracing create(OpenTelemetry openTelemetry) {
return builder(openTelemetry).build();
}
/**
* Returns a new {@link SpringWebfluxTracingBuilder} configured with the given {@link
* OpenTelemetry}.
*/
public static SpringWebfluxTracingBuilder builder(OpenTelemetry openTelemetry) {
return new SpringWebfluxTracingBuilder(openTelemetry);
}
private final Instrumenter<ClientRequest, ClientResponse> instrumenter;
private final ContextPropagators propagators;
SpringWebfluxTracing(
Instrumenter<ClientRequest, ClientResponse> instrumenter, ContextPropagators propagators) {
this.instrumenter = instrumenter;
this.propagators = propagators;
}
public void addClientTracingFilter(List<ExchangeFilterFunction> exchangeFilterFunctions) {
for (ExchangeFilterFunction filterFunction : exchangeFilterFunctions) {
if (filterFunction instanceof WebClientTracingFilter) {
return;
}
}
exchangeFilterFunctions.add(0, new WebClientTracingFilter(instrumenter, propagators));
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.spring.webflux.client;
import static io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor.alwaysClient;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.PeerServiceAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.CapturedHttpHeaders;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor;
import java.util.ArrayList;
import java.util.List;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
/** A builder of {@link SpringWebfluxTracing}. */
public final class SpringWebfluxTracingBuilder {
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ClientRequest, ClientResponse>> additionalExtractors =
new ArrayList<>();
private CapturedHttpHeaders capturedHttpHeaders = CapturedHttpHeaders.client(Config.get());
SpringWebfluxTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
/**
* Adds an additional {@link AttributesExtractor} to invoke to set attributes to instrumented
* items.
*/
public SpringWebfluxTracingBuilder addAttributesExtractor(
AttributesExtractor<ClientRequest, ClientResponse> attributesExtractor) {
additionalExtractors.add(attributesExtractor);
return this;
}
/**
* Configure the instrumentation to capture chosen HTTP request and response headers as span
* attributes.
*
* @param capturedHttpHeaders An instance of {@link CapturedHttpHeaders} containing the configured
* HTTP request and response names.
*/
public SpringWebfluxTracingBuilder captureHttpHeaders(CapturedHttpHeaders capturedHttpHeaders) {
this.capturedHttpHeaders = capturedHttpHeaders;
return this;
}
/**
* Returns a new {@link SpringWebfluxTracing} with the settings of this {@link
* SpringWebfluxTracingBuilder}.
*/
public SpringWebfluxTracing build() {
SpringWebfluxHttpAttributesExtractor httpAttributesExtractor =
new SpringWebfluxHttpAttributesExtractor(capturedHttpHeaders);
SpringWebfluxNetAttributesExtractor netAttributesExtractor =
new SpringWebfluxNetAttributesExtractor();
InstrumenterBuilder<ClientRequest, ClientResponse> builder =
Instrumenter.<ClientRequest, ClientResponse>builder(
openTelemetry,
"io.opentelemetry.spring-webflux-5.0",
HttpSpanNameExtractor.create(httpAttributesExtractor))
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor))
.addAttributesExtractor(httpAttributesExtractor)
.addAttributesExtractor(netAttributesExtractor)
.addAttributesExtractor(PeerServiceAttributesExtractor.create(netAttributesExtractor))
.addAttributesExtractors(additionalExtractors)
.addRequestMetrics(HttpClientMetrics.get());
if (SpringWebfluxExperimentalAttributesExtractor.enabled()) {
builder.addAttributesExtractor(new SpringWebfluxExperimentalAttributesExtractor());
}
// headers are injected elsewhere; ClientRequest is immutable
Instrumenter<ClientRequest, ClientResponse> instrumenter =
builder.newInstrumenter(alwaysClient());
return new SpringWebfluxTracing(instrumenter, openTelemetry.getPropagators());
}
}

View File

@ -5,8 +5,11 @@
package io.opentelemetry.instrumentation.spring.webflux.client;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.reactivestreams.Subscription;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import reactor.core.CoreSubscriber;
@ -16,22 +19,22 @@ import reactor.core.CoreSubscriber;
*/
final class TraceWebClientSubscriber implements CoreSubscriber<ClientResponse> {
private final SpringWebfluxHttpClientTracer tracer;
private final Instrumenter<ClientRequest, ClientResponse> instrumenter;
private final ClientRequest request;
private final CoreSubscriber<? super ClientResponse> actual;
private final reactor.util.context.Context context;
private final io.opentelemetry.context.Context tracingContext;
private final reactor.util.context.Context reactorContext;
private final io.opentelemetry.context.Context otelContext;
TraceWebClientSubscriber(
SpringWebfluxHttpClientTracer tracer,
Instrumenter<ClientRequest, ClientResponse> instrumenter,
ClientRequest request,
CoreSubscriber<? super ClientResponse> actual,
io.opentelemetry.context.Context tracingContext) {
this.tracer = tracer;
Context otelContext) {
this.instrumenter = instrumenter;
this.request = request;
this.actual = actual;
this.tracingContext = tracingContext;
this.context = actual.currentContext();
this.otelContext = otelContext;
this.reactorContext = actual.currentContext();
}
@Override
@ -41,31 +44,31 @@ final class TraceWebClientSubscriber implements CoreSubscriber<ClientResponse> {
@Override
public void onNext(ClientResponse response) {
try (Scope ignored = tracingContext.makeCurrent()) {
try (Scope ignored = otelContext.makeCurrent()) {
this.actual.onNext(response);
} finally {
tracer.end(tracingContext, response);
instrumenter.end(otelContext, request, response, null);
}
}
@Override
public void onError(Throwable t) {
try (Scope ignored = tracingContext.makeCurrent()) {
try (Scope ignored = otelContext.makeCurrent()) {
this.actual.onError(t);
} finally {
tracer.endExceptionally(tracingContext, t);
instrumenter.end(otelContext, request, null, t);
}
}
@Override
public void onComplete() {
try (Scope ignored = tracingContext.makeCurrent()) {
try (Scope ignored = otelContext.makeCurrent()) {
this.actual.onComplete();
}
}
@Override
public reactor.util.context.Context currentContext() {
return this.context;
return this.reactorContext;
}
}

View File

@ -5,10 +5,10 @@
package io.opentelemetry.instrumentation.spring.webflux.client;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.util.List;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
@ -20,39 +20,28 @@ import reactor.core.publisher.Mono;
* Based on Spring Sleuth's Reactor instrumentation.
* https://github.com/spring-cloud/spring-cloud-sleuth/blob/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/web/client/TraceWebClientBeanPostProcessor.java
*/
public class WebClientTracingFilter implements ExchangeFilterFunction {
class WebClientTracingFilter implements ExchangeFilterFunction {
private final SpringWebfluxHttpClientTracer tracer;
private final Instrumenter<ClientRequest, ClientResponse> instrumenter;
private final ContextPropagators propagators;
private WebClientTracingFilter(SpringWebfluxHttpClientTracer tracer) {
this.tracer = tracer;
}
public static void addFilter(
OpenTelemetry openTelemetry, List<ExchangeFilterFunction> exchangeFilterFunctions) {
for (ExchangeFilterFunction filterFunction : exchangeFilterFunctions) {
if (filterFunction instanceof WebClientTracingFilter) {
return;
}
}
exchangeFilterFunctions.add(
0, new WebClientTracingFilter(new SpringWebfluxHttpClientTracer(openTelemetry)));
public WebClientTracingFilter(
Instrumenter<ClientRequest, ClientResponse> instrumenter, ContextPropagators propagators) {
this.instrumenter = instrumenter;
this.propagators = propagators;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return new MonoWebClientTrace(tracer, request, next);
return new MonoWebClientTrace(request, next);
}
private static final class MonoWebClientTrace extends Mono<ClientResponse> {
private final class MonoWebClientTrace extends Mono<ClientResponse> {
private final SpringWebfluxHttpClientTracer tracer;
private final ExchangeFunction next;
private final ClientRequest request;
private MonoWebClientTrace(
SpringWebfluxHttpClientTracer tracer, ClientRequest request, ExchangeFunction next) {
this.tracer = tracer;
private MonoWebClientTrace(ClientRequest request, ExchangeFunction next) {
this.next = next;
this.request = request;
}
@ -60,22 +49,23 @@ public class WebClientTracingFilter implements ExchangeFilterFunction {
@Override
public void subscribe(CoreSubscriber<? super ClientResponse> subscriber) {
Context parentContext = Context.current();
if (!tracer.shouldStartSpan(parentContext)) {
if (!instrumenter.shouldStart(parentContext, request)) {
next.exchange(request).subscribe(subscriber);
return;
}
Context context = instrumenter.start(parentContext, request);
ClientRequest.Builder builder = ClientRequest.from(request);
Context context = tracer.startSpan(parentContext, request, builder);
propagators.getTextMapPropagator().inject(context, builder, HttpHeadersSetter.INSTANCE);
try (Scope ignored = context.makeCurrent()) {
this.next
.exchange(builder.build())
.doOnCancel(
() -> {
tracer.onCancel(context);
tracer.end(context);
})
.subscribe(new TraceWebClientSubscriber(tracer, subscriber, context));
// no response and no error means that the request has been cancelled
() -> instrumenter.end(context, request, null, null))
.subscribe(new TraceWebClientSubscriber(instrumenter, request, subscriber, context));
}
}
}