Support remote endpoint. (#877)

* Support remote endpoint.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Use GRPC_ENDPOINT and HTTP_ENDPOINT in integration tests.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix happy path for waiting for sidecar test.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
Artur Souza 2023-06-18 23:34:09 -07:00 committed by GitHub
parent d8dbb0e16d
commit 201dbc5344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 293 additions and 127 deletions

View File

@ -140,6 +140,10 @@ public class DaprRun implements Stoppable {
System.getProperties().setProperty(
Properties.API_METHOD_INVOCATION_PROTOCOL.getName(),
DaprApiProtocol.GRPC.name());
System.getProperties().setProperty(
Properties.GRPC_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getGrpcPort());
System.getProperties().setProperty(
Properties.HTTP_ENDPOINT.getName(), "http://127.0.0.1:" + this.ports.getHttpPort());
}
public void switchToGRPC() {

View File

@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.net.URI;
/**
* A builder for the DaprClient,
@ -162,19 +163,28 @@ public class DaprClientBuilder {
* @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
*/
private DaprClient buildDaprClientGrpc() {
int port = Properties.GRPC_PORT.get();
if (port <= 0) {
throw new IllegalArgumentException("Invalid port.");
}
ManagedChannel channel = ManagedChannelBuilder.forAddress(
Properties.SIDECAR_IP.get(), port).usePlaintext().userAgent(Version.getSdkVersion()).build();
Closeable closeableChannel = () -> {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
};
final ManagedChannel channel = buildGrpcManagedChanel();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
return new DaprClientGrpc(closeableChannel, asyncStub, this.objectSerializer, this.stateSerializer);
return new DaprClientGrpc(channelFacade, asyncStub, this.objectSerializer, this.stateSerializer);
}
private ManagedChannel buildGrpcManagedChanel() {
String host = Properties.SIDECAR_IP.get();
int port = Properties.GRPC_PORT.get();
boolean insecure = true;
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
URI uri = URI.create(grpcEndpoint);
insecure = uri.getScheme().equalsIgnoreCase("http");
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
}
ManagedChannelBuilder builder = ManagedChannelBuilder.forAddress(host, port)
.userAgent(Version.getSdkVersion());
if (insecure) {
builder = builder.usePlaintext();
}
return builder.build();
}
/**

View File

@ -50,7 +50,6 @@ import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.DefaultContentTypeConverter;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprGrpc;
@ -69,7 +68,6 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.ContextView;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -92,7 +90,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
/**
* The GRPC managed channel to be used.
*/
private Closeable channel;
private final GrpcChannelFacade channel;
/**
* The async gRPC stub.
@ -102,19 +100,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param closeableChannel A closeable for a Managed GRPC channel
* @param channel Facade for the managed GRPC channel
* @param asyncStub async gRPC stub
* @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder
*/
DaprClientGrpc(
Closeable closeableChannel,
GrpcChannelFacade channel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.channel = channel;
this.asyncStub = intercept(asyncStub);
}
@ -145,13 +143,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
*/
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
return this.channel.waitForChannelReady(timeoutInMilliseconds);
}
/**
@ -193,7 +185,6 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
/**
*
* {@inheritDoc}
*/
@Override

View File

@ -32,6 +32,7 @@ import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@ -141,14 +142,9 @@ public class DaprHttp implements AutoCloseable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Hostname used to communicate to Dapr's HTTP endpoint.
* Endpoint used to communicate to Dapr's HTTP endpoint.
*/
private final String hostname;
/**
* Port used to communicate to Dapr's HTTP endpoint.
*/
private final int port;
private final URI uri;
/**
* Http client used for all API calls.
@ -163,8 +159,18 @@ public class DaprHttp implements AutoCloseable {
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
this.hostname = hostname;
this.port = port;
this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port);
this.httpClient = httpClient;
}
/**
* Creates a new instance of {@link DaprHttp}.
*
* @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com")
* @param httpClient RestClient used for all API calls in this new instance.
*/
DaprHttp(String uri, OkHttpClient httpClient) {
this.uri = URI.create(uri);
this.httpClient = httpClient;
}
@ -273,9 +279,14 @@ public class DaprHttp implements AutoCloseable {
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
.host(this.hostname)
.port(this.port);
urlBuilder.scheme(uri.getScheme())
.host(uri.getHost());
if (uri.getPort() > 0) {
urlBuilder.port(uri.getPort());
}
if (uri.getPath() != null) {
urlBuilder.addPathSegments(uri.getPath());
}
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}

View File

@ -85,6 +85,11 @@ public class DaprHttpBuilder {
}
}
String endpoint = Properties.HTTP_ENDPOINT.get();
if ((endpoint != null) && !endpoint.isEmpty()) {
return new DaprHttp(endpoint, OK_HTTP_CLIENT);
}
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.v1.DaprGrpc;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
/**
* Facade for common operations on gRPC channel.
*
* @see DaprGrpc
* @see DaprClient
*/
class GrpcChannelFacade implements Closeable {
/**
* The GRPC managed channel to be used.
*/
private final ManagedChannel channel;
/**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
*
* @param channel A Managed GRPC channel
* @see DaprClientBuilder
*/
GrpcChannelFacade(ManagedChannel channel) {
this.channel = channel;
}
@Override
public void close() throws IOException {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
}
public Mono<Void> waitForChannelReady(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
boolean isReady = false;
long startTime = System.currentTimeMillis();
while (!isReady && System.currentTimeMillis() - startTime < timeoutInMilliseconds) {
isReady = this.channel.getState(true) == ConnectivityState.READY;
if (!isReady) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Waiting for gRPC channel ready interrupted.", e);
}
}
}
if (!isReady) {
throw new RuntimeException("Timeout waiting for gRPC channel to be ready.");
}
});
}
}

View File

@ -99,6 +99,22 @@ public class Properties {
"DAPR_GRPC_PORT",
DEFAULT_GRPC_PORT);
/**
* GRPC endpoint for remote sidecar connectivity.
*/
public static final Property<String> GRPC_ENDPOINT = new StringProperty(
"dapr.grpc.endpoint",
"DAPR_GRPC_ENDPOINT",
null);
/**
* GRPC endpoint for remote sidecar connectivity.
*/
public static final Property<String> HTTP_ENDPOINT = new StringProperty(
"dapr.http.endpoint",
"DAPR_HTTP_ENDPOINT",
null);
/**
* Determines if Dapr client will use gRPC or HTTP to talk to Dapr's side car.
* @deprecated This attribute will be deleted at SDK version 1.10.

View File

@ -163,14 +163,9 @@ public class DaprClientGrpcTelemetryTest {
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
Closeable closeableChannel = () -> {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
};
DaprGrpc.DaprStub asyncStub = DaprGrpc.newStub(channel);
client = new DaprClientGrpc(
closeableChannel, asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
new GrpcChannelFacade(channel), asyncStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
@Test

View File

@ -45,6 +45,7 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import reactor.core.publisher.Mono;
@ -91,53 +92,40 @@ public class DaprClientGrpcTest {
private static final String SECRET_STORE_NAME = "MySecretStore";
private Closeable closeable;
private GrpcChannelFacade channel;
private DaprGrpc.DaprStub daprStub;
private DaprClient client;
private ObjectSerializer serializer;
@Before
public void setup() throws IOException {
closeable = mock(Closeable.class);
channel = mock(GrpcChannelFacade.class);
daprStub = mock(DaprGrpc.DaprStub.class);
when(daprStub.withInterceptors(any())).thenReturn(daprStub);
DaprClient grpcClient = new DaprClientGrpc(
closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
client = new DaprClientProxy(grpcClient);
serializer = new ObjectSerializer();
doNothing().when(closeable).close();
doNothing().when(channel).close();
}
@After
public void tearDown() throws Exception {
client.close();
verify(closeable).close();
verifyNoMoreInteractions(closeable);
verify(channel).close();
}
@Test
public void waitForSidecarTimeout() throws Exception {
int port = findFreePort();
System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port));
public void waitForSidecarTimeout() {
Mockito.doReturn(Mono.error(new RuntimeException())).when(channel).waitForChannelReady(1);
assertThrows(RuntimeException.class, () -> client.waitForSidecar(1).block());
}
@Test
public void waitForSidecarTimeoutOK() throws Exception {
try (ServerSocket serverSocket = new ServerSocket(0)) {
final int port = serverSocket.getLocalPort();
System.setProperty(Properties.GRPC_PORT.getName(), Integer.toString(port));
Thread t = new Thread(() -> {
try {
try (Socket socket = serverSocket.accept()) {
}
} catch (IOException e) {
}
});
t.start();
public void waitForSidecarOK() {
Mockito.doReturn(Mono.empty()).when(channel).waitForChannelReady(10000);
client.waitForSidecar(10000).block();
}
}
@Test
public void publishEventExceptionThrownTest() {
@ -172,7 +160,7 @@ public class DaprClientGrpcTest {
@Test
public void publishEventSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer());
client = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
observer.onNext(Empty.getDefaultInstance());
@ -290,7 +278,7 @@ public class DaprClientGrpcTest {
@Test
public void invokeBindingSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
client = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer());
client = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<Empty> observer = (StreamObserver<Empty>) invocation.getArguments()[1];
observer.onNext(Empty.getDefaultInstance());
@ -1451,7 +1439,7 @@ public class DaprClientGrpcTest {
@Test
public void executeTransactionSerializerExceptionTest() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
client = new DaprClientGrpc(closeable, daprStub, mockSerializer, mockSerializer);
client = new DaprClientGrpc(channel, daprStub, mockSerializer, mockSerializer);
String etag = "ETag1";
String key = "key1";
String data = "my data";

View File

@ -69,25 +69,25 @@ public class DaprPreviewClientGrpcTest {
private static final String TOPIC_NAME = "testTopic";
private Closeable closeable;
private GrpcChannelFacade channel;
private DaprGrpc.DaprStub daprStub;
private DaprPreviewClient previewClient;
@Before
public void setup() throws IOException {
closeable = mock(Closeable.class);
channel = mock(GrpcChannelFacade.class);
daprStub = mock(DaprGrpc.DaprStub.class);
when(daprStub.withInterceptors(any())).thenReturn(daprStub);
previewClient = new DaprClientGrpc(
closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
doNothing().when(closeable).close();
channel, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
doNothing().when(channel).close();
}
@After
public void tearDown() throws Exception {
previewClient.close();
verify(closeable).close();
verifyNoMoreInteractions(closeable);
verify(channel).close();
verifyNoMoreInteractions(channel);
}
@Test
@ -143,7 +143,7 @@ public class DaprPreviewClientGrpcTest {
@Test
public void publishEventsSerializeException() throws IOException {
DaprObjectSerializer mockSerializer = mock(DaprObjectSerializer.class);
previewClient = new DaprClientGrpc(closeable, daprStub, mockSerializer, new DefaultObjectSerializer());
previewClient = new DaprClientGrpc(channel, daprStub, mockSerializer, new DefaultObjectSerializer());
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.BulkPublishResponse> observer =
(StreamObserver<DaprProtos.BulkPublishResponse>) invocation.getArguments()[1];

View File

@ -0,0 +1,70 @@
/*
* Copyright 2023 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client;
import io.dapr.v1.DaprGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import static io.dapr.utils.TestUtils.findFreePort;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class GrpcChannelFacadeTest {
private static int port;
public static Server server;
@BeforeAll
public static void setup() throws IOException {
port = findFreePort();
server = ServerBuilder.forPort(port)
.addService(new DaprGrpc.DaprImplBase() {
})
.build();
server.start();
}
@AfterAll
public static void teardown() throws InterruptedException {
server.shutdown();
server.awaitTermination();
}
@Test
public void waitForSidecarTimeout() throws Exception {
int unusedPort = findFreePort();
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", unusedPort)
.usePlaintext().build();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
assertThrows(RuntimeException.class, () -> channelFacade.waitForChannelReady(1).block());
}
@Test
public void waitForSidecarOK() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
.usePlaintext().build();
final GrpcChannelFacade channelFacade = new GrpcChannelFacade(channel);
channelFacade.waitForChannelReady(10000).block();
}
}