From 7040417eeecb69aa4f012e6ee01da249abbca6d4 Mon Sep 17 00:00:00 2001 From: MV Shiva Date: Wed, 6 Aug 2025 12:24:33 +0530 Subject: [PATCH] stub: use the closedTrailers in StatusException (#12259) --- .../java/io/grpc/stub/BlockingClientCall.java | 53 +++++++++++-------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/BlockingClientCall.java b/stub/src/main/java/io/grpc/stub/BlockingClientCall.java index b62bd4322a..2c896c7208 100644 --- a/stub/src/main/java/io/grpc/stub/BlockingClientCall.java +++ b/stub/src/main/java/io/grpc/stub/BlockingClientCall.java @@ -29,6 +29,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -69,7 +70,7 @@ public final class BlockingClientCall { private final ThreadSafeThreadlessExecutor executor; private boolean writeClosed; - private volatile Status closedStatus; // null if not closed + private AtomicReference closeState = new AtomicReference<>(); BlockingClientCall(ClientCall call, ThreadSafeThreadlessExecutor executor) { this.call = call; @@ -120,22 +121,22 @@ public final class BlockingClientCall { logger.finer("Client Blocking read had value: " + bufferedValue); } - Status currentClosedStatus; + CloseState currentCloseState; if (bufferedValue != null) { call.request(1); return bufferedValue; - } else if ((currentClosedStatus = closedStatus) == null) { + } else if ((currentCloseState = closeState.get()) == null) { throw new IllegalStateException( "The message disappeared... are you reading from multiple threads?"); - } else if (!currentClosedStatus.isOk()) { - throw currentClosedStatus.asException(); + } else if (!currentCloseState.status.isOk()) { + throw currentCloseState.status.asException(currentCloseState.trailers); } else { return null; } } boolean skipWaitingForRead() { - return closedStatus != null || !buffer.isEmpty(); + return closeState.get() != null || !buffer.isEmpty(); } /** @@ -148,11 +149,11 @@ public final class BlockingClientCall { * @throws StatusException If the stream was closed in an error state */ public boolean hasNext() throws InterruptedException, StatusException { - executor.waitAndDrain((x) -> !x.buffer.isEmpty() || x.closedStatus != null, this); + executor.waitAndDrain((x) -> !x.buffer.isEmpty() || x.closeState.get() != null, this); - Status currentClosedStatus = closedStatus; - if (currentClosedStatus != null && !currentClosedStatus.isOk()) { - throw currentClosedStatus.asException(); + CloseState currentCloseState = closeState.get(); + if (currentCloseState != null && !currentCloseState.status.isOk()) { + throw currentCloseState.status.asException(currentCloseState.trailers); } return !buffer.isEmpty(); @@ -221,17 +222,16 @@ public final class BlockingClientCall { } Predicate> predicate = - (x) -> x.call.isReady() || x.closedStatus != null; + (x) -> x.call.isReady() || x.closeState.get() != null; executor.waitAndDrainWithTimeout(waitForever, endNanoTime, predicate, this); - Status savedClosedStatus = closedStatus; - if (savedClosedStatus == null) { + CloseState savedCloseState = closeState.get(); + if (savedCloseState == null || savedCloseState.status == null) { call.sendMessage(request); return true; - } else if (savedClosedStatus.isOk()) { + } else if (savedCloseState.status.isOk()) { return false; } else { - // Propagate any errors returned from the server - throw savedClosedStatus.asException(); + throw savedCloseState.status.asException(savedCloseState.trailers); } } @@ -274,7 +274,8 @@ public final class BlockingClientCall { @VisibleForTesting Status getClosedStatus() { drainQuietly(); - return closedStatus; + CloseState state = closeState.get(); + return (state == null) ? null : state.status; } /** @@ -317,7 +318,7 @@ public final class BlockingClientCall { * @return True if writes haven't been closed and the server hasn't closed the stream */ private boolean isWriteLegal() { - return !writeClosed && closedStatus == null; + return !writeClosed && closeState.get() == null; } ClientCall.Listener getListener() { @@ -335,15 +336,25 @@ public final class BlockingClientCall { private final class QueuingListener extends ClientCall.Listener { @Override public void onMessage(RespT value) { - Preconditions.checkState(closedStatus == null, "ClientCall already closed"); + Preconditions.checkState(closeState.get() == null, "ClientCall already closed"); buffer.add(value); } @Override public void onClose(Status status, Metadata trailers) { - Preconditions.checkState(closedStatus == null, "ClientCall already closed"); - closedStatus = status; + CloseState newCloseState = new CloseState(status, trailers); + boolean wasSet = closeState.compareAndSet(null, newCloseState); + Preconditions.checkState(wasSet, "ClientCall already closed"); } } + private static final class CloseState { + final Status status; + final Metadata trailers; + + CloseState(Status status, Metadata trailers) { + this.status = Preconditions.checkNotNull(status, "status"); + this.trailers = trailers; + } + } }