Implements full spec of gRPC endpoint. (#1042)

* Implements full spec of gRPC endpoint.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix test container dependencies.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add support for /// in gRPC endpoint.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update binding example and test to use confluentinc images.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
This commit is contained in:
Artur Souza 2024-05-28 14:31:01 -07:00 committed by GitHub
parent a074310ec6
commit faa3af5bbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 242 additions and 46 deletions

View File

@ -72,10 +72,8 @@ docker-compose -f ./src/main/java/io/dapr/examples/bindings/http/docker-compose-
2. Run `docker ps` to see the container running locally:
```bash
342d3522ca14 kafka-docker_kafka "start-kafka.sh" 14 hours ago Up About
a minute 0.0.0.0:9092->9092/tcp kafka-docker_kafka_1
0cd69dbe5e65 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 8 days ago Up About
a minute 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka-docker_zookeeper_1
26966aaabd82 confluentinc/cp-kafka:7.4.4 "/etc/confluent/dock…" About a minute ago Up About a minute 9092/tcp, 0.0.0.0:29092->29092/tcp deploy-kafka-1
b95e7ad31707 confluentinc/cp-zookeeper:7.4.4 "/etc/confluent/dock…" 5 days ago Up 14 minutes 2888/tcp, 3888/tcp, 0.0.0.0:22181->2181/tcp deploy-zookeeper-1
```
Click [here](https://github.com/wurstmeister/kafka-docker) for more information about the kafka broker server.

View File

@ -1,14 +1,23 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
image: confluentinc/cp-zookeeper:7.4.4
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "sample:1:1"
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

View File

@ -1,19 +1,27 @@
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
image: confluentinc/cp-zookeeper:7.4.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
- 22181:2181
kafka:
image: confluentinc/cp-kafka:7.4.4
depends_on:
- zookeeper
image: wurstmeister/kafka:latest
ports:
- "9092:9092"
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "sample:1:1"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mongo:
image: mongo
ports:

View File

@ -58,7 +58,7 @@ public class DaprPorts {
if (this.grpcPort != null) {
System.getProperties().setProperty(Properties.GRPC_PORT.getName(), String.valueOf(this.grpcPort));
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.grpcPort);
Properties.GRPC_ENDPOINT.getName(), "127.0.0.1:" + this.grpcPort);
}
}

View File

@ -22,8 +22,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.regex.Pattern;
/**
* Utility methods for network, internal to Dapr SDK.
@ -32,13 +31,56 @@ public final class NetworkUtils {
private static final long RETRY_WAIT_MILLISECONDS = 1000;
// Thanks to https://ihateregex.io/expr/ipv6/
private static final String IPV6_REGEX = "(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|"
+ "([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|"
+ "([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|"
+ "([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|"
+ "([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|"
+ ":((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|"
+ "::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|(2[0-4]|"
+ "1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|"
+ "(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\\.){3,3}(25[0-5]|"
+ "(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))";
private static final Pattern IPV6_PATTERN = Pattern.compile(IPV6_REGEX, Pattern.CASE_INSENSITIVE);
// Don't accept "?" to avoid ambiguity with ?tls=true
private static final String GRPC_ENDPOINT_FILENAME_REGEX_PART = "[^\0\\?]+";
private static final String GRPC_ENDPOINT_HOSTNAME_REGEX_PART = "(([A-Za-z0-9_\\-\\.]+)|(\\[" + IPV6_REGEX + "\\]))";
private static final String GRPC_ENDPOINT_DNS_AUTHORITY_REGEX_PART =
"(?<dnsWithAuthority>dns://)(?<authorityEndpoint>" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ":[0-9]+)?/";
private static final String GRPC_ENDPOINT_PARAM_REGEX_PART = "(\\?(?<param>tls\\=((true)|(false))))?";
private static final String GRPC_ENDPOINT_SOCKET_REGEX_PART =
"(?<socket>((unix:)|(unix://)|(unix-abstract:))" + GRPC_ENDPOINT_FILENAME_REGEX_PART + ")";
private static final String GRPC_ENDPOINT_VSOCKET_REGEX_PART =
"(?<vsocket>vsock:" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ":[0-9]+)";
private static final String GRPC_ENDPOINT_HOST_REGEX_PART =
"((?<http>http://)|(?<https>https://)|(?<dns>dns:)|(" + GRPC_ENDPOINT_DNS_AUTHORITY_REGEX_PART + "))?"
+ "(?<hostname>" + GRPC_ENDPOINT_HOSTNAME_REGEX_PART + ")?+"
+ "(:(?<port>[0-9]+))?";
private static final String GRPC_ENDPOINT_REGEX = "^("
+ "(" + GRPC_ENDPOINT_HOST_REGEX_PART + ")|"
+ "(" + GRPC_ENDPOINT_SOCKET_REGEX_PART + ")|"
+ "(" + GRPC_ENDPOINT_VSOCKET_REGEX_PART + ")"
+ ")" + GRPC_ENDPOINT_PARAM_REGEX_PART + "$";
private static final Pattern GRPC_ENDPOINT_PATTERN = Pattern.compile(GRPC_ENDPOINT_REGEX, Pattern.CASE_INSENSITIVE);
private NetworkUtils() {
}
/**
* Tries to connect to a socket, retrying every 1 second.
* @param host Host to connect to.
* @param port Port to connect to.
*
* @param host Host to connect to.
* @param port Port to connect to.
* @param timeoutInMilliseconds Timeout in milliseconds to give up trying.
* @throws InterruptedException If retry is interrupted.
*/
@ -60,26 +102,15 @@ public final class NetworkUtils {
/**
* Creates a GRPC managed channel.
*
* @param interceptors Optional interceptors to add to the channel.
* @return GRPC managed channel to communicate with the sidecar.
*/
public static ManagedChannel buildGrpcManagedChannel(ClientInterceptor... interceptors) {
String address = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
boolean insecure = true;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
URI uri = URI.create(grpcEndpoint);
insecure = uri.getScheme().equalsIgnoreCase("http");
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
address = uri.getHost();
if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
address += uri.getPath();
}
}
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
var settings = GrpcEndpointSettings.parse();
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(settings.endpoint)
.userAgent(Version.getSdkVersion());
if (insecure) {
if (!settings.secure) {
builder = builder.usePlaintext();
}
if (interceptors != null && interceptors.length > 0) {
@ -88,6 +119,73 @@ public final class NetworkUtils {
return builder.build();
}
// Not private to allow unit testing
static final class GrpcEndpointSettings {
final String endpoint;
final boolean secure;
private GrpcEndpointSettings(String endpoint, boolean secure) {
this.endpoint = endpoint;
this.secure = secure;
}
static GrpcEndpointSettings parse() {
String address = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
boolean secure = false;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
var matcher = GRPC_ENDPOINT_PATTERN.matcher(grpcEndpoint);
if (!matcher.matches()) {
throw new IllegalArgumentException("Illegal gRPC endpoint: " + grpcEndpoint);
}
var parsedHost = matcher.group("hostname");
if (parsedHost != null) {
address = parsedHost;
}
var https = matcher.group("https") != null;
var http = matcher.group("http") != null;
secure = https;
String parsedPort = matcher.group("port");
if (parsedPort != null) {
port = Integer.parseInt(parsedPort);
} else {
// This implements default port as 80 for http for backwards compatibility.
port = http ? 80 : 443;
}
String parsedParam = matcher.group("param");
if ((http || https) && (parsedParam != null)) {
throw new IllegalArgumentException("Query params is not supported in HTTP URI for gRPC endpoint.");
}
if (parsedParam != null) {
secure = parsedParam.equalsIgnoreCase("tls=true");
}
var authorityEndpoint = matcher.group("authorityEndpoint");
if (authorityEndpoint != null) {
return new GrpcEndpointSettings(String.format("dns://%s/%s:%d", authorityEndpoint, address, port), secure);
}
var socket = matcher.group("socket");
if (socket != null) {
return new GrpcEndpointSettings(socket, secure);
}
var vsocket = matcher.group("vsocket");
if (vsocket != null) {
return new GrpcEndpointSettings(vsocket, secure);
}
}
return new GrpcEndpointSettings(String.format("dns:///%s:%d", address, port), secure);
}
}
private static void callWithRetry(Runnable function, long retryTimeoutMilliseconds) throws InterruptedException {
long started = System.currentTimeMillis();
while (true) {
@ -104,7 +202,7 @@ public final class NetworkUtils {
long elapsed = System.currentTimeMillis() - started;
if (elapsed >= retryTimeoutMilliseconds) {
if (exception instanceof RuntimeException) {
throw (RuntimeException)exception;
throw (RuntimeException) exception;
}
throw new RuntimeException(exception);
@ -117,9 +215,14 @@ public final class NetworkUtils {
/**
* Retrieve loopback address for the host.
*
* @return The loopback address String
*/
public static String getHostLoopbackAddress() {
return InetAddress.getLoopbackAddress().getHostAddress();
}
static boolean isIPv6(String ip) {
return IPV6_PATTERN.matcher(ip).matches();
}
}

View File

@ -2,14 +2,12 @@ package io.dapr.utils;
import io.dapr.config.Properties;
import io.grpc.ManagedChannel;
import io.grpc.testing.GrpcCleanupRule;
import org.junit.Rule;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.mockito.Mockito.mock;
public class NetworkUtilsTest {
private final int defaultGrpcPort = 4000;
@ -74,4 +72,85 @@ public class NetworkUtilsTest {
String expectedAuthority = "example.com:3000";
Assertions.assertEquals(expectedAuthority, channel.authority());
}
@Test
public void testGrpcEndpointParsing() {
testGrpcEndpointParsingScenario(":5000", "dns:///127.0.0.1:5000", false);
testGrpcEndpointParsingScenario(":5000?tls=true", "dns:///127.0.0.1:5000", true);
testGrpcEndpointParsingScenario(":5000?tls=false", "dns:///127.0.0.1:5000", false);
testGrpcEndpointParsingScenario("myhost:5000", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("myhost:5000?tls=true", "dns:///myhost:5000", true);
testGrpcEndpointParsingScenario("myhost:5000?tls=false", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("myhost", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("myhost?tls=true", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("myhost?tls=false", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("dns:myhost", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("dns:myhost?tls=true", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("dns:myhost?tls=false", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("http://myhost", "dns:///myhost:80", false);
testGrpcEndpointParsingScenario("http://myhost:443", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("http://myhost:5000", "dns:///myhost:5000", false);
testGrpcEndpointParsingScenario("http://myhost:8080", "dns:///myhost:8080", false);
testGrpcEndpointParsingScenario("https://myhost", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("https://myhost:443", "dns:///myhost:443", true);
testGrpcEndpointParsingScenario("https://myhost:5000", "dns:///myhost:5000", true);
testGrpcEndpointParsingScenario("dns:///myhost", "dns:///myhost:443", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost", "dns://myauthority:53/myhost:443", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost?tls=false", "dns://myauthority:53/myhost:443", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/myhost?tls=true", "dns://myauthority:53/myhost:443", true);
testGrpcEndpointParsingScenario("unix:my.sock", "unix:my.sock", false);
testGrpcEndpointParsingScenario("unix:my.sock?tls=true", "unix:my.sock", true);
testGrpcEndpointParsingScenario("unix://my.sock", "unix://my.sock", false);
testGrpcEndpointParsingScenario("unix://my.sock?tls=true", "unix://my.sock", true);
testGrpcEndpointParsingScenario("unix-abstract:my.sock", "unix-abstract:my.sock", false);
testGrpcEndpointParsingScenario("unix-abstract:my.sock?tls=true", "unix-abstract:my.sock", true);
testGrpcEndpointParsingScenario("vsock:mycid:5000", "vsock:mycid:5000", false);
testGrpcEndpointParsingScenario("vsock:mycid:5000?tls=true", "vsock:mycid:5000", true);
testGrpcEndpointParsingScenario("[2001:db8:1f70::999:de8:7648:6e8]", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:443", false);
testGrpcEndpointParsingScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:5000", false);
testGrpcEndpointParsingScenario("dns://myauthority:53/[2001:db8:1f70::999:de8:7648:6e8]", "dns://myauthority:53/[2001:db8:1f70::999:de8:7648:6e8]:443", false);
testGrpcEndpointParsingScenario("https://[2001:db8:1f70::999:de8:7648:6e8]", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:443", true);
testGrpcEndpointParsingScenario("https://[2001:db8:1f70::999:de8:7648:6e8]:5000", "dns:///[2001:db8:1f70::999:de8:7648:6e8]:5000", true);
}
@Test
public void testGrpcEndpointParsingError() {
testGrpcEndpointParsingErrorScenario("http://myhost?tls=true");
testGrpcEndpointParsingErrorScenario("http://myhost?tls=false");
testGrpcEndpointParsingErrorScenario("http://myhost:8080?tls=true");
testGrpcEndpointParsingErrorScenario("http://myhost:443?tls=false");
testGrpcEndpointParsingErrorScenario("https://myhost?tls=true");
testGrpcEndpointParsingErrorScenario("https://myhost?tls=false");
testGrpcEndpointParsingErrorScenario("https://myhost:8080?tls=true");
testGrpcEndpointParsingErrorScenario("https://myhost:443?tls=false");
testGrpcEndpointParsingErrorScenario("dns://myhost");
testGrpcEndpointParsingErrorScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000?abc=[]");
testGrpcEndpointParsingErrorScenario("dns:[2001:db8:1f70::999:de8:7648:6e8]:5000?abc=123");
testGrpcEndpointParsingErrorScenario("host:5000/v1/dapr");
testGrpcEndpointParsingErrorScenario("host:5000/?a=1");
testGrpcEndpointParsingErrorScenario("inv-scheme://myhost");
testGrpcEndpointParsingErrorScenario("inv-scheme:myhost:5000");
}
private static void testGrpcEndpointParsingScenario(
String grpcEndpointEnvValue,
String expectedEndpoint,
boolean expectSecure
) {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
var settings = NetworkUtils.GrpcEndpointSettings.parse();
Assertions.assertEquals(expectedEndpoint, settings.endpoint);
Assertions.assertEquals(expectSecure, settings.secure);
}
private static void testGrpcEndpointParsingErrorScenario(String grpcEndpointEnvValue) {
try {
System.setProperty(Properties.GRPC_ENDPOINT.getName(), grpcEndpointEnvValue);
NetworkUtils.GrpcEndpointSettings.parse();
Assert.fail();
} catch (IllegalArgumentException e) {
// Expected
}
}
}

View File

@ -17,7 +17,6 @@ import io.dapr.exceptions.DaprErrorDetails;
import io.dapr.exceptions.DaprException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.function.Executable;
import org.apache.commons.validator.routines.InetAddressValidator;
import java.io.IOException;
import java.net.ServerSocket;
@ -83,7 +82,7 @@ public final class TestUtils {
public static String formatIpAddress(final String ipAddress) {
String formattedIpAddress = ipAddress;
if(InetAddressValidator.getInstance().isValidInet6Address(ipAddress)) {
if(NetworkUtils.isIPv6(ipAddress)) {
formattedIpAddress = "[" + ipAddress + "]"; // per URL spec https://url.spec.whatwg.org/#host-writing
}
return formattedIpAddress;