integration test: add a flowcontrolling proxy and integration tests

This commit is contained in:
mfcripps 2016-08-17 15:33:13 -07:00 committed by Carl Mastrangelo
parent 0d89bb4942
commit 09d663faf1
2 changed files with 517 additions and 0 deletions

View File

@ -0,0 +1,251 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.testing.integration;
import static org.junit.Assert.assertEquals;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@RunWith(JUnit4.class)
public class ProxyTest {
private int serverPort = 5001;
private int proxyPort = 5050;
private String loopBack = "127.0.0.1";
private static ThreadPoolExecutor executor =
new ThreadPoolExecutor(1, 4, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
@AfterClass
public static void stopExecutor() {
executor.shutdown();
}
@Test
public void smallLatency()
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server();
Thread serverThread = new Thread(server);
serverThread.start();
int latency = (int) TimeUnit.MILLISECONDS.toNanos(10);
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
startProxy(p).get();
Socket client = new Socket(loopBack, proxyPort);
client.setReuseAddress(true);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream());
byte[] message = new byte[1];
// test
long start = System.nanoTime();
clientOut.write(message, 0, 1);
clientIn.read(message);
long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long rtt = (stop - start);
assertEquals(latency, rtt, latency);
}
@Test
public void bigLatency()
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server();
Thread serverThread = new Thread(server);
serverThread.start();
int latency = (int) TimeUnit.MILLISECONDS.toNanos(250);
TrafficControlProxy p = new TrafficControlProxy(1024 * 1024, latency, TimeUnit.NANOSECONDS);
startProxy(p).get();
Socket client = new Socket(loopBack, proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream());
byte[] message = new byte[1];
// test
long start = System.nanoTime();
clientOut.write(message, 0, 1);
clientIn.read(message);
long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long rtt = (stop - start);
assertEquals(latency, rtt, latency);
}
@Test
public void smallBandwidth()
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server();
server.setMode("stream");
(new Thread(server)).start();
assertEquals(server.mode(), "stream");
int bandwidth = 64 * 1024;
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
startProxy(p).get();
Socket client = new Socket(loopBack, proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream());
clientOut.write(new byte[1]);
clientIn.readFully(new byte[100 * 1024]);
long start = System.nanoTime();
clientIn.readFully(new byte[5 * bandwidth]);
long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
assertEquals(bandwidth, bandUsed, .5 * bandwidth);
}
@Test
public void largeBandwidth()
throws UnknownHostException, IOException, InterruptedException, ExecutionException {
Server server = new Server();
server.setMode("stream");
(new Thread(server)).start();
assertEquals(server.mode(), "stream");
int bandwidth = 10 * 1024 * 1024;
TrafficControlProxy p = new TrafficControlProxy(bandwidth, 200, TimeUnit.MILLISECONDS);
startProxy(p).get();
Socket client = new Socket(loopBack, proxyPort);
DataOutputStream clientOut = new DataOutputStream(client.getOutputStream());
DataInputStream clientIn = new DataInputStream(client.getInputStream());
clientOut.write(new byte[1]);
clientIn.readFully(new byte[100 * 1024]);
long start = System.nanoTime();
clientIn.readFully(new byte[5 * bandwidth]);
long stop = System.nanoTime();
p.shutDown();
server.shutDown();
client.close();
long bandUsed = ((5 * bandwidth) / ((stop - start) / TimeUnit.SECONDS.toNanos(1)));
assertEquals(bandwidth, bandUsed, .5 * bandwidth);
}
private Future<?> startProxy(final TrafficControlProxy p) {
return executor.submit(new Runnable() {
@Override
public void run() {
try {
p.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
// server with echo and streaming modes
private class Server implements Runnable {
private ServerSocket server;
private Socket rcv;
private boolean shutDown;
private String mode = "echo";
public void setMode(String mode) {
this.mode = mode;
}
public String mode() {
return mode;
}
public void shutDown() {
try {
rcv.close();
server.close();
shutDown = true;
} catch (IOException e) {
shutDown = true;
}
}
@Override
public void run() {
try {
server = new ServerSocket(serverPort);
rcv = server.accept();
DataInputStream serverIn = new DataInputStream(rcv.getInputStream());
DataOutputStream serverOut = new DataOutputStream(rcv.getOutputStream());
byte[] response = new byte[1024];
if (mode.equals("echo")) {
while (!shutDown) {
int readable = serverIn.read(response);
serverOut.write(response, 0, readable);
}
} else if (mode.equals("stream")) {
serverIn.read(response);
byte[] message = new byte[16 * 1024];
while (!shutDown) {
serverOut.write(message, 0, message.length);
}
serverIn.close();
serverOut.close();
rcv.close();
} else {
System.out.println("Unknown mode: use 'echo' or 'stream'");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,266 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.testing.integration;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
public final class TrafficControlProxy {
private static final int DEFAULT_BAND_BPS = 1024 * 1024;
private static final int DEFAULT_DELAY_NANOS = 200 * 1000 * 1000;
private static final Logger logger = Logger.getLogger(TrafficControlProxy.class.getName());
// TODO: make host and ports arguments
private String localhost = "127.0.0.1";
private int proxyPort = 5050;
private int serverPort = 5001;
private int queueLength;
private int chunkSize;
private int bandwidth;
private long latency;
private volatile boolean shutDown;
private ServerSocket clientAcceptor;
private Socket serverSock;
private Socket clientSock;
private final ThreadPoolExecutor executor =
new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
/**
* Returns a new TrafficControlProxy with default bandwidth and latency.
*/
public TrafficControlProxy() {
this(DEFAULT_BAND_BPS, DEFAULT_DELAY_NANOS, TimeUnit.NANOSECONDS);
}
/**
* Returns a new TrafficControlProxy with bandwidth set to targetBPS, and latency set to
* targetLatency in latencyUnits.
*/
public TrafficControlProxy(int targetBps, int targetLatency, TimeUnit latencyUnits) {
checkArgument(targetBps > 0);
checkArgument(targetLatency > 0);
bandwidth = targetBps;
// divide by 2 because latency is applied in both directions
latency = latencyUnits.toNanos(targetLatency) / 2;
queueLength = (int) Math.max(bandwidth * latency / TimeUnit.SECONDS.toNanos(1), 1);
chunkSize = Math.max(1, queueLength);
}
/**
* Starts a new thread that waits for client and server and start reader/writer threads.
*/
public void start() throws IOException {
// ClientAcceptor uses a ServerSocket server so that the client can connect to the proxy as it
// normally would a server. serverSock then connects the server using a regular Socket as a
// client normally would.
logger.info("Starting new proxy on port " + proxyPort + " with Queue Length " + queueLength);
clientAcceptor = new ServerSocket();
clientAcceptor.setReuseAddress(true);
clientAcceptor.bind(new InetSocketAddress(localhost, proxyPort));
executor.submit(new Runnable() {
@Override
public void run() {
try {
clientSock = clientAcceptor.accept();
serverSock = new Socket();
serverSock.connect(new InetSocketAddress(localhost, serverPort));
startWorkers();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
/** Interrupt all workers and close sockets. */
public void shutDown() throws IOException {
// TODO: Handle case where a socket fails to close, therefore blocking the others from closing
logger.info("Proxy shutting down... ");
shutDown = true;
executor.shutdown();
clientAcceptor.close();
clientSock.close();
serverSock.close();
logger.info("Shutdown Complete");
}
private void startWorkers() throws IOException {
DataInputStream clientIn = new DataInputStream(clientSock.getInputStream());
DataOutputStream clientOut = new DataOutputStream(serverSock.getOutputStream());
DataInputStream serverIn = new DataInputStream(serverSock.getInputStream());
DataOutputStream serverOut = new DataOutputStream(clientSock.getOutputStream());
MessageQueue clientPipe = new MessageQueue(clientIn, clientOut);
MessageQueue serverPipe = new MessageQueue(serverIn, serverOut);
executor.submit(new Thread(new Reader(clientPipe)));
executor.submit(new Thread(new Writer(clientPipe)));
executor.submit(new Thread(new Reader(serverPipe)));
executor.submit(new Thread(new Writer(serverPipe)));
}
private final class Reader implements Runnable {
private final MessageQueue queue;
Reader(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (!shutDown) {
try {
queue.readIn();
} catch (IOException e) {
shutDown = true;
} catch (InterruptedException e) {
shutDown = true;
}
}
}
}
private final class Writer implements Runnable {
private final MessageQueue queue;
Writer(MessageQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (!shutDown) {
try {
queue.writeOut();
} catch (IOException e) {
shutDown = true;
} catch (InterruptedException e) {
shutDown = true;
}
}
}
}
/**
* A Delay Queue that counts by number of bytes instead of the number of elements.
*/
private class MessageQueue {
DataInputStream inStream;
DataOutputStream outStream;
int bytesQueued;
BlockingQueue<Message> queue = new DelayQueue<Message>();
MessageQueue(DataInputStream inputStream, DataOutputStream outputStream) {
inStream = inputStream;
outStream = outputStream;
}
/**
* Take a message off the queue and write it to an endpoint. Blocks until a message becomes
* available.
*/
void writeOut() throws InterruptedException, IOException {
Message next = queue.take();
outStream.write(next.message, 0, next.messageLength);
incrementBytes(-next.messageLength);
}
/**
* Read bytes from an endpoint and add them as a message to the queue. Blocks if the queue is
* full.
*/
void readIn() throws InterruptedException, IOException {
byte[] request = new byte[getNextChunk()];
int readableBytes = inStream.read(request);
long sendTime = System.nanoTime() + latency;
queue.put(new Message(sendTime, request, readableBytes));
incrementBytes(readableBytes);
}
/**
* Block until space on the queue becomes available. Returns how many bytes can be read on to
* the queue
*/
synchronized int getNextChunk() throws InterruptedException {
while (bytesQueued == queueLength) {
wait();
}
return Math.max(0, Math.min(chunkSize, queueLength - bytesQueued));
}
synchronized void incrementBytes(int delta) {
bytesQueued += delta;
if (bytesQueued < queueLength) {
notifyAll();
}
}
}
private static class Message implements Delayed {
long sendTime;
byte[] message;
int messageLength;
Message(long sendTime, byte[] message, int messageLength) {
this.sendTime = sendTime;
this.message = message;
this.messageLength = messageLength;
}
@Override
public int compareTo(Delayed o) {
return ((Long) sendTime).compareTo(((Message) o).sendTime);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(sendTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
}