interop-testing: add orca_oob lock to allow only one test client at a time to perform the test case (#9260)

This commit is contained in:
yifeizhuang 2022-06-17 10:41:30 -07:00 committed by GitHub
parent 8e7793652b
commit 630d63b7cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 25 deletions

View File

@ -117,6 +117,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
@ -1759,37 +1760,67 @@ public abstract class AbstractInteropTest {
*/
public void testOrcaOob() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
TestOrcaReport answer = TestOrcaReport.newBuilder()
final TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putUtilization("util", 0.30499)
.build();
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
int i;
int retryLimit = 5;
for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (reportHolder.get().equals(answer)) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
answer = TestOrcaReport.newBuilder()
final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
.setCpuUtilization(0.29309)
.setMemoryUtilization(0.2)
.putUtilization("util", 100.2039)
.build();
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
for (i = 0; i < retryLimit; i++) {
final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}
@Override
public void onError(Throwable t) {
queue.add(t);
}
@Override
public void onCompleted() {
queue.add(lastItem);
}
});
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (reportHolder.get().equals(answer)) {
if (answer.equals(reportHolder.get())) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
if (reportHolder.get().equals(answer2)) {
break;
}
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
assertThat(queue.take()).isSameInstanceAs(lastItem);
}
/** Sends a large unary rpc with service account credentials. */

View File

@ -25,6 +25,7 @@ import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.MetricRecorder;
@ -50,6 +51,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
@ -63,6 +65,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private final ScheduledExecutorService executor;
private final ByteString compressableBuffer;
private final MetricRecorder metricRecorder;
final Semaphore lock = new Semaphore(1);
/**
* Constructs a controller using the given executor for scheduling response stream chunks.
@ -126,9 +129,6 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
if (req.hasOrcaPerQueryReport()) {
echoCallMetricsFromPayload(req.getOrcaPerQueryReport());
}
if (req.hasOrcaOobReport()) {
echoMetricsFromPayload(req.getOrcaOobReport());
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
@ -202,9 +202,29 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
public StreamObserver<Messages.StreamingOutputCallRequest> fullDuplexCall(
final StreamObserver<Messages.StreamingOutputCallResponse> responseObserver) {
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
return new StreamObserver<StreamingOutputCallRequest>() {
ServerCallStreamObserver<Messages.StreamingOutputCallResponse> autoUnlockResponseObserver =
(ServerCallStreamObserver<Messages.StreamingOutputCallResponse>) responseObserver;
class MayBlockStreamObserver implements StreamObserver<StreamingOutputCallRequest> {
boolean oobTestLocked;
@Override
public void onNext(StreamingOutputCallRequest request) {
// to facilitate multiple clients running orca_oob test in parallel, the server allows
// only one orca_oob test client to run at a time to avoid conflicts.
if (request.hasOrcaOobReport()) {
if (!oobTestLocked) {
try {
lock.acquire();
} catch (InterruptedException ex) {
autoUnlockResponseObserver.onError(new StatusRuntimeException(
Status.ABORTED.withDescription("server service interrupted").withCause(ex)));
return;
}
oobTestLocked = true;
}
echoMetricsFromPayload(request.getOrcaOobReport());
}
if (request.hasResponseStatus()) {
dispatcher.cancel();
dispatcher.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
@ -227,7 +247,19 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
public void onError(Throwable cause) {
dispatcher.onError(cause);
}
};
void cleanup() {
if (oobTestLocked) {
lock.release();
oobTestLocked = false;
}
}
}
MayBlockStreamObserver mayBlockObserver = new MayBlockStreamObserver();
autoUnlockResponseObserver.setOnCancelHandler(mayBlockObserver::cleanup);
autoUnlockResponseObserver.setOnCloseHandler(mayBlockObserver::cleanup);
return mayBlockObserver;
}
/**

View File

@ -103,9 +103,6 @@ message SimpleRequest {
// If set the server should record this metrics report data for the current RPC.
TestOrcaReport orca_per_query_report = 11;
// If set the server should update this metrics report data at the OOB server.
TestOrcaReport orca_oob_report = 12;
}
// Unary response, as configured by the request.
@ -180,6 +177,9 @@ message StreamingOutputCallRequest {
// Whether server should return a given status
EchoStatus response_status = 7;
// If set the server should update this metrics report data at the OOB server.
TestOrcaReport orca_oob_report = 8;
}
// Server-streaming response, as configured by the request and parameters.