interop-testing: Logging of stress test metrics

Introduces a flag to enable the logging of collected metrics. In some
cases it can be more practical to analyze the metrics from a log file
after the fact, instead of connecting to the metrics server to collect
the metrics as the client is running.
This commit is contained in:
Terry Wilson 2023-10-05 09:14:18 -07:00 committed by Terry Wilson
parent bf9ccc68f5
commit 8a247ad16a
2 changed files with 43 additions and 1 deletions

View File

@ -55,6 +55,8 @@ import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -85,6 +87,7 @@ public class StressTestClient {
try { try {
client.startMetricsService(); client.startMetricsService();
client.runStressTest(); client.runStressTest();
client.startMetricsLogging();
client.blockUntilStressTestComplete(); client.blockUntilStressTestComplete();
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING, "The stress test client encountered an error!", e); log.log(Level.WARNING, "The stress test client encountered an error!", e);
@ -106,6 +109,7 @@ public class StressTestClient {
private int channelsPerServer = 1; private int channelsPerServer = 1;
private int stubsPerChannel = 1; private int stubsPerChannel = 1;
private int metricsPort = 8081; private int metricsPort = 8081;
private int metricsLogRateSecs = -1;
private Server metricsServer; private Server metricsServer;
private final Map<String, Metrics.GaugeResponse> gauges = private final Map<String, Metrics.GaugeResponse> gauges =
@ -120,6 +124,7 @@ public class StressTestClient {
new ArrayList<>(); new ArrayList<>();
private final List<ManagedChannel> channels = new ArrayList<>(); private final List<ManagedChannel> channels = new ArrayList<>();
private ListeningExecutorService threadpool; private ListeningExecutorService threadpool;
private ScheduledExecutorService metricsLoggingThreadpool;
@VisibleForTesting @VisibleForTesting
void parseArgs(String[] args) { void parseArgs(String[] args) {
@ -162,6 +167,8 @@ public class StressTestClient {
stubsPerChannel = Integer.valueOf(value); stubsPerChannel = Integer.valueOf(value);
} else if ("metrics_port".equals(key)) { } else if ("metrics_port".equals(key)) {
metricsPort = Integer.valueOf(value); metricsPort = Integer.valueOf(value);
} else if ("metrics_log_rate_secs".equals(key)) {
metricsLogRateSecs = Integer.valueOf(value);
} else { } else {
System.err.println("Unknown argument: " + key); System.err.println("Unknown argument: " + key);
usage = true; usage = true;
@ -198,6 +205,8 @@ public class StressTestClient {
+ "\n --num_stubs_per_channel=INT Default: " + c.stubsPerChannel + "\n --num_stubs_per_channel=INT Default: " + c.stubsPerChannel
+ "\n --metrics_port=PORT Listening port of the metrics server." + "\n --metrics_port=PORT Listening port of the metrics server."
+ " Default: " + c.metricsPort + " Default: " + c.metricsPort
+ "\n --metrics_log_rate_secs=INT The rate (in secs) to log collected metrics"
+ " Default: " + c.metricsLogRateSecs
); );
System.exit(1); System.exit(1);
} }
@ -213,6 +222,24 @@ public class StressTestClient {
.start(); .start();
} }
/** Starts logging gauge information at a given rate (if rate > -1). */
@SuppressWarnings("FutureReturnValueIgnored")
void startMetricsLogging() {
if (metricsLogRateSecs > -1) {
metricsLoggingThreadpool = Executors.newScheduledThreadPool(1);
metricsLoggingThreadpool.scheduleAtFixedRate(() -> {
long totalQps = 0;
for (Metrics.GaugeResponse gauge : gauges.values()) {
log.info("GAUGE: " + gauge);
if (gauge.getName().endsWith("/qps")) {
totalQps += gauge.getLongValue();
}
}
log.info("TOTAL QPS: " + totalQps);
}, metricsLogRateSecs, metricsLogRateSecs, SECONDS);
}
}
@VisibleForTesting @VisibleForTesting
void runStressTest() throws Exception { void runStressTest() throws Exception {
Preconditions.checkState(!shutdown, "client was shutdown."); Preconditions.checkState(!shutdown, "client was shutdown.");
@ -283,6 +310,14 @@ public class StressTestClient {
} catch (Throwable t) { } catch (Throwable t) {
log.log(Level.WARNING, "Error shutting down threadpool.", t); log.log(Level.WARNING, "Error shutting down threadpool.", t);
} }
try {
if (metricsLoggingThreadpool != null) {
metricsLoggingThreadpool.shutdownNow();
}
} catch (Throwable t) {
log.log(Level.WARNING, "Error shutting down metrics logging threadpool.", t);
}
} }
@VisibleForTesting @VisibleForTesting
@ -661,4 +696,9 @@ public class StressTestClient {
int metricsPort() { int metricsPort() {
return metricsPort; return metricsPort;
} }
@VisibleForTesting
int metricsLogRateSecs() {
return metricsLogRateSecs;
}
} }

View File

@ -79,7 +79,8 @@ public class StressTestClientTest {
"--metrics_port=9090", "--metrics_port=9090",
"--server_host_override=foo.test.google.fr", "--server_host_override=foo.test.google.fr",
"--use_tls=true", "--use_tls=true",
"--use_test_ca=true" "--use_test_ca=true",
"--metrics_log_rate_secs=60"
}); });
List<InetSocketAddress> addresses = Arrays.asList(new InetSocketAddress("localhost", 8080), List<InetSocketAddress> addresses = Arrays.asList(new InetSocketAddress("localhost", 8080),
@ -99,6 +100,7 @@ public class StressTestClientTest {
assertEquals(10, client.channelsPerServer()); assertEquals(10, client.channelsPerServer());
assertEquals(5, client.stubsPerChannel()); assertEquals(5, client.stubsPerChannel());
assertEquals(9090, client.metricsPort()); assertEquals(9090, client.metricsPort());
assertEquals(60, client.metricsLogRateSecs());
} }
@Test @Test