diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index c65955dcf..02b158aea 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -43,9 +43,9 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
- DAPR_CLI_VER: 1.11.0
- DAPR_RUNTIME_VER: 1.11.3
- DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0/install/install.sh
+ DAPR_CLI_VER: 1.12.0-rc.1
+ DAPR_RUNTIME_VER: 1.12.0-rc.5
+ DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.12.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64
diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml
index 13a7ce3fb..676b25fc7 100644
--- a/.github/workflows/validate.yml
+++ b/.github/workflows/validate.yml
@@ -37,9 +37,9 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
- DAPR_CLI_VER: 1.11.0
- DAPR_RUNTIME_VER: 1.11.3
- DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0/install/install.sh
+ DAPR_CLI_VER: 1.12.0-rc.1
+ DAPR_RUNTIME_VER: 1.12.0-rc.5
+ DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.12.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
steps:
diff --git a/pom.xml b/pom.xml
index 5026de175..5c84ee176 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
UTF-8
1.42.1
3.17.3
- https://raw.githubusercontent.com/dapr/dapr/v1.11.0-rc.5/dapr/proto
+ https://raw.githubusercontent.com/dapr/dapr/v1.12.0-rc.2/dapr/proto
0.10.0-SNAPSHOT
1.6.2
3.1.1
diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
index 33beeaa07..c0181b1db 100644
--- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
+++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
@@ -13,11 +13,19 @@ limitations under the License.
package io.dapr.it;
+import com.google.protobuf.Empty;
import io.dapr.client.DaprApiProtocol;
import io.dapr.config.Properties;
+import io.dapr.v1.AppCallbackHealthCheckGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -40,6 +48,8 @@ public class DaprRun implements Stoppable {
private final String appName;
+ private final DaprApiProtocol appProtocol;
+
private final int maxWaitMilliseconds;
private final AtomicBoolean started;
@@ -50,6 +60,8 @@ public class DaprRun implements Stoppable {
private final Command stopCommand;
+ private final boolean hasAppHealthCheck;
+
private DaprRun(String testName,
DaprPorts ports,
String successMessage,
@@ -61,6 +73,7 @@ public class DaprRun implements Stoppable {
this.appName = serviceClass == null ?
testName.toLowerCase() :
String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase();
+ this.appProtocol = appProtocol;
this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol, appProtocol));
this.listCommand = new Command(
@@ -72,6 +85,7 @@ public class DaprRun implements Stoppable {
this.ports = ports;
this.maxWaitMilliseconds = maxWaitMilliseconds;
this.started = new AtomicBoolean(false);
+ this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass);
}
public void start() throws InterruptedException, IOException {
@@ -156,6 +170,63 @@ public class DaprRun implements Stoppable {
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
}
+ public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {
+ if (!this.hasAppHealthCheck) {
+ return;
+ }
+
+ if (DaprApiProtocol.GRPC.equals(this.appProtocol)) {
+ ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", this.getAppPort())
+ .usePlaintext()
+ .build();
+ try {
+ AppCallbackHealthCheckGrpc.AppCallbackHealthCheckBlockingStub stub =
+ AppCallbackHealthCheckGrpc.newBlockingStub(channel);
+ long maxWait = System.currentTimeMillis() + maxWaitMilliseconds;
+ while (System.currentTimeMillis() <= maxWait) {
+ try {
+ stub.healthCheck(Empty.getDefaultInstance());
+ // artursouza: workaround due to race condition with runtime's probe on app's health.
+ Thread.sleep(5000);
+ return;
+ } catch (Exception e) {
+ Thread.sleep(1000);
+ }
+ }
+
+ throw new RuntimeException("timeout: gRPC service is not healthy.");
+ } finally {
+ channel.shutdown();
+ }
+ } else {
+ // Create an OkHttpClient instance with a custom timeout
+ OkHttpClient client = new OkHttpClient.Builder()
+ .connectTimeout(maxWaitMilliseconds, TimeUnit.MILLISECONDS)
+ .readTimeout(maxWaitMilliseconds, TimeUnit.MILLISECONDS)
+ .build();
+
+ // Define the URL to probe
+ String url = "http://127.0.0.1:" + this.getAppPort() + "/health"; // Change to your specific URL
+
+ // Create a request to the URL
+ Request request = new Request.Builder()
+ .url(url)
+ .build();
+
+ // Execute the request
+ try (Response response = client.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ throw new RuntimeException("error: HTTP service is not healthy.");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("exception: HTTP service is not healthy.");
+ }
+
+ // artursouza: workaround due to race condition with runtime's probe on app's health.
+ Thread.sleep(5000);
+ }
+ }
+
public Integer getGrpcPort() {
return ports.getGrpcPort();
}
@@ -198,6 +269,8 @@ public class DaprRun implements Stoppable {
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
.append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "")
.append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "")
+ .append(isAppHealthCheckEnabled(serviceClass) ?
+ " --enable-app-health-check --app-health-probe-interval=1" : "")
.append(serviceClass == null ? "" :
String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
ports.getAppPort() != null ? ports.getAppPort().toString() : "",
@@ -206,6 +279,17 @@ public class DaprRun implements Stoppable {
return stringBuilder.toString();
}
+ private static boolean isAppHealthCheckEnabled(Class serviceClass) {
+ if (serviceClass != null) {
+ DaprRunConfig daprRunConfig = (DaprRunConfig) serviceClass.getAnnotation(DaprRunConfig.class);
+ if (daprRunConfig != null) {
+ return daprRunConfig.enableAppHealthCheck();
+ }
+ }
+
+ return false;
+ }
+
private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port);
diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java b/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java
new file mode 100644
index 000000000..205eaf73d
--- /dev/null
+++ b/sdk-tests/src/test/java/io/dapr/it/DaprRunConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2021 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.it;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Customizes an app run for Dapr.
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface DaprRunConfig {
+
+ boolean enableAppHealthCheck() default false;
+}
diff --git a/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java
index 3a0d2f9dd..1f592fd61 100644
--- a/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java
+++ b/sdk-tests/src/test/java/io/dapr/it/ToxiProxyRun.java
@@ -16,6 +16,7 @@ package io.dapr.it;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
+import io.dapr.utils.NetworkUtils;
import java.io.IOException;
import java.time.Duration;
@@ -53,6 +54,7 @@ public class ToxiProxyRun implements Stoppable {
public void start() throws IOException, InterruptedException {
this.toxiProxyServer.run();
+ NetworkUtils.waitForSocket("127.0.0.1", this.toxiProxyPorts.getAppPort(), 10000);
this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort());
if (this.daprRun.getGrpcPort() != null) {
diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java
index c3b29b748..b3437d565 100644
--- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeIT.java
@@ -35,19 +35,19 @@ public class MethodInvokeIT extends BaseIT {
@BeforeEach
public void init() throws Exception {
daprRun = startDaprApp(
- MethodInvokeIT.class.getSimpleName(),
+ MethodInvokeIT.class.getSimpleName() + "grpc",
MethodInvokeService.SUCCESS_MESSAGE,
MethodInvokeService.class,
DaprApiProtocol.GRPC, // appProtocol
60000);
daprRun.switchToGRPC();
- // Wait since service might be ready even after port is available.
- Thread.sleep(2000);
+ daprRun.waitForAppHealth(20000);
}
@Test
public void testInvoke() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
@@ -94,6 +94,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeTimeout() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
long started = System.currentTimeMillis();
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
String message = assertThrows(IllegalStateException.class, () ->
@@ -108,6 +109,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeException() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
SleepRequest req = SleepRequest.newBuilder().setSeconds(-9).build();
DaprException exception = assertThrows(DaprException.class, () ->
client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block());
diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java
index f8c3b62c7..45648075f 100644
--- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java
+++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/grpc/MethodInvokeService.java
@@ -15,6 +15,7 @@ package io.dapr.it.methodinvoke.grpc;
import com.google.protobuf.Any;
import io.dapr.grpc.GrpcHealthCheckService;
+import io.dapr.it.DaprRunConfig;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
@@ -35,8 +36,13 @@ import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
+@DaprRunConfig(
+ enableAppHealthCheck = true
+)
public class MethodInvokeService {
+ private static final long STARTUP_DELAY_SECONDS = 10;
+
public static final String SUCCESS_MESSAGE = "application discovered on port ";
/**
@@ -94,6 +100,7 @@ public class MethodInvokeService {
@Override
public void onInvoke(CommonProtos.InvokeRequest request,
StreamObserver responseObserver) {
+ System.out.println("Server: received " + request.getMethod() + " ...");
try {
if ("postMessage".equals(request.getMethod())) {
PostMessageRequest req = PostMessageRequest.parseFrom(request.getData().getValue().toByteArray());
@@ -163,8 +170,12 @@ public class MethodInvokeService {
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
- System.out.printf("Service starting on port %d ...\n", port);
+ System.out.printf("Service to start on port %d ...\n", port);
+ // The artificial delay is useful to detect bugs in app health, where the app is invoked too soon.
+ System.out.printf("Artificial delay of %d seconds ...\n", STARTUP_DELAY_SECONDS);
+ Thread.sleep(STARTUP_DELAY_SECONDS * 1000);
+ System.out.printf("Now starting ...\n", STARTUP_DELAY_SECONDS);
final MyDaprService service = new MyDaprService();
service.start(port);
service.awaitTermination();
diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java
index eb8872765..3687bd805 100644
--- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeIT.java
@@ -35,14 +35,13 @@ public class MethodInvokeIT extends BaseIT {
@BeforeEach
public void init() throws Exception {
daprRun = startDaprApp(
- MethodInvokeIT.class.getSimpleName(),
+ MethodInvokeIT.class.getSimpleName() + "http",
MethodInvokeService.SUCCESS_MESSAGE,
MethodInvokeService.class,
true,
30000);
daprRun.switchToHTTP();
- // Wait since service might be ready even after port is available.
- Thread.sleep(2000);
+ daprRun.waitForAppHealth(20000);
}
@Test
@@ -51,6 +50,7 @@ public class MethodInvokeIT extends BaseIT {
// At this point, it is guaranteed that the service above is running and all ports being listened to.
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
@@ -76,6 +76,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeWithObjects() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
Person person = new Person();
person.setName(String.format("Name %d", i));
@@ -111,6 +112,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeTimeout() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
long started = System.currentTimeMillis();
String message = assertThrows(IllegalStateException.class, () -> {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
@@ -125,6 +127,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeException() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
MethodInvokeServiceProtos.SleepRequest req = MethodInvokeServiceProtos.SleepRequest.newBuilder().setSeconds(-9).build();
DaprException exception = assertThrows(DaprException.class, () ->
client.invokeMethod(daprRun.getAppName(), "sleep", -9, HttpExtension.POST).block());
diff --git a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeService.java b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeService.java
index 1ce6834d5..8b4858fc2 100644
--- a/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeService.java
+++ b/sdk-tests/src/test/java/io/dapr/it/methodinvoke/http/MethodInvokeService.java
@@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.it.methodinvoke.http;
+import io.dapr.it.DaprRunConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -20,15 +21,23 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
+@DaprRunConfig(
+ enableAppHealthCheck = true
+)
@SpringBootApplication
public class MethodInvokeService {
+ private static final long STARTUP_DELAY_SECONDS = 10;
+
public static final String SUCCESS_MESSAGE = "Completed initialization in";
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException {
int port = Integer.parseInt(args[0]);
- System.out.printf("Service starting on port %d ...\n", port);
+ System.out.printf("Service to start on port %d ...\n", port);
+ System.out.printf("Artificial delay of %d seconds ...\n", STARTUP_DELAY_SECONDS);
+ Thread.sleep(STARTUP_DELAY_SECONDS * 1000);
+ System.out.printf("Now starting ...\n", STARTUP_DELAY_SECONDS);
// Start Dapr's callback endpoint.
start(port);
diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java
index 8be2c1885..c7ae603e9 100644
--- a/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/SdkResiliencytIT.java
@@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class SdkResiliencytIT extends BaseIT {
- private static final int NUM_ITERATIONS = 25;
+ private static final int NUM_ITERATIONS = 30;
private static final Duration TIMEOUT = Duration.ofMillis(100);
@@ -139,6 +139,9 @@ public class SdkResiliencytIT extends BaseIT {
assertEquals(value, savedValue);
}
+ // Asserts that we had at least one success per client.
+ assertTrue(toxiClientErrorCount.get() < NUM_ITERATIONS);
+ assertTrue(retryOneClientErrorCount.get() < NUM_ITERATIONS);
// This assertion makes sure that toxicity is on
assertTrue(toxiClientErrorCount.get() > 0);
assertTrue(retryOneClientErrorCount.get() > 0);
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java b/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java
index 6b9f09a5f..bae97e7a6 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/OpenTelemetry.java
@@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.it.tracing;
+import io.dapr.utils.NetworkUtils;
import io.opentelemetry.api.GlobalOpenTelemetry;
-import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
@@ -23,15 +23,7 @@ import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
-import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import java.io.IOException;
-import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@@ -46,30 +38,23 @@ public class OpenTelemetry {
* @param serviceName Name of the service in Zipkin
* @return OpenTelemetry.
*/
- public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) {
- // Only exports to Zipkin if it is up. Otherwise, ignore it.
- // This is helpful to avoid exceptions for examples that do not require Zipkin.
- if (isZipkinUp()) {
- String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
- ZipkinSpanExporter zipkinExporter =
- ZipkinSpanExporter.builder()
- .setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
- .setServiceName(serviceName)
- .build();
+ public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) throws InterruptedException {
+ waitForZipkin();
+ String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
+ ZipkinSpanExporter zipkinExporter =
+ ZipkinSpanExporter.builder()
+ .setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
+ .setServiceName(serviceName)
+ .build();
- SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
- .addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
- .build();
+ SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
+ .build();
- return OpenTelemetrySdk.builder()
- .setTracerProvider(sdkTracerProvider)
- .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
- .buildAndRegisterGlobal();
- } else {
- System.out.println("WARNING: Zipkin is not available.");
- }
-
- return null;
+ return OpenTelemetrySdk.builder()
+ .setTracerProvider(sdkTracerProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .buildAndRegisterGlobal();
}
/**
@@ -98,11 +83,7 @@ public class OpenTelemetry {
return reactorContext;
}
- private static boolean isZipkinUp() {
- try (Socket ignored = new Socket("localhost", ZIPKIN_PORT)) {
- return true;
- } catch (IOException ignored) {
- return false;
- }
+ private static void waitForZipkin() throws InterruptedException {
+ NetworkUtils.waitForSocket("127.0.0.1", ZIPKIN_PORT, 10000);
}
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/Service.java b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/Service.java
index fc2e59e05..1aa660d88 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/Service.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/Service.java
@@ -15,6 +15,7 @@ package io.dapr.it.tracing.grpc;
import com.google.protobuf.Any;
import io.dapr.grpc.GrpcHealthCheckService;
+import io.dapr.it.DaprRunConfig;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
@@ -26,6 +27,9 @@ import java.io.IOException;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
+@DaprRunConfig(
+ enableAppHealthCheck = true
+)
public class Service {
public static final String SUCCESS_MESSAGE = "application discovered on port ";
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java
index ea043d248..e82aa248c 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/grpc/TracingIT.java
@@ -12,24 +12,14 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
-import org.junit.Before;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
public class TracingIT extends BaseIT {
@@ -40,7 +30,7 @@ public class TracingIT extends BaseIT {
public void setup(boolean useGrpc) throws Exception {
daprRun = startDaprApp(
- TracingIT.class.getSimpleName(),
+ TracingIT.class.getSimpleName() + "grpc",
Service.SUCCESS_MESSAGE,
Service.class,
DaprApiProtocol.GRPC, // appProtocol
@@ -52,8 +42,7 @@ public class TracingIT extends BaseIT {
daprRun.switchToHTTP();
}
- // Wait since service might be ready even after port is available.
- Thread.sleep(2000);
+ daprRun.waitForAppHealth(10000);
}
@ParameterizedTest
@@ -68,6 +57,7 @@ public class TracingIT extends BaseIT {
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
try (Scope scope = span.makeCurrent()) {
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
@@ -78,6 +68,6 @@ public class TracingIT extends BaseIT {
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
- Validation.validate(spanName, "calllocal/tracingit-service/sleepovergrpc");
+ Validation.validate(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc");
}
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java
index 808f49ca3..8a976f93d 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/OpenTelemetryConfig.java
@@ -31,7 +31,7 @@ public class OpenTelemetryConfig {
public static final String SERVICE_NAME = "integration testing service over http";
@Bean
- public OpenTelemetry initOpenTelemetry() {
+ public OpenTelemetry initOpenTelemetry() throws InterruptedException {
return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(SERVICE_NAME);
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/Service.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/Service.java
index 124483d26..ba4317305 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/Service.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/Service.java
@@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.it.tracing.http;
+import io.dapr.it.DaprRunConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -20,6 +21,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
+@DaprRunConfig(
+ enableAppHealthCheck = true
+)
@SpringBootApplication
public class Service {
diff --git a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java
index 2ea036c74..96b669ef7 100644
--- a/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/tracing/http/TracingIT.java
@@ -11,21 +11,13 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
-import org.junit.Before;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
public class TracingIT extends BaseIT {
@@ -36,7 +28,7 @@ public class TracingIT extends BaseIT {
public void setup(boolean useGrpc) throws Exception {
daprRun = startDaprApp(
- TracingIT.class.getSimpleName(),
+ TracingIT.class.getSimpleName() + "http",
Service.SUCCESS_MESSAGE,
Service.class,
true,
@@ -64,6 +56,7 @@ public class TracingIT extends BaseIT {
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
+ client.waitForSidecar(10000).block();
try (Scope scope = span.makeCurrent()) {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
.contextWrite(getReactorContext())
@@ -73,7 +66,7 @@ public class TracingIT extends BaseIT {
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
- Validation.validate(spanName, "calllocal/tracingit-service/sleep");
+ Validation.validate(spanName, "calllocal/tracingithttp-service/sleep");
}
}