xds: add OrcaServiceImpl (#8993)

This commit is contained in:
yifeizhuang 2022-03-29 08:48:02 -07:00 committed by GitHub
parent 6554061076
commit 72ae95792c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 559 additions and 0 deletions

View File

@ -0,0 +1,121 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.BindableService;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Implements the service/APIs for Out-of-Band metrics reporting, only for utilization metrics.
* Register the returned service {@link #getService()} to the server, then a client can request
* for periodic load reports. A user should use the public set-APIs to update the server machine's
* utilization metrics data.
*/
@io.grpc.ExperimentalApi("https://github.com/grpc/grpc-java/issues/9006")
public final class OrcaOobService {
/**
* Empty or invalid (non-positive) minInterval config in will be treated to this default value.
*/
public static final long DEFAULT_MIN_REPORT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(30);
private final OrcaServiceImpl orcaService;
/**
* Construct an OOB metrics reporting service.
*
* @param minInterval configures the minimum metrics reporting interval for the service. Bad
* configuration (non-positive) will be overridden to service default (30s).
* Minimum metrics reporting interval means, if the setting in the client's
* request is invalid (non-positive) or below this value, they will be treated
* as this value.
*/
public OrcaOobService(long minInterval, TimeUnit timeUnit,
ScheduledExecutorService timeService) {
this.orcaService = new OrcaServiceImpl(minInterval > 0 ? timeUnit.toNanos(minInterval)
: DEFAULT_MIN_REPORT_INTERVAL_NANOS, checkNotNull(timeService));
}
public OrcaOobService(ScheduledExecutorService timeService) {
this(DEFAULT_MIN_REPORT_INTERVAL_NANOS, TimeUnit.NANOSECONDS, timeService);
}
/**
* Returns the service instance to be bound to the server for ORCA OOB functionality.
*/
public BindableService getService() {
return orcaService;
}
@VisibleForTesting
int getClientsCount() {
return orcaService.clientCount.get();
}
/**
* Update the metrics value corresponding to the specified key.
*/
public void setUtilizationMetric(String key, double value) {
orcaService.setUtilizationMetric(key, value);
}
/**
* Replace the whole metrics data using the specified map.
*/
public void setAllUtilizationMetrics(Map<String, Double> metrics) {
orcaService.setAllUtilizationMetrics(metrics);
}
/**
* Remove the metrics data entry corresponding to the specified key.
*/
public void deleteUtilizationMetric(String key) {
orcaService.deleteUtilizationMetric(key);
}
/**
* Update the CPU utilization metrics data.
*/
public void setCpuUtilizationMetric(double value) {
orcaService.setCpuUtilizationMetric(value);
}
/**
* Clear the CPU utilization metrics data.
*/
public void deleteCpuUtilizationMetric() {
orcaService.deleteCpuUtilizationMetric();
}
/**
* Update the memory utilization metrics data.
*/
public void setMemoryUtilizationMetric(double value) {
orcaService.setMemoryUtilizationMetric(value);
}
/**
* Clear the memory utilization metrics data.
*/
public void deleteMemoryUtilizationMetric() {
orcaService.deleteMemoryUtilizationMetric();
}
}

View File

@ -0,0 +1,141 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkNotNull;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.github.xds.service.orca.v3.OpenRcaServiceGrpc;
import com.github.xds.service.orca.v3.OrcaLoadReportRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Durations;
import io.grpc.SynchronizationContext;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
final class OrcaServiceImpl extends OpenRcaServiceGrpc.OpenRcaServiceImplBase {
private static final Logger logger = Logger.getLogger(OrcaServiceImpl.class.getName());
private final long minReportIntervalNanos;
private final ScheduledExecutorService timeService;
private volatile ConcurrentHashMap<String, Double> metricsData = new ConcurrentHashMap<>();
private volatile double cpuUtilization;
private volatile double memoryUtilization;
@VisibleForTesting
final AtomicInteger clientCount = new AtomicInteger(0);
public OrcaServiceImpl(long minReportIntervalNanos, ScheduledExecutorService timeService) {
this.minReportIntervalNanos = minReportIntervalNanos;
this.timeService = checkNotNull(timeService);
}
@Override
public void streamCoreMetrics(
OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) {
OrcaClient client = new OrcaClient(request, responseObserver);
client.run();
clientCount.getAndIncrement();
}
private final class OrcaClient implements Runnable {
final OrcaLoadReportRequest request;
final ServerCallStreamObserver<OrcaLoadReport> responseObserver;
SynchronizationContext.ScheduledHandle periodicReportTimer;
final long reportIntervalNanos;
final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.log(Level.SEVERE, "Exception!" + e);
}
});
OrcaClient(OrcaLoadReportRequest request, StreamObserver<OrcaLoadReport> responseObserver) {
this.request = checkNotNull(request);
this.reportIntervalNanos = Math.max(Durations.toNanos(request.getReportInterval()),
minReportIntervalNanos);
this.responseObserver = (ServerCallStreamObserver<OrcaLoadReport>) responseObserver;
this.responseObserver.setOnCancelHandler(new Runnable() {
@Override
public void run() {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (periodicReportTimer != null) {
periodicReportTimer.cancel();
}
clientCount.getAndDecrement();
}
});
}
});
}
@Override
public void run() {
if (periodicReportTimer != null && periodicReportTimer.isPending()) {
return;
}
OrcaLoadReport report = generateMetricsReport();
responseObserver.onNext(report);
periodicReportTimer = syncContext.schedule(OrcaClient.this, reportIntervalNanos,
TimeUnit.NANOSECONDS, timeService);
}
}
private OrcaLoadReport generateMetricsReport() {
return OrcaLoadReport.newBuilder().setCpuUtilization(cpuUtilization)
.setMemUtilization(memoryUtilization)
.putAllUtilization(metricsData)
.build();
}
void setUtilizationMetric(String key, double value) {
metricsData.put(key, value);
}
void setAllUtilizationMetrics(Map<String, Double> metrics) {
metricsData = new ConcurrentHashMap<>(metrics);
}
void deleteUtilizationMetric(String key) {
metricsData.remove(key);
}
void setCpuUtilizationMetric(double value) {
cpuUtilization = value;
}
void deleteCpuUtilizationMetric() {
cpuUtilization = 0;
}
void setMemoryUtilizationMetric(double value) {
memoryUtilization = value;
}
void deleteMemoryUtilizationMetric() {
memoryUtilization = 0;
}
}

View File

@ -0,0 +1,297 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.github.xds.service.orca.v3.OpenRcaServiceGrpc;
import com.github.xds.service.orca.v3.OrcaLoadReportRequest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Duration;
import io.grpc.BindableService;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.FakeClock;
import io.grpc.testing.GrpcCleanupRule;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class OrcaServiceImplTest {
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
private ManagedChannel channel;
private Server oobServer;
private final FakeClock fakeClock = new FakeClock();
private OrcaOobService defaultTestService;
private final Random random = new Random();
@Mock
ClientCall.Listener<OrcaLoadReport> listener;
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
defaultTestService = new OrcaOobService(1, TimeUnit.SECONDS,
fakeClock.getScheduledExecutorService());
startServerAndGetChannel(defaultTestService.getService());
}
@After
public void teardown() throws Exception {
channel.shutdownNow();
}
private void startServerAndGetChannel(BindableService orcaService) throws Exception {
oobServer = grpcCleanup.register(
InProcessServerBuilder.forName("orca-service-test")
.addService(orcaService)
.directExecutor()
.build()
.start());
channel = grpcCleanup.register(
InProcessChannelBuilder.forName("orca-service-test")
.directExecutor().build());
}
@Test
public void testReportingLifeCycle() {
defaultTestService.setCpuUtilizationMetric(0.1);
Iterator<OrcaLoadReport> reports = OpenRcaServiceGrpc.newBlockingStub(channel)
.streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build());
assertThat(reports.next()).isEqualTo(
OrcaLoadReport.newBuilder().setCpuUtilization(0.1).build());
assertThat(defaultTestService.getClientsCount()).isEqualTo(1);
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1);
assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1);
assertThat(reports.next()).isEqualTo(
OrcaLoadReport.newBuilder().setCpuUtilization(0.1).build());
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(1);
channel.shutdownNow();
assertThat(defaultTestService.getClientsCount()).isEqualTo(0);
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(0);
}
@Test
@SuppressWarnings("unchecked")
public void testReportingLifeCycle_serverShutdown() {
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
defaultTestService.setUtilizationMetric("buffer", 0.2);
call.start(listener, new Metadata());
call.sendMessage(OrcaLoadReportRequest.newBuilder()
.setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build());
call.halfClose();
call.request(1);
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build();
assertThat(defaultTestService.getClientsCount()).isEqualTo(1);
verify(listener).onMessage(eq(expect));
reset(listener);
oobServer.shutdownNow();
assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0);
assertThat(defaultTestService.getClientsCount()).isEqualTo(0);
ArgumentCaptor<Status> callCloseCaptor = ArgumentCaptor.forClass(null);
verify(listener).onClose(callCloseCaptor.capture(), any());
assertThat(callCloseCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
}
@Test
@SuppressWarnings("unchecked")
public void testRequestIntervalLess() {
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
defaultTestService.setUtilizationMetric("buffer", 0.2);
call.start(listener, new Metadata());
call.sendMessage(OrcaLoadReportRequest.newBuilder()
.setReportInterval(Duration.newBuilder().setSeconds(0).setNanos(500).build()).build());
call.halfClose();
call.request(1);
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build();
verify(listener).onMessage(eq(expect));
reset(listener);
defaultTestService.deleteUtilizationMetric("buffer0");
assertThat(fakeClock.forwardTime(500, TimeUnit.NANOSECONDS)).isEqualTo(0);
verifyNoInteractions(listener);
assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(1);
call.request(1);
verify(listener).onMessage(eq(expect));
}
@Test
@SuppressWarnings("unchecked")
public void testRequestIntervalGreater() {
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
defaultTestService.setUtilizationMetric("buffer", 0.2);
call.start(listener, new Metadata());
call.sendMessage(OrcaLoadReportRequest.newBuilder()
.setReportInterval(Duration.newBuilder().setSeconds(10).build()).build());
call.halfClose();
call.request(1);
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build();
verify(listener).onMessage(eq(expect));
reset(listener);
defaultTestService.deleteUtilizationMetric("buffer0");
assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0);
verifyNoInteractions(listener);
assertThat(fakeClock.forwardTime(9, TimeUnit.SECONDS)).isEqualTo(1);
call.request(1);
verify(listener).onMessage(eq(expect));
}
@Test
@SuppressWarnings("unchecked")
public void testRequestIntervalDefault() throws Exception {
defaultTestService = new OrcaOobService(fakeClock.getScheduledExecutorService());
oobServer.shutdownNow();
startServerAndGetChannel(defaultTestService.getService());
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
defaultTestService.setUtilizationMetric("buffer", 0.2);
call.start(listener, new Metadata());
call.sendMessage(OrcaLoadReportRequest.newBuilder()
.setReportInterval(Duration.newBuilder().setSeconds(10).build()).build());
call.halfClose();
call.request(1);
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("buffer", 0.2).build();
verify(listener).onMessage(eq(expect));
reset(listener);
defaultTestService.deleteUtilizationMetric("buffer0");
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(0);
verifyNoInteractions(listener);
assertThat(fakeClock.forwardTime(20, TimeUnit.SECONDS)).isEqualTo(1);
call.request(1);
verify(listener).onMessage(eq(expect));
}
@Test
public void testMultipleClients() {
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
defaultTestService.setUtilizationMetric("omg", 100);
call.start(listener, new Metadata());
call.sendMessage(OrcaLoadReportRequest.newBuilder().build());
call.halfClose();
call.request(1);
OrcaLoadReport expect = OrcaLoadReport.newBuilder().putUtilization("omg", 100).build();
verify(listener).onMessage(eq(expect));
defaultTestService.setMemoryUtilizationMetric(0.5);
ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call2 = channel.newCall(
OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
call2.start(listener, new Metadata());
call2.sendMessage(OrcaLoadReportRequest.newBuilder().build());
call2.halfClose();
call2.request(1);
expect = OrcaLoadReport.newBuilder(expect).setMemUtilization(0.5).build();
verify(listener).onMessage(eq(expect));
assertThat(defaultTestService.getClientsCount()).isEqualTo(2);
assertThat(fakeClock.getPendingTasks().size()).isEqualTo(2);
channel.shutdownNow();
assertThat(fakeClock.forwardTime(1, TimeUnit.SECONDS)).isEqualTo(0);
assertThat(defaultTestService.getClientsCount()).isEqualTo(0);
ArgumentCaptor<Status> callCloseCaptor = ArgumentCaptor.forClass(null);
verify(listener, times(2)).onClose(callCloseCaptor.capture(), any());
assertThat(callCloseCaptor.getValue().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
}
@Test
public void testApis() throws Exception {
Map<String, Double> firstUtilization = ImmutableMap.of("util", 0.1);
OrcaLoadReport goldenReport = OrcaLoadReport.newBuilder()
.setCpuUtilization(random.nextDouble())
.setMemUtilization(random.nextDouble())
.putAllUtilization(firstUtilization)
.putUtilization("queue", 1.0)
.build();
defaultTestService.setCpuUtilizationMetric(goldenReport.getCpuUtilization());
defaultTestService.setMemoryUtilizationMetric(goldenReport.getMemUtilization());
defaultTestService.setAllUtilizationMetrics(firstUtilization);
defaultTestService.setUtilizationMetric("queue", 1.0);
Iterator<OrcaLoadReport> reports = OpenRcaServiceGrpc.newBlockingStub(channel)
.streamCoreMetrics(OrcaLoadReportRequest.newBuilder().build());
assertThat(reports.next()).isEqualTo(goldenReport);
defaultTestService.deleteCpuUtilizationMetric();
defaultTestService.deleteMemoryUtilizationMetric();
fakeClock.forwardTime(1, TimeUnit.SECONDS);
goldenReport = OrcaLoadReport.newBuilder()
.putAllUtilization(firstUtilization)
.putUtilization("queue", 1.0)
.putUtilization("util", 0.1)
.build();
assertThat(reports.next()).isEqualTo(goldenReport);
defaultTestService.deleteUtilizationMetric("util-not-exist");
defaultTestService.deleteUtilizationMetric("queue-not-exist");
fakeClock.forwardTime(1, TimeUnit.SECONDS);
assertThat(reports.next()).isEqualTo(goldenReport);
CyclicBarrier barrier = new CyclicBarrier(2);
new Thread(new Runnable() {
@Override
public void run() {
try {
barrier.await();
} catch (Exception ex) {
throw new AssertionError(ex);
}
defaultTestService.deleteUtilizationMetric("util");
defaultTestService.setMemoryUtilizationMetric(0.4);
defaultTestService.setAllUtilizationMetrics(firstUtilization);
try {
barrier.await();
} catch (Exception ex) {
throw new AssertionError(ex);
}
}
}).start();
barrier.await();
defaultTestService.setMemoryUtilizationMetric(0.4);
defaultTestService.deleteUtilizationMetric("util");
defaultTestService.setAllUtilizationMetrics(firstUtilization);
barrier.await();
goldenReport = OrcaLoadReport.newBuilder()
.putAllUtilization(firstUtilization)
.setMemUtilization(0.4)
.build();
fakeClock.forwardTime(1, TimeUnit.SECONDS);
assertThat(reports.next()).isEqualTo(goldenReport);
}
}