mirror of https://github.com/grpc/grpc-java.git
services,orca: update backend metrics support to allow for server-wide metrics recording (per-call and OOB) (#9902)
Also added input range validation.
This commit is contained in:
parent
11a1f9e3e8
commit
5201e49ce1
|
|
@ -63,7 +63,7 @@ public class CustomBackendMetricsServer {
|
||||||
// Enable OOB custom backend metrics reporting.
|
// Enable OOB custom backend metrics reporting.
|
||||||
.addService(orcaOobService)
|
.addService(orcaOobService)
|
||||||
// Enable per-query custom backend metrics reporting.
|
// Enable per-query custom backend metrics reporting.
|
||||||
.intercept(OrcaMetricReportingServerInterceptor.getInstance())
|
.intercept(OrcaMetricReportingServerInterceptor.create(metricRecorder))
|
||||||
.build()
|
.build()
|
||||||
.start();
|
.start();
|
||||||
logger.info("Server started, listening on " + port);
|
logger.info("Server started, listening on " + port);
|
||||||
|
|
|
||||||
|
|
@ -1777,7 +1777,7 @@ public abstract class AbstractInteropTest {
|
||||||
final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
|
final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
|
||||||
.setCpuUtilization(0.29309)
|
.setCpuUtilization(0.29309)
|
||||||
.setMemoryUtilization(0.2)
|
.setMemoryUtilization(0.2)
|
||||||
.putUtilization("util", 100.2039)
|
.putUtilization("util", 0.2039)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
final int retryLimit = 5;
|
final int retryLimit = 5;
|
||||||
|
|
|
||||||
|
|
@ -164,7 +164,7 @@ public class TestServiceServer {
|
||||||
ServerInterceptors.intercept(
|
ServerInterceptors.intercept(
|
||||||
new TestServiceImpl(executor, metricRecorder), TestServiceImpl.interceptors()))
|
new TestServiceImpl(executor, metricRecorder), TestServiceImpl.interceptors()))
|
||||||
.addService(orcaOobService)
|
.addService(orcaOobService)
|
||||||
.intercept(OrcaMetricReportingServerInterceptor.getInstance())
|
.intercept(OrcaMetricReportingServerInterceptor.create(metricRecorder))
|
||||||
.build()
|
.build()
|
||||||
.start();
|
.start();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ java_library(
|
||||||
"src/main/java/io/grpc/services/CallMetricRecorder.java",
|
"src/main/java/io/grpc/services/CallMetricRecorder.java",
|
||||||
"src/main/java/io/grpc/services/MetricRecorder.java",
|
"src/main/java/io/grpc/services/MetricRecorder.java",
|
||||||
"src/main/java/io/grpc/services/MetricReport.java",
|
"src/main/java/io/grpc/services/MetricReport.java",
|
||||||
|
"src/main/java/io/grpc/services/MetricRecorderHelper.java",
|
||||||
],
|
],
|
||||||
deps = [
|
deps = [
|
||||||
"//api",
|
"//api",
|
||||||
|
|
|
||||||
|
|
@ -65,8 +65,8 @@ public final class CallMetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a call metric measurement for utilization.
|
* Records a call metric measurement for utilization in the range [0, 1]. Values outside the valid
|
||||||
* If RPC has already finished, this method is no-op.
|
* range are ignored. If RPC has already finished, this method is no-op.
|
||||||
*
|
*
|
||||||
* <p>A latter record will overwrite its former name-sakes.
|
* <p>A latter record will overwrite its former name-sakes.
|
||||||
*
|
*
|
||||||
|
|
@ -74,7 +74,7 @@ public final class CallMetricRecorder {
|
||||||
* @since 1.23.0
|
* @since 1.23.0
|
||||||
*/
|
*/
|
||||||
public CallMetricRecorder recordUtilizationMetric(String name, double value) {
|
public CallMetricRecorder recordUtilizationMetric(String name, double value) {
|
||||||
if (disabled) {
|
if (disabled || !MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
if (utilizationMetrics.get() == null) {
|
if (utilizationMetrics.get() == null) {
|
||||||
|
|
@ -126,8 +126,8 @@ public final class CallMetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a call metric measurement for CPU utilization.
|
* Records a call metric measurement for CPU utilization in the range [0, 1]. Values outside the
|
||||||
* If RPC has already finished, this method is no-op.
|
* valid range are ignored. If RPC has already finished, this method is no-op.
|
||||||
*
|
*
|
||||||
* <p>A latter record will overwrite its former name-sakes.
|
* <p>A latter record will overwrite its former name-sakes.
|
||||||
*
|
*
|
||||||
|
|
@ -135,7 +135,7 @@ public final class CallMetricRecorder {
|
||||||
* @since 1.47.0
|
* @since 1.47.0
|
||||||
*/
|
*/
|
||||||
public CallMetricRecorder recordCpuUtilizationMetric(double value) {
|
public CallMetricRecorder recordCpuUtilizationMetric(double value) {
|
||||||
if (disabled) {
|
if (disabled || !MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
cpuUtilizationMetric = value;
|
cpuUtilizationMetric = value;
|
||||||
|
|
@ -143,8 +143,8 @@ public final class CallMetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a call metric measurement for memory utilization.
|
* Records a call metric measurement for memory utilization in the range [0, 1]. Values outside
|
||||||
* If RPC has already finished, this method is no-op.
|
* the valid range are ignored. If RPC has already finished, this method is no-op.
|
||||||
*
|
*
|
||||||
* <p>A latter record will overwrite its former name-sakes.
|
* <p>A latter record will overwrite its former name-sakes.
|
||||||
*
|
*
|
||||||
|
|
@ -152,7 +152,7 @@ public final class CallMetricRecorder {
|
||||||
* @since 1.47.0
|
* @since 1.47.0
|
||||||
*/
|
*/
|
||||||
public CallMetricRecorder recordMemoryUtilizationMetric(double value) {
|
public CallMetricRecorder recordMemoryUtilizationMetric(double value) {
|
||||||
if (disabled) {
|
if (disabled || !MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
memoryUtilizationMetric = value;
|
memoryUtilizationMetric = value;
|
||||||
|
|
@ -160,8 +160,8 @@ public final class CallMetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Records a call metric measurement for qps.
|
* Records a call metric measurement for qps in the range [0, inf). Values outside the valid range
|
||||||
* If RPC has already finished, this method is no-op.
|
* are ignored. If RPC has already finished, this method is no-op.
|
||||||
*
|
*
|
||||||
* <p>A latter record will overwrite its former name-sakes.
|
* <p>A latter record will overwrite its former name-sakes.
|
||||||
*
|
*
|
||||||
|
|
@ -169,7 +169,7 @@ public final class CallMetricRecorder {
|
||||||
* @since 1.54.0
|
* @since 1.54.0
|
||||||
*/
|
*/
|
||||||
public CallMetricRecorder recordQpsMetric(double value) {
|
public CallMetricRecorder recordQpsMetric(double value) {
|
||||||
if (disabled) {
|
if (disabled || !MetricRecorderHelper.isQpsValid(value)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
qps = value;
|
qps = value;
|
||||||
|
|
|
||||||
|
|
@ -39,14 +39,18 @@ public final class MetricRecorder {
|
||||||
private MetricRecorder() {}
|
private MetricRecorder() {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the metrics value corresponding to the specified key.
|
* Update the metrics value in the range [0, 1] corresponding to the specified key. Values outside
|
||||||
|
* the valid range are ignored.
|
||||||
*/
|
*/
|
||||||
public void putUtilizationMetric(String key, double value) {
|
public void putUtilizationMetric(String key, double value) {
|
||||||
|
if (!MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
metricsData.put(key, value);
|
metricsData.put(key, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Replace the whole metrics data using the specified map.
|
* Replace the whole metrics data using the specified map. No range validation.
|
||||||
*/
|
*/
|
||||||
public void setAllUtilizationMetrics(Map<String, Double> metrics) {
|
public void setAllUtilizationMetrics(Map<String, Double> metrics) {
|
||||||
metricsData = new ConcurrentHashMap<>(metrics);
|
metricsData = new ConcurrentHashMap<>(metrics);
|
||||||
|
|
@ -60,9 +64,13 @@ public final class MetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the CPU utilization metrics data.
|
* Update the CPU utilization metrics data in the range [0, 1]. Values outside the valid range are
|
||||||
|
* ignored.
|
||||||
*/
|
*/
|
||||||
public void setCpuUtilizationMetric(double value) {
|
public void setCpuUtilizationMetric(double value) {
|
||||||
|
if (!MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
cpuUtilization = value;
|
cpuUtilization = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,9 +82,13 @@ public final class MetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the memory utilization metrics data.
|
* Update the memory utilization metrics data in the range [0, 1]. Values outside the valid range
|
||||||
|
* are ignored.
|
||||||
*/
|
*/
|
||||||
public void setMemoryUtilizationMetric(double value) {
|
public void setMemoryUtilizationMetric(double value) {
|
||||||
|
if (!MetricRecorderHelper.isUtilizationValid(value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
memoryUtilization = value;
|
memoryUtilization = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -88,9 +100,12 @@ public final class MetricRecorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the QPS metrics data.
|
* Update the QPS metrics data in the range [0, inf). Values outside the valid range are ignored.
|
||||||
*/
|
*/
|
||||||
public void setQps(double value) {
|
public void setQps(double value) {
|
||||||
|
if (!MetricRecorderHelper.isQpsValid(value)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
qps = value;
|
qps = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2023 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.services;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility helper class to check whether values for {@link CallMetricRecorder} and
|
||||||
|
* {@link MetricRecorder} are inside the valid range.
|
||||||
|
*/
|
||||||
|
final class MetricRecorderHelper {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the utilization value is in the range [0, 1] and false otherwise.
|
||||||
|
*/
|
||||||
|
static boolean isUtilizationValid(double utilization) {
|
||||||
|
return utilization >= 0.0 && utilization <= 1.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the qps value is in the range [0, inf) and false otherwise.
|
||||||
|
*/
|
||||||
|
static boolean isQpsValid(double qps) {
|
||||||
|
return qps >= 0.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent instantiation.
|
||||||
|
private MetricRecorderHelper() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -38,33 +38,62 @@ public class CallMetricRecorderTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dumpDumpsAllSavedMetricValues() {
|
public void dumpDumpsAllSavedMetricValues() {
|
||||||
recorder.recordUtilizationMetric("util1", 154353.423);
|
recorder.recordUtilizationMetric("util1", 0.154353423);
|
||||||
recorder.recordUtilizationMetric("util2", 0.1367);
|
recorder.recordUtilizationMetric("util2", 0.1367);
|
||||||
recorder.recordUtilizationMetric("util3", 1437.34);
|
recorder.recordUtilizationMetric("util3", 0.143734);
|
||||||
recorder.recordRequestCostMetric("cost1", 37465.12);
|
recorder.recordRequestCostMetric("cost1", 37465.12);
|
||||||
recorder.recordRequestCostMetric("cost2", 10293.0);
|
recorder.recordRequestCostMetric("cost2", 10293.0);
|
||||||
recorder.recordRequestCostMetric("cost3", 1.0);
|
recorder.recordRequestCostMetric("cost3", 1.0);
|
||||||
recorder.recordCpuUtilizationMetric(0.1928);
|
recorder.recordCpuUtilizationMetric(0.1928);
|
||||||
recorder.recordMemoryUtilizationMetric(47.4);
|
recorder.recordMemoryUtilizationMetric(0.474);
|
||||||
recorder.recordQpsMetric(2522.54);
|
recorder.recordQpsMetric(2522.54);
|
||||||
|
|
||||||
MetricReport dump = recorder.finalizeAndDump2();
|
MetricReport dump = recorder.finalizeAndDump2();
|
||||||
Truth.assertThat(dump.getUtilizationMetrics())
|
Truth.assertThat(dump.getUtilizationMetrics())
|
||||||
.containsExactly("util1", 154353.423, "util2", 0.1367, "util3", 1437.34);
|
.containsExactly("util1", 0.154353423, "util2", 0.1367, "util3", 0.143734);
|
||||||
Truth.assertThat(dump.getRequestCostMetrics())
|
Truth.assertThat(dump.getRequestCostMetrics())
|
||||||
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
|
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
|
||||||
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
|
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
|
||||||
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(47.4);
|
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474);
|
||||||
Truth.assertThat(dump.getQps()).isEqualTo(2522.54);
|
Truth.assertThat(dump.getQps()).isEqualTo(2522.54);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void noMetricsRecordedAfterSnapshot() {
|
public void noMetricsRecordedAfterSnapshot() {
|
||||||
Map<String, Double> initDump = recorder.finalizeAndDump();
|
Map<String, Double> initDump = recorder.finalizeAndDump();
|
||||||
recorder.recordUtilizationMetric("cost", 154353.423);
|
recorder.recordUtilizationMetric("cost", 0.154353423);
|
||||||
assertThat(recorder.finalizeAndDump()).isEqualTo(initDump);
|
assertThat(recorder.finalizeAndDump()).isEqualTo(initDump);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noMetricsRecordedIfUtilizationIsGreaterThanUpperBound() {
|
||||||
|
recorder.recordCpuUtilizationMetric(1.001);
|
||||||
|
recorder.recordMemoryUtilizationMetric(1.001);
|
||||||
|
recorder.recordUtilizationMetric("util1", 1.001);
|
||||||
|
|
||||||
|
MetricReport dump = recorder.finalizeAndDump2();
|
||||||
|
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getQps()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getUtilizationMetrics()).isEmpty();
|
||||||
|
Truth.assertThat(dump.getRequestCostMetrics()).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void noMetricsRecordedIfUtilizationAndQpsAreLessThanLowerBound() {
|
||||||
|
recorder.recordCpuUtilizationMetric(-0.001);
|
||||||
|
recorder.recordMemoryUtilizationMetric(-0.001);
|
||||||
|
recorder.recordQpsMetric(-0.001);
|
||||||
|
recorder.recordUtilizationMetric("util1", -0.001);
|
||||||
|
|
||||||
|
MetricReport dump = recorder.finalizeAndDump2();
|
||||||
|
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getQps()).isEqualTo(0);
|
||||||
|
Truth.assertThat(dump.getUtilizationMetrics()).isEmpty();
|
||||||
|
Truth.assertThat(dump.getRequestCostMetrics()).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void lastValueWinForMetricsWithSameName() {
|
public void lastValueWinForMetricsWithSameName() {
|
||||||
recorder.recordRequestCostMetric("cost1", 3412.5435);
|
recorder.recordRequestCostMetric("cost1", 3412.5435);
|
||||||
|
|
@ -72,20 +101,20 @@ public class CallMetricRecorderTest {
|
||||||
recorder.recordRequestCostMetric("cost1", 6441.341);
|
recorder.recordRequestCostMetric("cost1", 6441.341);
|
||||||
recorder.recordRequestCostMetric("cost1", 4654.67);
|
recorder.recordRequestCostMetric("cost1", 4654.67);
|
||||||
recorder.recordRequestCostMetric("cost2", 75.83);
|
recorder.recordRequestCostMetric("cost2", 75.83);
|
||||||
recorder.recordMemoryUtilizationMetric(1.3);
|
recorder.recordMemoryUtilizationMetric(0.13);
|
||||||
recorder.recordMemoryUtilizationMetric(3.1);
|
recorder.recordMemoryUtilizationMetric(0.31);
|
||||||
recorder.recordUtilizationMetric("util1", 28374.21);
|
recorder.recordUtilizationMetric("util1", 0.2837421);
|
||||||
recorder.recordMemoryUtilizationMetric(9384.0);
|
recorder.recordMemoryUtilizationMetric(0.93840);
|
||||||
recorder.recordUtilizationMetric("util1", 84323.3);
|
recorder.recordUtilizationMetric("util1", 0.843233);
|
||||||
recorder.recordQpsMetric(1928.3);
|
recorder.recordQpsMetric(1928.3);
|
||||||
recorder.recordQpsMetric(100.8);
|
recorder.recordQpsMetric(100.8);
|
||||||
|
|
||||||
MetricReport dump = recorder.finalizeAndDump2();
|
MetricReport dump = recorder.finalizeAndDump2();
|
||||||
Truth.assertThat(dump.getRequestCostMetrics())
|
Truth.assertThat(dump.getRequestCostMetrics())
|
||||||
.containsExactly("cost1", 4654.67, "cost2", 75.83);
|
.containsExactly("cost1", 4654.67, "cost2", 75.83);
|
||||||
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(9384.0);
|
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.93840);
|
||||||
Truth.assertThat(dump.getUtilizationMetrics())
|
Truth.assertThat(dump.getUtilizationMetrics())
|
||||||
.containsExactly("util1", 84323.3);
|
.containsExactly("util1", 0.843233);
|
||||||
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
|
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
|
||||||
Truth.assertThat(dump.getQps()).isEqualTo(100.8);
|
Truth.assertThat(dump.getQps()).isEqualTo(100.8);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,10 @@ import io.grpc.Status;
|
||||||
import io.grpc.protobuf.ProtoUtils;
|
import io.grpc.protobuf.ProtoUtils;
|
||||||
import io.grpc.services.CallMetricRecorder;
|
import io.grpc.services.CallMetricRecorder;
|
||||||
import io.grpc.services.InternalCallMetricRecorder;
|
import io.grpc.services.InternalCallMetricRecorder;
|
||||||
|
import io.grpc.services.InternalMetricRecorder;
|
||||||
|
import io.grpc.services.MetricRecorder;
|
||||||
import io.grpc.services.MetricReport;
|
import io.grpc.services.MetricReport;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link ServerInterceptor} that intercepts a {@link ServerCall} by running server-side RPC
|
* A {@link ServerInterceptor} that intercepts a {@link ServerCall} by running server-side RPC
|
||||||
|
|
@ -45,7 +48,7 @@ import io.grpc.services.MetricReport;
|
||||||
public final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
|
public final class OrcaMetricReportingServerInterceptor implements ServerInterceptor {
|
||||||
|
|
||||||
private static final OrcaMetricReportingServerInterceptor INSTANCE =
|
private static final OrcaMetricReportingServerInterceptor INSTANCE =
|
||||||
new OrcaMetricReportingServerInterceptor();
|
new OrcaMetricReportingServerInterceptor(null);
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
|
static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY =
|
||||||
|
|
@ -53,14 +56,29 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce
|
||||||
"endpoint-load-metrics-bin",
|
"endpoint-load-metrics-bin",
|
||||||
ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
|
ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance()));
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
private final MetricRecorder metricRecorder;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
OrcaMetricReportingServerInterceptor() {
|
OrcaMetricReportingServerInterceptor(@Nullable MetricRecorder metricRecorder) {
|
||||||
|
this.metricRecorder = metricRecorder;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static OrcaMetricReportingServerInterceptor getInstance() {
|
public static OrcaMetricReportingServerInterceptor getInstance() {
|
||||||
return INSTANCE;
|
return INSTANCE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link OrcaMetricReportingServerInterceptor} instance with the given
|
||||||
|
* {@link MetricRecorder}. When both {@link CallMetricRecorder} and {@link MetricRecorder} exist,
|
||||||
|
* the metrics are merged such that per-request metrics from {@link CallMetricRecorder} takes a
|
||||||
|
* higher precedence compared to metrics from {@link MetricRecorder}.
|
||||||
|
*/
|
||||||
|
public static OrcaMetricReportingServerInterceptor create(
|
||||||
|
@Nullable MetricRecorder metricRecorder) {
|
||||||
|
return new OrcaMetricReportingServerInterceptor(metricRecorder);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> Listener<ReqT> interceptCall(
|
public <ReqT, RespT> Listener<ReqT> interceptCall(
|
||||||
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
|
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
|
||||||
|
|
@ -75,8 +93,12 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce
|
||||||
new SimpleForwardingServerCall<ReqT, RespT>(call) {
|
new SimpleForwardingServerCall<ReqT, RespT>(call) {
|
||||||
@Override
|
@Override
|
||||||
public void close(Status status, Metadata trailers) {
|
public void close(Status status, Metadata trailers) {
|
||||||
OrcaLoadReport report = fromInternalReport(
|
OrcaLoadReport.Builder reportBuilder = metricRecorder != null ? fromInternalReport(
|
||||||
|
InternalMetricRecorder.getMetricReport(metricRecorder))
|
||||||
|
: OrcaLoadReport.newBuilder();
|
||||||
|
mergeMetrics(reportBuilder,
|
||||||
InternalCallMetricRecorder.finalizeAndDump2(finalCallMetricRecorder));
|
InternalCallMetricRecorder.finalizeAndDump2(finalCallMetricRecorder));
|
||||||
|
OrcaLoadReport report = reportBuilder.build();
|
||||||
if (!report.equals(OrcaLoadReport.getDefaultInstance())) {
|
if (!report.equals(OrcaLoadReport.getDefaultInstance())) {
|
||||||
trailers.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, report);
|
trailers.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, report);
|
||||||
}
|
}
|
||||||
|
|
@ -90,12 +112,42 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce
|
||||||
next);
|
next);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static OrcaLoadReport fromInternalReport(MetricReport internalReport) {
|
private static OrcaLoadReport.Builder fromInternalReport(MetricReport internalReport) {
|
||||||
return OrcaLoadReport.newBuilder()
|
return OrcaLoadReport.newBuilder()
|
||||||
.setCpuUtilization(internalReport.getCpuUtilization())
|
.setCpuUtilization(internalReport.getCpuUtilization())
|
||||||
.setMemUtilization(internalReport.getMemoryUtilization())
|
.setMemUtilization(internalReport.getMemoryUtilization())
|
||||||
|
.setRpsFractional(internalReport.getQps())
|
||||||
.putAllUtilization(internalReport.getUtilizationMetrics())
|
.putAllUtilization(internalReport.getUtilizationMetrics())
|
||||||
.putAllRequestCost(internalReport.getRequestCostMetrics())
|
.putAllRequestCost(internalReport.getRequestCostMetrics());
|
||||||
.build();
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Modify the given {@link OrcaLoadReport.Builder} containing metrics for {@link MetricRecorder}
|
||||||
|
* such that metrics from the given {@link MetricReport} for {@link CallMetricRecorder} takes a
|
||||||
|
* higher precedence.
|
||||||
|
*/
|
||||||
|
private static void mergeMetrics(
|
||||||
|
OrcaLoadReport.Builder metricRecorderReportBuilder,
|
||||||
|
MetricReport callMetricRecorderReport
|
||||||
|
) {
|
||||||
|
metricRecorderReportBuilder.putAllUtilization(callMetricRecorderReport.getUtilizationMetrics())
|
||||||
|
.putAllRequestCost(callMetricRecorderReport.getRequestCostMetrics());
|
||||||
|
// Overwrite only if the values from the given MetricReport for CallMetricRecorder are set
|
||||||
|
double cpu = callMetricRecorderReport.getCpuUtilization();
|
||||||
|
if (isReportValueSet(cpu)) {
|
||||||
|
metricRecorderReportBuilder.setCpuUtilization(cpu);
|
||||||
|
}
|
||||||
|
double mem = callMetricRecorderReport.getMemoryUtilization();
|
||||||
|
if (isReportValueSet(mem)) {
|
||||||
|
metricRecorderReportBuilder.setMemUtilization(mem);
|
||||||
|
}
|
||||||
|
double rps = callMetricRecorderReport.getQps();
|
||||||
|
if (isReportValueSet(rps)) {
|
||||||
|
metricRecorderReportBuilder.setRpsFractional(rps);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isReportValueSet(double value) {
|
||||||
|
return value != 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import io.grpc.inprocess.InProcessChannelBuilder;
|
||||||
import io.grpc.inprocess.InProcessServerBuilder;
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
import io.grpc.services.CallMetricRecorder;
|
import io.grpc.services.CallMetricRecorder;
|
||||||
import io.grpc.services.InternalCallMetricRecorder;
|
import io.grpc.services.InternalCallMetricRecorder;
|
||||||
|
import io.grpc.services.MetricRecorder;
|
||||||
import io.grpc.stub.ClientCalls;
|
import io.grpc.stub.ClientCalls;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.GrpcCleanupRule;
|
import io.grpc.testing.GrpcCleanupRule;
|
||||||
|
|
@ -73,6 +74,8 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
private final Map<String, Double> applicationCostMetrics = new HashMap<>();
|
private final Map<String, Double> applicationCostMetrics = new HashMap<>();
|
||||||
private double cpuUtilizationMetrics = 0;
|
private double cpuUtilizationMetrics = 0;
|
||||||
private double memoryUtilizationMetrics = 0;
|
private double memoryUtilizationMetrics = 0;
|
||||||
|
private double qpsMetrics = 0;
|
||||||
|
private MetricRecorder metricRecorder;
|
||||||
|
|
||||||
private final AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
|
private final AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
|
||||||
|
|
||||||
|
|
@ -95,6 +98,7 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
}
|
}
|
||||||
CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics);
|
CallMetricRecorder.getCurrent().recordCpuUtilizationMetric(cpuUtilizationMetrics);
|
||||||
CallMetricRecorder.getCurrent().recordMemoryUtilizationMetric(memoryUtilizationMetrics);
|
CallMetricRecorder.getCurrent().recordMemoryUtilizationMetric(memoryUtilizationMetrics);
|
||||||
|
CallMetricRecorder.getCurrent().recordQpsMetric(qpsMetrics);
|
||||||
SimpleResponse response =
|
SimpleResponse response =
|
||||||
SimpleResponse.newBuilder().setResponseMessage("Simple response").build();
|
SimpleResponse.newBuilder().setResponseMessage("Simple response").build();
|
||||||
responseObserver.onNext(response);
|
responseObserver.onNext(response);
|
||||||
|
|
@ -102,7 +106,9 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ServerInterceptor metricReportingServerInterceptor = new OrcaMetricReportingServerInterceptor();
|
metricRecorder = MetricRecorder.newInstance();
|
||||||
|
ServerInterceptor metricReportingServerInterceptor = new OrcaMetricReportingServerInterceptor(
|
||||||
|
metricRecorder);
|
||||||
String serverName = InProcessServerBuilder.generateName();
|
String serverName = InProcessServerBuilder.generateName();
|
||||||
grpcCleanupRule.register(
|
grpcCleanupRule.register(
|
||||||
InProcessServerBuilder
|
InProcessServerBuilder
|
||||||
|
|
@ -149,7 +155,8 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
ServerInterceptor metricReportingServerInterceptor = new OrcaMetricReportingServerInterceptor();
|
ServerInterceptor metricReportingServerInterceptor =
|
||||||
|
OrcaMetricReportingServerInterceptor.getInstance();
|
||||||
String serverName = InProcessServerBuilder.generateName();
|
String serverName = InProcessServerBuilder.generateName();
|
||||||
grpcCleanupRule.register(
|
grpcCleanupRule.register(
|
||||||
InProcessServerBuilder
|
InProcessServerBuilder
|
||||||
|
|
@ -177,7 +184,7 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void responseTrailersContainAllReportedMetrics() {
|
public void responseTrailersContainAllReportedMetricsFromCallMetricRecorder() {
|
||||||
applicationCostMetrics.put("cost1", 1231.4543);
|
applicationCostMetrics.put("cost1", 1231.4543);
|
||||||
applicationCostMetrics.put("cost2", 0.1367);
|
applicationCostMetrics.put("cost2", 0.1367);
|
||||||
applicationCostMetrics.put("cost3", 7614.145);
|
applicationCostMetrics.put("cost3", 7614.145);
|
||||||
|
|
@ -186,6 +193,7 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
applicationUtilizationMetrics.put("util3", 0.5342);
|
applicationUtilizationMetrics.put("util3", 0.5342);
|
||||||
cpuUtilizationMetrics = 0.3465;
|
cpuUtilizationMetrics = 0.3465;
|
||||||
memoryUtilizationMetrics = 0.764;
|
memoryUtilizationMetrics = 0.764;
|
||||||
|
qpsMetrics = 3.1415926535;
|
||||||
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
|
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
|
||||||
Metadata receivedTrailers = trailersCapture.get();
|
Metadata receivedTrailers = trailersCapture.get();
|
||||||
OrcaLoadReport report =
|
OrcaLoadReport report =
|
||||||
|
|
@ -196,6 +204,54 @@ public class OrcaMetricReportingServerInterceptorTest {
|
||||||
.containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145);
|
.containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145);
|
||||||
assertThat(report.getCpuUtilization()).isEqualTo(0.3465);
|
assertThat(report.getCpuUtilization()).isEqualTo(0.3465);
|
||||||
assertThat(report.getMemUtilization()).isEqualTo(0.764);
|
assertThat(report.getMemUtilization()).isEqualTo(0.764);
|
||||||
|
assertThat(report.getRpsFractional()).isEqualTo(3.1415926535);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricRecorder() {
|
||||||
|
applicationUtilizationMetrics.put("util1", 0.1482);
|
||||||
|
applicationUtilizationMetrics.put("util2", 0.4036);
|
||||||
|
applicationUtilizationMetrics.put("util3", 0.5742);
|
||||||
|
cpuUtilizationMetrics = 0.3465;
|
||||||
|
memoryUtilizationMetrics = 0.967;
|
||||||
|
metricRecorder.setMemoryUtilizationMetric(0.764);
|
||||||
|
metricRecorder.setQps(1.618);
|
||||||
|
metricRecorder.putUtilizationMetric("serverUtil1", 0.7467);
|
||||||
|
metricRecorder.putUtilizationMetric("serverUtil2", 0.2233);
|
||||||
|
metricRecorder.putUtilizationMetric("util1", 0.01);
|
||||||
|
metricRecorder.putUtilizationMetric("util3", 0.99);
|
||||||
|
|
||||||
|
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
|
||||||
|
Metadata receivedTrailers = trailersCapture.get();
|
||||||
|
OrcaLoadReport report =
|
||||||
|
receivedTrailers.get(OrcaMetricReportingServerInterceptor.ORCA_ENDPOINT_LOAD_METRICS_KEY);
|
||||||
|
|
||||||
|
assertThat(report.getUtilizationMap())
|
||||||
|
.containsExactly("util1", 0.1482, "util2", 0.4036, "util3", 0.5742, "serverUtil1", 0.7467,
|
||||||
|
"serverUtil2", 0.2233);
|
||||||
|
assertThat(report.getRequestCostMap()).isEmpty();
|
||||||
|
assertThat(report.getCpuUtilization()).isEqualTo(0.3465);
|
||||||
|
assertThat(report.getMemUtilization()).isEqualTo(0.967);
|
||||||
|
assertThat(report.getRpsFractional()).isEqualTo(1.618);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void responseTrailersContainMergedMetricsFromCallMetricRecorderAndMetricRecorderNoMap() {
|
||||||
|
qpsMetrics = 5142.77;
|
||||||
|
metricRecorder.setCpuUtilizationMetric(0.314159);
|
||||||
|
metricRecorder.setMemoryUtilizationMetric(0.764);
|
||||||
|
metricRecorder.setQps(1.618);
|
||||||
|
|
||||||
|
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
|
||||||
|
Metadata receivedTrailers = trailersCapture.get();
|
||||||
|
OrcaLoadReport report =
|
||||||
|
receivedTrailers.get(OrcaMetricReportingServerInterceptor.ORCA_ENDPOINT_LOAD_METRICS_KEY);
|
||||||
|
|
||||||
|
assertThat(report.getUtilizationMap()).isEmpty();
|
||||||
|
assertThat(report.getRequestCostMap()).isEmpty();
|
||||||
|
assertThat(report.getCpuUtilization()).isEqualTo(0.314159);
|
||||||
|
assertThat(report.getMemUtilization()).isEqualTo(0.764);
|
||||||
|
assertThat(report.getRpsFractional()).isEqualTo(5142.77);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class TrailersCapturingClientInterceptor implements ClientInterceptor {
|
private static final class TrailersCapturingClientInterceptor implements ClientInterceptor {
|
||||||
|
|
|
||||||
|
|
@ -150,7 +150,7 @@ public class OrcaServiceImplTest {
|
||||||
call.halfClose();
|
call.halfClose();
|
||||||
call.request(1);
|
call.request(1);
|
||||||
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2)
|
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2)
|
||||||
.setRpsFractional(1.9).build();
|
.setRpsFractional(1.9).build();
|
||||||
verify(listener).onMessage(eq(expect));
|
verify(listener).onMessage(eq(expect));
|
||||||
reset(listener);
|
reset(listener);
|
||||||
defaultTestService.removeUtilizationMetric("buffer0");
|
defaultTestService.removeUtilizationMetric("buffer0");
|
||||||
|
|
@ -215,12 +215,12 @@ public class OrcaServiceImplTest {
|
||||||
public void testMultipleClients() {
|
public void testMultipleClients() {
|
||||||
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
|
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
|
||||||
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
|
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
|
||||||
defaultTestService.putUtilizationMetric("omg", 100);
|
defaultTestService.putUtilizationMetric("omg", 1.00);
|
||||||
call.start(listener, new Metadata());
|
call.start(listener, new Metadata());
|
||||||
call.sendMessage(OrcaLoadReportRequest.newBuilder().build());
|
call.sendMessage(OrcaLoadReportRequest.newBuilder().build());
|
||||||
call.halfClose();
|
call.halfClose();
|
||||||
call.request(1);
|
call.request(1);
|
||||||
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("omg", 100).build();
|
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("omg", 1.00).build();
|
||||||
verify(listener).onMessage(eq(expect));
|
verify(listener).onMessage(eq(expect));
|
||||||
defaultTestService.setMemoryUtilizationMetric(0.5);
|
defaultTestService.setMemoryUtilizationMetric(0.5);
|
||||||
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call2 = channel.newCall(
|
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call2 = channel.newCall(
|
||||||
|
|
@ -275,6 +275,16 @@ public class OrcaServiceImplTest {
|
||||||
fakeClock.forwardTime(1, TimeUnit.SECONDS);
|
fakeClock.forwardTime(1, TimeUnit.SECONDS);
|
||||||
assertThat(reports.next()).isEqualTo(goldenReport);
|
assertThat(reports.next()).isEqualTo(goldenReport);
|
||||||
|
|
||||||
|
defaultTestService.setCpuUtilizationMetric(-0.001);
|
||||||
|
defaultTestService.setCpuUtilizationMetric(1.001);
|
||||||
|
defaultTestService.setMemoryUtilizationMetric(-0.001);
|
||||||
|
defaultTestService.setMemoryUtilizationMetric(1.001);
|
||||||
|
defaultTestService.setQps(-0.001);
|
||||||
|
defaultTestService.putUtilizationMetric("util-out-of-range", -0.001);
|
||||||
|
defaultTestService.putUtilizationMetric("util-out-of-range", 1.001);
|
||||||
|
fakeClock.forwardTime(1, TimeUnit.SECONDS);
|
||||||
|
assertThat(reports.next()).isEqualTo(goldenReport);
|
||||||
|
|
||||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||||
new Thread(new Runnable() {
|
new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue