Remove ThreadlessExecutor from BlockingServerStream (#10496)

* Remove ThreadlessExecutor from BlockingServerStream

fixes #10490
This commit is contained in:
Larry Safran 2023-08-18 10:16:43 -07:00 committed by GitHub
parent eb18cba062
commit 55c5040cb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 14 additions and 49 deletions

View File

@ -133,9 +133,7 @@ public final class ClientCalls {
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) { public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try { try {
return getUnchecked(futureUnaryCall(call, req)); return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) { } catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e); throw cancelThrow(call, e);
} }
} }
@ -167,10 +165,7 @@ public final class ClientCalls {
} }
executor.shutdown(); executor.shutdown();
return getUnchecked(responseFuture); return getUnchecked(responseFuture);
} catch (RuntimeException e) { } catch (RuntimeException | Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} catch (Error e) {
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose(). // Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e); throw cancelThrow(call, e);
} finally { } finally {
@ -206,14 +201,12 @@ public final class ClientCalls {
* *
* @return an iterator over the response stream. * @return an iterator over the response stream.
*/ */
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) { Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING) callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING));
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor); BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call);
asyncUnaryRequestCall(call, req, result.listener()); asyncUnaryRequestCall(call, req, result.listener());
return result; return result;
} }
@ -288,8 +281,7 @@ public final class ClientCalls {
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) { private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try { try {
call.cancel(null, t); call.cancel(null, t);
} catch (Throwable e) { } catch (RuntimeException | Error e) {
assert e instanceof RuntimeException || e instanceof Error;
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e); logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
} }
if (t instanceof RuntimeException) { if (t instanceof RuntimeException) {
@ -320,9 +312,7 @@ public final class ClientCalls {
try { try {
call.sendMessage(req); call.sendMessage(req);
call.halfClose(); call.halfClose();
} catch (RuntimeException e) { } catch (RuntimeException | Error e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e); throw cancelThrow(call, e);
} }
} }
@ -597,20 +587,12 @@ public final class ClientCalls {
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3); private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<>(3);
private final StartableListener<T> listener = new QueuingListener(); private final StartableListener<T> listener = new QueuingListener();
private final ClientCall<?, T> call; private final ClientCall<?, T> call;
/** May be null. */
private final ThreadlessExecutor threadless;
// Only accessed when iterating. // Only accessed when iterating.
private Object last; private Object last;
// Non private to avoid synthetic class // Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call) { BlockingResponseStream(ClientCall<?, T> call) {
this(call, null);
}
// Non private to avoid synthetic class
BlockingResponseStream(ClientCall<?, T> call, ThreadlessExecutor threadless) {
this.call = call; this.call = call;
this.threadless = threadless;
} }
StartableListener<T> listener() { StartableListener<T> listener() {
@ -620,7 +602,6 @@ public final class ClientCalls {
private Object waitForNext() { private Object waitForNext() {
boolean interrupt = false; boolean interrupt = false;
try { try {
if (threadless == null) {
while (true) { while (true) {
try { try {
return buffer.take(); return buffer.take();
@ -630,22 +611,6 @@ public final class ClientCalls {
// Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill // Now wait for onClose() to be called, to guarantee BlockingQueue doesn't fill
} }
} }
} else {
Object next;
while ((next = buffer.poll()) == null) {
try {
threadless.waitAndDrain();
} catch (InterruptedException ie) {
interrupt = true;
call.cancel("Thread interrupted", ie);
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
}
return next;
}
} finally { } finally {
if (interrupt) { if (interrupt) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();