/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.buffer; import static io.netty.buffer.PoolChunk.isSubpage; import static java.lang.Math.max; import io.netty.util.internal.LongCounter; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; abstract class PoolArena extends SizeClasses implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); enum SizeClass { Small, Normal } final PooledByteBufAllocator parent; final int numSmallSubpagePools; final int directMemoryCacheAlignment; final int directMemoryCacheAlignmentMask; private final PoolSubpage[] smallSubpagePools; private final PoolChunkList q050; private final PoolChunkList q025; private final PoolChunkList q000; private final PoolChunkList qInit; private final PoolChunkList q075; private final PoolChunkList q100; private final List chunkListMetrics; // Metrics for allocations and deallocations private long allocationsNormal; // We need to use the LongCounter here as this is not guarded via synchronized block. private final LongCounter allocationsSmall = PlatformDependent.newLongCounter(); private final LongCounter allocationsHuge = PlatformDependent.newLongCounter(); private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter(); private long deallocationsSmall; private long deallocationsNormal; // We need to use the LongCounter here as this is not guarded via synchronized block. private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter(); // Number of thread caches backed by this arena. final AtomicInteger numThreadCaches = new AtomicInteger(); // TODO: Test if adding padding helps under contention // private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; protected PoolArena( PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int cacheAlignment) { super(pageSize, pageShifts, chunkSize, cacheAlignment); this.parent = parent; directMemoryCacheAlignment = cacheAlignment; directMemoryCacheAlignmentMask = cacheAlignment - 1; numSmallSubpagePools = nSubpages; smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools); for (int i = 0; i < smallSubpagePools.length; i++) { smallSubpagePools[i] = newSubpagePoolHead(); } q100 = new PoolChunkList(this, null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkList(this, q100, 75, 100, chunkSize); q050 = new PoolChunkList(this, q075, 50, 100, chunkSize); q025 = new PoolChunkList(this, q050, 25, 75, chunkSize); q000 = new PoolChunkList(this, q025, 1, 50, chunkSize); qInit = new PoolChunkList(this, q000, Integer.MIN_VALUE, 25, chunkSize); q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); List metrics = new ArrayList(6); metrics.add(qInit); metrics.add(q000); metrics.add(q025); metrics.add(q050); metrics.add(q075); metrics.add(q100); chunkListMetrics = Collections.unmodifiableList(metrics); } private PoolSubpage newSubpagePoolHead() { PoolSubpage head = new PoolSubpage(); head.prev = head; head.next = head; return head; } @SuppressWarnings("unchecked") private PoolSubpage[] newSubpagePoolArray(int size) { return new PoolSubpage[size]; } abstract boolean isDirect(); PooledByteBuf allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; } private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { final int sizeIdx = size2SizeIdx(reqCapacity); if (sizeIdx <= smallMaxSizeIdx) { tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) { tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; // Huge allocations are never served via the cache so just call allocateHuge allocateHuge(buf, normCapacity); } } private void tcacheAllocateSmall( PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } /** * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and {@link * PoolChunk#free(long)} may modify the doubly linked list as well. */ final PoolSubpage head = smallSubpagePools[sizeIdx]; final boolean needsNormalAllocation; synchronized (head) { final PoolSubpage s = head.next; needsNormalAllocation = s == head; if (!needsNormalAllocation) { assert s.doNotDestroy && s.elemSize == sizeIdx2size(sizeIdx); long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); } } if (needsNormalAllocation) { synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); } } incSmallAllocation(); } private void tcacheAllocateNormal( PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity, final int sizeIdx) { if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { // was able to allocate out of the cache so move on return; } synchronized (this) { allocateNormal(buf, reqCapacity, sizeIdx, cache); ++allocationsNormal; } } // Method must be called inside synchronized(this) { ... } block private void allocateNormal( PooledByteBuf buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) { if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { return; } // Add a new chunk. PoolChunk c = newChunk(pageSize, nPSizes, pageShifts, chunkSize); boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); assert success; qInit.add(c); } private void incSmallAllocation() { allocationsSmall.increment(); } private void allocateHuge(PooledByteBuf buf, int reqCapacity) { PoolChunk chunk = newUnpooledChunk(reqCapacity); activeBytesHuge.add(chunk.chunkSize()); buf.initUnpooled(chunk, reqCapacity); allocationsHuge.increment(); } void free( PoolChunk chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { SizeClass sizeClass = sizeClass(handle); if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) { // cached so not free it. return; } freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false); } } private SizeClass sizeClass(long handle) { return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal; } void freeChunk( PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) { final boolean destroyChunk; synchronized (this) { // We only call this if freeChunk is not called because of the PoolThreadCache finalizer as // otherwise this // may fail due lazy class-loading in for example tomcat. if (!finalizer) { switch (sizeClass) { case Normal: ++deallocationsNormal; break; case Small: ++deallocationsSmall; break; default: throw new Error(); } } destroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer); } if (destroyChunk) { // destroyChunk not need to be called while holding the synchronized lock. destroyChunk(chunk); } } PoolSubpage findSubpagePoolHead(int sizeIdx) { return smallSubpagePools[sizeIdx]; } void reallocate(PooledByteBuf buf, int newCapacity, boolean freeOldMemory) { assert newCapacity >= 0 && newCapacity <= buf.maxCapacity(); int oldCapacity = buf.length; if (oldCapacity == newCapacity) { return; } PoolChunk oldChunk = buf.chunk; ByteBuffer oldNioBuffer = buf.tmpNioBuf; long oldHandle = buf.handle; T oldMemory = buf.memory; int oldOffset = buf.offset; int oldMaxLength = buf.maxLength; // This does not touch buf's reader/writer indices allocate(parent.threadCache(), buf, newCapacity); int bytesToCopy; if (newCapacity > oldCapacity) { bytesToCopy = oldCapacity; } else { buf.trimIndicesToCapacity(newCapacity); bytesToCopy = newCapacity; } memoryCopy(oldMemory, oldOffset, buf, bytesToCopy); if (freeOldMemory) { free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache); } } @Override public int numThreadCaches() { return numThreadCaches.get(); } @Override public int numTinySubpages() { return 0; } @Override public int numSmallSubpages() { return smallSubpagePools.length; } @Override public int numChunkLists() { return chunkListMetrics.size(); } @Override public List tinySubpages() { return Collections.emptyList(); } @Override public List smallSubpages() { return subPageMetricList(smallSubpagePools); } @Override public List chunkLists() { return chunkListMetrics; } private static List subPageMetricList(PoolSubpage[] pages) { List metrics = new ArrayList(); for (PoolSubpage head : pages) { if (head.next == head) { continue; } PoolSubpage s = head.next; for (; ; ) { metrics.add(s); s = s.next; if (s == head) { break; } } } return metrics; } @Override public long numAllocations() { final long allocsNormal; synchronized (this) { allocsNormal = allocationsNormal; } return allocationsSmall.value() + allocsNormal + allocationsHuge.value(); } @Override public long numTinyAllocations() { return 0; } @Override public long numSmallAllocations() { return allocationsSmall.value(); } @Override public synchronized long numNormalAllocations() { return allocationsNormal; } @Override public long numDeallocations() { final long deallocs; synchronized (this) { deallocs = deallocationsSmall + deallocationsNormal; } return deallocs + deallocationsHuge.value(); } @Override public long numTinyDeallocations() { return 0; } @Override public synchronized long numSmallDeallocations() { return deallocationsSmall; } @Override public synchronized long numNormalDeallocations() { return deallocationsNormal; } @Override public long numHugeAllocations() { return allocationsHuge.value(); } @Override public long numHugeDeallocations() { return deallocationsHuge.value(); } @Override public long numActiveAllocations() { long val = allocationsSmall.value() + allocationsHuge.value() - deallocationsHuge.value(); synchronized (this) { val += allocationsNormal - (deallocationsSmall + deallocationsNormal); } return max(val, 0); } @Override public long numActiveTinyAllocations() { return 0; } @Override public long numActiveSmallAllocations() { return max(numSmallAllocations() - numSmallDeallocations(), 0); } @Override public long numActiveNormalAllocations() { final long val; synchronized (this) { val = allocationsNormal - deallocationsNormal; } return max(val, 0); } @Override public long numActiveHugeAllocations() { return max(numHugeAllocations() - numHugeDeallocations(), 0); } @Override public long numActiveBytes() { long val = activeBytesHuge.value(); synchronized (this) { for (int i = 0; i < chunkListMetrics.size(); i++) { for (PoolChunkMetric m : chunkListMetrics.get(i)) { val += m.chunkSize(); } } } return max(0, val); } protected abstract PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize); protected abstract PoolChunk newUnpooledChunk(int capacity); protected abstract PooledByteBuf newByteBuf(int maxCapacity); protected abstract void memoryCopy(T src, int srcOffset, PooledByteBuf dst, int length); protected abstract void destroyChunk(PoolChunk chunk); @Override public synchronized String toString() { StringBuilder buf = new StringBuilder() .append("Chunk(s) at 0~25%:") .append(StringUtil.NEWLINE) .append(qInit) .append(StringUtil.NEWLINE) .append("Chunk(s) at 0~50%:") .append(StringUtil.NEWLINE) .append(q000) .append(StringUtil.NEWLINE) .append("Chunk(s) at 25~75%:") .append(StringUtil.NEWLINE) .append(q025) .append(StringUtil.NEWLINE) .append("Chunk(s) at 50~100%:") .append(StringUtil.NEWLINE) .append(q050) .append(StringUtil.NEWLINE) .append("Chunk(s) at 75~100%:") .append(StringUtil.NEWLINE) .append(q075) .append(StringUtil.NEWLINE) .append("Chunk(s) at 100%:") .append(StringUtil.NEWLINE) .append(q100) .append(StringUtil.NEWLINE) .append("small subpages:"); appendPoolSubPages(buf, smallSubpagePools); buf.append(StringUtil.NEWLINE); return buf.toString(); } private static void appendPoolSubPages(StringBuilder buf, PoolSubpage[] subpages) { for (int i = 0; i < subpages.length; i++) { PoolSubpage head = subpages[i]; if (head.next == head) { continue; } buf.append(StringUtil.NEWLINE).append(i).append(": "); PoolSubpage s = head.next; for (; ; ) { buf.append(s); s = s.next; if (s == head) { break; } } } } @Override protected final void finalize() throws Throwable { try { super.finalize(); } finally { destroyPoolSubPages(smallSubpagePools); destroyPoolChunkLists(qInit, q000, q025, q050, q075, q100); } } private static void destroyPoolSubPages(PoolSubpage[] pages) { for (PoolSubpage page : pages) { page.destroy(); } } private void destroyPoolChunkLists(PoolChunkList... chunkLists) { for (PoolChunkList chunkList : chunkLists) { chunkList.destroy(this); } } static final class HeapArena extends PoolArena { HeapArena( PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); } private static byte[] newByteArray(int size) { return PlatformDependent.allocateUninitializedArray(size); } @Override boolean isDirect() { return false; } @Override protected PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { return new PoolChunk( this, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); } @Override protected PoolChunk newUnpooledChunk(int capacity) { return new PoolChunk(this, newByteArray(capacity), capacity, 0); } @Override protected void destroyChunk(PoolChunk chunk) { // Rely on GC. } @Override protected PooledByteBuf newByteBuf(int maxCapacity) { return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity) : PooledHeapByteBuf.newInstance(maxCapacity); } @Override protected void memoryCopy(byte[] src, int srcOffset, PooledByteBuf dst, int length) { if (length == 0) { return; } System.arraycopy(src, srcOffset, dst.memory, dst.offset, length); } } static final class DirectArena extends PoolArena { DirectArena( PooledByteBufAllocator parent, int pageSize, int pageShifts, int chunkSize, int directMemoryCacheAlignment) { super(parent, pageSize, pageShifts, chunkSize, directMemoryCacheAlignment); } @Override boolean isDirect() { return true; } // mark as package-private, only for unit test int offsetCacheLine(ByteBuffer memory) { // We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) // will // throw an NPE. int remainder = HAS_UNSAFE ? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask) : 0; // offset = alignment - address & (alignment - 1) return directMemoryCacheAlignment - remainder; } @Override protected PoolChunk newChunk( int pageSize, int maxPageIdx, int pageShifts, int chunkSize) { if (directMemoryCacheAlignment == 0) { return new PoolChunk( this, allocateDirect(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx, 0); } final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment); return new PoolChunk( this, memory, pageSize, pageShifts, chunkSize, maxPageIdx, offsetCacheLine(memory)); } @Override protected PoolChunk newUnpooledChunk(int capacity) { if (directMemoryCacheAlignment == 0) { return new PoolChunk(this, allocateDirect(capacity), capacity, 0); } final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment); return new PoolChunk(this, memory, capacity, offsetCacheLine(memory)); } private static ByteBuffer allocateDirect(int capacity) { return PlatformDependent.useDirectBufferNoCleaner() ? PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); } @Override protected void destroyChunk(PoolChunk chunk) { if (PlatformDependent.useDirectBufferNoCleaner()) { PlatformDependent.freeDirectNoCleaner(chunk.memory); } else { PlatformDependent.freeDirectBuffer(chunk.memory); } } @Override protected PooledByteBuf newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } @Override protected void memoryCopy( ByteBuffer src, int srcOffset, PooledByteBuf dstBuf, int length) { if (length == 0) { return; } if (HAS_UNSAFE) { PlatformDependent.copyMemory( PlatformDependent.directBufferAddress(src) + srcOffset, PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length); } else { // We must duplicate the NIO buffers because they may be accessed by other Netty buffers. src = src.duplicate(); ByteBuffer dst = dstBuf.internalNioBuffer(); src.position(srcOffset).limit(srcOffset + length); dst.position(dstBuf.offset); dst.put(src); } } } }