diff --git a/xds/src/main/java/io/grpc/xds/OrcaOobService.java b/xds/src/main/java/io/grpc/xds/OrcaOobService.java new file mode 100644 index 0000000000..d5c287cc5d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/OrcaOobService.java @@ -0,0 +1,121 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.BindableService; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Implements the service/APIs for Out-of-Band metrics reporting, only for utilization metrics. + * Register the returned service {@link #getService()} to the server, then a client can request + * for periodic load reports. A user should use the public set-APIs to update the server machine's + * utilization metrics data. + */ +@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/9006") +public final class OrcaOobService { + /** + * Empty or invalid (non-positive) minInterval config in will be treated to this default value. + */ + public static final long DEFAULT_MIN_REPORT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(30); + + private final OrcaServiceImpl orcaService; + + /** + * Construct an OOB metrics reporting service. + * + * @param minInterval configures the minimum metrics reporting interval for the service. Bad + * configuration (non-positive) will be overridden to service default (30s). + * Minimum metrics reporting interval means, if the setting in the client's + * request is invalid (non-positive) or below this value, they will be treated + * as this value. + */ + public OrcaOobService(long minInterval, TimeUnit timeUnit, + ScheduledExecutorService timeService) { + this.orcaService = new OrcaServiceImpl(minInterval > 0 ? timeUnit.toNanos(minInterval) + : DEFAULT_MIN_REPORT_INTERVAL_NANOS, checkNotNull(timeService)); + } + + public OrcaOobService(ScheduledExecutorService timeService) { + this(DEFAULT_MIN_REPORT_INTERVAL_NANOS, TimeUnit.NANOSECONDS, timeService); + } + + /** + * Returns the service instance to be bound to the server for ORCA OOB functionality. + */ + public BindableService getService() { + return orcaService; + } + + @VisibleForTesting + int getClientsCount() { + return orcaService.clientCount.get(); + } + + /** + * Update the metrics value corresponding to the specified key. + */ + public void setUtilizationMetric(String key, double value) { + orcaService.setUtilizationMetric(key, value); + } + + /** + * Replace the whole metrics data using the specified map. + */ + public void setAllUtilizationMetrics(Map metrics) { + orcaService.setAllUtilizationMetrics(metrics); + } + + /** + * Remove the metrics data entry corresponding to the specified key. + */ + public void deleteUtilizationMetric(String key) { + orcaService.deleteUtilizationMetric(key); + } + + /** + * Update the CPU utilization metrics data. + */ + public void setCpuUtilizationMetric(double value) { + orcaService.setCpuUtilizationMetric(value); + } + + /** + * Clear the CPU utilization metrics data. + */ + public void deleteCpuUtilizationMetric() { + orcaService.deleteCpuUtilizationMetric(); + } + + /** + * Update the memory utilization metrics data. + */ + public void setMemoryUtilizationMetric(double value) { + orcaService.setMemoryUtilizationMetric(value); + } + + /** + * Clear the memory utilization metrics data. + */ + public void deleteMemoryUtilizationMetric() { + orcaService.deleteMemoryUtilizationMetric(); + } +} diff --git a/xds/src/main/java/io/grpc/xds/OrcaServiceImpl.java b/xds/src/main/java/io/grpc/xds/OrcaServiceImpl.java new file mode 100644 index 0000000000..818ca3601e --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/OrcaServiceImpl.java @@ -0,0 +1,141 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.github.xds.data.orca.v3.OrcaLoadReport; +import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; +import com.github.xds.service.orca.v3.OrcaLoadReportRequest; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.util.Durations; +import io.grpc.SynchronizationContext; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +final class OrcaServiceImpl extends OpenRcaServiceGrpc.OpenRcaServiceImplBase { + private static final Logger logger = Logger.getLogger(OrcaServiceImpl.class.getName()); + + private final long minReportIntervalNanos; + private final ScheduledExecutorService timeService; + private volatile ConcurrentHashMap metricsData = new ConcurrentHashMap<>(); + private volatile double cpuUtilization; + private volatile double memoryUtilization; + @VisibleForTesting + final AtomicInteger clientCount = new AtomicInteger(0); + + public OrcaServiceImpl(long minReportIntervalNanos, ScheduledExecutorService timeService) { + this.minReportIntervalNanos = minReportIntervalNanos; + this.timeService = checkNotNull(timeService); + } + + @Override + public void streamCoreMetrics( + OrcaLoadReportRequest request, StreamObserver responseObserver) { + OrcaClient client = new OrcaClient(request, responseObserver); + client.run(); + clientCount.getAndIncrement(); + } + + private final class OrcaClient implements Runnable { + final OrcaLoadReportRequest request; + final ServerCallStreamObserver responseObserver; + SynchronizationContext.ScheduledHandle periodicReportTimer; + final long reportIntervalNanos; + final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + logger.log(Level.SEVERE, "Exception!" + e); + } + }); + + OrcaClient(OrcaLoadReportRequest request, StreamObserver responseObserver) { + this.request = checkNotNull(request); + this.reportIntervalNanos = Math.max(Durations.toNanos(request.getReportInterval()), + minReportIntervalNanos); + this.responseObserver = (ServerCallStreamObserver) responseObserver; + this.responseObserver.setOnCancelHandler(new Runnable() { + @Override + public void run() { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (periodicReportTimer != null) { + periodicReportTimer.cancel(); + } + clientCount.getAndDecrement(); + } + }); + } + }); + } + + @Override + public void run() { + if (periodicReportTimer != null && periodicReportTimer.isPending()) { + return; + } + OrcaLoadReport report = generateMetricsReport(); + responseObserver.onNext(report); + periodicReportTimer = syncContext.schedule(OrcaClient.this, reportIntervalNanos, + TimeUnit.NANOSECONDS, timeService); + } + } + + private OrcaLoadReport generateMetricsReport() { + return OrcaLoadReport.newBuilder().setCpuUtilization(cpuUtilization) + .setMemUtilization(memoryUtilization) + .putAllUtilization(metricsData) + .build(); + } + + void setUtilizationMetric(String key, double value) { + metricsData.put(key, value); + } + + void setAllUtilizationMetrics(Map metrics) { + metricsData = new ConcurrentHashMap<>(metrics); + } + + void deleteUtilizationMetric(String key) { + metricsData.remove(key); + } + + void setCpuUtilizationMetric(double value) { + cpuUtilization = value; + } + + void deleteCpuUtilizationMetric() { + cpuUtilization = 0; + } + + void setMemoryUtilizationMetric(double value) { + memoryUtilization = value; + } + + void deleteMemoryUtilizationMetric() { + memoryUtilization = 0; + } +} diff --git a/xds/src/test/java/io/grpc/xds/OrcaServiceImplTest.java b/xds/src/test/java/io/grpc/xds/OrcaServiceImplTest.java new file mode 100644 index 0000000000..42e90ac60d --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/OrcaServiceImplTest.java @@ -0,0 +1,297 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +import com.github.xds.data.orca.v3.OrcaLoadReport; +import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; +import com.github.xds.service.orca.v3.OrcaLoadReportRequest; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Duration; +import io.grpc.BindableService; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.FakeClock; +import io.grpc.testing.GrpcCleanupRule; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(JUnit4.class) +public class OrcaServiceImplTest { + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + private ManagedChannel channel; + private Server oobServer; + private final FakeClock fakeClock = new FakeClock(); + private OrcaOobService defaultTestService; + private final Random random = new Random(); + @Mock + ClientCall.Listener listener; + + @Before + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + defaultTestService = new OrcaOobService(1, TimeUnit.SECONDS, + fakeClock.getScheduledExecutorService()); + startServerAndGetChannel(defaultTestService.getService()); + } + + @After + public void teardown() throws Exception { + channel.shutdownNow(); + } + + private void startServerAndGetChannel(BindableService orcaService) throws Exception { + oobServer = grpcCleanup.register( + InProcessServerBuilder.forName("orca-service-test") + .addService(orcaService) + .directExecutor() + .build() + .start()); + channel = grpcCleanup.register( + InProcessChannelBuilder.forName("orca-service-test") + .directExecutor().build()); + } + + @Test + public void testReportingLifeCycle() { + defaultTestService.setCpuUtilizationMetric(0.1); + Iterator reports = OpenRcaServiceGrpc.newBlockingStub(channel) + .streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build()); + assertThat(reports.next()).isEqualTo( + OrcaLoadReport.newBuilder().setCpuUtilization(0.1).build()); + assertThat(defaultTestService.getClientsCount()).isEqualTo(1); + assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1); + assertThat(reports.next()).isEqualTo( + OrcaLoadReport.newBuilder().setCpuUtilization(0.1).build()); + assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1); + channel.shutdownNow(); + assertThat(defaultTestService.getClientsCount()).isEqualTo(0); + assertThat(fakeClock.getPendingTasks().size()).isEqualTo(0); + } + + @Test + @SuppressWarnings("unchecked") + public void testReportingLifeCycle_serverShutdown() { + ClientCall call = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + defaultTestService.setUtilizationMetric("buffer", 0.2); + call.start(listener, new Metadata()); + call.sendMessage(OrcaLoadReportRequest.newBuilder() + .setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build()); + call.halfClose(); + call.request(1); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); + assertThat(defaultTestService.getClientsCount()).isEqualTo(1); + verify(listener).onMessage(eq(expect)); + reset(listener); + oobServer.shutdownNow(); + assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0); + assertThat(defaultTestService.getClientsCount()).isEqualTo(0); + ArgumentCaptor callCloseCaptor = ArgumentCaptor.forClass(null); + verify(listener).onClose(callCloseCaptor.capture(), any()); + assertThat(callCloseCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + } + + @Test + @SuppressWarnings("unchecked") + public void testRequestIntervalLess() { + ClientCall call = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + defaultTestService.setUtilizationMetric("buffer", 0.2); + call.start(listener, new Metadata()); + call.sendMessage(OrcaLoadReportRequest.newBuilder() + .setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build()); + call.halfClose(); + call.request(1); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); + verify(listener).onMessage(eq(expect)); + reset(listener); + defaultTestService.deleteUtilizationMetric("buffer0"); + assertThat(fakeClock.forwardTime(500, TimeUnit.NANOSECONDS)).isEqualTo(0); + verifyNoInteractions(listener); + assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1); + call.request(1); + verify(listener).onMessage(eq(expect)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRequestIntervalGreater() { + ClientCall call = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + defaultTestService.setUtilizationMetric("buffer", 0.2); + call.start(listener, new Metadata()); + call.sendMessage(OrcaLoadReportRequest.newBuilder() + .setReportInterval(Duration.newBuilder().setSeconds(10).build()).build()); + call.halfClose(); + call.request(1); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); + verify(listener).onMessage(eq(expect)); + reset(listener); + defaultTestService.deleteUtilizationMetric("buffer0"); + assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0); + verifyNoInteractions(listener); + assertThat(fakeClock.forwardTime(9, TimeUnit.SECONDS)).isEqualTo(1); + call.request(1); + verify(listener).onMessage(eq(expect)); + } + + @Test + @SuppressWarnings("unchecked") + public void testRequestIntervalDefault() throws Exception { + defaultTestService = new OrcaOobService(fakeClock.getScheduledExecutorService()); + oobServer.shutdownNow(); + startServerAndGetChannel(defaultTestService.getService()); + ClientCall call = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + defaultTestService.setUtilizationMetric("buffer", 0.2); + call.start(listener, new Metadata()); + call.sendMessage(OrcaLoadReportRequest.newBuilder() + .setReportInterval(Duration.newBuilder().setSeconds(10).build()).build()); + call.halfClose(); + call.request(1); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build(); + verify(listener).onMessage(eq(expect)); + reset(listener); + defaultTestService.deleteUtilizationMetric("buffer0"); + assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(0); + verifyNoInteractions(listener); + assertThat(fakeClock.forwardTime(20, TimeUnit.SECONDS)).isEqualTo(1); + call.request(1); + verify(listener).onMessage(eq(expect)); + } + + @Test + public void testMultipleClients() { + ClientCall call = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + defaultTestService.setUtilizationMetric("omg", 100); + call.start(listener, new Metadata()); + call.sendMessage(OrcaLoadReportRequest.newBuilder().build()); + call.halfClose(); + call.request(1); + OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("omg", 100).build(); + verify(listener).onMessage(eq(expect)); + defaultTestService.setMemoryUtilizationMetric(0.5); + ClientCall call2 = channel.newCall( + OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT); + call2.start(listener, new Metadata()); + call2.sendMessage(OrcaLoadReportRequest.newBuilder().build()); + call2.halfClose(); + call2.request(1); + expect = OrcaLoadReport.newBuilder(expect).setMemUtilization(0.5).build(); + verify(listener).onMessage(eq(expect)); + assertThat(defaultTestService.getClientsCount()).isEqualTo(2); + assertThat(fakeClock.getPendingTasks().size()).isEqualTo(2); + channel.shutdownNow(); + assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0); + assertThat(defaultTestService.getClientsCount()).isEqualTo(0); + ArgumentCaptor callCloseCaptor = ArgumentCaptor.forClass(null); + verify(listener, times(2)).onClose(callCloseCaptor.capture(), any()); + assertThat(callCloseCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE); + } + + @Test + public void testApis() throws Exception { + Map firstUtilization = ImmutableMap.of("util", 0.1); + OrcaLoadReport goldenReport = OrcaLoadReport.newBuilder() + .setCpuUtilization(random.nextDouble()) + .setMemUtilization(random.nextDouble()) + .putAllUtilization(firstUtilization) + .putUtilization("queue", 1.0) + .build(); + defaultTestService.setCpuUtilizationMetric(goldenReport.getCpuUtilization()); + defaultTestService.setMemoryUtilizationMetric(goldenReport.getMemUtilization()); + defaultTestService.setAllUtilizationMetrics(firstUtilization); + defaultTestService.setUtilizationMetric("queue", 1.0); + Iterator reports = OpenRcaServiceGrpc.newBlockingStub(channel) + .streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build()); + assertThat(reports.next()).isEqualTo(goldenReport); + + defaultTestService.deleteCpuUtilizationMetric(); + defaultTestService.deleteMemoryUtilizationMetric(); + fakeClock.forwardTime(1, TimeUnit.SECONDS); + goldenReport = OrcaLoadReport.newBuilder() + .putAllUtilization(firstUtilization) + .putUtilization("queue", 1.0) + .putUtilization("util", 0.1) + .build(); + assertThat(reports.next()).isEqualTo(goldenReport); + defaultTestService.deleteUtilizationMetric("util-not-exist"); + defaultTestService.deleteUtilizationMetric("queue-not-exist"); + fakeClock.forwardTime(1, TimeUnit.SECONDS); + assertThat(reports.next()).isEqualTo(goldenReport); + + CyclicBarrier barrier = new CyclicBarrier(2); + new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + } catch (Exception ex) { + throw new AssertionError(ex); + } + defaultTestService.deleteUtilizationMetric("util"); + defaultTestService.setMemoryUtilizationMetric(0.4); + defaultTestService.setAllUtilizationMetrics(firstUtilization); + try { + barrier.await(); + } catch (Exception ex) { + throw new AssertionError(ex); + } + } + }).start(); + barrier.await(); + defaultTestService.setMemoryUtilizationMetric(0.4); + defaultTestService.deleteUtilizationMetric("util"); + defaultTestService.setAllUtilizationMetrics(firstUtilization); + barrier.await(); + goldenReport = OrcaLoadReport.newBuilder() + .putAllUtilization(firstUtilization) + .setMemUtilization(0.4) + .build(); + fakeClock.forwardTime(1, TimeUnit.SECONDS); + assertThat(reports.next()).isEqualTo(goldenReport); + } +}