Integration test stabilization + update proto and Dapr runtime to 1.12 RC. (#917)

* Update Dapr runtime to 1.12 RC.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add socket wait for ToxiProxy client.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Changing ambiguity in service Id for TracingIT tests.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* More assertions to SDKResiliencyIT.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add sidecar wait for TracingIT.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Wait for Zipkin endpoint.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Tacking flakiness on MethodInvokeIT and TracingIT.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Test again with 1.11.0 CLI.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Start bisect 1.12 regression.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Bisect step for runtime at 34c5102600b5a39704089b72db565ca95943269d

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* bisect step for runtime at ad5618711830510617e1b8fb2d0c6758f14ff6b9

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* bisect step for runtime at 4e5c51be62d8a7319ab25a401681ba82bd3ef7e7

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* git bisect step for runtime at 3143f2fcb679e585d978e9b9d706cce72088fecc

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* git bisect step for runtime at d243d1c6a17b019e0435ebbc10abb00810beb29a

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* git bisect step for runtime at 2ddb99be5ae61abd9f7f3616987fcfda0cedbc6e

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* git bisect step for runtime at 0fdab70eb32a63dd2630c39c4b1bc623212cd099

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* git bisect step for runtime at b42319279f21ee5c63d5cd04bfef6e94897cf34f

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Verify first bad commit at runtime: 0fdab70eb32a63dd2630c39c4b1bc623212cd099

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Trying artursouza/dapr at 37ca4c81041de11677ddea101298ca3c2ad8585d

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add delay of 10s on MethodInvokeService for IT.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Trying dapr at artursouza's fork at 65e117433284388535f5a967456611a72773cf6b

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Change MethodInvokeService to use AppHealthCheck.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Change TracingIT to use AppHealthCheck.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Change to use artursouza's fork at f770694b3fbf7222c162a4ee4e13818a5afc3b01

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove use of artursouza's fork for runtime.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix MethodInvokeIT to actually use AppHealthCheck.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Try fix in runtime: artursouza/dapr at 72b6f7374a670c3054427aa2b01d2374e3bc3329

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Trying runtime as per artursouza at 64d19957059d134378603630165b3e473bdde388

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Increase delay to avoid race condition for app health checks.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Try runtime at artursouza at d2eb15d1707ff8539ff1561a1fd67750d205854b

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update to 1.12.0-rc.5

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Artur Souza 2023-10-02 09:15:26 -07:00 committed by GitHub
parent 96c0418521
commit 7f76d5b2de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 194 additions and 79 deletions

View File

@ -43,9 +43,9 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.11.0
DAPR_RUNTIME_VER: 1.11.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0/install/install.sh
DAPR_CLI_VER: 1.12.0-rc.1
DAPR_RUNTIME_VER: 1.12.0-rc.5
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.12.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
TOXIPROXY_URL: https://github.com/Shopify/toxiproxy/releases/download/v2.5.0/toxiproxy-server-linux-amd64

View File

@ -37,9 +37,9 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.11.0
DAPR_RUNTIME_VER: 1.11.3
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.11.0/install/install.sh
DAPR_CLI_VER: 1.12.0-rc.1
DAPR_RUNTIME_VER: 1.12.0-rc.5
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.12.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
steps:

View File

@ -16,7 +16,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.42.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.11.0-rc.5/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.12.0-rc.2/dapr/proto</dapr.proto.baseurl>
<dapr.sdk-workflows.version>0.10.0-SNAPSHOT</dapr.sdk-workflows.version>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>

View File

@ -13,11 +13,19 @@ limitations under the License.
package io.dapr.it;
import com.google.protobuf.Empty;
import io.dapr.client.DaprApiProtocol;
import io.dapr.config.Properties;
import io.dapr.v1.AppCallbackHealthCheckGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@ -40,6 +48,8 @@ public class DaprRun implements Stoppable {
private final String appName;
private final DaprApiProtocol appProtocol;
private final int maxWaitMilliseconds;
private final AtomicBoolean started;
@ -50,6 +60,8 @@ public class DaprRun implements Stoppable {
private final Command stopCommand;
private final boolean hasAppHealthCheck;
private DaprRun(String testName,
DaprPorts ports,
String successMessage,
@ -61,6 +73,7 @@ public class DaprRun implements Stoppable {
this.appName = serviceClass == null ?
testName.toLowerCase() :
String.format("%s-%s", testName, serviceClass.getSimpleName()).toLowerCase();
this.appProtocol = appProtocol;
this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports, protocol, appProtocol));
this.listCommand = new Command(
@ -72,6 +85,7 @@ public class DaprRun implements Stoppable {
this.ports = ports;
this.maxWaitMilliseconds = maxWaitMilliseconds;
this.started = new AtomicBoolean(false);
this.hasAppHealthCheck = isAppHealthCheckEnabled(serviceClass);
}
public void start() throws InterruptedException, IOException {
@ -156,6 +170,63 @@ public class DaprRun implements Stoppable {
System.getProperties().setProperty(Properties.API_METHOD_INVOCATION_PROTOCOL.getName(), protocol.name());
}
public void waitForAppHealth(int maxWaitMilliseconds) throws InterruptedException {
if (!this.hasAppHealthCheck) {
return;
}
if (DaprApiProtocol.GRPC.equals(this.appProtocol)) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", this.getAppPort())
.usePlaintext()
.build();
try {
AppCallbackHealthCheckGrpc.AppCallbackHealthCheckBlockingStub stub =
AppCallbackHealthCheckGrpc.newBlockingStub(channel);
long maxWait = System.currentTimeMillis() + maxWaitMilliseconds;
while (System.currentTimeMillis() <= maxWait) {
try {
stub.healthCheck(Empty.getDefaultInstance());
// artursouza: workaround due to race condition with runtime's probe on app's health.
Thread.sleep(5000);
return;
} catch (Exception e) {
Thread.sleep(1000);
}
}
throw new RuntimeException("timeout: gRPC service is not healthy.");
} finally {
channel.shutdown();
}
} else {
// Create an OkHttpClient instance with a custom timeout
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(maxWaitMilliseconds, TimeUnit.MILLISECONDS)
.readTimeout(maxWaitMilliseconds, TimeUnit.MILLISECONDS)
.build();
// Define the URL to probe
String url = "http://127.0.0.1:" + this.getAppPort() + "/health"; // Change to your specific URL
// Create a request to the URL
Request request = new Request.Builder()
.url(url)
.build();
// Execute the request
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new RuntimeException("error: HTTP service is not healthy.");
}
} catch (IOException e) {
throw new RuntimeException("exception: HTTP service is not healthy.");
}
// artursouza: workaround due to race condition with runtime's probe on app's health.
Thread.sleep(5000);
}
}
public Integer getGrpcPort() {
return ports.getGrpcPort();
}
@ -198,6 +269,8 @@ public class DaprRun implements Stoppable {
.append(ports.getAppPort() != null ? " --app-port " + ports.getAppPort() : "")
.append(ports.getHttpPort() != null ? " --dapr-http-port " + ports.getHttpPort() : "")
.append(ports.getGrpcPort() != null ? " --dapr-grpc-port " + ports.getGrpcPort() : "")
.append(isAppHealthCheckEnabled(serviceClass) ?
" --enable-app-health-check --app-health-probe-interval=1" : "")
.append(serviceClass == null ? "" :
String.format(DAPR_COMMAND, serviceClass.getCanonicalName(),
ports.getAppPort() != null ? ports.getAppPort().toString() : "",
@ -206,6 +279,17 @@ public class DaprRun implements Stoppable {
return stringBuilder.toString();
}
private static boolean isAppHealthCheckEnabled(Class serviceClass) {
if (serviceClass != null) {
DaprRunConfig daprRunConfig = (DaprRunConfig) serviceClass.getAnnotation(DaprRunConfig.class);
if (daprRunConfig != null) {
return daprRunConfig.enableAppHealthCheck();
}
}
return false;
}
private static void assertListeningOnPort(int port) {
System.out.printf("Checking port %d ...\n", port);

View File

@ -0,0 +1,29 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Customizes an app run for Dapr.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface DaprRunConfig {
boolean enableAppHealthCheck() default false;
}

View File

@ -16,6 +16,7 @@ package io.dapr.it;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import io.dapr.utils.NetworkUtils;
import java.io.IOException;
import java.time.Duration;
@ -53,6 +54,7 @@ public class ToxiProxyRun implements Stoppable {
public void start() throws IOException, InterruptedException {
this.toxiProxyServer.run();
NetworkUtils.waitForSocket("127.0.0.1", this.toxiProxyPorts.getAppPort(), 10000);
this.toxiproxyClient = new ToxiproxyClient("127.0.0.1", this.toxiProxyPorts.getAppPort());
if (this.daprRun.getGrpcPort() != null) {

View File

@ -35,19 +35,19 @@ public class MethodInvokeIT extends BaseIT {
@BeforeEach
public void init() throws Exception {
daprRun = startDaprApp(
MethodInvokeIT.class.getSimpleName(),
MethodInvokeIT.class.getSimpleName() + "grpc",
MethodInvokeService.SUCCESS_MESSAGE,
MethodInvokeService.class,
DaprApiProtocol.GRPC, // appProtocol
60000);
daprRun.switchToGRPC();
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
daprRun.waitForAppHealth(20000);
}
@Test
public void testInvoke() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
@ -94,6 +94,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeTimeout() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
long started = System.currentTimeMillis();
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
String message = assertThrows(IllegalStateException.class, () ->
@ -108,6 +109,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeException() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
SleepRequest req = SleepRequest.newBuilder().setSeconds(-9).build();
DaprException exception = assertThrows(DaprException.class, () ->
client.invokeMethod(daprRun.getAppName(), "sleep", req.toByteArray(), HttpExtension.POST).block());

View File

@ -15,6 +15,7 @@ package io.dapr.it.methodinvoke.grpc;
import com.google.protobuf.Any;
import io.dapr.grpc.GrpcHealthCheckService;
import io.dapr.it.DaprRunConfig;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
@ -35,8 +36,13 @@ import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
@DaprRunConfig(
enableAppHealthCheck = true
)
public class MethodInvokeService {
private static final long STARTUP_DELAY_SECONDS = 10;
public static final String SUCCESS_MESSAGE = "application discovered on port ";
/**
@ -94,6 +100,7 @@ public class MethodInvokeService {
@Override
public void onInvoke(CommonProtos.InvokeRequest request,
StreamObserver<CommonProtos.InvokeResponse> responseObserver) {
System.out.println("Server: received " + request.getMethod() + " ...");
try {
if ("postMessage".equals(request.getMethod())) {
PostMessageRequest req = PostMessageRequest.parseFrom(request.getData().getValue().toByteArray());
@ -163,8 +170,12 @@ public class MethodInvokeService {
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
System.out.printf("Service to start on port %d ...\n", port);
// The artificial delay is useful to detect bugs in app health, where the app is invoked too soon.
System.out.printf("Artificial delay of %d seconds ...\n", STARTUP_DELAY_SECONDS);
Thread.sleep(STARTUP_DELAY_SECONDS * 1000);
System.out.printf("Now starting ...\n", STARTUP_DELAY_SECONDS);
final MyDaprService service = new MyDaprService();
service.start(port);
service.awaitTermination();

View File

@ -35,14 +35,13 @@ public class MethodInvokeIT extends BaseIT {
@BeforeEach
public void init() throws Exception {
daprRun = startDaprApp(
MethodInvokeIT.class.getSimpleName(),
MethodInvokeIT.class.getSimpleName() + "http",
MethodInvokeService.SUCCESS_MESSAGE,
MethodInvokeService.class,
true,
30000);
daprRun.switchToHTTP();
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
daprRun.waitForAppHealth(20000);
}
@Test
@ -51,6 +50,7 @@ public class MethodInvokeIT extends BaseIT {
// At this point, it is guaranteed that the service above is running and all ports being listened to.
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i);
//Publishing messages
@ -76,6 +76,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeWithObjects() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
for (int i = 0; i < NUM_MESSAGES; i++) {
Person person = new Person();
person.setName(String.format("Name %d", i));
@ -111,6 +112,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeTimeout() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
long started = System.currentTimeMillis();
String message = assertThrows(IllegalStateException.class, () -> {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
@ -125,6 +127,7 @@ public class MethodInvokeIT extends BaseIT {
@Test
public void testInvokeException() throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
MethodInvokeServiceProtos.SleepRequest req = MethodInvokeServiceProtos.SleepRequest.newBuilder().setSeconds(-9).build();
DaprException exception = assertThrows(DaprException.class, () ->
client.invokeMethod(daprRun.getAppName(), "sleep", -9, HttpExtension.POST).block());

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.it.methodinvoke.http;
import io.dapr.it.DaprRunConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -20,15 +21,23 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
@DaprRunConfig(
enableAppHealthCheck = true
)
@SpringBootApplication
public class MethodInvokeService {
private static final long STARTUP_DELAY_SECONDS = 10;
public static final String SUCCESS_MESSAGE = "Completed initialization in";
public static void main(String[] args) {
public static void main(String[] args) throws InterruptedException {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
System.out.printf("Service to start on port %d ...\n", port);
System.out.printf("Artificial delay of %d seconds ...\n", STARTUP_DELAY_SECONDS);
Thread.sleep(STARTUP_DELAY_SECONDS * 1000);
System.out.printf("Now starting ...\n", STARTUP_DELAY_SECONDS);
// Start Dapr's callback endpoint.
start(port);

View File

@ -38,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class SdkResiliencytIT extends BaseIT {
private static final int NUM_ITERATIONS = 25;
private static final int NUM_ITERATIONS = 30;
private static final Duration TIMEOUT = Duration.ofMillis(100);
@ -139,6 +139,9 @@ public class SdkResiliencytIT extends BaseIT {
assertEquals(value, savedValue);
}
// Asserts that we had at least one success per client.
assertTrue(toxiClientErrorCount.get() < NUM_ITERATIONS);
assertTrue(retryOneClientErrorCount.get() < NUM_ITERATIONS);
// This assertion makes sure that toxicity is on
assertTrue(toxiClientErrorCount.get() > 0);
assertTrue(retryOneClientErrorCount.get() > 0);

View File

@ -13,8 +13,8 @@ limitations under the License.
package io.dapr.it.tracing;
import io.dapr.utils.NetworkUtils;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
@ -23,15 +23,7 @@ import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@ -46,30 +38,23 @@ public class OpenTelemetry {
* @param serviceName Name of the service in Zipkin
* @return OpenTelemetry.
*/
public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) {
// Only exports to Zipkin if it is up. Otherwise, ignore it.
// This is helpful to avoid exceptions for examples that do not require Zipkin.
if (isZipkinUp()) {
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
ZipkinSpanExporter zipkinExporter =
ZipkinSpanExporter.builder()
.setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
.setServiceName(serviceName)
.build();
public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) throws InterruptedException {
waitForZipkin();
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
ZipkinSpanExporter zipkinExporter =
ZipkinSpanExporter.builder()
.setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
.setServiceName(serviceName)
.build();
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
.build();
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
.build();
return OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
} else {
System.out.println("WARNING: Zipkin is not available.");
}
return null;
return OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
}
/**
@ -98,11 +83,7 @@ public class OpenTelemetry {
return reactorContext;
}
private static boolean isZipkinUp() {
try (Socket ignored = new Socket("localhost", ZIPKIN_PORT)) {
return true;
} catch (IOException ignored) {
return false;
}
private static void waitForZipkin() throws InterruptedException {
NetworkUtils.waitForSocket("127.0.0.1", ZIPKIN_PORT, 10000);
}
}

View File

@ -15,6 +15,7 @@ package io.dapr.it.tracing.grpc;
import com.google.protobuf.Any;
import io.dapr.grpc.GrpcHealthCheckService;
import io.dapr.it.DaprRunConfig;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
@ -26,6 +27,9 @@ import java.io.IOException;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
@DaprRunConfig(
enableAppHealthCheck = true
)
public class Service {
public static final String SUCCESS_MESSAGE = "application discovered on port ";

View File

@ -12,24 +12,14 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
public class TracingIT extends BaseIT {
@ -40,7 +30,7 @@ public class TracingIT extends BaseIT {
public void setup(boolean useGrpc) throws Exception {
daprRun = startDaprApp(
TracingIT.class.getSimpleName(),
TracingIT.class.getSimpleName() + "grpc",
Service.SUCCESS_MESSAGE,
Service.class,
DaprApiProtocol.GRPC, // appProtocol
@ -52,8 +42,7 @@ public class TracingIT extends BaseIT {
daprRun.switchToHTTP();
}
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
daprRun.waitForAppHealth(10000);
}
@ParameterizedTest
@ -68,6 +57,7 @@ public class TracingIT extends BaseIT {
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
try (Scope scope = span.makeCurrent()) {
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
@ -78,6 +68,6 @@ public class TracingIT extends BaseIT {
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate(spanName, "calllocal/tracingit-service/sleepovergrpc");
Validation.validate(spanName, "calllocal/tracingitgrpc-service/sleepovergrpc");
}
}

View File

@ -31,7 +31,7 @@ public class OpenTelemetryConfig {
public static final String SERVICE_NAME = "integration testing service over http";
@Bean
public OpenTelemetry initOpenTelemetry() {
public OpenTelemetry initOpenTelemetry() throws InterruptedException {
return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(SERVICE_NAME);
}

View File

@ -13,6 +13,7 @@ limitations under the License.
package io.dapr.it.tracing.http;
import io.dapr.it.DaprRunConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -20,6 +21,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
@DaprRunConfig(
enableAppHealthCheck = true
)
@SpringBootApplication
public class Service {

View File

@ -11,21 +11,13 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.Before;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
public class TracingIT extends BaseIT {
@ -36,7 +28,7 @@ public class TracingIT extends BaseIT {
public void setup(boolean useGrpc) throws Exception {
daprRun = startDaprApp(
TracingIT.class.getSimpleName(),
TracingIT.class.getSimpleName() + "http",
Service.SUCCESS_MESSAGE,
Service.class,
true,
@ -64,6 +56,7 @@ public class TracingIT extends BaseIT {
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
client.waitForSidecar(10000).block();
try (Scope scope = span.makeCurrent()) {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
.contextWrite(getReactorContext())
@ -73,7 +66,7 @@ public class TracingIT extends BaseIT {
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate(spanName, "calllocal/tracingit-service/sleep");
Validation.validate(spanName, "calllocal/tracingithttp-service/sleep");
}
}