diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index e066018249..19bdbbc788 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -157,8 +157,7 @@ public abstract class AbstractStream implements Stream { maxMessageSize, statsTraceCtx, transportTracer); - // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break. - deframer = rawDeframer; + deframer = new MigratingThreadDeframer(this, this, rawDeframer); } final void optimizeForDirectExecutor() { diff --git a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java index f820e7ff15..e146ff21cd 100644 --- a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java +++ b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java @@ -47,11 +47,14 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { /** * {@code true} means decoding on transport thread. * - *

Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise - * deframerOnTransportThread=false. + *

Invariant: if there are outstanding requests, then deframerOnTransportThread=true. If there + * is buffered data, then deframerOnTransportThread=false. + * + *

Start on transport thread since our stubs will generally request(1) immediately, and to be + * compatible with RetriableStream. */ @GuardedBy("lock") - private boolean deframerOnTransportThread; + private boolean deframerOnTransportThread = true; @GuardedBy("lock") private final Queue opQueue = new ArrayDeque<>(); @GuardedBy("lock") @@ -65,8 +68,7 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener")); this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor"); this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor); - // Starts on app thread - this.migratingListener = new MigratingDeframerListener(appListener); + this.migratingListener = new MigratingDeframerListener(transportListener); deframer.setListener(migratingListener); this.deframer = deframer; }