Fix NPE happening when .headersWhen() is used (reactor-netty) (#9511)

This commit is contained in:
Mateusz Rzeszutek 2023-09-21 19:17:10 +02:00 committed by GitHub
parent edb0db39d7
commit 7bbd7a48ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 151 additions and 2 deletions

View File

@ -0,0 +1,53 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientConfigBuddy;
public class HttpClientConnectInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("reactor.netty.http.client.HttpClientConnect");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("connect").and(returns(named("reactor.core.publisher.Mono"))),
this.getClass().getName() + "$ConnectAdvice");
}
@SuppressWarnings("unused")
public static class ConnectAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Return(readOnly = false) Mono<? extends Connection> connection,
@Advice.This HttpClient httpClient) {
HttpClientConfig config = httpClient.configuration();
// reactor-netty 1.0.x has a bug: the .mapConnect() function is not applied when deferred
// configuration is used
// we're fixing this bug here, so that our instrumentation can safely add its own
// .mapConnect() listener
if (HttpClientConfigBuddy.hasDeferredConfig(config)) {
connection = HttpClientConfigBuddy.getConnector(config).apply(connection);
}
}
}
}

View File

@ -13,12 +13,18 @@ import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
final class InstrumentationContexts {
private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater =
AtomicReferenceFieldUpdater.newUpdater(
InstrumentationContexts.class, Context.class, "parentContext");
private volatile Context parentContext;
private volatile Timer timer;
// on retries, reactor-netty starts the next resend attempt before it ends the previous one (i.e.
@ -27,8 +33,11 @@ final class InstrumentationContexts {
private final Queue<RequestAndContext> clientContexts = new LinkedBlockingQueue<>();
void initialize(Context parentContext) {
this.parentContext = HttpClientResendCount.initialize(parentContext);
timer = Timer.start();
Context parentContextWithResends = HttpClientResendCount.initialize(parentContext);
// make sure initialization happens only once
if (parentContextUpdater.compareAndSet(this, null, parentContextWithResends)) {
timer = Timer.start();
}
}
Context getParentContext() {

View File

@ -36,10 +36,16 @@ public class ReactorNettyInstrumentationModule extends InstrumentationModule {
return hasClassesNamed("reactor.netty.transport.AddressUtils");
}
@Override
public boolean isHelperClass(String className) {
return className.startsWith("reactor.netty.http.client.HttpClientConfigBuddy");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new HttpClientInstrumentation(),
new HttpClientConnectInstrumentation(),
new ResponseReceiverInstrumentation(),
new TransportConnectorInstrumentation());
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package reactor.netty.http.client;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
// class in reactor package to access package-private code
public final class HttpClientConfigBuddy {
public static boolean hasDeferredConfig(HttpClientConfig config) {
return config.deferredConf != null;
}
public static Function<? super Mono<? extends Connection>, ? extends Mono<? extends Connection>>
getConnector(HttpClientConfig config) {
return config.connector == null ? Function.identity() : config.connector;
}
private HttpClientConfigBuddy() {}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;
import io.netty.channel.ChannelOption;
import io.netty.handler.codec.http.HttpMethod;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames;
import java.net.URI;
import java.util.Map;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
class ReactorNettyHttpClientDeferredHeadersTest extends AbstractReactorNettyHttpClientTest {
@Override
protected HttpClient createHttpClient() {
int connectionTimeoutMillis = (int) CONNECTION_TIMEOUT.toMillis();
return HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMillis)
.resolver(getAddressResolverGroup())
.headers(headers -> headers.set(HttpHeaderNames.USER_AGENT, USER_AGENT));
}
@Override
public HttpClient.ResponseReceiver<?> buildRequest(
String method, URI uri, Map<String, String> headers) {
HttpClient client =
createHttpClient()
.followRedirect(true)
.headersWhen(
h -> {
headers.forEach(h::add);
return Mono.just(h);
})
.baseUrl(resolveAddress("").toString());
if (uri.toString().contains("/read-timeout")) {
client = client.responseTimeout(READ_TIMEOUT);
}
return client.request(HttpMethod.valueOf(method)).uri(uri.toString());
}
@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
super.configure(optionsBuilder);
// these scenarios don't work because deferred config does not apply the doOnRequestError()
// callback
optionsBuilder.disableTestReadTimeout();
optionsBuilder.disableTestConnectionFailure();
optionsBuilder.disableTestRemoteConnection();
}
}