diff --git a/android-interop-testing/build.gradle b/android-interop-testing/build.gradle index e40507e6a3..f399210aa4 100644 --- a/android-interop-testing/build.gradle +++ b/android-interop-testing/build.gradle @@ -74,6 +74,10 @@ dependencies { exclude group: 'org.apache.httpcomponents' } + implementation (project(':grpc-services')) { + exclude group: 'com.google.protobuf' + } + compileOnly libraries.javax_annotation androidTestImplementation 'androidx.test.ext:junit:1.1.3', diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 0ed3816ac8..970b2d1e83 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -92,6 +92,7 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest; import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.grpc.testing.integration.Messages.TestOrcaReport; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; import io.opencensus.stats.Measure; import io.opencensus.stats.Measure.MeasureDouble; @@ -188,6 +189,11 @@ public abstract class AbstractInteropTest { private final LinkedBlockingQueue serverStreamTracers = new LinkedBlockingQueue<>(); + static final CallOptions.Key> + ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report"); + static final CallOptions.Key> + ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report"); + private static final class ServerStreamTracerInfo { final String fullMethodName; final InteropServerStreamTracer tracer; @@ -1731,6 +1737,49 @@ public abstract class AbstractInteropTest { assertNotNull(obtainLocalClientAddr()); } + /** + * Test backend metrics per query reporting: expect the test client LB policy to receive load + * reports. + */ + public void testOrcaPerRpc() throws Exception { + AtomicReference reportHolder = new AtomicReference<>(); + TestOrcaReport answer = TestOrcaReport.newBuilder() + .setCpuUtilization(0.8210) + .setMemoryUtilization(0.5847) + .putRequestCost("cost", 3456.32) + .putUtilization("util", 0.30499) + .build(); + blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall( + SimpleRequest.newBuilder().setOrcaPerRpcReport(answer).build()); + assertThat(reportHolder.get()).isEqualTo(answer); + } + + /** + * Test backend metrics OOB reporting: expect the test client LB policy to receive load reports. + */ + public void testOrcaOob() throws Exception { + AtomicReference reportHolder = new AtomicReference<>(); + TestOrcaReport answer = TestOrcaReport.newBuilder() + .setCpuUtilization(0.8210) + .setMemoryUtilization(0.5847) + .putUtilization("util", 0.30499) + .build(); + blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build()); + Thread.sleep(1500); + blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); + assertThat(reportHolder.get()).isEqualTo(answer); + + answer = TestOrcaReport.newBuilder() + .setCpuUtilization(0.29309) + .setMemoryUtilization(0.2) + .putUtilization("util", 100.2039) + .build(); + blockingStub.unaryCall(SimpleRequest.newBuilder().setOrcaOobReport(answer).build()); + Thread.sleep(1500); + blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); + assertThat(reportHolder.get()).isEqualTo(answer); + } + /** Sends a large unary rpc with service account credentials. */ public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) throws Exception { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java b/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java new file mode 100644 index 0000000000..fc5b5f8e95 --- /dev/null +++ b/interop-testing/src/main/java/io/grpc/testing/integration/CustomBackendMetricsLoadBalancerProvider.java @@ -0,0 +1,154 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static io.grpc.testing.integration.AbstractInteropTest.ORCA_OOB_REPORT_KEY; +import static io.grpc.testing.integration.AbstractInteropTest.ORCA_RPC_REPORT_KEY; + +import io.grpc.ConnectivityState; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.testing.integration.Messages.TestOrcaReport; +import io.grpc.util.ForwardingLoadBalancer; +import io.grpc.util.ForwardingLoadBalancerHelper; +import io.grpc.xds.orca.OrcaOobUtil; +import io.grpc.xds.orca.OrcaPerRequestUtil; +import io.grpc.xds.shaded.com.github.xds.data.orca.v3.OrcaLoadReport; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Implements a test LB policy that receives ORCA load reports. + */ +final class CustomBackendMetricsLoadBalancerProvider extends LoadBalancerProvider { + + static final String TEST_ORCA_LB_POLICY_NAME = "test_backend_metrics_load_balancer"; + private volatile TestOrcaReport latestOobReport; + + @Override + public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { + return new CustomBackendMetricsLoadBalancer(helper); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public String getPolicyName() { + return TEST_ORCA_LB_POLICY_NAME; + } + + private final class CustomBackendMetricsLoadBalancer extends ForwardingLoadBalancer { + private LoadBalancer delegate; + + public CustomBackendMetricsLoadBalancer(Helper helper) { + this.delegate = LoadBalancerRegistry.getDefaultRegistry() + .getProvider("pick_first") + .newLoadBalancer(new CustomBackendMetricsLoadBalancerHelper(helper)); + } + + @Override + public LoadBalancer delegate() { + return delegate; + } + + private final class CustomBackendMetricsLoadBalancerHelper + extends ForwardingLoadBalancerHelper { + private final Helper orcaHelper; + + public CustomBackendMetricsLoadBalancerHelper(Helper helper) { + this.orcaHelper = OrcaOobUtil.newOrcaReportingHelper(helper); + } + + @Override + public Subchannel createSubchannel(CreateSubchannelArgs args) { + Subchannel subchannel = super.createSubchannel(args); + OrcaOobUtil.setListener(subchannel, new OrcaOobUtil.OrcaOobReportListener() { + @Override + public void onLoadReport(OrcaLoadReport orcaLoadReport) { + latestOobReport = fromOrcaLoadReport(orcaLoadReport); + } + }, + OrcaOobUtil.OrcaReportingConfig.newBuilder() + .setReportInterval(1, TimeUnit.SECONDS) + .build() + ); + return subchannel; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + delegate().updateBalancingState(newState, new MayReportLoadPicker(newPicker)); + } + + @Override + public Helper delegate() { + return orcaHelper; + } + } + + private final class MayReportLoadPicker extends SubchannelPicker { + private SubchannelPicker delegate; + + public MayReportLoadPicker(SubchannelPicker delegate) { + this.delegate = delegate; + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult result = delegate.pickSubchannel(args); + if (result.getSubchannel() == null) { + return result; + } + AtomicReference reportRef = + args.getCallOptions().getOption(ORCA_OOB_REPORT_KEY); + if (reportRef != null) { + reportRef.set(latestOobReport); + } + + return PickResult.withSubchannel( + result.getSubchannel(), + OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory( + new OrcaPerRequestUtil.OrcaPerRequestReportListener() { + @Override + public void onLoadReport(OrcaLoadReport orcaLoadReport) { + AtomicReference reportRef = + args.getCallOptions().getOption(ORCA_RPC_REPORT_KEY); + reportRef.set(fromOrcaLoadReport(orcaLoadReport)); + } + })); + } + } + } + + private static TestOrcaReport fromOrcaLoadReport(OrcaLoadReport orcaLoadReport) { + return TestOrcaReport.newBuilder() + .setCpuUtilization(orcaLoadReport.getCpuUtilization()) + .setMemoryUtilization(orcaLoadReport.getMemUtilization()) + .putAllRequestCost(orcaLoadReport.getRequestCostMap()) + .putAllUtilization(orcaLoadReport.getUtilizationMap()) + .build(); + } +} diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index 39afaa99d6..85e5c31a4c 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -56,7 +56,9 @@ public enum TestCases { VERY_LARGE_REQUEST("very large request"), PICK_FIRST_UNARY("all requests are sent to one server despite multiple servers are resolved"), RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"), - CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"); + CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"), + ORCA_PER_RPC("report backend metrics per query"), + ORCA_OOB("report backend metrics out-of-band"); private final String description; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 914db12e5a..2e04c7bc3d 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -22,6 +22,8 @@ import io.grpc.ChannelCredentials; import io.grpc.Grpc; import io.grpc.InsecureChannelCredentials; import io.grpc.InsecureServerCredentials; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.ServerBuilder; @@ -60,6 +62,8 @@ public class TestServiceClient { TestUtils.installConscryptIfAvailable(); final TestServiceClient client = new TestServiceClient(); client.parseArgs(args); + customBackendMetricsLoadBalancerProvider = new CustomBackendMetricsLoadBalancerProvider(); + LoadBalancerRegistry.getDefaultRegistry().register(customBackendMetricsLoadBalancerProvider); client.setUp(); try { @@ -91,6 +95,7 @@ public class TestServiceClient { private int soakPerIterationMaxAcceptableLatencyMs = 1000; private int soakOverallTimeoutSeconds = soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000; + private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider; private Tester tester = new Tester(); @@ -239,6 +244,10 @@ public class TestServiceClient { private synchronized void tearDown() { try { tester.tearDown(); + if (customBackendMetricsLoadBalancerProvider != null) { + LoadBalancerRegistry.getDefaultRegistry() + .deregister(customBackendMetricsLoadBalancerProvider); + } } catch (RuntimeException ex) { throw ex; } catch (Exception ex) { @@ -460,6 +469,17 @@ public class TestServiceClient { soakPerIterationMaxAcceptableLatencyMs, soakOverallTimeoutSeconds); break; + + } + + case ORCA_PER_RPC: { + tester.testOrcaPerRpc(); + break; + } + + case ORCA_OOB: { + tester.testOrcaOob(); + break; } default: diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 5fe7248b2b..bb2c845378 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -26,6 +26,8 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.internal.LogExceptionRunnable; +import io.grpc.services.CallMetricRecorder; +import io.grpc.services.MetricRecorder; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; import io.grpc.testing.integration.Messages.Payload; @@ -36,10 +38,13 @@ import io.grpc.testing.integration.Messages.StreamingInputCallRequest; import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.grpc.testing.integration.Messages.TestOrcaReport; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Random; import java.util.Set; @@ -57,13 +62,19 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private final ScheduledExecutorService executor; private final ByteString compressableBuffer; + private final MetricRecorder metricRecorder; /** * Constructs a controller using the given executor for scheduling response stream chunks. */ - public TestServiceImpl(ScheduledExecutorService executor) { + public TestServiceImpl(ScheduledExecutorService executor, MetricRecorder metricRecorder) { this.executor = executor; this.compressableBuffer = ByteString.copyFrom(new byte[1024]); + this.metricRecorder = metricRecorder; + } + + public TestServiceImpl(ScheduledExecutorService executor) { + this(executor, MetricRecorder.newInstance()); } @Override @@ -112,10 +123,33 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return; } + echoCallMetricsFromPayload(req.getOrcaPerRpcReport()); + echoMetricsFromPayload(req.getOrcaOobReport()); responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } + private static void echoCallMetricsFromPayload(TestOrcaReport report) { + CallMetricRecorder recorder = CallMetricRecorder.getCurrent() + .recordCpuUtilizationMetric(report.getCpuUtilization()) + .recordMemoryUtilizationMetric(report.getMemoryUtilization()); + for (Map.Entry entry : report.getUtilizationMap().entrySet()) { + recorder.recordUtilizationMetric(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : report.getRequestCostMap().entrySet()) { + recorder.recordCallMetric(entry.getKey(), entry.getValue()); + } + } + + private void echoMetricsFromPayload(TestOrcaReport report) { + metricRecorder.setCpuUtilizationMetric(report.getCpuUtilization()); + metricRecorder.setMemoryUtilizationMetric(report.getMemoryUtilization()); + metricRecorder.setAllUtilizationMetrics(new HashMap<>()); + for (Map.Entry entry : report.getUtilizationMap().entrySet()) { + metricRecorder.putUtilizationMetric(entry.getKey(), entry.getValue()); + } + } + /** * Given a request that specifies chunk size and interval between responses, creates and schedules * the response stream. diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index 19946ec4a7..a2966685f3 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -18,6 +18,7 @@ package io.grpc.testing.integration; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.BindableService; import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; import io.grpc.Server; @@ -26,6 +27,9 @@ import io.grpc.ServerInterceptors; import io.grpc.TlsServerCredentials; import io.grpc.alts.AltsServerCredentials; import io.grpc.internal.testing.TestUtils; +import io.grpc.services.MetricRecorder; +import io.grpc.xds.orca.OrcaMetricReportingServerInterceptor; +import io.grpc.xds.orca.OrcaServiceImpl; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -151,11 +155,16 @@ public class TestServiceServer { } else { serverCreds = InsecureServerCredentials.create(); } + MetricRecorder metricRecorder = MetricRecorder.newInstance(); + BindableService orcaOobService = + OrcaServiceImpl.createService(executor, metricRecorder, 1, TimeUnit.SECONDS); server = Grpc.newServerBuilderForPort(port, serverCreds) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .addService( ServerInterceptors.intercept( - new TestServiceImpl(executor), TestServiceImpl.interceptors())) + new TestServiceImpl(executor, metricRecorder), TestServiceImpl.interceptors())) + .addService(orcaOobService) + .intercept(OrcaMetricReportingServerInterceptor.getInstance()) .build() .start(); } diff --git a/interop-testing/src/main/proto/grpc/testing/messages.proto b/interop-testing/src/main/proto/grpc/testing/messages.proto index c37d66f658..d71ac13282 100644 --- a/interop-testing/src/main/proto/grpc/testing/messages.proto +++ b/interop-testing/src/main/proto/grpc/testing/messages.proto @@ -100,6 +100,19 @@ message SimpleRequest { // Whether SimpleResponse should include grpclb_route_type. bool fill_grpclb_route_type = 10; + + // Whether server should update per-rpc metrics. + TestOrcaReport orca_per_rpc_report = 11; + + // Whether server should update OOB metrics. + TestOrcaReport orca_oob_report = 12; +} + +message TestOrcaReport { + double cpu_utilization = 1; + double memory_utilization = 2; + map request_cost = 3; + map utilization = 4; } // Unary response, as configured by the request. diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index 14a9851491..ab32d584e7 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -65,7 +65,9 @@ public class TestCasesTest { "unimplemented_service", "cancel_after_begin", "cancel_after_first_response", - "timeout_on_sleeping_server" + "timeout_on_sleeping_server", + "orca_per_rpc", + "orca_oob" }; // additional test cases diff --git a/services/src/main/java/io/grpc/services/CallMetricRecorder.java b/services/src/main/java/io/grpc/services/CallMetricRecorder.java index 073cfa7a9c..da4283eba8 100644 --- a/services/src/main/java/io/grpc/services/CallMetricRecorder.java +++ b/services/src/main/java/io/grpc/services/CallMetricRecorder.java @@ -56,8 +56,8 @@ public final class CallMetricRecorder { * Create a report for all backend metrics. */ CallMetricReport(double cpuUtilization, double memoryUtilization, - Map requestCostMetrics, - Map utilizationMetrics) { + Map requestCostMetrics, + Map utilizationMetrics) { this.cpuUtilization = cpuUtilization; this.memoryUtilization = memoryUtilization; this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");