Fix netty memory leak (#12003)

This commit is contained in:
Lauri Tulmin 2024-08-16 03:30:21 +03:00 committed by GitHub
parent b231d34c44
commit 888accf474
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 106 additions and 59 deletions

View File

@ -18,12 +18,11 @@ import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField; import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.client.HttpClientTracingHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.server.HttpServerTracingHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyHttpServerResponseBeforeCommitHandler; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyHttpServerResponseBeforeCommitHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.NettyServerSingletons;
import java.util.Deque;
public final class Helpers { public final class Helpers {
@ -42,9 +41,8 @@ public final class Helpers {
// the parent channel is the original http/1.1 channel and has the contexts stored in it; // the parent channel is the original http/1.1 channel and has the contexts stored in it;
// we assign to this new channel as the old one will not be evaluated in the upgraded h2c // we assign to this new channel as the old one will not be evaluated in the upgraded h2c
// chain // chain
Deque<ServerContext> serverContexts = ServerContexts serverContexts = ServerContexts.get(channel.parent());
channel.parent().attr(AttributeKeys.SERVER_CONTEXT).get(); channel.attr(AttributeKeys.SERVER_CONTEXTS).set(serverContexts);
channel.attr(AttributeKeys.SERVER_CONTEXT).set(serverContexts);
// todo add way to propagate the protocol version override up to the netty instrumentation; // todo add way to propagate the protocol version override up to the netty instrumentation;
// why: the netty instrumentation extracts the http protocol version from the HttpRequest // why: the netty instrumentation extracts the http protocol version from the HttpRequest

View File

@ -18,9 +18,9 @@ import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys; import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -61,8 +61,7 @@ public class AbstractChannelHandlerContextInstrumentation implements TypeInstrum
instrumenter().end(clientContext, request, null, throwable); instrumenter().end(clientContext, request, null, throwable);
return; return;
} }
Deque<ServerContext> serverContexts = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).get(); ServerContext serverContext = ServerContexts.peekFirst(ctx.channel());
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
if (serverContext != null) { if (serverContext != null) {
NettyErrorHolder.set(serverContext.context(), throwable); NettyErrorHolder.set(serverContext.context(), throwable);
} }

View File

@ -7,7 +7,6 @@ package io.opentelemetry.instrumentation.netty.v4_1.internal;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import java.util.Deque;
/** /**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at * This class is internal and is hence not for public use. Its APIs are unstable and can change at
@ -17,9 +16,9 @@ public final class AttributeKeys {
// this is the context that has the server span // this is the context that has the server span
// //
// note: this attribute key is also used by ratpack instrumentation // note: this attribute key is also used by finagle instrumentation
public static final AttributeKey<Deque<ServerContext>> SERVER_CONTEXT = public static final AttributeKey<ServerContexts> SERVER_CONTEXTS =
AttributeKey.valueOf(AttributeKeys.class, "server-context"); AttributeKey.valueOf(AttributeKeys.class, "server-contexts");
public static final AttributeKey<Context> CLIENT_CONTEXT = public static final AttributeKey<Context> CLIENT_CONTEXT =
AttributeKey.valueOf(AttributeKeys.class, "client-context"); AttributeKey.valueOf(AttributeKeys.class, "client-context");

View File

@ -0,0 +1,79 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.netty.v4_1.internal;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import java.util.ArrayDeque;
import java.util.Deque;
/**
* A helper class for keeping track of incoming requests and spans associated with them.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class ServerContexts {
private static final int PIPELINING_LIMIT = 1000;
// With http pipelining multiple requests can be sent on the same connection. Responses should be
// sent in the same order the requests came in. We use this deque to store the request context
// and pop elements as responses are sent.
private final Deque<ServerContext> serverContexts = new ArrayDeque<>();
private volatile boolean broken = false;
private ServerContexts() {}
public static ServerContexts get(Channel channel) {
return channel.attr(AttributeKeys.SERVER_CONTEXTS).get();
}
public static ServerContexts getOrCreate(Channel channel) {
Attribute<ServerContexts> attribute = channel.attr(AttributeKeys.SERVER_CONTEXTS);
ServerContexts result = attribute.get();
if (result == null) {
result = new ServerContexts();
attribute.set(result);
}
return result;
}
public static ServerContext peekFirst(Channel channel) {
ServerContexts serverContexts = get(channel);
return serverContexts != null ? serverContexts.peekFirst() : null;
}
public ServerContext peekFirst() {
return serverContexts.peekFirst();
}
public ServerContext peekLast() {
return serverContexts.peekFirst();
}
public ServerContext pollFirst() {
return serverContexts.pollFirst();
}
public ServerContext pollLast() {
return serverContexts.pollLast();
}
public void addLast(ServerContext context) {
if (broken) {
return;
}
// If the pipelining limit is exceeded we'll stop tracing and mark the channel as broken.
// Exceeding the limit indicates that there is good chance that server context are not removed
// from the deque and there could be a memory leak. This could happen when http server decides
// not to send response to some requests, for example see
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11942
if (serverContexts.size() > PIPELINING_LIMIT) {
broken = true;
serverContexts.clear();
}
serverContexts.addLast(context);
}
}

View File

@ -10,16 +10,12 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import java.util.ArrayDeque; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import java.util.Deque;
/** /**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at * This class is internal and is hence not for public use. Its APIs are unstable and can change at
@ -37,7 +33,7 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel(); Channel channel = ctx.channel();
Deque<ServerContext> serverContexts = getOrCreate(channel, AttributeKeys.SERVER_CONTEXT); ServerContexts serverContexts = ServerContexts.getOrCreate(channel);
if (!(msg instanceof HttpRequest)) { if (!(msg instanceof HttpRequest)) {
ServerContext serverContext = serverContexts.peekLast(); ServerContext serverContext = serverContexts.peekLast();
@ -66,8 +62,10 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
// the span is ended normally in HttpServerResponseTracingHandler // the span is ended normally in HttpServerResponseTracingHandler
} catch (Throwable throwable) { } catch (Throwable throwable) {
// make sure to remove the server context on end() call // make sure to remove the server context on end() call
ServerContext serverContext = serverContexts.removeLast(); ServerContext serverContext = serverContexts.pollLast();
instrumenter.end(serverContext.context(), serverContext.request(), null, throwable); if (serverContext != null) {
instrumenter.end(serverContext.context(), serverContext.request(), null, throwable);
}
throw throwable; throw throwable;
} }
} }
@ -75,8 +73,7 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// connection was closed, close all remaining requests // connection was closed, close all remaining requests
Attribute<Deque<ServerContext>> contextAttr = ctx.channel().attr(AttributeKeys.SERVER_CONTEXT); ServerContexts serverContexts = ServerContexts.get(ctx.channel());
Deque<ServerContext> serverContexts = contextAttr.get();
if (serverContexts == null) { if (serverContexts == null) {
super.channelInactive(ctx); super.channelInactive(ctx);
@ -89,14 +86,4 @@ public class HttpServerRequestTracingHandler extends ChannelInboundHandlerAdapte
} }
super.channelInactive(ctx); super.channelInactive(ctx);
} }
private static <T> Deque<T> getOrCreate(Channel channel, AttributeKey<Deque<T>> key) {
Attribute<Deque<T>> attribute = channel.attr(key);
Deque<T> deque = attribute.get();
if (deque == null) {
deque = new ArrayDeque<>();
attribute.set(deque);
}
return deque;
}
} }

View File

@ -13,18 +13,16 @@ import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder; import io.opentelemetry.instrumentation.netty.common.internal.NettyErrorHolder;
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel; import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler; import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolEventHandler;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent; import io.opentelemetry.instrumentation.netty.v4_1.internal.ProtocolSpecificEvent;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import java.util.Deque; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -51,12 +49,8 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) throws Exception {
Attribute<Deque<ServerContext>> serverContextAttr = ServerContexts serverContexts = ServerContexts.get(ctx.channel());
ctx.channel().attr(AttributeKeys.SERVER_CONTEXT);
Deque<ServerContext> serverContexts = serverContextAttr.get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null; ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
if (serverContext == null) { if (serverContext == null) {
super.write(ctx, msg, prm); super.write(ctx, msg, prm);
return; return;
@ -86,7 +80,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
} else { } else {
// Headers and body all sent together, we have the response information in the msg. // Headers and body all sent together, we have the response information in the msg.
beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg); beforeCommitHandler.handle(serverContext.context(), (HttpResponse) msg);
serverContexts.removeFirst(); serverContexts.pollFirst();
writePromise.addListener( writePromise.addListener(
future -> future ->
end( end(
@ -102,7 +96,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
// Body sent after headers. We stored the response information in the context when // Body sent after headers. We stored the response information in the context when
// encountering HttpResponse (which was not FullHttpResponse since it's not // encountering HttpResponse (which was not FullHttpResponse since it's not
// LastHttpContent). // LastHttpContent).
serverContexts.removeFirst(); serverContexts.pollFirst();
HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null); HttpResponse response = ctx.channel().attr(HTTP_SERVER_RESPONSE).getAndSet(null);
writePromise.addListener( writePromise.addListener(
future -> future ->
@ -130,7 +124,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap
try (Scope ignored = serverContext.context().makeCurrent()) { try (Scope ignored = serverContext.context().makeCurrent()) {
super.write(ctx, msg, writePromise); super.write(ctx, msg, writePromise);
} catch (Throwable throwable) { } catch (Throwable throwable) {
serverContexts.removeFirst(); serverContexts.pollFirst();
end(serverContext.context(), serverContext.request(), null, throwable); end(serverContext.context(), serverContext.request(), null, throwable);
throw throwable; throw throwable;
} }

View File

@ -9,12 +9,10 @@ import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingleto
import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateServerSpanName; import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateServerSpanName;
import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateSpanNames; import static io.opentelemetry.javaagent.instrumentation.ratpack.RatpackSingletons.updateSpanNames;
import io.netty.util.Attribute;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import java.util.Deque;
import ratpack.handling.Context; import ratpack.handling.Context;
import ratpack.handling.Handler; import ratpack.handling.Handler;
@ -26,10 +24,8 @@ public final class TracingHandler implements Handler {
@Override @Override
public void handle(Context ctx) { public void handle(Context ctx) {
Attribute<Deque<ServerContext>> serverContextAttribute = ServerContext serverContext =
ctx.getDirectChannelAccess().getChannel().attr(AttributeKeys.SERVER_CONTEXT); ServerContexts.peekFirst(ctx.getDirectChannelAccess().getChannel());
Deque<ServerContext> serverContexts = serverContextAttribute.get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
// Must use context from channel, as executor instrumentation is not accurate - Ratpack // Must use context from channel, as executor instrumentation is not accurate - Ratpack
// internally queues events and then drains them in batches, causing executor instrumentation to // internally queues events and then drains them in batches, causing executor instrumentation to

View File

@ -10,11 +10,10 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -39,8 +38,7 @@ public class ContextHandlerInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class) @Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope onEnter(@Advice.Argument(0) Channel channel) { public static Scope onEnter(@Advice.Argument(0) Channel channel) {
// set context to the first unprocessed request // set context to the first unprocessed request
Deque<ServerContext> serverContextx = channel.attr(AttributeKeys.SERVER_CONTEXT).get(); ServerContext serverContext = ServerContexts.peekFirst(channel);
ServerContext serverContext = serverContextx != null ? serverContextx.peekFirst() : null;
if (serverContext != null) { if (serverContext != null) {
return serverContext.context().makeCurrent(); return serverContext.context().makeCurrent();
} }

View File

@ -10,11 +10,10 @@ import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.internal.AttributeKeys;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext; import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContext;
import io.opentelemetry.instrumentation.netty.v4_1.internal.ServerContexts;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Deque;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher; import net.bytebuddy.matcher.ElementMatcher;
@ -40,9 +39,7 @@ public class HttpTrafficHandlerInstrumentation implements TypeInstrumentation {
public static Scope onEnter( public static Scope onEnter(
@Advice.FieldValue("ctx") ChannelHandlerContext channelHandlerContext) { @Advice.FieldValue("ctx") ChannelHandlerContext channelHandlerContext) {
// set context to the first unprocessed request // set context to the first unprocessed request
Deque<ServerContext> serverContexts = ServerContext serverContext = ServerContexts.peekFirst(channelHandlerContext.channel());
channelHandlerContext.channel().attr(AttributeKeys.SERVER_CONTEXT).get();
ServerContext serverContext = serverContexts != null ? serverContexts.peekFirst() : null;
if (serverContext != null) { if (serverContext != null) {
return serverContext.context().makeCurrent(); return serverContext.context().makeCurrent();
} }