core: Fix migrating deframer compatibility with RetriableStream

Fixes #7168
This commit is contained in:
Eric Anderson 2020-07-09 09:17:33 -07:00 committed by Eric Anderson
parent cb5ceaaaa0
commit bcb287bb15
2 changed files with 8 additions and 7 deletions

View File

@ -157,8 +157,7 @@ public abstract class AbstractStream implements Stream {
maxMessageSize,
statsTraceCtx,
transportTracer);
// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
deframer = rawDeframer;
deframer = new MigratingThreadDeframer(this, this, rawDeframer);
}
final void optimizeForDirectExecutor() {

View File

@ -47,11 +47,14 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
/**
* {@code true} means decoding on transport thread.
*
* <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise
* deframerOnTransportThread=false.
* <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. If there
* is buffered data, then deframerOnTransportThread=false.
*
* <p>Start on transport thread since our stubs will generally request(1) immediately, and to be
* compatible with RetriableStream.
*/
@GuardedBy("lock")
private boolean deframerOnTransportThread;
private boolean deframerOnTransportThread = true;
@GuardedBy("lock")
private final Queue<Op> opQueue = new ArrayDeque<>();
@GuardedBy("lock")
@ -65,8 +68,7 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener"));
this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor");
this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor);
// Starts on app thread
this.migratingListener = new MigratingDeframerListener(appListener);
this.migratingListener = new MigratingDeframerListener(transportListener);
deframer.setListener(migratingListener);
this.deframer = deframer;
}