diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 923aca75f3..0b18494fc0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -202,10 +202,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { public StreamObserver fullDuplexCall( final StreamObserver responseObserver) { final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); - ServerCallStreamObserver autoUnlockResponseObserver = - (ServerCallStreamObserver) responseObserver; - - class MayBlockStreamObserver implements StreamObserver { + return new StreamObserver() { boolean oobTestLocked; @Override @@ -217,7 +214,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { try { lock.acquire(); } catch (InterruptedException ex) { - autoUnlockResponseObserver.onError(new StatusRuntimeException( + responseObserver.onError(new StatusRuntimeException( Status.ABORTED.withDescription("server service interrupted").withCause(ex))); return; } @@ -237,6 +234,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onCompleted() { + if (oobTestLocked) { + lock.release(); + oobTestLocked = false; + } if (!dispatcher.isCancelled()) { // Tell the dispatcher that all input has been received. dispatcher.completeInput(); @@ -245,21 +246,13 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onError(Throwable cause) { - dispatcher.onError(cause); - } - - void cleanup() { if (oobTestLocked) { lock.release(); oobTestLocked = false; } + dispatcher.onError(cause); } - } - - MayBlockStreamObserver mayBlockObserver = new MayBlockStreamObserver(); - autoUnlockResponseObserver.setOnCancelHandler(mayBlockObserver::cleanup); - autoUnlockResponseObserver.setOnCloseHandler(mayBlockObserver::cleanup); - return mayBlockObserver; + }; } /**