mirror of https://github.com/grpc/grpc-java.git
netty:Fix Netty composite buffer merging to be compatible with Netty 4.1.111 (#11294)
* Use addComponent instead of addFlattenedComponent and do not append to components that are composites.
This commit is contained in:
parent
d57f2719b2
commit
0fea7dd32e
|
|
@ -23,6 +23,18 @@ import io.netty.buffer.ByteBufAllocator;
|
||||||
import io.netty.buffer.CompositeByteBuf;
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
|
import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
|
||||||
|
* compose strategies.
|
||||||
|
* <br><br>
|
||||||
|
*
|
||||||
|
* <p><b><font color="red">Avoid using</font></b>
|
||||||
|
* {@link CompositeByteBuf#addFlattenedComponents(boolean, ByteBuf)} as it can lead
|
||||||
|
* to corruption, where the components' readable area are not equal to the Composite's capacity
|
||||||
|
* (see https://github.com/netty/netty/issues/12844).
|
||||||
|
*/
|
||||||
|
|
||||||
class NettyAdaptiveCumulator implements Cumulator {
|
class NettyAdaptiveCumulator implements Cumulator {
|
||||||
private final int composeMinSize;
|
private final int composeMinSize;
|
||||||
|
|
||||||
|
|
@ -83,8 +95,7 @@ class NettyAdaptiveCumulator implements Cumulator {
|
||||||
composite.capacity(composite.writerIndex());
|
composite.capacity(composite.writerIndex());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
composite = alloc.compositeBuffer(Integer.MAX_VALUE)
|
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addComponent(true, cumulation);
|
||||||
.addFlattenedComponents(true, cumulation);
|
|
||||||
}
|
}
|
||||||
addInput(alloc, composite, in);
|
addInput(alloc, composite, in);
|
||||||
in = null;
|
in = null;
|
||||||
|
|
@ -104,7 +115,7 @@ class NettyAdaptiveCumulator implements Cumulator {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
|
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
|
||||||
if (shouldCompose(composite, in, composeMinSize)) {
|
if (shouldCompose(composite, in, composeMinSize)) {
|
||||||
composite.addFlattenedComponents(true, in);
|
composite.addComponent(true, in);
|
||||||
} else {
|
} else {
|
||||||
// The total size of the new data and the last component are below the threshold. Merge them.
|
// The total size of the new data and the last component are below the threshold. Merge them.
|
||||||
mergeWithCompositeTail(alloc, composite, in);
|
mergeWithCompositeTail(alloc, composite, in);
|
||||||
|
|
@ -150,31 +161,13 @@ class NettyAdaptiveCumulator implements Cumulator {
|
||||||
ByteBuf tail = composite.component(tailComponentIndex);
|
ByteBuf tail = composite.component(tailComponentIndex);
|
||||||
ByteBuf newTail = null;
|
ByteBuf newTail = null;
|
||||||
try {
|
try {
|
||||||
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
|
if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()
|
||||||
|
&& !isCompositeOrWrappedComposite(tail)) {
|
||||||
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.
|
// Ideal case: the tail isn't shared, and can be expanded to the required capacity.
|
||||||
|
|
||||||
// Take ownership of the tail.
|
// Take ownership of the tail.
|
||||||
newTail = tail.retain();
|
newTail = tail.retain();
|
||||||
|
|
||||||
// TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
|
|
||||||
// the issue fixed.
|
|
||||||
// In certain cases, removing the CompositeByteBuf component, and then adding it back
|
|
||||||
// isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
|
|
||||||
// This happens because the buffer returned by composite.component() has out-of-sync
|
|
||||||
// indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
|
|
||||||
// buffer, but doesn't set the indexes.
|
|
||||||
//
|
|
||||||
// To get the right indexes we use the fact that composite.internalComponent() returns
|
|
||||||
// the slice() into the readable portion of the underlying buffer.
|
|
||||||
// We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
|
|
||||||
// and combine it with the fact that SlicedByteBuf duplicates have their indexes
|
|
||||||
// adjusted so they correspond to the to the readable portion of the slice.
|
|
||||||
//
|
|
||||||
// Hence composite.internalComponent().duplicate() returns a buffer with the
|
|
||||||
// indexes that should've been on the composite.component() in the first place.
|
|
||||||
// Until the issue is fixed, we manually adjust the indexes of the removed component.
|
|
||||||
ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
|
|
||||||
newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
|
* The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
|
||||||
*
|
*
|
||||||
|
|
@ -188,20 +181,26 @@ class NettyAdaptiveCumulator implements Cumulator {
|
||||||
* as pronounced because the capacity is doubled with each reallocation.
|
* as pronounced because the capacity is doubled with each reallocation.
|
||||||
*/
|
*/
|
||||||
newTail.writeBytes(in);
|
newTail.writeBytes(in);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
|
// The tail satisfies one or more criteria:
|
||||||
|
// - Shared
|
||||||
|
// - Not expandable
|
||||||
|
// - Composite
|
||||||
|
// - Wrapped Composite
|
||||||
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
|
newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
|
||||||
newTail.setBytes(0, composite, tailStart, tailSize)
|
newTail.setBytes(0, composite, tailStart, tailSize)
|
||||||
.setBytes(tailSize, in, in.readerIndex(), inputSize)
|
.setBytes(tailSize, in, in.readerIndex(), inputSize)
|
||||||
.writerIndex(newTailSize);
|
.writerIndex(newTailSize);
|
||||||
in.readerIndex(in.writerIndex());
|
in.readerIndex(in.writerIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store readerIndex to avoid out of bounds writerIndex during component replacement.
|
// Store readerIndex to avoid out of bounds writerIndex during component replacement.
|
||||||
int prevReader = composite.readerIndex();
|
int prevReader = composite.readerIndex();
|
||||||
// Remove the old tail, reset writer index.
|
// Remove the old tail, reset writer index.
|
||||||
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
|
composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
|
||||||
// Add back the new tail.
|
// Add back the new tail.
|
||||||
composite.addFlattenedComponents(true, newTail);
|
composite.addComponent(true, newTail);
|
||||||
// New tail's ownership transferred to the composite buf.
|
// New tail's ownership transferred to the composite buf.
|
||||||
newTail = null;
|
newTail = null;
|
||||||
composite.readerIndex(prevReader);
|
composite.readerIndex(prevReader);
|
||||||
|
|
@ -216,4 +215,12 @@ class NettyAdaptiveCumulator implements Cumulator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isCompositeOrWrappedComposite(ByteBuf tail) {
|
||||||
|
ByteBuf cur = tail;
|
||||||
|
while (cur.unwrap() != null) {
|
||||||
|
cur = cur.unwrap();
|
||||||
|
}
|
||||||
|
return cur instanceof CompositeByteBuf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
@Override
|
@Override
|
||||||
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
|
void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
|
||||||
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
|
// To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
|
||||||
composite.addFlattenedComponents(true, in);
|
composite.addComponent(true, in);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -208,8 +208,8 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
in = ByteBufUtil.writeAscii(alloc, inData);
|
in = ByteBufUtil.writeAscii(alloc, inData);
|
||||||
tail = ByteBufUtil.writeAscii(alloc, tailData);
|
tail = ByteBufUtil.writeAscii(alloc, tailData);
|
||||||
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
|
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
|
||||||
// Note that addFlattenedComponents() will not add a new component when tail is not readable.
|
// Note that addComponent() will not add a new component when tail is not readable.
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
@ -345,7 +345,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());
|
assertThat(in.readableBytes()).isAtMost(tail.writableBytes());
|
||||||
|
|
||||||
// All fits, so tail capacity must stay the same.
|
// All fits, so tail capacity must stay the same.
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
|
assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -362,7 +362,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);
|
alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);
|
||||||
|
|
||||||
// Tail capacity is extended to its fast capacity.
|
// Tail capacity is extended to its fast capacity.
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
|
assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -372,7 +372,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
@SuppressWarnings("InlineMeInliner") // Requires Java 11
|
@SuppressWarnings("InlineMeInliner") // Requires Java 11
|
||||||
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
|
String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
|
||||||
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
|
int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length();
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
|
|
||||||
// Make input larger than tailFastCapacity
|
// Make input larger than tailFastCapacity
|
||||||
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
|
in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
|
||||||
|
|
@ -435,21 +435,21 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
@SuppressWarnings("InlineMeInliner") // Requires Java 11
|
@SuppressWarnings("InlineMeInliner") // Requires Java 11
|
||||||
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
|
String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
|
||||||
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
|
tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
assertTailReplaced();
|
assertTailReplaced();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mergeWithCompositeTail_tailNotExpandable_shared() {
|
public void mergeWithCompositeTail_tailNotExpandable_shared() {
|
||||||
tail.retain();
|
tail.retain();
|
||||||
composite.addFlattenedComponents(true, tail);
|
composite.addComponent(true, tail);
|
||||||
assertTailReplaced();
|
assertTailReplaced();
|
||||||
tail.release();
|
tail.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
|
public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
|
||||||
composite.addFlattenedComponents(true, tail.asReadOnly());
|
composite.addComponent(true, tail.asReadOnly());
|
||||||
assertTailReplaced();
|
assertTailReplaced();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -527,8 +527,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
|
CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
|
||||||
tail) {
|
tail) {
|
||||||
@Override
|
@Override
|
||||||
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
|
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||||
ByteBuf buffer) {
|
|
||||||
throw expectedError;
|
throw expectedError;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -561,8 +560,7 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
|
CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
|
||||||
tail.asReadOnly()) {
|
tail.asReadOnly()) {
|
||||||
@Override
|
@Override
|
||||||
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
|
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
|
||||||
ByteBuf buffer) {
|
|
||||||
throw expectedError;
|
throw expectedError;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -616,14 +614,14 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));
|
ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));
|
||||||
|
|
||||||
// Start with a regular cumulation and add the buf as the only component.
|
// Start with a regular cumulation and add the buf as the only component.
|
||||||
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
|
CompositeByteBuf composite1 = alloc.compositeBuffer(8).addComponent(true, buf);
|
||||||
// Read composite1 buf to the beginning of the numbers.
|
// Read composite1 buf to the beginning of the numbers.
|
||||||
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");
|
assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");
|
||||||
|
|
||||||
// Wrap composite1 into another cumulation. This is similar to
|
// Wrap composite1 into another cumulation. This is similar to
|
||||||
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
|
// what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
|
||||||
CompositeByteBuf composite2 =
|
CompositeByteBuf composite2 =
|
||||||
alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
|
alloc.compositeBuffer(8).addComponent(true, composite1);
|
||||||
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");
|
assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");
|
||||||
|
|
||||||
// The previous operation does not adjust the read indexes of the underlying buffers,
|
// The previous operation does not adjust the read indexes of the underlying buffers,
|
||||||
|
|
@ -639,13 +637,27 @@ public class NettyAdaptiveCumulatorTest {
|
||||||
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
|
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
|
||||||
ByteBufUtil.writeAscii(alloc, "56789"));
|
ByteBufUtil.writeAscii(alloc, "56789"));
|
||||||
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
|
assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
|
||||||
|
}
|
||||||
|
|
||||||
// Correctness check: we still have a single component, and this component is still the
|
@Test
|
||||||
// original underlying buffer.
|
public void mergeWithNonCompositeTail() {
|
||||||
assertThat(cumulation.numComponents()).isEqualTo(1);
|
NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
|
||||||
// Replace '2' with '*', and '8' with '$'.
|
ByteBufAllocator alloc = new PooledByteBufAllocator();
|
||||||
buf.setByte(5, '*').setByte(11, '$');
|
ByteBuf buf = alloc.buffer().writeBytes("tail".getBytes(US_ASCII));
|
||||||
assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
|
ByteBuf in = alloc.buffer().writeBytes("-012345".getBytes(US_ASCII));
|
||||||
|
|
||||||
|
CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, buf);
|
||||||
|
|
||||||
|
CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite, in);
|
||||||
|
|
||||||
|
assertEquals("tail-012345", cumulation.toString(US_ASCII));
|
||||||
|
assertEquals(0, in.refCnt());
|
||||||
|
assertEquals(1, cumulation.numComponents());
|
||||||
|
|
||||||
|
buf.setByte(2, '*').setByte(7, '$');
|
||||||
|
assertEquals("ta*l-01$345", cumulation.toString(US_ASCII));
|
||||||
|
|
||||||
|
composite.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue