Revert "core: Fix migrating deframer compatibility with RetriableStream"

This reverts commit bcb287b in #7195.
This commit is contained in:
ZHANG Dapeng 2020-07-10 08:43:30 -07:00 committed by GitHub
parent dc30d85765
commit 791b48e79d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 7 additions and 8 deletions

View File

@ -157,7 +157,8 @@ public abstract class AbstractStream implements Stream {
maxMessageSize, maxMessageSize,
statsTraceCtx, statsTraceCtx,
transportTracer); transportTracer);
deframer = new MigratingThreadDeframer(this, this, rawDeframer); // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
deframer = rawDeframer;
} }
final void optimizeForDirectExecutor() { final void optimizeForDirectExecutor() {

View File

@ -47,14 +47,11 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
/** /**
* {@code true} means decoding on transport thread. * {@code true} means decoding on transport thread.
* *
* <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. If there * <p>Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise
* is buffered data, then deframerOnTransportThread=false. * deframerOnTransportThread=false.
*
* <p>Start on transport thread since our stubs will generally request(1) immediately, and to be
* compatible with RetriableStream.
*/ */
@GuardedBy("lock") @GuardedBy("lock")
private boolean deframerOnTransportThread = true; private boolean deframerOnTransportThread;
@GuardedBy("lock") @GuardedBy("lock")
private final Queue<Op> opQueue = new ArrayDeque<>(); private final Queue<Op> opQueue = new ArrayDeque<>();
@GuardedBy("lock") @GuardedBy("lock")
@ -68,7 +65,8 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer {
new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener")); new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener"));
this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor"); this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor");
this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor); this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor);
this.migratingListener = new MigratingDeframerListener(transportListener); // Starts on app thread
this.migratingListener = new MigratingDeframerListener(appListener);
deframer.setListener(migratingListener); deframer.setListener(migratingListener);
this.deframer = deframer; this.deframer = deframer;
} }