From d2d67b4b51a278b99d659effc072760744d9ab62 Mon Sep 17 00:00:00 2001 From: iosmanthus Date: Wed, 19 Jan 2022 14:46:11 +0800 Subject: [PATCH] add netty mempool metrics Signed-off-by: iosmanthus --- src/main/java/io/netty/buffer/PoolArena.java | 741 +++++++++++++++++++ 1 file changed, 741 insertions(+) create mode 100644 src/main/java/io/netty/buffer/PoolArena.java diff --git a/src/main/java/io/netty/buffer/PoolArena.java b/src/main/java/io/netty/buffer/PoolArena.java new file mode 100644 index 0000000000..66ac2ac85d --- /dev/null +++ b/src/main/java/io/netty/buffer/PoolArena.java @@ -0,0 +1,741 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import static io.netty.buffer.PoolChunk.isSubpage; +import static java.lang.Math.max; + +import io.netty.util.internal.LongCounter; +import io.netty.util.internal.PlatformDependent; +import io.netty.util.internal.StringUtil; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +abstract class PoolArena extends SizeClasses implements PoolArenaMetric { + + public static final Counter poolArenaAllocations = + Counter.build() + .name("netty_buffer_pool_arena_allocations") + .help("Number of times a pool arena was allocated") + .labelNames("type") + .register(); + + public static final Histogram poolArenaAllocationsDuration = + Histogram.build() + .name("netty_buffer_pool_arena_allocations_duration_seconds") + .help("Duration of a pool arena allocation") + .labelNames("type") + .register(); + + static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); + + enum SizeClass { + Small, + Normal + } + + final PooledByteBufAllocator parent; + + final int numSmallSubpagePools; + final int directMemoryCacheAlignment; + final int directMemoryCacheAlignmentMask; + private final PoolSubpage[] smallSubpagePools; + + private final PoolChunkList q050; + private final PoolChunkList q025; + private final PoolChunkList q000; + private final PoolChunkList qInit; + private final PoolChunkList q075; + private final PoolChunkList q100; + + private final List chunkListMetrics; + + // Metrics for allocations and deallocations + private long allocationsNormal; + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter allocationsSmall = PlatformDependent.newLongCounter(); + private final LongCounter allocationsHuge = PlatformDependent.newLongCounter(); + private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter(); + + private long deallocationsSmall; + private long deallocationsNormal; + + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter(); + + // Number of thread caches backed by this arena. + final AtomicInteger numThreadCaches = new AtomicInteger(); + + // TODO: Test if adding padding helps under contention + // private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; + + protected PoolArena( + PooledByteBufAllocator parent, + int pageSize, + int pageShifts, + int chunkSize, + int cacheAlignment) { + super(pageSize, pageShifts, chunkSize, cacheAlignment); + this.parent = parent; + directMemoryCacheAlignment = cacheAlignment; + directMemoryCacheAlignmentMask = cacheAlignment - 1; + + numSmallSubpagePools = nSubpages; + smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); + for (int i = 0; i < smallSubpagePools.length; i++) { + smallSubpagePools[i] = newSubpagePoolHead(); + } + + q100 = new PoolChunkList(this, null, 100, Integer.MAX_VALUE, chunkSize); + q075 = new PoolChunkList(this, q100, 75, 100, chunkSize); + q050 = new PoolChunkList(this, q075, 50, 100, chunkSize); + q025 = new PoolChunkList(this, q050, 25, 75, chunkSize); + q000 = new PoolChunkList(this, q025, 1, 50, chunkSize); + qInit = new PoolChunkList(this, q000, Integer.MIN_VALUE, 25, chunkSize); + + q100.prevList(q075); + q075.prevList(q050); + q050.prevList(q025); + q025.prevList(q000); + q000.prevList(null); + qInit.prevList(qInit); + + List metrics = new ArrayList(6); + metrics.add(qInit); + metrics.add(q000); + metrics.add(q025); + metrics.add(q050); + metrics.add(q075); + metrics.add(q100); + chunkListMetrics = Collections.unmodifiableList(metrics); + } + + private PoolSubpage newSubpagePoolHead() { + PoolSubpage head = new PoolSubpage(); + head.prev = head; + head.next = head; + return head; + } + + @SuppressWarnings("unchecked") + private PoolSubpage[] newSubpagePoolArray(int size) { + return new PoolSubpage[size]; + } + + abstract boolean isDirect(); + + PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { + PooledByteBuf buf = newByteBuf(maxCapacity); + allocate(cache, buf, reqCapacity); + return buf; + } + + private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { + final int sizeIdx = size2SizeIdx(reqCapacity); + + if (sizeIdx <= smallMaxSizeIdx) { + Histogram.Timer smallAllocationTimer = + poolArenaAllocationsDuration.labels("small").startTimer(); + tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); + smallAllocationTimer.observeDuration(); + poolArenaAllocations.labels("small").inc(); + } else if (sizeIdx < nSizes) { + Histogram.Timer normalAllocationTimer = + poolArenaAllocationsDuration.labels("normal").startTimer(); + tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); + normalAllocationTimer.observeDuration(); + poolArenaAllocations.labels("normal").inc(); + } else { + Histogram.Timer hugeAllocationTimer = + poolArenaAllocationsDuration.labels("huge").startTimer(); + int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; + // Huge allocations are never served via the cache so just call allocateHuge + allocateHuge(buf, normCapacity); + hugeAllocationTimer.observeDuration(); + poolArenaAllocations.labels("huge").inc(); + } + } + + private void tcacheAllocateSmall( + PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { + + if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { + // was able to allocate out of the cache so move on + return; + } + + /** + * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link + * PoolChunk#free(long)} may modify the doubly linked list as well. + */ + final PoolSubpage head = smallSubpagePools[sizeIdx]; + final boolean needsNormalAllocation; + synchronized (head) { + final PoolSubpage s = head.next; + needsNormalAllocation = s == head; + if (!needsNormalAllocation) { + assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); + long handle = s.allocate(); + assert handle >= 0; + s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); + } + } + + if (needsNormalAllocation) { + synchronized (this) { + allocateNormal(buf, reqCapacity, sizeIdx, cache); + } + } + + incSmallAllocation(); + } + + private void tcacheAllocateNormal( + PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { + if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { + // was able to allocate out of the cache so move on + return; + } + synchronized (this) { + allocateNormal(buf, reqCapacity, sizeIdx, cache); + ++allocationsNormal; + } + } + + // Method must be called inside synchronized(this) { ... } block + private void allocateNormal( + PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { + if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) + || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) + || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) + || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) + || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { + return; + } + + // Add a new chunk. + PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); + boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); + assert success; + qInit.add(c); + } + + private void incSmallAllocation() { + allocationsSmall.increment(); + } + + private void allocateHuge(PooledByteBuf buf, int reqCapacity) { + PoolChunk chunk = newUnpooledChunk(reqCapacity); + activeBytesHuge.add(chunk.chunkSize()); + buf.initUnpooled(chunk, reqCapacity); + allocationsHuge.increment(); + } + + void free( + PoolChunk chunk, + ByteBuffer nioBuffer, + long handle, + int normCapacity, + PoolThreadCache cache) { + if (chunk.unpooled) { + int size = chunk.chunkSize(); + destroyChunk(chunk); + activeBytesHuge.add(-size); + deallocationsHuge.increment(); + } else { + SizeClass sizeClass = sizeClass(handle); + if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { + // cached so not free it. + return; + } + + freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); + } + } + + private SizeClass sizeClass(long handle) { + return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal; + } + + void freeChunk( + PoolChunk chunk, + long handle, + int normCapacity, + SizeClass sizeClass, + ByteBuffer nioBuffer, + boolean finalizer) { + final boolean destroyChunk; + synchronized (this) { + // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as + // otherwise this + // may fail due lazy class-loading in for example tomcat. + if (!finalizer) { + switch (sizeClass) { + case Normal: + ++deallocationsNormal; + break; + case Small: + ++deallocationsSmall; + break; + default: + throw new Error(); + } + } + destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); + } + if (destroyChunk) { + // destroyChunk not need to be called while holding the synchronized lock. + destroyChunk(chunk); + } + } + + PoolSubpage findSubpagePoolHead(int sizeIdx) { + return smallSubpagePools[sizeIdx]; + } + + void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { + assert newCapacity >= 0 && newCapacity <= buf.maxCapacity(); + + int oldCapacity = buf.length; + if (oldCapacity == newCapacity) { + return; + } + + PoolChunk oldChunk = buf.chunk; + ByteBuffer oldNioBuffer = buf.tmpNioBuf; + long oldHandle = buf.handle; + T oldMemory = buf.memory; + int oldOffset = buf.offset; + int oldMaxLength = buf.maxLength; + + // This does not touch buf's reader/writer indices + allocate(parent.threadCache(), buf, newCapacity); + int bytesToCopy; + if (newCapacity > oldCapacity) { + bytesToCopy = oldCapacity; + } else { + buf.trimIndicesToCapacity(newCapacity); + bytesToCopy = newCapacity; + } + memoryCopy(oldMemory, oldOffset, buf, bytesToCopy); + if (freeOldMemory) { + free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache); + } + } + + @Override + public int numThreadCaches() { + return numThreadCaches.get(); + } + + @Override + public int numTinySubpages() { + return 0; + } + + @Override + public int numSmallSubpages() { + return smallSubpagePools.length; + } + + @Override + public int numChunkLists() { + return chunkListMetrics.size(); + } + + @Override + public List tinySubpages() { + return Collections.emptyList(); + } + + @Override + public List smallSubpages() { + return subPageMetricList(smallSubpagePools); + } + + @Override + public List chunkLists() { + return chunkListMetrics; + } + + private static List subPageMetricList(PoolSubpage[] pages) { + List metrics = new ArrayList(); + for (PoolSubpage head : pages) { + if (head.next == head) { + continue; + } + PoolSubpage s = head.next; + for (; ; ) { + metrics.add(s); + s = s.next; + if (s == head) { + break; + } + } + } + return metrics; + } + + @Override + public long numAllocations() { + final long allocsNormal; + synchronized (this) { + allocsNormal = allocationsNormal; + } + return allocationsSmall.value() + allocsNormal + allocationsHuge.value(); + } + + @Override + public long numTinyAllocations() { + return 0; + } + + @Override + public long numSmallAllocations() { + return allocationsSmall.value(); + } + + @Override + public synchronized long numNormalAllocations() { + return allocationsNormal; + } + + @Override + public long numDeallocations() { + final long deallocs; + synchronized (this) { + deallocs = deallocationsSmall + deallocationsNormal; + } + return deallocs + deallocationsHuge.value(); + } + + @Override + public long numTinyDeallocations() { + return 0; + } + + @Override + public synchronized long numSmallDeallocations() { + return deallocationsSmall; + } + + @Override + public synchronized long numNormalDeallocations() { + return deallocationsNormal; + } + + @Override + public long numHugeAllocations() { + return allocationsHuge.value(); + } + + @Override + public long numHugeDeallocations() { + return deallocationsHuge.value(); + } + + @Override + public long numActiveAllocations() { + long val = allocationsSmall.value() + allocationsHuge.value() - deallocationsHuge.value(); + synchronized (this) { + val += allocationsNormal - (deallocationsSmall + deallocationsNormal); + } + return max(val, 0); + } + + @Override + public long numActiveTinyAllocations() { + return 0; + } + + @Override + public long numActiveSmallAllocations() { + return max(numSmallAllocations() - numSmallDeallocations(), 0); + } + + @Override + public long numActiveNormalAllocations() { + final long val; + synchronized (this) { + val = allocationsNormal - deallocationsNormal; + } + return max(val, 0); + } + + @Override + public long numActiveHugeAllocations() { + return max(numHugeAllocations() - numHugeDeallocations(), 0); + } + + @Override + public long numActiveBytes() { + long val = activeBytesHuge.value(); + synchronized (this) { + for (int i = 0; i < chunkListMetrics.size(); i++) { + for (PoolChunkMetric m : chunkListMetrics.get(i)) { + val += m.chunkSize(); + } + } + } + return max(0, val); + } + + protected abstract PoolChunk newChunk( + int pageSize, int maxPageIdx, int pageShifts, int chunkSize); + + protected abstract PoolChunk newUnpooledChunk(int capacity); + + protected abstract PooledByteBuf newByteBuf(int maxCapacity); + + protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf dst, int length); + + protected abstract void destroyChunk(PoolChunk chunk); + + @Override + public synchronized String toString() { + StringBuilder buf = + new StringBuilder() + .append("Chunk(s) at 0~25%:") + .append(StringUtil.NEWLINE) + .append(qInit) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 0~50%:") + .append(StringUtil.NEWLINE) + .append(q000) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 25~75%:") + .append(StringUtil.NEWLINE) + .append(q025) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 50~100%:") + .append(StringUtil.NEWLINE) + .append(q050) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 75~100%:") + .append(StringUtil.NEWLINE) + .append(q075) + .append(StringUtil.NEWLINE) + .append("Chunk(s) at 100%:") + .append(StringUtil.NEWLINE) + .append(q100) + .append(StringUtil.NEWLINE) + .append("small subpages:"); + appendPoolSubPages(buf, smallSubpagePools); + buf.append(StringUtil.NEWLINE); + + return buf.toString(); + } + + private static void appendPoolSubPages(StringBuilder buf, PoolSubpage[] subpages) { + for (int i = 0; i < subpages.length; i++) { + PoolSubpage head = subpages[i]; + if (head.next == head) { + continue; + } + + buf.append(StringUtil.NEWLINE).append(i).append(": "); + PoolSubpage s = head.next; + for (; ; ) { + buf.append(s); + s = s.next; + if (s == head) { + break; + } + } + } + } + + @Override + protected final void finalize() throws Throwable { + try { + super.finalize(); + } finally { + destroyPoolSubPages(smallSubpagePools); + destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100); + } + } + + private static void destroyPoolSubPages(PoolSubpage[] pages) { + for (PoolSubpage page : pages) { + page.destroy(); + } + } + + private void destroyPoolChunkLists(PoolChunkList... chunkLists) { + for (PoolChunkList chunkList : chunkLists) { + chunkList.destroy(this); + } + } + + static final class HeapArena extends PoolArena { + + HeapArena( + PooledByteBufAllocator parent, + int pageSize, + int pageShifts, + int chunkSize, + int directMemoryCacheAlignment) { + super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); + } + + private static byte[] newByteArray(int size) { + return PlatformDependent.allocateUninitializedArray(size); + } + + @Override + boolean isDirect() { + return false; + } + + @Override + protected PoolChunk newChunk( + int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { + return new PoolChunk( + this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); + } + + @Override + protected PoolChunk newUnpooledChunk(int capacity) { + return new PoolChunk(this, newByteArray(capacity), capacity, 0); + } + + @Override + protected void destroyChunk(PoolChunk chunk) { + // Rely on GC. + } + + @Override + protected PooledByteBuf newByteBuf(int maxCapacity) { + return HAS_UNSAFE + ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) + : PooledHeapByteBuf.newInstance(maxCapacity); + } + + @Override + protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf dst, int length) { + if (length == 0) { + return; + } + + System.arraycopy(src, srcOffset, dst.memory, dst.offset, length); + } + } + + static final class DirectArena extends PoolArena { + + DirectArena( + PooledByteBufAllocator parent, + int pageSize, + int pageShifts, + int chunkSize, + int directMemoryCacheAlignment) { + super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); + } + + @Override + boolean isDirect() { + return true; + } + + // mark as package-private, only for unit test + int offsetCacheLine(ByteBuffer memory) { + // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) + // will + // throw an NPE. + int remainder = + HAS_UNSAFE + ? (int) + (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask) + : 0; + + // offset = alignment - address & (alignment - 1) + return directMemoryCacheAlignment - remainder; + } + + @Override + protected PoolChunk newChunk( + int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { + if (directMemoryCacheAlignment == 0) { + return new PoolChunk( + this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); + } + final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); + return new PoolChunk( + this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); + } + + @Override + protected PoolChunk newUnpooledChunk(int capacity) { + if (directMemoryCacheAlignment == 0) { + return new PoolChunk(this, allocateDirect(capacity), capacity, 0); + } + final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment); + return new PoolChunk(this, memory, capacity, offsetCacheLine(memory)); + } + + private static ByteBuffer allocateDirect(int capacity) { + return PlatformDependent.useDirectBufferNoCleaner() + ? PlatformDependent.allocateDirectNoCleaner(capacity) + : ByteBuffer.allocateDirect(capacity); + } + + @Override + protected void destroyChunk(PoolChunk chunk) { + if (PlatformDependent.useDirectBufferNoCleaner()) { + PlatformDependent.freeDirectNoCleaner(chunk.memory); + } else { + PlatformDependent.freeDirectBuffer(chunk.memory); + } + } + + @Override + protected PooledByteBuf newByteBuf(int maxCapacity) { + if (HAS_UNSAFE) { + return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); + } else { + return PooledDirectByteBuf.newInstance(maxCapacity); + } + } + + @Override + protected void memoryCopy( + ByteBuffer src, int srcOffset, PooledByteBuf dstBuf, int length) { + if (length == 0) { + return; + } + + if (HAS_UNSAFE) { + PlatformDependent.copyMemory( + PlatformDependent.directBufferAddress(src) + srcOffset, + PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, + length); + } else { + // We must duplicate the NIO buffers because they may be accessed by other Netty buffers. + src = src.duplicate(); + ByteBuffer dst = dstBuf.internalNioBuffer(); + src.position(srcOffset).limit(srcOffset + length); + dst.position(dstBuf.offset); + dst.put(src); + } + } + } +}