Reimplement finding open ports (#2629)

This commit is contained in:
Lauri Tulmin 2021-03-25 19:41:20 +02:00 committed by GitHub
parent 2a2d4ae2f8
commit 4ad9ed5c32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 268 additions and 67 deletions

View File

@ -33,7 +33,7 @@ class RestCamelTest extends AgentInstrumentationSpecification implements RetryOn
}
def setupSpecUnderRetry() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
def app = new SpringApplication(RestConfig)
app.setDefaultProperties(["restServer.port": port])
server = app.run()

View File

@ -38,7 +38,7 @@ class SingleServiceCamelTest extends AgentInstrumentationSpecification implement
}
def setupSpecUnderRetry() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
address = new URI("http://localhost:$port/")
def app = new SpringApplication(SingleServiceConfig)
app.setDefaultProperties(["camelService.port": port])

View File

@ -46,7 +46,7 @@ class SqsCamelTest extends AgentInstrumentationSpecification {
void initialize(AbstractApplicationContext applicationContext) {applicationContext.getBeanFactory().registerSingleton("localStack", sqs)}})
server = app.run()**/
sqsPort = PortUtils.randomOpenPort()
sqsPort = PortUtils.findOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"

View File

@ -39,8 +39,8 @@ class TwoServicesWithDirectClientCamelTest extends AgentInstrumentationSpecifica
}
def setupSpecUnderRetry() {
portOne = PortUtils.randomOpenPort()
portTwo = PortUtils.randomOpenPort()
portOne = PortUtils.findOpenPort()
portTwo = PortUtils.findOpenPort()
def app = new SpringApplication(TwoServicesConfig)
app.setDefaultProperties(["service.one.port": portOne, "service.two.port": portTwo])
server = app.run()

View File

@ -52,7 +52,7 @@ abstract class AbstractDubboTest extends InstrumentationSpecification {
def "test apache dubbo base #dubbo"() {
setup:
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
protocolConfig.setPort(port)
DubboBootstrap bootstrap = DubboBootstrap.getInstance()
@ -121,7 +121,7 @@ abstract class AbstractDubboTest extends InstrumentationSpecification {
def "test apache dubbo test #dubbo"() {
setup:
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
protocolConfig.setPort(port)
DubboBootstrap bootstrap = DubboBootstrap.getInstance()

View File

@ -34,7 +34,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
def setupSpec() {
sqsPort = PortUtils.randomOpenPort()
sqsPort = PortUtils.findOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"

View File

@ -31,7 +31,7 @@ abstract class AbstractCouchbaseTest extends AgentInstrumentationSpecification {
static final PASSWORD = "password"
@Shared
private int port = PortUtils.randomOpenPort()
private int port = PortUtils.findOpenPort()
@Shared
private String testBucketName = this.getClass().simpleName

View File

@ -38,7 +38,7 @@ class DropwizardTest extends HttpServerTest<DropwizardTestSupport> implements Ag
def testSupport = new DropwizardTestSupport(testApp(),
null,
ConfigOverride.config("server.applicationConnectors[0].port", "$port"),
ConfigOverride.config("server.adminConnectors[0].port", PortUtils.randomOpenPort().toString()))
ConfigOverride.config("server.adminConnectors[0].port", PortUtils.findOpenPort().toString()))
testSupport.before()
return testSupport
}

View File

@ -65,7 +65,7 @@ abstract class AbstractGrpcStreamingTest extends InstrumentationSpecification {
}
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))

View File

@ -58,7 +58,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
responseObserver.onCompleted()
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
@ -141,7 +141,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
responseObserver.onError(error)
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
@ -227,7 +227,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
throw error
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server = configureServer(ServerBuilder.forPort(port).addService(greeter)).build().start()
ManagedChannelBuilder channelBuilder = configureClient(ManagedChannelBuilder.forAddress("localhost", port))
@ -325,7 +325,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
responseObserver.onCompleted()
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server
server = configureServer(ServerBuilder.forPort(port)
.addService(greeter)
@ -466,7 +466,7 @@ abstract class AbstractGrpcTest extends InstrumentationSpecification {
responseObserver.onNext(Helloworld.Response.getDefaultInstance())
}
}
def port = PortUtils.randomOpenPort()
def port = PortUtils.findOpenPort()
Server server
server = configureServer(ServerBuilder.forPort(port)
.addService(greeter))

View File

@ -15,7 +15,7 @@ import spock.lang.Shared
class JedisClientTest extends AgentInstrumentationSpecification {
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
RedisServer redisServer = RedisServer.builder()

View File

@ -15,7 +15,7 @@ import spock.lang.Shared
class Jedis30ClientTest extends AgentInstrumentationSpecification {
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
RedisServer redisServer = RedisServer.builder()

View File

@ -44,7 +44,7 @@ class JspInstrumentationBasicTests extends AgentInstrumentationSpecification {
baseDir = Files.createTempDirectory("jsp").toFile()
baseDir.deleteOnExit()
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
tomcatServer = new Tomcat()
tomcatServer.setBaseDir(baseDir.getAbsolutePath())

View File

@ -41,7 +41,7 @@ class JspInstrumentationForwardTests extends AgentInstrumentationSpecification {
baseDir = Files.createTempDirectory("jsp").toFile()
baseDir.deleteOnExit()
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
tomcatServer = new Tomcat()
tomcatServer.setBaseDir(baseDir.getAbsolutePath())

View File

@ -63,8 +63,8 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -50,8 +50,8 @@ class LettuceSyncClientTest extends AgentInstrumentationSpecification {
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -66,8 +66,8 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = PEER_NAME + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = PEER_NAME + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -39,7 +39,7 @@ class LettuceReactiveClientTest extends AgentInstrumentationSpecification {
RedisCommands<String, ?> syncCommands
def setupSpec() {
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
String dbAddr = PEER_HOST + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr

View File

@ -52,8 +52,8 @@ class LettuceSyncClientTest extends AgentInstrumentationSpecification {
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = PEER_NAME + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = PEER_NAME + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -63,8 +63,8 @@ abstract class AbstractLettuceAsyncClientTest extends InstrumentationSpecificati
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -40,7 +40,7 @@ abstract class AbstractLettuceReactiveClientTest extends InstrumentationSpecific
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
String dbAddr = HOST + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr

View File

@ -35,7 +35,7 @@ abstract class AbstractLettuceSyncClientAuthTest extends InstrumentationSpecific
RedisClient redisClient
def setupSpec() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr
password = "password"

View File

@ -56,8 +56,8 @@ abstract class AbstractLettuceSyncClientTest extends InstrumentationSpecificatio
RedisCommands<String, ?> syncCommands
def setupSpec() {
port = PortUtils.randomOpenPort()
incorrectPort = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
incorrectPort = PortUtils.findOpenPort()
dbAddr = HOST + ":" + port + "/" + DB_INDEX
dbAddrNonExistent = HOST + ":" + incorrectPort + "/" + DB_INDEX
dbUriNonExistent = "redis://" + dbAddrNonExistent

View File

@ -30,7 +30,7 @@ class MongoBaseTest extends AgentInstrumentationSpecification {
private static final MongodStarter STARTER = MongodStarter.getDefaultInstance()
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
MongodExecutable mongodExe
@Shared

View File

@ -22,7 +22,7 @@ import spock.lang.Shared
class RediscalaClientTest extends AgentInstrumentationSpecification {
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
RedisServer redisServer = RedisServer.builder()

View File

@ -23,7 +23,7 @@ import spock.lang.Shared
class RedissonAsyncClientTest extends AgentInstrumentationSpecification {
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
RedisServer redisServer = RedisServer.builder()

View File

@ -28,7 +28,7 @@ import spock.lang.Shared
class RedissonClientTest extends AgentInstrumentationSpecification {
@Shared
int port = PortUtils.randomOpenPort()
int port = PortUtils.findOpenPort()
@Shared
RedisServer redisServer = RedisServer.builder()

View File

@ -18,7 +18,7 @@ import rmi.app.Server
import rmi.app.ServerLegacy
class RmiTest extends AgentInstrumentationSpecification {
def registryPort = PortUtils.randomOpenPort()
def registryPort = PortUtils.findOpenPort()
def serverRegistry = LocateRegistry.createRegistry(registryPort)
def clientRegistry = LocateRegistry.getRegistry("localhost", registryPort)

View File

@ -5,6 +5,7 @@
package base;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@ -12,7 +13,6 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
@ -37,13 +37,6 @@ public class IntegrationTestBase {
protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<>();
protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100;
protected static final int INDEX_NUM = 1000;
private static final AtomicInteger port = new AtomicInteger(40000);
public static synchronized int nextPort() {
return port.addAndGet(random.nextInt(10) + 10);
}
protected static final Random random = new Random();
private static String createTempDir() {
String path = null;
@ -77,7 +70,9 @@ public class IntegrationTestBase {
namesrvConfig.setKvConfigPath(kvConfigPath.toString());
namesrvConfig.setConfigStorePath(namesrvPath.toString());
nameServerNettyServerConfig.setListenPort(nextPort());
// find 3 consecutive open ports and use the last one of them
// rocketmq will also bind to given port - 2
nameServerNettyServerConfig.setListenPort(PortUtils.findOpenPorts(3) + 2);
NamesrvController namesrvController =
new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
try {
@ -113,8 +108,8 @@ public class IntegrationTestBase {
MessageStoreConfig storeConfig, BrokerConfig brokerConfig) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyServerConfig.setListenPort(nextPort());
storeConfig.setHaListenPort(nextPort());
nettyServerConfig.setListenPort(PortUtils.findOpenPort());
storeConfig.setHaListenPort(PortUtils.findOpenPort());
BrokerController brokerController =
new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
try {

View File

@ -22,7 +22,7 @@ class SparkJavaBasedTest extends AgentInstrumentationSpecification {
OkHttpClient client = OkHttpUtils.client()
def setupSpec() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
TestSparkJavaApplication.initSpark(port)
}

View File

@ -28,7 +28,7 @@ class VertxReactivePropagationTest extends AgentInstrumentationSpecification {
Vertx server
def setupSpec() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
server = VertxReactiveWebServer.start(port)
}

View File

@ -40,7 +40,7 @@ trait HttpServerTestTrait<SERVER> implements RetryOnAddressAlreadyInUseTrait {
}
def setupSpecUnderRetry() {
port = PortUtils.randomOpenPort()
port = PortUtils.findOpenPort()
address = buildAddress()
server = startServer(port)
println getClass().name + " http server started at: http://localhost:$port" + getContextPath()

View File

@ -0,0 +1,129 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.test.utils;
import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
/**
* Helper class for finding open ports that test servers can bind. Allocator splits allocation range
* to chunks and binds to the first port in chunk to claim it. If first port of chunk is already in
* use allocator assumes that some other process has already claimed that chunk and moves to next
* chunk. This should let us as run tests in parallel without them interfering with each other.
*/
class PortAllocator {
static final int CHUNK_SIZE = 100;
static final int RANGE_MIN = 11000;
// end of allocator port range, should be below ephemeral port range
static final int RANGE_MAX = 32768;
private final PortBinder portBinder;
private final List<Closeable> sockets = new ArrayList<>();
// next candidate port
private int next = RANGE_MIN;
private int nextChunkStart = RANGE_MIN;
PortAllocator() {
this(PortBinder.INSTANCE);
}
PortAllocator(PortBinder portBinder) {
this.portBinder = portBinder;
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
for (Closeable socket : sockets) {
try {
socket.close();
} catch (IOException ignored) {
}
}
}));
}
/** Find open port. */
int getPort() {
return getPorts(1);
}
/** Find consecutive range of open ports, returning the first one in the range. */
synchronized int getPorts(int count) {
// as we bind to first port in each chunk the max amount of
// consecutive ports that we can find is CHUNK_SIZE - 1
if (count < 1 || count >= CHUNK_SIZE) {
throw new IllegalStateException("Invalid count " + count);
}
while (next + count - 1 <= RANGE_MAX) {
// if current chunk doesn't have enough ports move to next chunk
if (next + count - 1 >= nextChunkStart) {
reserveNextChunk();
}
// find requested amount of consecutive ports
while (next + count - 1 < nextChunkStart && next + count - 1 <= RANGE_MAX) {
// result is the lowest port in consecutive range
int result = next;
for (int i = 0; i < count; i++) {
int port = next;
next++;
if (!portBinder.canBind(port)) {
// someone has allocated a port in our port range, ignore it and try with
// the next port
break;
} else if (i == count - 1) {
return result;
}
}
}
}
// give up when port range is exhausted
throw new IllegalStateException("Failed to find suitable port");
}
private void reserveNextChunk() {
while (nextChunkStart < RANGE_MAX) {
// reserve a chunk, if binding to first port of chunk fails
// move to next chunk
Closeable serverSocket = portBinder.bind(nextChunkStart);
if (serverSocket != null) {
sockets.add(serverSocket);
next = nextChunkStart + 1;
nextChunkStart += CHUNK_SIZE;
return;
}
nextChunkStart += CHUNK_SIZE;
}
// give up when port range is exhausted
throw new IllegalStateException("Failed to reserve suitable port range");
}
static class PortBinder {
static PortBinder INSTANCE = new PortBinder();
Closeable bind(int port) {
try {
return new ServerSocket(port);
} catch (IOException exception) {
return null;
}
}
boolean canBind(int port) {
try {
ServerSocket socket = new ServerSocket(port);
socket.close();
return true;
} catch (IOException exception) {
return false;
}
}
}
}

View File

@ -6,7 +6,6 @@
package io.opentelemetry.instrumentation.test.utils;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
@ -14,18 +13,16 @@ public class PortUtils {
public static int UNUSABLE_PORT = 61;
/** Open up a random, reusable port. */
public static int randomOpenPort() {
ServerSocket socket;
try {
socket = new ServerSocket(0);
socket.setReuseAddress(true);
socket.close();
return socket.getLocalPort();
} catch (IOException ioe) {
ioe.printStackTrace();
return -1;
}
private static final PortAllocator portAllocator = new PortAllocator();
/** Find consecutive open ports, returning the first one in the range. */
public static int findOpenPorts(int count) {
return portAllocator.getPorts(count);
}
/** Find open port. */
public static int findOpenPort() {
return portAllocator.getPort();
}
private static boolean isPortOpen(int port) {

View File

@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.test.utils;
import java.io.Closeable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class PortAllocatorTest {
@Test
public void testSimple() {
PortAllocator portAllocator = getPortAllocator((port) -> true);
int next = PortAllocator.RANGE_MIN + 1;
for (int i = 0; i < 1000; i++) {
Assertions.assertEquals(next, portAllocator.getPort());
next++;
if (next % PortAllocator.CHUNK_SIZE == 0) {
next++;
}
}
Assertions.assertEquals(next, portAllocator.getPorts(10));
Assertions.assertEquals(12101, portAllocator.getPorts(PortAllocator.CHUNK_SIZE - 1));
try {
Assertions.assertEquals(next, portAllocator.getPorts(PortAllocator.CHUNK_SIZE + 1));
Assertions.fail("should not be able to allocate more than PORT_RANGE_STEP consecutive ports");
} catch (IllegalStateException ignored) {
}
}
@Test
public void testEven() {
PortAllocator portAllocator = getPortAllocator((port) -> port % 2 == 0);
int next = PortAllocator.RANGE_MIN + 2;
for (int i = 0; i < 1000; i++) {
Assertions.assertEquals(next, portAllocator.getPort());
next += 2;
if (next % PortAllocator.CHUNK_SIZE == 0) {
next += 2;
}
}
try {
Assertions.assertEquals(next, portAllocator.getPorts(2));
Assertions.fail("should not be able to allocate consecutive ports");
} catch (IllegalStateException ignored) {
}
}
private static PortAllocator getPortAllocator(PortTest portTest) {
return new PortAllocator(new TestPortBinder(portTest));
}
private interface PortTest {
boolean test(int port);
}
private static class TestPortBinder extends PortAllocator.PortBinder {
private final PortTest portTest;
TestPortBinder(PortTest portTest) {
this.portTest = portTest;
}
@Override
Closeable bind(int port) {
if (canBind(port)) {
return () -> {};
}
return null;
}
@Override
boolean canBind(int port) {
return portTest.test(port);
}
}
}