w3c tracing integration test (#522)

* w3c tracing integration test

* Addresses PR comments.
This commit is contained in:
Artur Souza 2021-03-23 14:30:34 -07:00 committed by GitHub
parent 9662265426
commit 589a352450
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 686 additions and 0 deletions

View File

@ -30,6 +30,7 @@
<protobuf.input.directory>${project.basedir}/proto</protobuf.input.directory>
<grpc.version>1.33.1</grpc.version>
<protobuf.version>3.13.0</protobuf.version>
<opentelemetry.version>0.14.0</opentelemetry.version>
</properties>
<dependencies>
@ -63,6 +64,21 @@
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.10.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-logging</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.dapr</groupId>
<artifactId>dapr-sdk</artifactId>
@ -105,6 +121,12 @@
<version>2.3.5.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
<!-- This is need for us to programmatically add secrets in integration tests. -->
<groupId>com.bettercloud</groupId>

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class OpenTelemetry {
private static final int ZIPKIN_PORT = 9411;
private static final String ENDPOINT_V2_SPANS = "/api/v2/spans";
/**
* Creates an opentelemetry instance.
* @param serviceName Name of the service in Zipkin
* @return OpenTelemetry.
*/
public static io.opentelemetry.api.OpenTelemetry createOpenTelemetry(String serviceName) {
// Only exports to Zipkin if it is up. Otherwise, ignore it.
// This is helpful to avoid exceptions for examples that do not require Zipkin.
if (isZipkinUp()) {
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
ZipkinSpanExporter zipkinExporter =
ZipkinSpanExporter.builder()
.setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
.setServiceName(serviceName)
.build();
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
.build();
return OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
} else {
System.out.println("WARNING: Zipkin is not available.");
}
return null;
}
/**
* Converts current OpenTelemetry's context into Reactor's context.
* @return Reactor's context.
*/
public static reactor.util.context.Context getReactorContext() {
return getReactorContext(Context.current());
}
/**
* Converts given OpenTelemetry's context into Reactor's context.
* @param context OpenTelemetry's context.
* @return Reactor's context.
*/
public static reactor.util.context.Context getReactorContext(Context context) {
Map<String, String> map = new HashMap<>();
TextMapPropagator.Setter<Map<String, String>> setter =
(carrier, key, value) -> map.put(key, value);
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter);
reactor.util.context.Context reactorContext = reactor.util.context.Context.empty();
for (Map.Entry<String, String> entry : map.entrySet()) {
reactorContext = reactorContext.put(entry.getKey(), entry.getValue());
}
return reactorContext;
}
private static boolean isZipkinUp() {
try (Socket ignored = new Socket("localhost", ZIPKIN_PORT)) {
return true;
} catch (IOException ignored) {
return false;
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import net.minidev.json.JSONArray;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Class used to verify that traces are present as expected.
*
* Checks for the main span and then checks for its child span for `sleep` API call.
*/
public final class Validation {
private static final OkHttpClient HTTP_CLIENT = new OkHttpClient();
/**
* JSON Path for main span Id.
*/
public static final String JSONPATH_MAIN_SPAN_ID = "$..[?(@.name == \"%s\")]['id']";
/**
* JSON Path for child span Id where duration is greater than 1s.
*/
public static final String JSONPATH_SLEEP_SPAN_ID =
"$..[?(@.parentId=='%s' && @.duration > 1000000 && @.name=='%s')]['id']";
public static void validate(String spanName, String sleepSpanName) throws Exception {
// Must wait for some time to make sure Zipkin receives all spans.
Thread.sleep(5000);
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme("http")
.host("localhost")
.port(9411)
.addPathSegments("api/v2/traces");
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build());
requestBuilder.method("GET", null);
Request request = requestBuilder.build();
Response response = HTTP_CLIENT.newCall(request).execute();
DocumentContext documentContext = JsonPath.parse(response.body().string());
String mainSpanId = readOne(documentContext, String.format(JSONPATH_MAIN_SPAN_ID, spanName)).toString();
assertNotNull(mainSpanId);
String sleepSpanId = readOne(documentContext, String.format(JSONPATH_SLEEP_SPAN_ID, mainSpanId, sleepSpanName))
.toString();
assertNotNull(sleepSpanId);
}
private static Object readOne(DocumentContext documentContext, String path) {
JSONArray arr = documentContext.read(path);
assertTrue(arr.size() > 0);
return arr.get(0);
}
}

View File

@ -0,0 +1,133 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing.grpc;
import com.google.protobuf.Any;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageRequest;
import static io.dapr.it.MethodInvokeServiceProtos.DeleteMessageResponse;
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesRequest;
import static io.dapr.it.MethodInvokeServiceProtos.GetMessagesResponse;
import static io.dapr.it.MethodInvokeServiceProtos.PostMessageRequest;
import static io.dapr.it.MethodInvokeServiceProtos.PostMessageResponse;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.MethodInvokeServiceProtos.SleepResponse;
public class Service {
public static final String SUCCESS_MESSAGE = "application discovered on port ";
/**
* Server mode: class that encapsulates all server-side logic for Grpc.
*/
private static class MyDaprService extends AppCallbackGrpc.AppCallbackImplBase {
/**
* Server mode: Grpc server.
*/
private Server server;
/**
* Server mode: starts listening on given port.
*
* @param port Port to listen on.
* @throws IOException Errors while trying to start service.
*/
private void start(int port) throws IOException {
this.server = ServerBuilder
.forPort(port)
.addService(this)
.build()
.start();
System.out.printf("Server: started listening on port %d\n", port);
// Now we handle ctrl+c (or any other JVM shutdown)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Server: shutting down gracefully ...");
MyDaprService.this.server.shutdown();
System.out.println("Server: Bye.");
}));
}
/**
* Server mode: waits for shutdown trigger.
*
* @throws InterruptedException Propagated interrupted exception.
*/
private void awaitTermination() throws InterruptedException {
if (this.server != null) {
this.server.awaitTermination();
}
}
/**
* Server mode: this is the Dapr method to receive Invoke operations via Grpc.
*
* @param request Dapr envelope request,
* @param responseObserver Dapr envelope response.
*/
@Override
public void onInvoke(CommonProtos.InvokeRequest request,
StreamObserver<CommonProtos.InvokeResponse> responseObserver) {
try {
if ("sleepOverGRPC".equals(request.getMethod())) {
SleepRequest req = SleepRequest.parseFrom(request.getData().getValue().toByteArray());
SleepResponse res = this.sleep(req);
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
responseBuilder.setData(Any.pack(res));
responseObserver.onNext(responseBuilder.build());
}
} catch (Exception e) {
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}
}
public SleepResponse sleep(SleepRequest request) {
if (request.getSeconds() < 0) {
throw new IllegalArgumentException("Sleep time cannot be negative.");
}
try {
Thread.sleep(request.getSeconds() * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
new RuntimeException(e);
}
// Now respond with current timestamp.
return SleepResponse.newBuilder().build();
}
}
/**
* This is the main method of this app.
* @param args The port to listen on.
* @throws Exception An Exception.
*/
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
final MyDaprService service = new MyDaprService();
service.start(port);
service.awaitTermination();
}
}

View File

@ -0,0 +1,91 @@
package io.dapr.it.tracing.grpc;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.tracing.Validation;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.MethodInvokeServiceProtos.SleepRequest;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TracingIT extends BaseIT {
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
/**
* Run of a Dapr application.
*/
private DaprRun daprRun = null;
@Parameter
public boolean useGrpc;
@Before
public void init() throws Exception {
daprRun = startDaprApp(
TracingIT.class.getSimpleName(),
Service.SUCCESS_MESSAGE,
Service.class,
DaprApiProtocol.GRPC, // appProtocol
60000);
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
}
@Test
public void testInvoke() throws Exception {
final OpenTelemetry openTelemetry = createOpenTelemetry("service over grpc");
final Tracer tracer = openTelemetry.getTracer("grpc integration test tracer");
final String spanName = UUID.randomUUID().toString();
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) {
SleepRequest req = SleepRequest.newBuilder().setSeconds(1).build();
client.invokeMethod(daprRun.getAppName(), "sleepOverGRPC", req.toByteArray(), HttpExtension.POST)
.subscriberContext(getReactorContext())
.block();
}
}
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate(spanName, "calllocal/tracingit_service/sleepovergrpc");
}
}

View File

@ -0,0 +1,20 @@
package io.dapr.it.tracing.http;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* SpringBoot Controller to handle input binding.
*/
@RestController
public class Controller {
@PostMapping(path = "/sleep")
public void sleep(@RequestBody int seconds) throws InterruptedException {
if (seconds < 0) {
throw new IllegalArgumentException("Sleep time cannot be negative.");
}
Thread.sleep(seconds * 1000);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing.http;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class OpenTelemetryConfig {
public static final String TRACER_NAME = "integration testing tracer";
public static final String SERVICE_NAME = "integration testing service over http";
@Bean
public OpenTelemetry initOpenTelemetry() {
return io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry(SERVICE_NAME);
}
@Bean
public Tracer initTracer(@Autowired OpenTelemetry openTelemetry) {
return openTelemetry.getTracer(TRACER_NAME);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing.http;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.DispatcherType;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Collections;
@Component
public class OpenTelemetryInterceptor implements HandlerInterceptor {
@Autowired
private OpenTelemetry openTelemetry;
private static final TextMapPropagator.Getter<HttpServletRequest> HTTP_SERVLET_REQUEST_GETTER =
new TextMapPropagator.Getter<>() {
@Override
public Iterable<String> keys(HttpServletRequest carrier) {
return Collections.list(carrier.getHeaderNames());
}
@Nullable
@Override
public String get(@Nullable HttpServletRequest carrier, String key) {
return carrier.getHeader(key);
}
};
@Override
public boolean preHandle(
HttpServletRequest request, HttpServletResponse response, Object handler) {
final TextMapPropagator textFormat = openTelemetry.getPropagators().getTextMapPropagator();
// preHandle is called twice for asynchronous request. For more information, read:
// https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/servlet/AsyncHandlerInterceptor.html
if (request.getDispatcherType() == DispatcherType.ASYNC) {
return true;
}
Context context = textFormat.extract(Context.current(), request, HTTP_SERVLET_REQUEST_GETTER);
request.setAttribute("opentelemetry-context", context);
return true;
}
@Override
public void postHandle(
HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) {
// There is no global context to be changed in post handle since it is done in preHandle on a new call.
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing.http;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
@Component
public class OpenTelemetryInterceptorConfig extends WebMvcConfigurationSupport {
@Autowired
OpenTelemetryInterceptor interceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(interceptor);
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright (c) Microsoft Corporation and Dapr Contributors.
* Licensed under the MIT License.
*/
package io.dapr.it.tracing.http;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for subscriber.
*/
@SpringBootApplication
public class Service {
public static final String SUCCESS_MESSAGE = "Completed initialization in";
public static void main(String[] args) {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
// Start Dapr's callback endpoint.
start(port);
}
/**
* Starts Dapr's callback in a given port.
*
* @param port Port to listen to.
*/
private static void start(int port) {
SpringApplication app = new SpringApplication(Service.class);
app.run(String.format("--server.port=%d", port));
}
}

View File

@ -0,0 +1,89 @@
package io.dapr.it.tracing.http;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import io.dapr.it.tracing.Validation;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import static io.dapr.it.tracing.OpenTelemetry.createOpenTelemetry;
import static io.dapr.it.tracing.OpenTelemetry.getReactorContext;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TracingIT extends BaseIT {
/**
* Parameters for this test.
* Param #1: useGrpc.
* @return Collection of parameter tuples.
*/
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] { { false }, { true } });
}
/**
* Run of a Dapr application.
*/
private DaprRun daprRun = null;
@Parameter
public boolean useGrpc;
@Before
public void init() throws Exception {
daprRun = startDaprApp(
TracingIT.class.getSimpleName(),
Service.SUCCESS_MESSAGE,
Service.class,
true,
30000);
if (this.useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}
// Wait since service might be ready even after port is available.
Thread.sleep(2000);
}
@Test
public void testInvoke() throws Exception {
final OpenTelemetry openTelemetry = createOpenTelemetry(OpenTelemetryConfig.SERVICE_NAME);
final Tracer tracer = openTelemetry.getTracer(OpenTelemetryConfig.TRACER_NAME);
final String spanName = UUID.randomUUID().toString();
Span span = tracer.spanBuilder(spanName).setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) {
client.invokeMethod(daprRun.getAppName(), "sleep", 1, HttpExtension.POST)
.subscriberContext(getReactorContext())
.block();
}
}
span.end();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate(spanName, "calllocal/tracingit_service/sleep");
}
}