diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index a9c47fbc4d..f69a2a9c64 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -117,6 +117,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -1759,37 +1760,67 @@ public abstract class AbstractInteropTest { */ public void testOrcaOob() throws Exception { AtomicReference reportHolder = new AtomicReference<>(); - TestOrcaReport answer = TestOrcaReport.newBuilder() + final TestOrcaReport answer = TestOrcaReport.newBuilder() .setCpuUtilization(0.8210) .setMemoryUtilization(0.5847) .putUtilization("util", 0.30499) .build(); - blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build()); - int i; - int retryLimit = 5; - for (i = 0; i < retryLimit; i++) { - Thread.sleep(1000); - blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); - if (reportHolder.get().equals(answer)) { - break; - } - } - assertThat(i).isLessThan(retryLimit); - - answer = TestOrcaReport.newBuilder() + final TestOrcaReport answer2 = TestOrcaReport.newBuilder() .setCpuUtilization(0.29309) .setMemoryUtilization(0.2) .putUtilization("util", 100.2039) .build(); - blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build()); - for (i = 0; i < retryLimit; i++) { + + final int retryLimit = 5; + BlockingQueue queue = new LinkedBlockingQueue<>(); + final Object lastItem = new Object(); + StreamObserver streamObserver = + asyncStub.fullDuplexCall(new StreamObserver() { + + @Override + public void onNext(StreamingOutputCallResponse value) { + queue.add(value); + } + + @Override + public void onError(Throwable t) { + queue.add(t); + } + + @Override + public void onCompleted() { + queue.add(lastItem); + } + }); + + streamObserver.onNext(StreamingOutputCallRequest.newBuilder() + .setOrcaOobReport(answer) + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + int i = 0; + for (; i < retryLimit; i++) { Thread.sleep(1000); blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); - if (reportHolder.get().equals(answer)) { + if (answer.equals(reportHolder.get())) { break; } } assertThat(i).isLessThan(retryLimit); + streamObserver.onNext(StreamingOutputCallRequest.newBuilder() + .setOrcaOobReport(answer2) + .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); + assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); + + for (i = 0; i < retryLimit; i++) { + Thread.sleep(1000); + blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); + if (reportHolder.get().equals(answer2)) { + break; + } + } + assertThat(i).isLessThan(retryLimit); + streamObserver.onCompleted(); + assertThat(queue.take()).isSameInstanceAs(lastItem); } /** Sends a large unary rpc with service account credentials. */ diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 7d9c4777bf..923aca75f3 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -25,6 +25,7 @@ import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.internal.LogExceptionRunnable; import io.grpc.services.CallMetricRecorder; import io.grpc.services.MetricRecorder; @@ -50,6 +51,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; @@ -63,6 +65,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private final ScheduledExecutorService executor; private final ByteString compressableBuffer; private final MetricRecorder metricRecorder; + final Semaphore lock = new Semaphore(1); /** * Constructs a controller using the given executor for scheduling response stream chunks. @@ -126,9 +129,6 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { if (req.hasOrcaPerQueryReport()) { echoCallMetricsFromPayload(req.getOrcaPerQueryReport()); } - if (req.hasOrcaOobReport()) { - echoMetricsFromPayload(req.getOrcaOobReport()); - } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } @@ -202,9 +202,29 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { public StreamObserver fullDuplexCall( final StreamObserver responseObserver) { final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); - return new StreamObserver() { + ServerCallStreamObserver autoUnlockResponseObserver = + (ServerCallStreamObserver) responseObserver; + + class MayBlockStreamObserver implements StreamObserver { + boolean oobTestLocked; + @Override public void onNext(StreamingOutputCallRequest request) { + // to facilitate multiple clients running orca_oob test in parallel, the server allows + // only one orca_oob test client to run at a time to avoid conflicts. + if (request.hasOrcaOobReport()) { + if (!oobTestLocked) { + try { + lock.acquire(); + } catch (InterruptedException ex) { + autoUnlockResponseObserver.onError(new StatusRuntimeException( + Status.ABORTED.withDescription("server service interrupted").withCause(ex))); + return; + } + oobTestLocked = true; + } + echoMetricsFromPayload(request.getOrcaOobReport()); + } if (request.hasResponseStatus()) { dispatcher.cancel(); dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) @@ -227,7 +247,19 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { public void onError(Throwable cause) { dispatcher.onError(cause); } - }; + + void cleanup() { + if (oobTestLocked) { + lock.release(); + oobTestLocked = false; + } + } + } + + MayBlockStreamObserver mayBlockObserver = new MayBlockStreamObserver(); + autoUnlockResponseObserver.setOnCancelHandler(mayBlockObserver::cleanup); + autoUnlockResponseObserver.setOnCloseHandler(mayBlockObserver::cleanup); + return mayBlockObserver; } /** diff --git a/interop-testing/src/main/proto/grpc/testing/messages.proto b/interop-testing/src/main/proto/grpc/testing/messages.proto index 2016115462..fbcb6b4ce9 100644 --- a/interop-testing/src/main/proto/grpc/testing/messages.proto +++ b/interop-testing/src/main/proto/grpc/testing/messages.proto @@ -103,9 +103,6 @@ message SimpleRequest { // If set the server should record this metrics report data for the current RPC. TestOrcaReport orca_per_query_report = 11; - - // If set the server should update this metrics report data at the OOB server. - TestOrcaReport orca_oob_report = 12; } // Unary response, as configured by the request. @@ -180,6 +177,9 @@ message StreamingOutputCallRequest { // Whether server should return a given status EchoStatus response_status = 7; + + // If set the server should update this metrics report data at the OOB server. + TestOrcaReport orca_oob_report = 8; } // Server-streaming response, as configured by the request and parameters.