From f0c934e4c1f4c725fcf1fd2cfbb2345ba1000345 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Tue, 21 Jun 2022 12:41:33 -0700 Subject: [PATCH] interop-test: fix import, no cast to ServerStreamCallObserver (#9299) --- .../testing/integration/TestServiceImpl.java | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 923aca75f3..0b18494fc0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -202,10 +202,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { public StreamObserver fullDuplexCall( final StreamObserver responseObserver) { final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); - ServerCallStreamObserver autoUnlockResponseObserver = - (ServerCallStreamObserver) responseObserver; - - class MayBlockStreamObserver implements StreamObserver { + return new StreamObserver() { boolean oobTestLocked; @Override @@ -217,7 +214,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { try { lock.acquire(); } catch (InterruptedException ex) { - autoUnlockResponseObserver.onError(new StatusRuntimeException( + responseObserver.onError(new StatusRuntimeException( Status.ABORTED.withDescription("server service interrupted").withCause(ex))); return; } @@ -237,6 +234,10 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onCompleted() { + if (oobTestLocked) { + lock.release(); + oobTestLocked = false; + } if (!dispatcher.isCancelled()) { // Tell the dispatcher that all input has been received. dispatcher.completeInput(); @@ -245,21 +246,13 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { @Override public void onError(Throwable cause) { - dispatcher.onError(cause); - } - - void cleanup() { if (oobTestLocked) { lock.release(); oobTestLocked = false; } + dispatcher.onError(cause); } - } - - MayBlockStreamObserver mayBlockObserver = new MayBlockStreamObserver(); - autoUnlockResponseObserver.setOnCancelHandler(mayBlockObserver::cleanup); - autoUnlockResponseObserver.setOnCloseHandler(mayBlockObserver::cleanup); - return mayBlockObserver; + }; } /**