interop-test: fix import, no cast to ServerStreamCallObserver (#9299)

This commit is contained in:
yifeizhuang 2022-06-21 12:41:33 -07:00 committed by GitHub
parent 12984db6a7
commit f0c934e4c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 8 additions and 15 deletions

View File

@ -202,10 +202,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall( public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) { final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
ServerCallStreamObserver<Messages.StreamingOutputCallResponse> autoUnlockResponseObserver = return new StreamObserver<StreamingOutputCallRequest>() {
(ServerCallStreamObserver<Messages.StreamingOutputCallResponse>) responseObserver;
class MayBlockStreamObserver implements StreamObserver<StreamingOutputCallRequest> {
boolean oobTestLocked; boolean oobTestLocked;
@Override @Override
@ -217,7 +214,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
try { try {
lock.acquire(); lock.acquire();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
autoUnlockResponseObserver.onError(new StatusRuntimeException( responseObserver.onError(new StatusRuntimeException(
Status.ABORTED.withDescription("server service interrupted").withCause(ex))); Status.ABORTED.withDescription("server service interrupted").withCause(ex)));
return; return;
} }
@ -237,6 +234,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override @Override
public void onCompleted() { public void onCompleted() {
if (oobTestLocked) {
lock.release();
oobTestLocked = false;
}
if (!dispatcher.isCancelled()) { if (!dispatcher.isCancelled()) {
// Tell the dispatcher that all input has been received. // Tell the dispatcher that all input has been received.
dispatcher.completeInput(); dispatcher.completeInput();
@ -245,21 +246,13 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
@Override @Override
public void onError(Throwable cause) { public void onError(Throwable cause) {
dispatcher.onError(cause);
}
void cleanup() {
if (oobTestLocked) { if (oobTestLocked) {
lock.release(); lock.release();
oobTestLocked = false; oobTestLocked = false;
} }
dispatcher.onError(cause);
} }
} };
MayBlockStreamObserver mayBlockObserver = new MayBlockStreamObserver();
autoUnlockResponseObserver.setOnCancelHandler(mayBlockObserver::cleanup);
autoUnlockResponseObserver.setOnCloseHandler(mayBlockObserver::cleanup);
return mayBlockObserver;
} }
/** /**