diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/StressTestClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/StressTestClient.java index d2a5f6474f..82326237cb 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/StressTestClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/StressTestClient.java @@ -55,6 +55,8 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -85,6 +87,7 @@ public class StressTestClient { try { client.startMetricsService(); client.runStressTest(); + client.startMetricsLogging(); client.blockUntilStressTestComplete(); } catch (Exception e) { log.log(Level.WARNING, "The stress test client encountered an error!", e); @@ -106,6 +109,7 @@ public class StressTestClient { private int channelsPerServer = 1; private int stubsPerChannel = 1; private int metricsPort = 8081; + private int metricsLogRateSecs = -1; private Server metricsServer; private final Map gauges = @@ -120,6 +124,7 @@ public class StressTestClient { new ArrayList<>(); private final List channels = new ArrayList<>(); private ListeningExecutorService threadpool; + private ScheduledExecutorService metricsLoggingThreadpool; @VisibleForTesting void parseArgs(String[] args) { @@ -162,6 +167,8 @@ public class StressTestClient { stubsPerChannel = Integer.valueOf(value); } else if ("metrics_port".equals(key)) { metricsPort = Integer.valueOf(value); + } else if ("metrics_log_rate_secs".equals(key)) { + metricsLogRateSecs = Integer.valueOf(value); } else { System.err.println("Unknown argument: " + key); usage = true; @@ -198,6 +205,8 @@ public class StressTestClient { + "\n --num_stubs_per_channel=INT Default: " + c.stubsPerChannel + "\n --metrics_port=PORT Listening port of the metrics server." + " Default: " + c.metricsPort + + "\n --metrics_log_rate_secs=INT The rate (in secs) to log collected metrics" + + " Default: " + c.metricsLogRateSecs ); System.exit(1); } @@ -213,6 +222,24 @@ public class StressTestClient { .start(); } + /** Starts logging gauge information at a given rate (if rate > -1). */ + @SuppressWarnings("FutureReturnValueIgnored") + void startMetricsLogging() { + if (metricsLogRateSecs > -1) { + metricsLoggingThreadpool = Executors.newScheduledThreadPool(1); + metricsLoggingThreadpool.scheduleAtFixedRate(() -> { + long totalQps = 0; + for (Metrics.GaugeResponse gauge : gauges.values()) { + log.info("GAUGE: " + gauge); + if (gauge.getName().endsWith("/qps")) { + totalQps += gauge.getLongValue(); + } + } + log.info("TOTAL QPS: " + totalQps); + }, metricsLogRateSecs, metricsLogRateSecs, SECONDS); + } + } + @VisibleForTesting void runStressTest() throws Exception { Preconditions.checkState(!shutdown, "client was shutdown."); @@ -283,6 +310,14 @@ public class StressTestClient { } catch (Throwable t) { log.log(Level.WARNING, "Error shutting down threadpool.", t); } + + try { + if (metricsLoggingThreadpool != null) { + metricsLoggingThreadpool.shutdownNow(); + } + } catch (Throwable t) { + log.log(Level.WARNING, "Error shutting down metrics logging threadpool.", t); + } } @VisibleForTesting @@ -661,4 +696,9 @@ public class StressTestClient { int metricsPort() { return metricsPort; } + + @VisibleForTesting + int metricsLogRateSecs() { + return metricsLogRateSecs; + } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/StressTestClientTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/StressTestClientTest.java index 66a3f2da88..a56ca3d620 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/StressTestClientTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/StressTestClientTest.java @@ -79,7 +79,8 @@ public class StressTestClientTest { "--metrics_port=9090", "--server_host_override=foo.test.google.fr", "--use_tls=true", - "--use_test_ca=true" + "--use_test_ca=true", + "--metrics_log_rate_secs=60" }); List addresses = Arrays.asList(new InetSocketAddress("localhost", 8080), @@ -99,6 +100,7 @@ public class StressTestClientTest { assertEquals(10, client.channelsPerServer()); assertEquals(5, client.stubsPerChannel()); assertEquals(9090, client.metricsPort()); + assertEquals(60, client.metricsLogRateSecs()); } @Test