interop-tests: reduce flakiness of proxy tests

This commit is contained in:
mfcripps 2016-08-19 12:46:20 -07:00 committed by Eric Anderson
parent 1f7fb044ab
commit d854da63d9
2 changed files with 55 additions and 44 deletions

View File

@ -33,6 +33,9 @@ package io.grpc.testing.integration;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -55,41 +58,53 @@ public class ProxyTest {
private int serverPort = 5001; private int serverPort = 5001;
private int proxyPort = 5050; private int proxyPort = 5050;
private String loopBack = "127.0.0.1"; private static TrafficControlProxy proxy;
private static Socket client;
private static Server server;
private static ThreadPoolExecutor executor = private static ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); new ThreadPoolExecutor(8, 8, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("proxy-test-pool", true));
@AfterClass @AfterClass
public static void stopExecutor() { public static void stopExecutor() {
executor.shutdown(); executor.shutdown();
} }
@After
public void shutdownTest() throws IOException {
proxy.shutDown();
server.shutDown();
client.close();
}
@Test @Test
public void smallLatency() public void smallLatency()
throws UnknownHostException, IOException, InterruptedException, ExecutionException { throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server(); server = new Server();
Thread serverThread = new Thread(server); executor.submit(server);
serverThread.start();
int latency = (int) TimeUnit.MILLISECONDS.toNanos(10); int latency = (int) TimeUnit.MILLISECONDS.toNanos(50);
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); proxy = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
startProxy(p).get(); startProxy(proxy).get();
Socket client = new Socket(loopBack, proxyPort); client = new Socket("localhost", proxyPort);
client.setReuseAddress(true); client.setReuseAddress(true);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream());
byte[] message = new byte[1]; byte[] message = new byte[1];
// warmup
for (int i = 0; i < 5; i++) {
clientOut.write(message, 0, 1);
}
clientIn.readFully(new byte[5]);
// test // test
long start = System.nanoTime(); long start = System.nanoTime();
clientOut.write(message, 0, 1); clientOut.write(message, 0, 1);
clientIn.read(message); clientIn.read(message);
long stop = System.nanoTime(); long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long rtt = (stop - start); long rtt = (stop - start);
assertEquals(latency, rtt, latency); assertEquals(latency, rtt, latency);
} }
@ -97,28 +112,29 @@ public class ProxyTest {
@Test @Test
public void bigLatency() public void bigLatency()
throws UnknownHostException, IOException, InterruptedException, ExecutionException { throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server(); server = new Server();
Thread serverThread = new Thread(server); executor.submit(server);
serverThread.start();
int latency = (int) TimeUnit.MILLISECONDS.toNanos(250); int latency = (int) TimeUnit.MILLISECONDS.toNanos(250);
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS); proxy = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
startProxy(p).get(); startProxy(proxy).get();
Socket client = new Socket(loopBack, proxyPort); client = new Socket("localhost", proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream());
byte[] message = new byte[1]; byte[] message = new byte[1];
// warmup
for (int i = 0; i < 5; i++) {
clientOut.write(message, 0, 1);
}
clientIn.readFully(new byte[5]);
// test // test
long start = System.nanoTime(); long start = System.nanoTime();
clientOut.write(message, 0, 1); clientOut.write(message, 0, 1);
clientIn.read(message); clientIn.read(message);
long stop = System.nanoTime(); long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long rtt = (stop - start); long rtt = (stop - start);
assertEquals(latency, rtt, latency); assertEquals(latency, rtt, latency);
} }
@ -126,15 +142,15 @@ public class ProxyTest {
@Test @Test
public void smallBandwidth() public void smallBandwidth()
throws UnknownHostException, IOException, InterruptedException, ExecutionException { throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server(); server = new Server();
server.setMode("stream"); server.setMode("stream");
(new Thread(server)).start(); executor.submit(server);
assertEquals(server.mode(), "stream"); assertEquals(server.mode(), "stream");
int bandwidth = 64 * 1024; int bandwidth = 64 * 1024;
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); proxy = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
startProxy(p).get(); startProxy(proxy).get();
Socket client = new Socket(loopBack, proxyPort); client = new Socket("localhost", proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream());
@ -144,10 +160,6 @@ public class ProxyTest {
clientIn.readFully(new byte[5 * bandwidth]); clientIn.readFully(new byte[5 * bandwidth]);
long stop = System.nanoTime(); long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
assertEquals(bandwidth, bandUsed, .5 * bandwidth); assertEquals(bandwidth, bandUsed, .5 * bandwidth);
} }
@ -155,14 +167,14 @@ public class ProxyTest {
@Test @Test
public void largeBandwidth() public void largeBandwidth()
throws UnknownHostException, IOException, InterruptedException, ExecutionException { throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server(); server = new Server();
server.setMode("stream"); server.setMode("stream");
(new Thread(server)).start(); executor.submit(server);
assertEquals(server.mode(), "stream"); assertEquals(server.mode(), "stream");
int bandwidth = 10 * 1024 * 1024; int bandwidth = 10 * 1024 * 1024;
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS); proxy = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
startProxy(p).get(); startProxy(proxy).get();
Socket client = new Socket(loopBack, proxyPort); client = new Socket("localhost", proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream()); DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream()); DataInputStream clientIn = new DataInputStream(client.getInputStream());
@ -172,10 +184,6 @@ public class ProxyTest {
clientIn.readFully(new byte[5 * bandwidth]); clientIn.readFully(new byte[5 * bandwidth]);
long stop = System.nanoTime(); long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1))); long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
assertEquals(bandwidth, bandUsed, .5 * bandwidth); assertEquals(bandwidth, bandUsed, .5 * bandwidth);
} }
@ -210,8 +218,8 @@ public class ProxyTest {
public void shutDown() { public void shutDown() {
try { try {
rcv.close();
server.close(); server.close();
rcv.close();
shutDown = true; shutDown = true;
} catch (IOException e) { } catch (IOException e) {
shutDown = true; shutDown = true;
@ -244,7 +252,7 @@ public class ProxyTest {
System.out.println("Unknown mode: use 'echo' or 'stream'"); System.out.println("Unknown mode: use 'echo' or 'stream'");
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); throw new RuntimeException(e);
} }
} }
} }

View File

@ -33,6 +33,8 @@ package io.grpc.testing.integration;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -66,7 +68,8 @@ public final class TrafficControlProxy {
private Socket serverSock; private Socket serverSock;
private Socket clientSock; private Socket clientSock;
private final ThreadPoolExecutor executor = private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory("proxy-pool", true));
/** /**
* Returns a new TrafficControlProxy with default bandwidth and latency. * Returns a new TrafficControlProxy with default bandwidth and latency.