interop-test: add orca test case (#9079)

add interop testing `orca_per_rpc` and `orca_oob`
This commit is contained in:
yifeizhuang 2022-05-24 13:34:01 -07:00 committed by GitHub
parent 62119b2a2e
commit 01ab821a06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 293 additions and 6 deletions

View File

@ -74,6 +74,10 @@ dependencies {
exclude group: 'org.apache.httpcomponents' exclude group: 'org.apache.httpcomponents'
} }
implementation (project(':grpc-services')) {
exclude group: 'com.google.protobuf'
}
compileOnly libraries.javax_annotation compileOnly libraries.javax_annotation
androidTestImplementation 'androidx.test.ext:junit:1.1.3', androidTestImplementation 'androidx.test.ext:junit:1.1.3',

View File

@ -92,6 +92,7 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.stats.Measure; import io.opencensus.stats.Measure;
import io.opencensus.stats.Measure.MeasureDouble; import io.opencensus.stats.Measure.MeasureDouble;
@ -188,6 +189,11 @@ public abstract class AbstractInteropTest {
private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers = private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
new LinkedBlockingQueue<>(); new LinkedBlockingQueue<>();
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report");
static final CallOptions.Key<AtomicReference<TestOrcaReport>>
ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report");
private static final class ServerStreamTracerInfo { private static final class ServerStreamTracerInfo {
final String fullMethodName; final String fullMethodName;
final InteropServerStreamTracer tracer; final InteropServerStreamTracer tracer;
@ -1731,6 +1737,49 @@ public abstract class AbstractInteropTest {
assertNotNull(obtainLocalClientAddr()); assertNotNull(obtainLocalClientAddr());
} }
/**
* Test backend metrics per query reporting: expect the test client LB policy to receive load
* reports.
*/
public void testOrcaPerRpc() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putRequestCost("cost", 3456.32)
.putUtilization("util", 0.30499)
.build();
blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall(
SimpleRequest.newBuilder().setOrcaPerRpcReport(answer).build());
assertThat(reportHolder.get()).isEqualTo(answer);
}
/**
* Test backend metrics OOB reporting: expect the test client LB policy to receive load reports.
*/
public void testOrcaOob() throws Exception {
AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
TestOrcaReport answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.8210)
.setMemoryUtilization(0.5847)
.putUtilization("util", 0.30499)
.build();
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
Thread.sleep(1500);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
assertThat(reportHolder.get()).isEqualTo(answer);
answer = TestOrcaReport.newBuilder()
.setCpuUtilization(0.29309)
.setMemoryUtilization(0.2)
.putUtilization("util", 100.2039)
.build();
blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build());
Thread.sleep(1500);
blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
assertThat(reportHolder.get()).isEqualTo(answer);
}
/** Sends a large unary rpc with service account credentials. */ /** Sends a large unary rpc with service account credentials. */
public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
throws Exception { throws Exception {

View File

@ -0,0 +1,154 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY;
import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.grpc.util.ForwardingLoadBalancer;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.orca.OrcaOobUtil;
import io.grpc.xds.orca.OrcaPerRequestUtil;
import io.grpc.xds.shaded.com.github.xds.data.orca.v3.OrcaLoadReport;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* Implements a test LB policy that receives ORCA load reports.
*/
final class CustomBackendMetricsLoadBalancerProvider extends LoadBalancerProvider {
static final String TEST_ORCA_LB_POLICY_NAME = "test_backend_metrics_load_balancer";
private volatile TestOrcaReport latestOobReport;
@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
return new CustomBackendMetricsLoadBalancer(helper);
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 0;
}
@Override
public String getPolicyName() {
return TEST_ORCA_LB_POLICY_NAME;
}
private final class CustomBackendMetricsLoadBalancer extends ForwardingLoadBalancer {
private LoadBalancer delegate;
public CustomBackendMetricsLoadBalancer(Helper helper) {
this.delegate = LoadBalancerRegistry.getDefaultRegistry()
.getProvider("pick_first")
.newLoadBalancer(new CustomBackendMetricsLoadBalancerHelper(helper));
}
@Override
public LoadBalancer delegate() {
return delegate;
}
private final class CustomBackendMetricsLoadBalancerHelper
extends ForwardingLoadBalancerHelper {
private final Helper orcaHelper;
public CustomBackendMetricsLoadBalancerHelper(Helper helper) {
this.orcaHelper = OrcaOobUtil.newOrcaReportingHelper(helper);
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
Subchannel subchannel = super.createSubchannel(args);
OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() {
@Override
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
latestOobReport = fromOrcaLoadReport(orcaLoadReport);
}
},
OrcaOobUtil.OrcaReportingConfig.newBuilder()
.setReportInterval(1, TimeUnit.SECONDS)
.build()
);
return subchannel;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
delegate().updateBalancingState(newState, new MayReportLoadPicker(newPicker));
}
@Override
public Helper delegate() {
return orcaHelper;
}
}
private final class MayReportLoadPicker extends SubchannelPicker {
private SubchannelPicker delegate;
public MayReportLoadPicker(SubchannelPicker delegate) {
this.delegate = delegate;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult result = delegate.pickSubchannel(args);
if (result.getSubchannel() == null) {
return result;
}
AtomicReference<TestOrcaReport> reportRef =
args.getCallOptions().getOption(ORCA_OOB_REPORT_KEY);
if (reportRef != null) {
reportRef.set(latestOobReport);
}
return PickResult.withSubchannel(
result.getSubchannel(),
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
new OrcaPerRequestUtil.OrcaPerRequestReportListener() {
@Override
public void onLoadReport(OrcaLoadReport orcaLoadReport) {
AtomicReference<TestOrcaReport> reportRef =
args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY);
reportRef.set(fromOrcaLoadReport(orcaLoadReport));
}
}));
}
}
}
private static TestOrcaReport fromOrcaLoadReport(OrcaLoadReport orcaLoadReport) {
return TestOrcaReport.newBuilder()
.setCpuUtilization(orcaLoadReport.getCpuUtilization())
.setMemoryUtilization(orcaLoadReport.getMemUtilization())
.putAllRequestCost(orcaLoadReport.getRequestCostMap())
.putAllUtilization(orcaLoadReport.getUtilizationMap())
.build();
}
}

View File

@ -56,7 +56,9 @@ public enum TestCases {
VERY_LARGE_REQUEST("very large request"), VERY_LARGE_REQUEST("very large request"),
PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"), PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"),
RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"),
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"); CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"),
ORCA_PER_RPC("report backend metrics per query"),
ORCA_OOB("report backend metrics out-of-band");
private final String description; private final String description;

View File

@ -22,6 +22,8 @@ import io.grpc.ChannelCredentials;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials; import io.grpc.InsecureServerCredentials;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
@ -60,6 +62,8 @@ public class TestServiceClient {
TestUtils.installConscryptIfAvailable(); TestUtils.installConscryptIfAvailable();
final TestServiceClient client = new TestServiceClient(); final TestServiceClient client = new TestServiceClient();
client.parseArgs(args); client.parseArgs(args);
customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider();
LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider);
client.setUp(); client.setUp();
try { try {
@ -91,6 +95,7 @@ public class TestServiceClient {
private int soakPerIterationMaxAcceptableLatencyMs = 1000; private int soakPerIterationMaxAcceptableLatencyMs = 1000;
private int soakOverallTimeoutSeconds = private int soakOverallTimeoutSeconds =
soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000; soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000;
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;
private Tester tester = new Tester(); private Tester tester = new Tester();
@ -239,6 +244,10 @@ public class TestServiceClient {
private synchronized void tearDown() { private synchronized void tearDown() {
try { try {
tester.tearDown(); tester.tearDown();
if (customBackendMetricsLoadBalancerProvider != null) {
LoadBalancerRegistry.getDefaultRegistry()
.deregister(customBackendMetricsLoadBalancerProvider);
}
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
@ -460,6 +469,17 @@ public class TestServiceClient {
soakPerIterationMaxAcceptableLatencyMs, soakPerIterationMaxAcceptableLatencyMs,
soakOverallTimeoutSeconds); soakOverallTimeoutSeconds);
break; break;
}
case ORCA_PER_RPC: {
tester.testOrcaPerRpc();
break;
}
case ORCA_OOB: {
tester.testOrcaOob();
break;
} }
default: default:

View File

@ -26,6 +26,8 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.LogExceptionRunnable; import io.grpc.internal.LogExceptionRunnable;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.MetricRecorder;
import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.Payload; import io.grpc.testing.integration.Messages.Payload;
@ -36,10 +38,13 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -57,13 +62,19 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private final ScheduledExecutorService executor; private final ScheduledExecutorService executor;
private final ByteString compressableBuffer; private final ByteString compressableBuffer;
private final MetricRecorder metricRecorder;
/** /**
* Constructs a controller using the given executor for scheduling response stream chunks. * Constructs a controller using the given executor for scheduling response stream chunks.
*/ */
public TestServiceImpl(ScheduledExecutorService executor) { public TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder) {
this.executor = executor; this.executor = executor;
this.compressableBuffer = ByteString.copyFrom(new byte[1024]); this.compressableBuffer = ByteString.copyFrom(new byte[1024]);
this.metricRecorder = metricRecorder;
}
public TestServiceImpl(ScheduledExecutorService executor) {
this(executor, MetricRecorder.newInstance());
} }
@Override @Override
@ -112,10 +123,33 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
return; return;
} }
echoCallMetricsFromPayload(req.getOrcaPerRpcReport());
echoMetricsFromPayload(req.getOrcaOobReport());
responseObserver.onNext(responseBuilder.build()); responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted(); responseObserver.onCompleted();
} }
private static void echoCallMetricsFromPayload(TestOrcaReport report) {
CallMetricRecorder recorder = CallMetricRecorder.getCurrent()
.recordCpuUtilizationMetric(report.getCpuUtilization())
.recordMemoryUtilizationMetric(report.getMemoryUtilization());
for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
recorder.recordUtilizationMetric(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, Double> entry : report.getRequestCostMap().entrySet()) {
recorder.recordCallMetric(entry.getKey(), entry.getValue());
}
}
private void echoMetricsFromPayload(TestOrcaReport report) {
metricRecorder.setCpuUtilizationMetric(report.getCpuUtilization());
metricRecorder.setMemoryUtilizationMetric(report.getMemoryUtilization());
metricRecorder.setAllUtilizationMetrics(new HashMap<>());
for (Map.Entry<String, Double> entry : report.getUtilizationMap().entrySet()) {
metricRecorder.putUtilizationMetric(entry.getKey(), entry.getValue());
}
}
/** /**
* Given a request that specifies chunk size and interval between responses, creates and schedules * Given a request that specifies chunk size and interval between responses, creates and schedules
* the response stream. * the response stream.

View File

@ -18,6 +18,7 @@ package io.grpc.testing.integration;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.BindableService;
import io.grpc.Grpc; import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials; import io.grpc.InsecureServerCredentials;
import io.grpc.Server; import io.grpc.Server;
@ -26,6 +27,9 @@ import io.grpc.ServerInterceptors;
import io.grpc.TlsServerCredentials; import io.grpc.TlsServerCredentials;
import io.grpc.alts.AltsServerCredentials; import io.grpc.alts.AltsServerCredentials;
import io.grpc.internal.testing.TestUtils; import io.grpc.internal.testing.TestUtils;
import io.grpc.services.MetricRecorder;
import io.grpc.xds.orca.OrcaMetricReportingServerInterceptor;
import io.grpc.xds.orca.OrcaServiceImpl;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -151,11 +155,16 @@ public class TestServiceServer {
} else { } else {
serverCreds = InsecureServerCredentials.create(); serverCreds = InsecureServerCredentials.create();
} }
MetricRecorder metricRecorder = MetricRecorder.newInstance();
BindableService orcaOobService =
OrcaServiceImpl.createService(executor, metricRecorder, 1, TimeUnit.SECONDS);
server = Grpc.newServerBuilderForPort(port, serverCreds) server = Grpc.newServerBuilderForPort(port, serverCreds)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.addService( .addService(
ServerInterceptors.intercept( ServerInterceptors.intercept(
new TestServiceImpl(executor), TestServiceImpl.interceptors())) new TestServiceImpl(executor, metricRecorder), TestServiceImpl.interceptors()))
.addService(orcaOobService)
.intercept(OrcaMetricReportingServerInterceptor.getInstance())
.build() .build()
.start(); .start();
} }

View File

@ -100,6 +100,19 @@ message SimpleRequest {
// Whether SimpleResponse should include grpclb_route_type. // Whether SimpleResponse should include grpclb_route_type.
bool fill_grpclb_route_type = 10; bool fill_grpclb_route_type = 10;
// Whether server should update per-rpc metrics.
TestOrcaReport orca_per_rpc_report = 11;
// Whether server should update OOB metrics.
TestOrcaReport orca_oob_report = 12;
}
message TestOrcaReport {
double cpu_utilization = 1;
double memory_utilization = 2;
map<string, double> request_cost = 3;
map<string, double> utilization = 4;
} }
// Unary response, as configured by the request. // Unary response, as configured by the request.

View File

@ -65,7 +65,9 @@ public class TestCasesTest {
"unimplemented_service", "unimplemented_service",
"cancel_after_begin", "cancel_after_begin",
"cancel_after_first_response", "cancel_after_first_response",
"timeout_on_sleeping_server" "timeout_on_sleeping_server",
"orca_per_rpc",
"orca_oob"
}; };
// additional test cases // additional test cases