Ensure OTEL version from examples aligns with SDK and ITs (#1403)

Signed-off-by: siri-varma <siri.varma@outlook.com>
This commit is contained in:
artur-ciocanu 2025-06-05 23:35:49 +03:00 committed by siri-varma
parent 0cbf38e1b0
commit 05e56ec5ad
7 changed files with 68 additions and 44 deletions

View File

@ -22,7 +22,12 @@
<maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target> <maven.compiler.target>${java.version}</maven.compiler.target>
<spotbugs.fail>false</spotbugs.fail> <spotbugs.fail>false</spotbugs.fail>
<<<<<<< HEAD
<opentelemetry.version>0.14.0</opentelemetry.version> <opentelemetry.version>0.14.0</opentelemetry.version>
=======
<opentelemetry.version>1.41.0</opentelemetry.version>
<zipkin.version>3.4.0</zipkin.version>
>>>>>>> 3a8fd611 (Ensure OTEL version from examples aligns with SDK and ITs (#1403))
</properties> </properties>
<dependencies> <dependencies>
@ -78,11 +83,29 @@
<version>${opentelemetry.version}</version> <version>${opentelemetry.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-metrics</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<<<<<<< HEAD
=======
<groupId>io.opentelemetry</groupId> <groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-zipkin</artifactId> <artifactId>opentelemetry-exporter-zipkin</artifactId>
<version>${opentelemetry.version}</version> <version>${opentelemetry.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-urlconnection</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
>>>>>>> 3a8fd611 (Ensure OTEL version from examples aligns with SDK and ITs (#1403))
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId> <artifactId>junit-jupiter</artifactId>
<scope>test</scope> <scope>test</scope>

View File

@ -20,9 +20,10 @@ import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -59,18 +60,23 @@ public class OpenTelemetryConfig {
* Creates an opentelemetry instance. * Creates an opentelemetry instance.
* @return OpenTelemetry. * @return OpenTelemetry.
*/ */
public static OpenTelemetry createOpenTelemetry() { public static OpenTelemetrySdk createOpenTelemetry() {
// Only exports to Zipkin if it is up. Otherwise, ignore it. // Only exports to Zipkin if it is up. Otherwise, ignore it.
// This is helpful to avoid exceptions for examples that do not require Zipkin. // This is helpful to avoid exceptions for examples that do not require Zipkin.
if (isZipkinUp()) { if (isZipkinUp()) {
Resource serviceResource = Resource.getDefault()
.toBuilder()
.put("service.name", InvokeClient.class.getName()) // Use ResourceAttributes constant
.build();
String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT); String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT);
ZipkinSpanExporter zipkinExporter = ZipkinSpanExporter zipkinExporter =
ZipkinSpanExporter.builder() ZipkinSpanExporter.builder()
.setEndpoint(httpUrl + ENDPOINT_V2_SPANS) .setEndpoint(httpUrl + ENDPOINT_V2_SPANS)
.setServiceName(InvokeClient.class.getName())
.build(); .build();
SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setResource(serviceResource)
.addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter)) .addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter))
.build(); .build();
@ -100,7 +106,7 @@ public class OpenTelemetryConfig {
*/ */
public static reactor.util.context.Context getReactorContext(Context context) { public static reactor.util.context.Context getReactorContext(Context context) {
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
TextMapPropagator.Setter<Map<String, String>> setter = TextMapSetter<Map<String, String>> setter =
(carrier, key, value) -> map.put(key, value); (carrier, key, value) -> map.put(key, value);
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter); GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter);

View File

@ -15,6 +15,7 @@ package io.dapr.examples;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapPropagator;
import jakarta.servlet.DispatcherType; import jakarta.servlet.DispatcherType;
import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletRequest;
@ -34,8 +35,8 @@ public class OpenTelemetryInterceptor implements HandlerInterceptor {
@Autowired @Autowired
private OpenTelemetry openTelemetry; private OpenTelemetry openTelemetry;
private static final TextMapPropagator.Getter<HttpServletRequest> HTTP_SERVLET_REQUEST_GETTER = private static final TextMapGetter<HttpServletRequest> HTTP_SERVLET_REQUEST_GETTER =
new TextMapPropagator.Getter<>() { new TextMapGetter<>() {
@Override @Override
public Iterable<String> keys(HttpServletRequest carrier) { public Iterable<String> keys(HttpServletRequest carrier) {
return Collections.list(carrier.getHeaderNames()); return Collections.list(carrier.getHeaderNames());

View File

@ -19,8 +19,8 @@ import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry; import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig; import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.OpenTelemetrySdk;
@ -55,28 +55,41 @@ public class BulkPublisher {
* @throws Exception any exception * @throws Exception any exception
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName()); Tracer tracer = openTelemetrySdk.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client; DaprClient c = (DaprClient) client;
c.waitForSidecar(10000); c.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) { try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client..."); System.out.println("Using preview client...");
List<String> messages = new ArrayList<>(); List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish"); System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i); String message = String.format("This is message #%d", i);
messages.add(message); messages.add(message);
System.out.println("Going to publish message : " + message); System.out.println("Going to publish message : " + message);
} }
BulkPublishResponse<?> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
BulkPublishResponse<?> res = client
.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages)
.contextWrite(getReactorContext()).block(); .contextWrite(getReactorContext()).block();
System.out.println("Published the set of messages in a single call to Dapr"); System.out.println("Published the set of messages in a single call to Dapr");
if (res != null) { if (res != null) {
if (res.getFailedEntries().size() > 0) { if (res.getFailedEntries().size() > 0) {
// Ideally this condition will not happen in examples // Ideally this condition will not happen in examples
System.out.println("Some events failed to be published"); System.out.println("Some events failed to be published");
for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) { for (BulkPublishResponseFailedEntry<?> entry : res.getFailedEntries()) {
System.out.println("EntryId : " + entry.getEntry().getEntryId() System.out.println("EntryId : " + entry.getEntry().getEntryId()
+ " Error message : " + entry.getErrorMessage()); + " Error message : " + entry.getErrorMessage());
@ -86,16 +99,11 @@ public class BulkPublisher {
throw new Exception("null response from dapr"); throw new Exception("null response from dapr");
} }
} }
// Close the span.
span.end(); span.end();
// Allow plenty of time for Dapr to export all relevant spans to the tracing infra.
Thread.sleep(10000); Thread.sleep(10000);
// Shutdown the OpenTelemetry tracer. openTelemetrySdk.getSdkTracerProvider().shutdown();
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
System.out.println("Done"); System.out.println("Done");
} }
} }
} }

View File

@ -16,8 +16,8 @@ package io.dapr.examples.pubsub;
import io.dapr.client.DaprClient; import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import io.dapr.examples.OpenTelemetryConfig; import io.dapr.examples.OpenTelemetryConfig;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.OpenTelemetrySdk;
@ -51,19 +51,21 @@ public class PublisherWithTracing {
* @throws Exception A startup Exception. * @throws Exception A startup Exception.
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(PublisherWithTracing.class.getCanonicalName()); Tracer tracer = openTelemetrySdk.getTracer(PublisherWithTracing.class.getCanonicalName());
Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan();
try (DaprClient client = new DaprClientBuilder().build()) { try (DaprClient client = new DaprClientBuilder().build()) {
try (Scope scope = span.makeCurrent()) { try (Scope scope = span.makeCurrent()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i); String message = String.format("This is message #%d", i);
// Publishing messages, notice the use of subscriberContext() for tracing. // Publishing messages, notice the use of subscriberContext() for tracing.
client.publishEvent( client.publishEvent(
PUBSUB_NAME, PUBSUB_NAME,
TOPIC_NAME, TOPIC_NAME,
message).contextWrite(getReactorContext()).block(); message).contextWrite(getReactorContext()).block();
System.out.println("Published message: " + message); System.out.println("Published message: " + message);
try { try {
@ -71,19 +73,14 @@ public class PublisherWithTracing {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; return;
} }
} }
} }
// Close the span.
span.end(); span.end();
openTelemetrySdk.getSdkTracerProvider().shutdown();
// Shutdown the OpenTelemetry tracer.
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done."); System.out.println("Done.");
} }
} }

View File

@ -19,8 +19,8 @@ import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.examples.OpenTelemetryConfig; import io.dapr.examples.OpenTelemetryConfig;
import io.dapr.utils.TypeRef; import io.dapr.utils.TypeRef;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope; import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.OpenTelemetrySdk;
@ -48,10 +48,10 @@ public class InvokeClient {
* @param args Messages to be sent as request for the invoke API. * @param args Messages to be sent as request for the invoke API.
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry();
final Tracer tracer = openTelemetry.getTracer(InvokeClient.class.getCanonicalName()); Tracer tracer = openTelemetrySdk.getTracer(InvokeClient.class.getCanonicalName());
Span span = tracer.spanBuilder("Example's Main").setSpanKind(SpanKind.CLIENT).startSpan();
Span span = tracer.spanBuilder("Example's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprClient client = (new DaprClientBuilder()).build()) { try (DaprClient client = (new DaprClientBuilder()).build()) {
for (String message : args) { for (String message : args) {
try (Scope scope = span.makeCurrent()) { try (Scope scope = span.makeCurrent()) {
@ -77,14 +77,5 @@ public class InvokeClient {
System.out.println("Done"); System.out.println("Done");
System.exit(0); System.exit(0);
} }
span.end();
shutdown();
System.out.println("Done");
} }
private static void shutdown() throws Exception {
OpenTelemetrySdk.getGlobalTracerManagement().shutdown();
Validation.validate();
}
} }

View File

@ -1346,11 +1346,9 @@ public class DaprClientImpl extends AbstractDaprClient {
} }
if (scheduleJobRequest.getFailurePolicy() != null) { if (scheduleJobRequest.getFailurePolicy() != null) {
scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy())); jobBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy()));
} }
scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite());
Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono = Mono<DaprProtos.ScheduleJobResponse> scheduleJobResponseMono =
Mono.deferContextual(context -> this.createMono( Mono.deferContextual(context -> this.createMono(
it -> intercept(context, asyncStub) it -> intercept(context, asyncStub)