Log warning message when server side gets exception writing message to stream and allow multiple closes (#10513)

* Use internalClose instead of close when sendMessage has a RuntimeException.
* Change argument to internalClose to a Throwable instead of a Status.
* Rename internalClose to handleInternalError
This commit is contained in:
Larry Safran 2023-08-24 16:40:23 -07:00 committed by GitHub
parent 9424f8b4c7
commit 92174be3df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 9 deletions

View File

@ -41,6 +41,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.perfmark.PerfMark;
import io.perfmark.Tag;
import io.perfmark.TaskCloseable;
@ -157,7 +158,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
checkState(!closeCalled, "call is closed");
if (method.getType().serverSendsOneMessage() && messageSent) {
internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES));
handleInternalError(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES).asRuntimeException());
return;
}
@ -169,7 +170,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
stream.flush();
}
} catch (RuntimeException e) {
close(Status.fromThrowable(e), new Metadata());
handleInternalError(e);
} catch (Error e) {
close(
Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
@ -214,7 +215,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
closeCalled = true;
if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
handleInternalError(Status.INTERNAL.withDescription(MISSING_RESPONSE).asRuntimeException());
return;
}
@ -263,10 +264,14 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
* run until completion, but silently ignore interactions with the {@link ServerStream} from now
* on.
*/
private void internalClose(Status internalError) {
log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
stream.cancel(internalError);
serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
private void handleInternalError(Throwable internalError) {
log.log(Level.WARNING, "Cancelling the stream because of internal error", internalError);
Status status = (internalError instanceof StatusRuntimeException)
? ((StatusRuntimeException) internalError).getStatus()
: Status.INTERNAL.withCause(internalError)
.withDescription("Internal error so cancelling stream.");
stream.cancel(status);
serverCallTracer.reportCallEnded(false); // error so always false
}
/**

View File

@ -47,7 +47,6 @@ import io.grpc.SecurityLevel;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.grpc.internal.SingleMessageProducer;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@ -220,7 +219,7 @@ public class ServerCallImplTest {
call.sendMessage(1234L);
verify(stream).close(isA(Status.class), isA(Metadata.class));
verify(stream).cancel(isA(Status.class));
}
@Test