netty: Touch ByteBuf when message framing has been decoded

When a memory leak occurs, it is really helpful to have access records
to understand where the buffer was being held when it leaked. retain()
when we create the NettyReadableBuffer already creates an access record
the ByteBuf, so here we track when the ByteBuf is passed to another
thread.

See #8330
This commit is contained in:
Eric Anderson 2023-09-06 15:24:29 -07:00
parent 131a0047c4
commit d4a163461a
4 changed files with 19 additions and 0 deletions

View File

@ -97,6 +97,11 @@ public abstract class ForwardingReadableBuffer implements ReadableBuffer {
return buf.arrayOffset(); return buf.arrayOffset();
} }
@Override
public void touch() {
buf.touch();
}
@Override @Override
public boolean markSupported() { public boolean markSupported() {
return buf.markSupported(); return buf.markSupported();

View File

@ -409,6 +409,7 @@ public class MessageDeframer implements Closeable, Deframer {
statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1); statsTraceCtx.inboundMessageRead(currentMessageSeqNo, inboundBodyWireSize, -1);
inboundBodyWireSize = 0; inboundBodyWireSize = 0;
InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody(); InputStream stream = compressedFlag ? getCompressedBody() : getUncompressedBody();
nextFrame.touch();
nextFrame = null; nextFrame = null;
listener.messagesAvailable(new SingleMessageProducer(stream)); listener.messagesAvailable(new SingleMessageProducer(stream));

View File

@ -124,6 +124,14 @@ public interface ReadableBuffer extends Closeable {
*/ */
int arrayOffset(); int arrayOffset();
/**
* Note that the current callsite has access to this buffer, or do nothing. This is only useful
* when the buffer has leak detection and intrumentation to record usages before the buffer was
* leaked. That can make it much easier to track down where the buffer was leaked. If this isn't
* such a buffer, the method does nothing.
*/
default void touch() {}
/** /**
* Indicates whether or not {@link #mark} operation is supported for this buffer. * Indicates whether or not {@link #mark} operation is supported for this buffer.
*/ */

View File

@ -94,6 +94,11 @@ class NettyReadableBuffer extends AbstractReadableBuffer {
return buffer.arrayOffset() + buffer.readerIndex(); return buffer.arrayOffset() + buffer.readerIndex();
} }
@Override
public void touch() {
buffer.touch();
}
@Override @Override
public boolean markSupported() { public boolean markSupported() {
return true; return true;