From 791b48e79df1e3240a47d5f7f0001a0f400ae033 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Fri, 10 Jul 2020 08:43:30 -0700 Subject: [PATCH] Revert "core: Fix migrating deframer compatibility with RetriableStream" This reverts commit bcb287b in #7195. --- .../main/java/io/grpc/internal/AbstractStream.java | 3 ++- .../io/grpc/internal/MigratingThreadDeframer.java | 12 +++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java index 19bdbbc788..e066018249 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream.java @@ -157,7 +157,8 @@ public abstract class AbstractStream implements Stream { maxMessageSize, statsTraceCtx, transportTracer); - deframer = new MigratingThreadDeframer(this, this, rawDeframer); + // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break. + deframer = rawDeframer; } final void optimizeForDirectExecutor() { diff --git a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java index e146ff21cd..f820e7ff15 100644 --- a/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java +++ b/core/src/main/java/io/grpc/internal/MigratingThreadDeframer.java @@ -47,14 +47,11 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { /** * {@code true} means decoding on transport thread. * - *

Invariant: if there are outstanding requests, then deframerOnTransportThread=true. If there - * is buffered data, then deframerOnTransportThread=false. - * - *

Start on transport thread since our stubs will generally request(1) immediately, and to be - * compatible with RetriableStream. + *

Invariant: if there are outstanding requests, then deframerOnTransportThread=true. Otherwise + * deframerOnTransportThread=false. */ @GuardedBy("lock") - private boolean deframerOnTransportThread = true; + private boolean deframerOnTransportThread; @GuardedBy("lock") private final Queue opQueue = new ArrayDeque<>(); @GuardedBy("lock") @@ -68,7 +65,8 @@ final class MigratingThreadDeframer implements ThreadOptimizedDeframer { new SquelchLateMessagesAvailableDeframerListener(checkNotNull(listener, "listener")); this.transportExecutor = checkNotNull(transportExecutor, "transportExecutor"); this.appListener = new ApplicationThreadDeframerListener(transportListener, transportExecutor); - this.migratingListener = new MigratingDeframerListener(transportListener); + // Starts on app thread + this.migratingListener = new MigratingDeframerListener(appListener); deframer.setListener(migratingListener); this.deframer = deframer; }