mirror of https://github.com/grpc/grpc-java.git
core: Move client-side decompressor selection to stream
Previously ClientCallImpl's stream listener would call stream.setDecompressor(), but this has always been a bit strange as the only case where the call listener calls the stream and forms a bit of a loop. It also turned out to be racy in the presence of DelayedClientStream since DelayedClientStream does not guarantee that the listener has processed before returning. Now we let the stream handle decompressor selection itself. Compressor selection on client and server and decompressor selection on server remain unchanged. Nothing prevents them from being changed, other than it is currently unnecessary to fix the severe compressionTest flake. Fixes #2865 Fixes #2157
This commit is contained in:
parent
4e31ccba4c
commit
a3ac64a883
|
|
@ -22,6 +22,7 @@ import io.grpc.Attributes;
|
|||
import io.grpc.CallOptions;
|
||||
import io.grpc.Compressor;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Grpc;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
|
|
@ -569,7 +570,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
|
|||
public void setCompressor(Compressor compressor) {}
|
||||
|
||||
@Override
|
||||
public void setDecompressor(Decompressor decompressor) {}
|
||||
public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
|
||||
|
||||
@Override
|
||||
public void setMaxInboundMessageSize(int maxSize) {}
|
||||
|
|
|
|||
|
|
@ -16,9 +16,14 @@
|
|||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.Codec;
|
||||
import io.grpc.Compressor;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import java.io.InputStream;
|
||||
|
|
@ -111,6 +116,11 @@ public abstract class AbstractClientStream extends AbstractStream
|
|||
transportState().setMaxInboundMessageSize(maxSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
|
||||
transportState().setDecompressorRegistry(decompressorRegistry);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@Override
|
||||
protected abstract TransportState transportState();
|
||||
|
|
@ -172,6 +182,7 @@ public abstract class AbstractClientStream extends AbstractStream
|
|||
private final StatsTraceContext statsTraceCtx;
|
||||
private boolean listenerClosed;
|
||||
private ClientStreamListener listener;
|
||||
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
|
||||
|
||||
private Runnable deliveryStalledTask;
|
||||
|
||||
|
|
@ -186,6 +197,12 @@ public abstract class AbstractClientStream extends AbstractStream
|
|||
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
|
||||
}
|
||||
|
||||
private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
|
||||
Preconditions.checkState(this.listener == null, "Already called start");
|
||||
this.decompressorRegistry =
|
||||
Preconditions.checkNotNull(decompressorRegistry, "decompressorRegistry");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public final void setListener(ClientStreamListener listener) {
|
||||
Preconditions.checkState(this.listener == null, "Already called setListener");
|
||||
|
|
@ -218,6 +235,19 @@ public abstract class AbstractClientStream extends AbstractStream
|
|||
protected void inboundHeadersReceived(Metadata headers) {
|
||||
Preconditions.checkState(!statusReported, "Received headers on closed stream");
|
||||
statsTraceCtx.clientInboundHeaders();
|
||||
|
||||
Decompressor decompressor = Codec.Identity.NONE;
|
||||
String encoding = headers.get(MESSAGE_ENCODING_KEY);
|
||||
if (encoding != null) {
|
||||
decompressor = decompressorRegistry.lookupDecompressor(encoding);
|
||||
if (decompressor == null) {
|
||||
deframeFailed(Status.INTERNAL.withDescription(
|
||||
String.format("Can't find decompressor for %s", encoding)).asRuntimeException());
|
||||
return;
|
||||
}
|
||||
}
|
||||
setDecompressor(decompressor);
|
||||
|
||||
listener().headersRead(headers);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package io.grpc.internal;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.InternalStatus;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
|
|
@ -150,6 +151,11 @@ public abstract class AbstractServerStream extends AbstractStream
|
|||
return super.isReady();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setDecompressor(Decompressor decompressor) {
|
||||
transportState().setDecompressor(Preconditions.checkNotNull(decompressor, "decompressor"));
|
||||
}
|
||||
|
||||
@Override public Attributes getAttributes() {
|
||||
return Attributes.EMPTY;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,11 +73,6 @@ public abstract class AbstractStream implements Stream {
|
|||
framer().setCompressor(checkNotNull(compressor, "compressor"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void setDecompressor(Decompressor decompressor) {
|
||||
transportState().setDecompressor(checkNotNull(decompressor, "decompressor"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReady() {
|
||||
if (framer().isClosed()) {
|
||||
|
|
@ -207,7 +202,7 @@ public abstract class AbstractStream implements Stream {
|
|||
return statsTraceCtx;
|
||||
}
|
||||
|
||||
private void setDecompressor(Decompressor decompressor) {
|
||||
protected final void setDecompressor(Decompressor decompressor) {
|
||||
if (deframer.isClosed()) {
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,6 @@ import io.grpc.Compressor;
|
|||
import io.grpc.CompressorRegistry;
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Deadline;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.InternalDecompressorRegistry;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
|
|
@ -222,6 +221,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
|
|||
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
|
||||
}
|
||||
stream.setCompressor(compressor);
|
||||
stream.setDecompressorRegistry(decompressorRegistry);
|
||||
stream.start(new ClientStreamListenerImpl(observer));
|
||||
|
||||
// Delay any sources of cancellation after start(), because most of the transports are broken if
|
||||
|
|
@ -429,18 +429,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT>
|
|||
|
||||
@Override
|
||||
public void headersRead(final Metadata headers) {
|
||||
Decompressor decompressor = Codec.Identity.NONE;
|
||||
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
|
||||
String encoding = headers.get(MESSAGE_ENCODING_KEY);
|
||||
decompressor = decompressorRegistry.lookupDecompressor(encoding);
|
||||
if (decompressor == null) {
|
||||
stream.cancel(Status.INTERNAL.withDescription(
|
||||
String.format("Can't find decompressor for %s", encoding)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
stream.setDecompressor(decompressor);
|
||||
|
||||
class HeadersRead extends ContextRunnable {
|
||||
HeadersRead() {
|
||||
super(context);
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Status;
|
||||
|
||||
/**
|
||||
|
|
@ -51,6 +52,14 @@ public interface ClientStream extends Stream {
|
|||
*/
|
||||
void setAuthority(String authority);
|
||||
|
||||
/**
|
||||
* Sets the registry to find a decompressor for the framer. May only be called before {@link
|
||||
* #start}. If the transport does not support compression, this may do nothing.
|
||||
*
|
||||
* @param decompressorRegistry the registry of decompressors for decoding responses
|
||||
*/
|
||||
void setDecompressorRegistry(DecompressorRegistry decompressorRegistry);
|
||||
|
||||
/**
|
||||
* Starts stream. This method may only be called once. It is safe to do latent initialization of
|
||||
* the stream up until {@link #start} is called.
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Compressor;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import java.io.InputStream;
|
||||
|
|
@ -303,16 +303,14 @@ class DelayedStream implements ClientStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setDecompressor(Decompressor decompressor) {
|
||||
checkNotNull(decompressor, "decompressor");
|
||||
// This method being called only makes sense after setStream() has been called (but not
|
||||
// necessarily returned), but there is not necessarily a happens-before relationship. This
|
||||
// synchronized block creates one.
|
||||
synchronized (this) { }
|
||||
checkState(realStream != null, "How did we receive a reply before the request is sent?");
|
||||
// ClientStreamListenerImpl (in ClientCallImpl) requires setDecompressor to be set immediately,
|
||||
// since messages may be processed immediately after this method returns.
|
||||
realStream.setDecompressor(decompressor);
|
||||
public void setDecompressorRegistry(final DecompressorRegistry decompressorRegistry) {
|
||||
checkNotNull(decompressorRegistry, "decompressorRegistry");
|
||||
delayOrExecute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
realStream.setDecompressorRegistry(decompressorRegistry);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ package io.grpc.internal;
|
|||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Compressor;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Status;
|
||||
import java.io.InputStream;
|
||||
|
||||
|
|
@ -68,7 +68,7 @@ public class NoopClientStream implements ClientStream {
|
|||
public void setCompressor(Compressor compressor) {}
|
||||
|
||||
@Override
|
||||
public void setDecompressor(Decompressor decompressor) {}
|
||||
public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {}
|
||||
|
||||
@Override
|
||||
public void setMaxInboundMessageSize(int maxSize) {}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import io.grpc.Attributes;
|
||||
import io.grpc.Decompressor;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import javax.annotation.Nullable;
|
||||
|
|
@ -58,6 +59,14 @@ public interface ServerStream extends Stream {
|
|||
*/
|
||||
void cancel(Status status);
|
||||
|
||||
/**
|
||||
* Sets the decompressor on the deframer. If the transport does not support compression, this may
|
||||
* do nothing.
|
||||
*
|
||||
* @param decompressor the decompressor to use.
|
||||
*/
|
||||
void setDecompressor(Decompressor decompressor);
|
||||
|
||||
/**
|
||||
* Attributes describing stream. This is inherited from the transport attributes, and used
|
||||
* as the basis of {@link io.grpc.ServerCall#getAttributes}.
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import io.grpc.Compressor;
|
||||
import io.grpc.Decompressor;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
|
|
@ -72,13 +71,6 @@ public interface Stream {
|
|||
*/
|
||||
void setCompressor(Compressor compressor);
|
||||
|
||||
/**
|
||||
* Sets the decompressor on the deframer.
|
||||
*
|
||||
* @param decompressor the decompressor to use.
|
||||
*/
|
||||
void setDecompressor(Decompressor decompressor);
|
||||
|
||||
/**
|
||||
* Enables per-message compression, if an encoding type has been negotiated. If no message
|
||||
* encoding has been negotiated, this is a no-op. By default per-message compression is enabled,
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
|
|||
import io.grpc.Attributes;
|
||||
import io.grpc.Attributes.Key;
|
||||
import io.grpc.Codec;
|
||||
import io.grpc.DecompressorRegistry;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.Status;
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
|
@ -87,16 +88,11 @@ public class DelayedStreamTest {
|
|||
stream.start(mock(ClientStreamListener.class));
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void setDecompressor_beforeSetStream() {
|
||||
stream.start(listener);
|
||||
stream.setDecompressor(Codec.Identity.NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setStream_sendsAllMessages() {
|
||||
stream.start(listener);
|
||||
stream.setCompressor(Codec.Identity.NONE);
|
||||
stream.setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
|
||||
|
||||
stream.setMessageCompression(true);
|
||||
InputStream message = new ByteArrayInputStream(new byte[]{'a'});
|
||||
|
|
@ -105,10 +101,9 @@ public class DelayedStreamTest {
|
|||
stream.writeMessage(message);
|
||||
|
||||
stream.setStream(realStream);
|
||||
stream.setDecompressor(Codec.Identity.NONE);
|
||||
|
||||
verify(realStream).setCompressor(Codec.Identity.NONE);
|
||||
verify(realStream).setDecompressor(Codec.Identity.NONE);
|
||||
verify(realStream).setDecompressorRegistry(DecompressorRegistry.getDefaultInstance());
|
||||
|
||||
verify(realStream).setMessageCompression(true);
|
||||
verify(realStream).setMessageCompression(false);
|
||||
|
|
|
|||
Loading…
Reference in New Issue