From d854da63d9e508d97d5e5a008d614472aa112c2c Mon Sep 17 00:00:00 2001 From: mfcripps Date: Fri, 19 Aug 2016 12:46:20 -0700 Subject: [PATCH] interop-tests: reduce flakiness of proxy tests --- .../grpc/testing/integration/ProxyTest.java | 94 ++++++++++--------- .../integration/TrafficControlProxy.java | 5 +- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java index f5c4ab27cd..bf26d37fd1 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/ProxyTest.java @@ -33,6 +33,9 @@ package io.grpc.testing.integration; import static org.junit.Assert.assertEquals; +import io.netty.util.concurrent.DefaultThreadFactory; + +import org.junit.After; import org.junit.AfterClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -55,41 +58,53 @@ public class ProxyTest { private int serverPort = 5001; private int proxyPort = 5050; - private String loopBack = "127.0.0.1"; + private static TrafficControlProxy proxy; + private static Socket client; + private static Server server; + private static ThreadPoolExecutor executor = - new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); + new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new DefaultThreadFactory("proxy-test-pool", true)); @AfterClass public static void stopExecutor() { executor.shutdown(); } + @After + public void shutdownTest() throws IOException { + proxy.shutDown(); + server.shutDown(); + client.close(); + } + @Test public void smallLatency() throws UnknownHostException, IOException, InterruptedException, ExecutionException { - Server server = new Server(); - Thread serverThread = new Thread(server); - serverThread.start(); + server = new Server(); + executor.submit(server); - int latency = (int) TimeUnit.MILLISECONDS.toNanos(10); - TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); - startProxy(p).get(); - Socket client = new Socket(loopBack, proxyPort); + int latency = (int) TimeUnit.MILLISECONDS.toNanos(50); + proxy = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); + startProxy(proxy).get(); + client = new Socket("localhost", proxyPort); client.setReuseAddress(true); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream()); byte[] message = new byte[1]; + // warmup + for (int i = 0; i < 5; i++) { + clientOut.write(message, 0, 1); + } + clientIn.readFully(new byte[5]); + // test long start = System.nanoTime(); clientOut.write(message, 0, 1); clientIn.read(message); long stop = System.nanoTime(); - p.shutDown(); - server.shutDown(); - client.close(); - long rtt = (stop - start); assertEquals(latency, rtt, latency); } @@ -97,28 +112,29 @@ public class ProxyTest { @Test public void bigLatency() throws UnknownHostException, IOException, InterruptedException, ExecutionException { - Server server = new Server(); - Thread serverThread = new Thread(server); - serverThread.start(); + server = new Server(); + executor.submit(server); int latency = (int) TimeUnit.MILLISECONDS.toNanos(250); - TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); - startProxy(p).get(); - Socket client = new Socket(loopBack, proxyPort); + proxy = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); + startProxy(proxy).get(); + client = new Socket("localhost", proxyPort); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream()); byte[] message = new byte[1]; + // warmup + for (int i = 0; i < 5; i++) { + clientOut.write(message, 0, 1); + } + clientIn.readFully(new byte[5]); + // test long start = System.nanoTime(); clientOut.write(message, 0, 1); clientIn.read(message); long stop = System.nanoTime(); - p.shutDown(); - server.shutDown(); - client.close(); - long rtt = (stop - start); assertEquals(latency, rtt, latency); } @@ -126,15 +142,15 @@ public class ProxyTest { @Test public void smallBandwidth() throws UnknownHostException, IOException, InterruptedException, ExecutionException { - Server server = new Server(); + server = new Server(); server.setMode("stream"); - (new Thread(server)).start(); + executor.submit(server); assertEquals(server.mode(), "stream"); int bandwidth = 64 * 1024; - TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); - startProxy(p).get(); - Socket client = new Socket(loopBack, proxyPort); + proxy = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); + startProxy(proxy).get(); + client = new Socket("localhost", proxyPort); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream()); @@ -144,10 +160,6 @@ public class ProxyTest { clientIn.readFully(new byte[5 * bandwidth]); long stop = System.nanoTime(); - p.shutDown(); - server.shutDown(); - client.close(); - long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); assertEquals(bandwidth, bandUsed, .5 * bandwidth); } @@ -155,14 +167,14 @@ public class ProxyTest { @Test public void largeBandwidth() throws UnknownHostException, IOException, InterruptedException, ExecutionException { - Server server = new Server(); + server = new Server(); server.setMode("stream"); - (new Thread(server)).start(); + executor.submit(server); assertEquals(server.mode(), "stream"); int bandwidth = 10 * 1024 * 1024; - TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); - startProxy(p).get(); - Socket client = new Socket(loopBack, proxyPort); + proxy = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); + startProxy(proxy).get(); + client = new Socket("localhost", proxyPort); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream()); @@ -172,10 +184,6 @@ public class ProxyTest { clientIn.readFully(new byte[5 * bandwidth]); long stop = System.nanoTime(); - p.shutDown(); - server.shutDown(); - client.close(); - long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); assertEquals(bandwidth, bandUsed, .5 * bandwidth); } @@ -210,8 +218,8 @@ public class ProxyTest { public void shutDown() { try { - rcv.close(); server.close(); + rcv.close(); shutDown = true; } catch (IOException e) { shutDown = true; @@ -244,7 +252,7 @@ public class ProxyTest { System.out.println("Unknown mode: use 'echo' or 'stream'"); } } catch (IOException e) { - e.printStackTrace(); + throw new RuntimeException(e); } } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java b/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java index 4e6a7b11b4..45abcf3d91 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TrafficControlProxy.java @@ -33,6 +33,8 @@ package io.grpc.testing.integration; import static com.google.common.base.Preconditions.checkArgument; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -66,7 +68,8 @@ public final class TrafficControlProxy { private Socket serverSock; private Socket clientSock; private final ThreadPoolExecutor executor = - new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue()); + new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(), + new DefaultThreadFactory("proxy-pool", true)); /** * Returns a new TrafficControlProxy with default bandwidth and latency.