diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java index ad33a0b841..8816163bad 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancerProvider.java @@ -33,15 +33,15 @@ import java.util.Map; * down the address list and sticks to the first that works. */ public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { - public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS = - "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS"; + public static final String GRPC_PF_USE_HAPPY_EYEBALLS = "GRPC_PF_USE_HAPPY_EYEBALLS"; private static final String SHUFFLE_ADDRESS_LIST_KEY = "shuffleAddressList"; static boolean enableNewPickFirst = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false); public static boolean isEnabledHappyEyeballs() { - return GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, false); + + return GrpcUtil.getFlag(GRPC_PF_USE_HAPPY_EYEBALLS, false); } @VisibleForTesting diff --git a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java index 91c8672150..8d701098be 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java @@ -142,8 +142,8 @@ public class PickFirstLeafLoadBalancerTest { @Before public void setUp() { originalHappyEyeballsEnabledValue = - System.getProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS); - System.setProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, + System.getProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS); + System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS, enableHappyEyeballs ? "true" : "false"); for (int i = 1; i <= 5; i++) { @@ -173,9 +173,9 @@ public class PickFirstLeafLoadBalancerTest { @After public void tearDown() { if (originalHappyEyeballsEnabledValue == null) { - System.clearProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS); + System.clearProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS); } else { - System.setProperty(PickFirstLoadBalancerProvider.GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, + System.setProperty(PickFirstLoadBalancerProvider.GRPC_PF_USE_HAPPY_EYEBALLS, originalHappyEyeballsEnabledValue); } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java index f2f5b43ea1..01cccf9804 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java @@ -22,14 +22,20 @@ import io.grpc.BindableService; import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.ServerCredentials; import io.grpc.ServerInterceptors; import io.grpc.TlsServerCredentials; import io.grpc.alts.AltsServerCredentials; +import io.grpc.netty.NettyServerBuilder; import io.grpc.services.MetricRecorder; import io.grpc.testing.TlsTesting; import io.grpc.xds.orca.OrcaMetricReportingServerInterceptor; import io.grpc.xds.orca.OrcaServiceImpl; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import java.util.Locale; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -73,6 +79,7 @@ public class TestServiceServer { private ScheduledExecutorService executor; private Server server; private int localHandshakerPort = -1; + private Util.AddressType addressType = Util.AddressType.IPV4_IPV6; @VisibleForTesting void parseArgs(String[] args) { @@ -103,6 +110,8 @@ public class TestServiceServer { useAlts = Boolean.parseBoolean(value); } else if ("local_handshaker_port".equals(key)) { localHandshakerPort = Integer.parseInt(value); + } else if ("address_type".equals(key)) { + addressType = Util.AddressType.valueOf(value.toUpperCase(Locale.ROOT)); } else if ("grpc_version".equals(key)) { if (!"2".equals(value)) { System.err.println("Only grpc version 2 is supported"); @@ -130,11 +139,14 @@ public class TestServiceServer { + "\n --local_handshaker_port=PORT" + "\n Use local ALTS handshaker service on the specified port " + "\n for testing. Only effective when --use_alts=true." + + "\n --address_type=IPV4|IPV6|IPV4_IPV6" + + "\n What type of addresses to listen on. Default IPV4_IPV6" ); System.exit(1); } } + @SuppressWarnings("AddressSelection") @VisibleForTesting void start() throws Exception { executor = Executors.newSingleThreadScheduledExecutor(); @@ -156,7 +168,36 @@ public class TestServiceServer { MetricRecorder metricRecorder = MetricRecorder.newInstance(); BindableService orcaOobService = OrcaServiceImpl.createService(executor, metricRecorder, 1, TimeUnit.SECONDS); - server = Grpc.newServerBuilderForPort(port, serverCreds) + + // Create ServerBuilder with appropriate addresses + // - IPV4_IPV6: bind to wildcard which covers all addresses on all interfaces of both families + // - IPV4: bind to v4 address for local hostname + v4 localhost + // - IPV6: bind to all v6 addresses for local hostname + v6 localhost + ServerBuilder serverBuilder; + switch (addressType) { + case IPV4_IPV6: + serverBuilder = Grpc.newServerBuilderForPort(port, serverCreds); + break; + case IPV4: + SocketAddress v4Address = Util.getV4Address(port); + serverBuilder = + NettyServerBuilder.forAddress(new InetSocketAddress("127.0.0.1", port), serverCreds); + if (v4Address == null) { + ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); + } + break; + case IPV6: + List v6Addresses = Util.getV6Addresses(port); + serverBuilder = + NettyServerBuilder.forAddress(new InetSocketAddress("::1", port), serverCreds); + for (SocketAddress address : v6Addresses) { + ((NettyServerBuilder)serverBuilder).addListenAddress(address); + } + break; + default: + throw new AssertionError("Unknown address type: " + addressType); + } + server = serverBuilder .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .addService( ServerInterceptors.intercept( @@ -187,4 +228,5 @@ public class TestServiceServer { server.awaitTermination(); } } + } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/Util.java b/interop-testing/src/main/java/io/grpc/testing/integration/Util.java index b66114f12c..50da0a6373 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/Util.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/Util.java @@ -16,10 +16,17 @@ package io.grpc.testing.integration; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.MessageLite; import com.google.protobuf.StringValue; import io.grpc.Metadata; import io.grpc.protobuf.lite.ProtoLiteUtils; +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.List; import org.junit.Assert; @@ -66,4 +73,51 @@ public class Util { } } } + + static List getV6Addresses(int port) throws UnknownHostException { + List v6addresses = new ArrayList<>(); + InetAddress[] addresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostName()); + for (InetAddress address : addresses) { + if (address.getAddress().length != 4) { + v6addresses.add(new java.net.InetSocketAddress(address, port)); + } + } + return v6addresses; + } + + static SocketAddress getV4Address(int port) throws UnknownHostException { + InetAddress[] addresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostName()); + for (InetAddress address : addresses) { + if (address.getAddress().length == 4) { + return new java.net.InetSocketAddress(address, port); + } + } + return null; // means it is v6 only + } + + + /** + * Picks a port that is not used right at this moment. + * Warning: Not thread safe. May see "BindException: Address already in use: bind" if using the + * returned port to create a new server socket when other threads/processes are concurrently + * creating new sockets without a specific port. + */ + public static int pickUnusedPort() { + try { + ServerSocket serverSocket = new ServerSocket(0); + int port = serverSocket.getLocalPort(); + serverSocket.close(); + return port; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + + @VisibleForTesting + enum AddressType { + IPV4, + IPV6, + IPV4_IPV6 + } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java index 287aae8b31..6a42dd62bb 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/XdsTestServer.java @@ -25,13 +25,16 @@ import io.grpc.Grpc; import io.grpc.InsecureServerCredentials; import io.grpc.Metadata; import io.grpc.Server; +import io.grpc.ServerBuilder; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.grpc.ServerCredentials; import io.grpc.ServerInterceptor; import io.grpc.ServerInterceptors; import io.grpc.Status; import io.grpc.gcp.csm.observability.CsmObservability; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.HealthStatusManager; import io.grpc.protobuf.services.ProtoReflectionService; import io.grpc.services.AdminInterface; @@ -43,9 +46,12 @@ import io.grpc.xds.XdsServerBuilder; import io.grpc.xds.XdsServerCredentials; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -81,6 +87,7 @@ public final class XdsTestServer { private Server server; private Server maintenanceServer; private String host; + private Util.AddressType addressType = Util.AddressType.IPV4_IPV6; private CsmObservability csmObservability; /** @@ -108,7 +115,7 @@ public final class XdsTestServer { server.blockUntilShutdown(); } - private void parseArgs(String[] args) { + void parseArgs(String[] args) { boolean usage = false; for (String arg : args) { if (!arg.startsWith("--")) { @@ -138,6 +145,8 @@ public final class XdsTestServer { enableCsmObservability = Boolean.valueOf(value); } else if ("server_id".equals(key)) { serverId = value; + } else if ("address_type".equals(key)) { + addressType = Util.AddressType.valueOf(value.toUpperCase(Locale.ROOT)); } else { System.err.println("Unknown argument: " + key); usage = true; @@ -173,12 +182,16 @@ public final class XdsTestServer { + s.enableCsmObservability + "\n --server_id=STRING server ID for response." + "\n Default: " - + s.serverId); + + s.serverId + + "\n --address_type=STRING type of IP address to bind to (IPV4|IPV6|IPV4_IPV6)." + + "\n Default: " + + s.addressType); System.exit(1); } } - private void start() throws Exception { + @SuppressWarnings("AddressSelection") + void start() throws Exception { if (enableCsmObservability) { csmObservability = CsmObservability.newBuilder() .sdk(AutoConfiguredOpenTelemetrySdk.builder() @@ -199,6 +212,9 @@ public final class XdsTestServer { } health = new HealthStatusManager(); if (secureMode) { + if (addressType != Util.AddressType.IPV4_IPV6) { + throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type"); + } maintenanceServer = Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create()) .addService(new XdsUpdateHealthServiceImpl(health)) @@ -216,8 +232,36 @@ public final class XdsTestServer { .build(); server.start(); } else { + ServerBuilder serverBuilder; + ServerCredentials insecureServerCreds = InsecureServerCredentials.create(); + switch (addressType) { + case IPV4_IPV6: + serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds); + break; + case IPV4: + SocketAddress v4Address = Util.getV4Address(port); + serverBuilder = NettyServerBuilder.forAddress( + new InetSocketAddress("127.0.0.1", port), insecureServerCreds); + if (v4Address != null) { + ((NettyServerBuilder) serverBuilder).addListenAddress(v4Address); + } + break; + case IPV6: + List v6Addresses = Util.getV6Addresses(port); + serverBuilder = NettyServerBuilder.forAddress( + new InetSocketAddress("::1", port), insecureServerCreds); + for (SocketAddress address : v6Addresses) { + ((NettyServerBuilder)serverBuilder).addListenAddress(address); + } + break; + default: + throw new AssertionError("Unknown address type: " + addressType); + } + + logger.info("Starting server on port " + port + " with address type " + addressType); + server = - Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + serverBuilder .addService( ServerInterceptors.intercept( new TestServiceImpl(serverId, host), new TestInfoInterceptor(host))) @@ -232,7 +276,7 @@ public final class XdsTestServer { health.setStatus("", ServingStatus.SERVING); } - private void stop() throws Exception { + void stop() throws Exception { server.shutdownNow(); if (maintenanceServer != null) { maintenanceServer.shutdownNow(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/XdsTestServerTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/XdsTestServerTest.java new file mode 100644 index 0000000000..7bfa1a2cd7 --- /dev/null +++ b/interop-testing/src/test/java/io/grpc/testing/integration/XdsTestServerTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2024 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.testing.integration; + +import static org.junit.Assert.assertEquals; + +import io.grpc.Channel; +import io.grpc.ChannelCredentials; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.testing.GrpcCleanupRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests to make sure that the {@link XdsTestServer} is working as expected. + * Specifically, that for dualstack communication is handled correctly across address families + * and that the test server is correctly handling the address_type flag. + */ +@RunWith(JUnit4.class) +public class XdsTestServerTest { + protected static final EmptyProtos.Empty EMPTY = EmptyProtos.Empty.getDefaultInstance(); + + @Rule + public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule(); + + + @Test + public void check_ipv4() throws Exception { + checkConnectionWorks("127.0.0.1", "--address_type=IPV4"); + } + + @Test + public void check_ipv6() throws Exception { + checkConnectionWorks("::1", "--address_type=IPV6"); + } + + @Test + public void check_ipv4_ipv6() throws Exception { + checkConnectionWorks("localhost", "--address_type=IPV4_IPV6"); + } + + @Test + public void checkNoAddressType() throws Exception { + // This ensures that all of the other xds tests aren't broken by the address_type argument. + checkConnectionWorks("localhost", null); + } + + // Simple test to ensure that communication with the server works which includes starting and + // stopping the server, creating a channel and doing a unary rpc. + private void checkConnectionWorks(String targetServer, String addressTypeArg) + throws Exception { + + int port = Util.pickUnusedPort(); + + XdsTestServer server = getAndStartTestServiceServer(port, addressTypeArg); + + try { + ManagedChannel realChannel = createChannel(port, targetServer); + Channel channel = cleanupRule.register(realChannel); + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel); + + assertEquals(EMPTY, stub.emptyCall(EMPTY)); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + server.stop(); + } + } + + private static ManagedChannel createChannel(int port, String target) { + ChannelCredentials creds = InsecureChannelCredentials.create(); + + ManagedChannelBuilder builder; + if (port == 0) { + builder = Grpc.newChannelBuilder(target, creds); + } else { + builder = Grpc.newChannelBuilderForAddress(target, port, creds); + } + + builder.overrideAuthority("foo.test.google.fr"); + return builder.build(); + } + + private static XdsTestServer getAndStartTestServiceServer(int port, String addressTypeArg) + throws Exception { + XdsTestServer server = new XdsTestServer(); + String[] args = addressTypeArg != null + ? new String[]{"--port=" + port, addressTypeArg} + : new String[]{"--port=" + port}; + server.parseArgs(args); + server.start(); + return server; + } + +} diff --git a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java index 0c7e8f46bc..3ed68ac9b7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsEndpointResource.java @@ -49,8 +49,9 @@ import javax.annotation.Nullable; class XdsEndpointResource extends XdsResourceType { static final String ADS_TYPE_URL_EDS = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; - static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS = - "grpc.experimental.xdsDualstackEndpoints"; + + public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS = + "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS"; private static final XdsEndpointResource instance = new XdsEndpointResource(); @@ -201,6 +202,7 @@ class XdsEndpointResource extends XdsResourceType { } List addresses = new ArrayList<>(); addresses.add(getInetSocketAddress(endpoint.getEndpoint().getAddress())); + if (isEnabledXdsDualStack()) { for (Endpoint.AdditionalAddress additionalAddress : endpoint.getEndpoint().getAdditionalAddressesList()) {