Switch Jaeger remote sampler to use grpc lite (#4043)

* Switch Jaeger remote sampler to gRPC lite

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Abstract impl

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Service done

* Working example

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Handle gzip compression

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Clean dependencies

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Remove dependency on proto

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* remove stdout

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix logger

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* jcmpAPI failing

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Switch logger

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Fix

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Support grpc netty too

* NPE test

* update after merge main

* Switch Jaeger remote sampler to gRPC lite

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* some tests

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* some tests

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* tls test

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* tls for default

Signed-off-by: Pavol Loffay <p.loffay@gmail.com>

* Workaround logger reference issue

* Use a better logger workaround

* Wait for two polls since first poll may not have been processed by client in time for assertion.

* Don't depend on poll count for assertions.

* shutdownNow

* Cleanups

* Update logunit

* Don't stop at body read failure

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
This commit is contained in:
Pavol Loffay 2022-01-19 04:26:47 +01:00 committed by GitHub
parent 28405b4f9c
commit 5d521f5c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 2363 additions and 229 deletions

View File

@ -80,7 +80,7 @@ val DEPENDENCIES = listOf(
// TODO(anuraaga): Skip 1.8 because of https://github.com/rohanpadhye/JQF/issues/172
"edu.berkeley.cs.jqf:jqf-fuzz:1.7",
"eu.rekawek.toxiproxy:toxiproxy-java:2.1.5",
"io.github.netmikey.logunit:logunit-jul:1.1.2",
"io.github.netmikey.logunit:logunit-jul:1.1.3",
"io.jaegertracing:jaeger-client:1.7.0",
"io.opentelemetry.proto:opentelemetry-proto:0.11.0-alpha",
"io.opentracing:opentracing-api:0.33.0",

View File

@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSamplerBuilder setTrustedCertificates(byte[])

View File

@ -99,7 +99,7 @@ public final class CodedInputStream {
if (size <= 0) {
throw newNegativeException();
}
throw newTrucatedException();
throw newTruncatedException();
}
/** Skips a field. */
@ -126,7 +126,8 @@ public final class CodedInputStream {
return pos == limit;
}
private int readRawVarint32() throws IOException {
/** Read varint32. */
public int readRawVarint32() throws IOException {
// See implementation notes for readRawVarint64
fastpath:
{
@ -182,7 +183,7 @@ public final class CodedInputStream {
private byte readRawByte() throws IOException {
if (pos == limit) {
throw newTrucatedException();
throw newTruncatedException();
}
return buffer[pos++];
}
@ -195,6 +196,29 @@ public final class CodedInputStream {
}
}
public double readDouble() throws IOException {
return Double.longBitsToDouble(readRawLittleEndian64());
}
private long readRawLittleEndian64() throws IOException {
int tempPos = pos;
if (limit - tempPos < FIXED64_SIZE) {
throw newTruncatedException();
}
final byte[] buffer = this.buffer;
pos = tempPos + FIXED64_SIZE;
return ((buffer[tempPos] & 0xffL)
| ((buffer[tempPos + 1] & 0xffL) << 8)
| ((buffer[tempPos + 2] & 0xffL) << 16)
| ((buffer[tempPos + 3] & 0xffL) << 24)
| ((buffer[tempPos + 4] & 0xffL) << 32)
| ((buffer[tempPos + 5] & 0xffL) << 40)
| ((buffer[tempPos + 6] & 0xffL) << 48)
| ((buffer[tempPos + 7] & 0xffL) << 56));
}
private void skipRawVarintFastPath() throws IOException {
for (int i = 0; i < MAX_VARINT_SIZE; i++) {
if (buffer[pos++] >= 0) {
@ -223,7 +247,7 @@ public final class CodedInputStream {
if (length < 0) {
throw newNegativeException();
}
throw newTrucatedException();
throw newTruncatedException();
}
private static IOException newNegativeException() {
@ -232,7 +256,7 @@ public final class CodedInputStream {
+ "which claimed to have negative size.");
}
private static IOException newTrucatedException() {
private static IOException newTruncatedException() {
return new IOException(
"While parsing a protocol message, the input ended unexpectedly "
+ "in the middle of a field. This could mean either that the "

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Utilities for exporter builders.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class ExporterBuilderUtil {
/** Validate OTLP endpoint. */
public static URI validateEndpoint(String endpoint) {
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}
if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}
return uri;
}
private ExporterBuilderUtil() {}
}

View File

@ -14,11 +14,11 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -81,20 +81,7 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
@Override
public DefaultGrpcExporterBuilder<T> setEndpoint(String endpoint) {
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}
if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}
this.endpoint = uri;
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}

View File

@ -21,7 +21,7 @@ import okio.Okio;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
final class GrpcRequestBody extends RequestBody {
public final class GrpcRequestBody extends RequestBody {
private static final int HEADER_LENGTH = 5;
@ -36,7 +36,7 @@ final class GrpcRequestBody extends RequestBody {
private final boolean compressed;
/** Creates a new {@link GrpcRequestBody}. */
GrpcRequestBody(Marshaler marshaler, boolean compressed) {
public GrpcRequestBody(Marshaler marshaler, boolean compressed) {
this.marshaler = marshaler;
this.compressed = compressed;

View File

@ -212,7 +212,8 @@ public final class OkHttpGrpcExporter<T extends Marshaler> implements GrpcExport
return CompletableResultCode.ofSuccess();
}
static boolean isRetryable(Response response) {
/** Whether response is retriable or not. */
public static boolean isRetryable(Response response) {
// Only retry on gRPC codes which will always come with an HTTP success
if (!response.isSuccessful()) {
return false;

View File

@ -7,13 +7,13 @@ package io.opentelemetry.exporter.otlp.internal.grpc;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@ -73,20 +73,7 @@ public final class OkHttpGrpcExporterBuilder<T extends Marshaler>
@Override
public OkHttpGrpcExporterBuilder<T> setEndpoint(String endpoint) {
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}
if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}
this.endpoint = uri;
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}

View File

@ -6,13 +6,13 @@
package io.opentelemetry.exporter.otlp.internal.okhttp;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@ -58,20 +58,8 @@ public final class OkHttpExporterBuilder<T extends Marshaler> {
}
public OkHttpExporterBuilder<T> setEndpoint(String endpoint) {
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
}
if (uri.getScheme() == null
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
throw new IllegalArgumentException(
"Invalid endpoint, must start with http:// or https://: " + uri);
}
this.endpoint = endpoint;
URI uri = ExporterBuilderUtil.validateEndpoint(endpoint);
this.endpoint = uri.toString();
return this;
}

View File

@ -10,13 +10,13 @@ The sampler configuration is received from collector's gRPC endpoint.
The following example shows initialization and installation of the sampler:
```java
Builder remoteSamplerBuilder = JaegerRemoteSampler.builder()
.setChannel(grpcChannel)
.setServiceName("my-service");
TraceConfig traceConfig = provider.getActiveTraceConfig()
.toBuilder().setSampler(remoteSamplerBuilder.build())
JaegerRemoteSampler sampler = JaegerRemoteSampler.builder()
.setServiceName("my-service")
.build();
return SdkTracerProvider.builder()
...
.setSampler(sampler)
.build();
provider.updateActiveTraceConfig(traceConfig);
```
[javadoc-image]: https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-sdk-extension-jaeger-remote-sampler.svg

View File

@ -3,6 +3,8 @@ plugins {
id("otel.publish-conventions")
id("otel.animalsniffer-conventions")
id("com.squareup.wire")
}
description = "OpenTelemetry - Jaeger Remote sampler"
@ -13,16 +15,56 @@ dependencies {
compileOnly(project(":sdk-extensions:autoconfigure"))
implementation(project(":sdk:all"))
implementation("io.grpc:grpc-api")
implementation("io.grpc:grpc-protobuf")
implementation("io.grpc:grpc-stub")
implementation("com.google.protobuf:protobuf-java")
// TODO(anuraaga): otlp-common has a lot of code not specific to OTLP now. As it's just internal
// code, this mysterious dependency is possibly still OK but we may need a rename or splitting.
implementation(project(":exporters:otlp:common"))
implementation("com.squareup.okhttp3:okhttp")
compileOnly("io.grpc:grpc-api")
compileOnly("io.grpc:grpc-protobuf")
compileOnly("io.grpc:grpc-stub")
testImplementation(project(":sdk:testing"))
testImplementation(project(":sdk-extensions:autoconfigure"))
testImplementation("io.grpc:grpc-testing")
testImplementation("com.google.protobuf:protobuf-java-util")
testImplementation("com.linecorp.armeria:armeria-junit5")
testImplementation("com.linecorp.armeria:armeria-grpc-protocol")
testImplementation("org.testcontainers:junit-jupiter")
testRuntimeOnly("io.grpc:grpc-netty-shaded")
}
testing {
suites {
val testGrpcNetty by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":sdk:testing"))
implementation("com.google.protobuf:protobuf-java-util")
implementation("com.linecorp.armeria:armeria-junit5")
implementation("com.linecorp.armeria:armeria-grpc-protocol")
implementation("org.testcontainers:junit-jupiter")
implementation("io.grpc:grpc-netty")
implementation("io.grpc:grpc-stub")
implementation(project(":exporters:otlp:common"))
}
}
}
}
tasks {
check {
dependsOn(testing.suites)
}
}
wire {
custom {
customHandlerClass = "io.opentelemetry.gradle.ProtoFieldsWireHandler"
}
}
sourceSets {
main {
java.srcDir("$buildDir/generated/source/wire")
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import com.google.common.util.concurrent.Futures;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
final class DefaultGrpcService<ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler>
implements GrpcService<ReqMarshalerT, ResUnMarshalerT> {
private static final Logger logger = Logger.getLogger(DefaultGrpcService.class.getName());
private final String type;
private final ManagedChannel managedChannel;
private final MarshalerServiceStub<ReqMarshalerT, ResUnMarshalerT, ?> stub;
private final long timeoutNanos;
/** Creates a new {@link DefaultGrpcService}. */
DefaultGrpcService(
String type,
ManagedChannel channel,
MarshalerServiceStub<ReqMarshalerT, ResUnMarshalerT, ?> stub,
long timeoutNanos) {
this.type = type;
this.managedChannel = channel;
this.timeoutNanos = timeoutNanos;
this.stub = stub;
}
@Override
public ResUnMarshalerT execute(
ReqMarshalerT exportRequest, ResUnMarshalerT responseUnmarshaller) {
MarshalerServiceStub<ReqMarshalerT, ResUnMarshalerT, ?> stub = this.stub;
if (timeoutNanos > 0) {
stub = stub.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
}
try {
return Futures.getUnchecked(stub.export(exportRequest));
} catch (Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode().equals(Status.Code.UNIMPLEMENTED)) {
logger.log(
Level.SEVERE,
"Failed to execute "
+ type
+ "s. Server responded with UNIMPLEMENTED. "
+ "Full error message: "
+ status.getDescription());
} else if (status.getCode().equals(Status.Code.UNAVAILABLE)) {
logger.log(
Level.SEVERE,
"Failed to execute "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your service is running and reachable from this network. "
+ "Full error message:"
+ status.getDescription());
} else {
logger.log(
Level.WARNING,
"Failed to execute "
+ type
+ "s. Server responded with gRPC status code "
+ status.getCode().value()
+ ". Error message: "
+ status.getDescription());
}
}
return responseUnmarshaller;
}
@Override
public CompletableResultCode shutdown() {
if (managedChannel.isTerminated()) {
return CompletableResultCode.ofSuccess();
}
return ManagedChannelUtil.shutdownChannel(managedChannel);
}
}

View File

@ -0,0 +1,167 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil.toServiceConfig;
import static java.util.Objects.requireNonNull;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.otlp.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
final class DefaultGrpcServiceBuilder<ReqT extends Marshaler, ResT extends UnMarshaler>
implements GrpcServiceBuilder<ReqT, ResT> {
private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<ReqT, ResT, ?>> stubFactory;
private final String grpcServiceName;
@Nullable private ManagedChannel channel;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;
/** Creates a new {@link OkHttpGrpcExporterBuilder}. */
// Visible for testing
DefaultGrpcServiceBuilder(
String type,
Function<ManagedChannel, MarshalerServiceStub<ReqT, ResT, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint,
String grpcServiceName) {
this.type = type;
this.stubFactory = stubFactory;
this.grpcServiceName = grpcServiceName;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setChannel(ManagedChannel channel) {
requireNonNull(channel, "channel");
this.channel = channel;
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
timeoutNanos = unit.toNanos(timeout);
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
checkArgument(!timeout.isNegative(), "timeout must be non-negative");
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint");
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
this.compressionEnabled = true;
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> setTrustedCertificates(
byte[] trustedCertificatesPem) {
requireNonNull(trustedCertificatesPem, "trustedCertificatesPem");
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> addHeader(String key, String value) {
requireNonNull(key, "key");
requireNonNull(value, "value");
if (metadata == null) {
metadata = new Metadata();
}
metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value);
return this;
}
@Override
public DefaultGrpcServiceBuilder<ReqT, ResT> addRetryPolicy(RetryPolicy retryPolicy) {
requireNonNull(retryPolicy, "retryPolicy");
this.retryPolicy = retryPolicy;
return this;
}
@Override
public GrpcService<ReqT, ResT> build() {
ManagedChannel channel = this.channel;
if (channel == null) {
ManagedChannelBuilder<?> managedChannelBuilder =
ManagedChannelBuilder.forTarget(endpoint.getAuthority());
if (endpoint.getScheme().equals("https")) {
managedChannelBuilder.useTransportSecurity();
} else {
managedChannelBuilder.usePlaintext();
}
if (metadata != null) {
managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
if (trustedCertificatesPem != null) {
try {
ManagedChannelUtil.setTrustedCertificatesPem(
managedChannelBuilder, trustedCertificatesPem);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates for gRPC TLS connection, are they valid "
+ "X.509 in PEM format?",
e);
}
}
if (retryPolicy != null) {
managedChannelBuilder.defaultServiceConfig(toServiceConfig(grpcServiceName, retryPolicy));
}
channel = managedChannelBuilder.build();
}
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<ReqT, ResT, ?> stub =
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
return new DefaultGrpcService<>(type, channel, stub, timeoutNanos);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;
interface GrpcService<ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler> {
/** Returns a new {@link GrpcExporterBuilder}. */
static <ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler>
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> builder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<
Function<ManagedChannel, MarshalerServiceStub<ReqMarshalerT, ResUnMarshalerT, ?>>>
stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
return GrpcServiceUtil.serviceBuilder(
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcServiceName, grpcEndpointPath);
}
/**
* Exports the {@code exportRequest} which is a request {@link Marshaler} for {@code numItems}
* items.
*/
ResUnMarshalerT execute(ReqMarshalerT request, ResUnMarshalerT response);
/** Shuts the exporter down. */
CompletableResultCode shutdown();
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
interface GrpcServiceBuilder<ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler> {
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setChannel(ManagedChannel channel);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTimeout(long timeout, TimeUnit unit);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTimeout(Duration timeout);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setEndpoint(String endpoint);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setCompression(String compressionMethod);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTrustedCertificates(
byte[] trustedCertificatesPem);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> addHeader(String key, String value);
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> addRetryPolicy(RetryPolicy retryPolicy);
GrpcService<ReqMarshalerT, ResUnMarshalerT> build();
}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;
final class GrpcServiceUtil {
private static final boolean USE_OKHTTP;
static {
boolean useOkhttp = true;
// Use the OkHttp exporter unless grpc-stub is on the classpath.
try {
Class.forName("io.grpc.stub.AbstractStub");
useOkhttp = false;
} catch (ClassNotFoundException e) {
// Fall through
}
USE_OKHTTP = useOkhttp;
}
static <ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler>
GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> serviceBuilder(
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<
Function<ManagedChannel, MarshalerServiceStub<ReqMarshalerT, ResUnMarshalerT, ?>>>
stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
if (USE_OKHTTP) {
return new OkHttpGrpcServiceBuilder<>(
type, grpcEndpointPath, defaultTimeoutSecs, defaultEndpoint);
} else {
return new DefaultGrpcServiceBuilder<>(
type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint, grpcServiceName);
}
}
private GrpcServiceUtil() {}
}

View File

@ -5,15 +5,9 @@
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.PerOperationSamplingStrategies;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyParameters;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyResponse;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.SamplingManagerGrpc;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.SamplingManagerGrpc.SamplingManagerBlockingStub;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
@ -36,25 +30,24 @@ public final class JaegerRemoteSampler implements Sampler, Closeable {
JaegerRemoteSampler.class.getSimpleName() + "_WorkerThread";
private final String serviceName;
private final ManagedChannel channel;
private final SamplingManagerBlockingStub stub;
private final boolean closeChannel;
private final ScheduledExecutorService pollExecutor;
private final ScheduledFuture<?> pollFuture;
private volatile Sampler sampler;
private final GrpcService<
SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
delegate;
JaegerRemoteSampler(
GrpcService<SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
delegate,
@Nullable String serviceName,
ManagedChannel channel,
int pollingIntervalMs,
Sampler initialSampler,
boolean closeChannel) {
this.channel = channel;
this.closeChannel = closeChannel;
Sampler initialSampler) {
this.serviceName = serviceName != null ? serviceName : "";
this.stub = SamplingManagerGrpc.newBlockingStub(channel);
this.delegate = delegate;
this.sampler = initialSampler;
pollExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(WORKER_THREAD_NAME));
pollFuture =
@ -75,31 +68,35 @@ public final class JaegerRemoteSampler implements Sampler, Closeable {
private void getAndUpdateSampler() {
try {
SamplingStrategyParameters params =
SamplingStrategyParameters.newBuilder().setServiceName(this.serviceName).build();
SamplingStrategyResponse response = stub.getSamplingStrategy(params);
this.sampler = updateSampler(response);
} catch (RuntimeException e) { // keep the timer thread alive
SamplingStrategyResponseUnMarshaler samplingStrategyResponseUnMarshaler =
delegate.execute(
SamplingStrategyParametersMarshaler.create(this.serviceName),
new SamplingStrategyResponseUnMarshaler());
SamplingStrategyResponse response = samplingStrategyResponseUnMarshaler.get();
if (response != null) {
this.sampler = updateSampler(response);
}
} catch (Throwable e) { // keep the timer thread alive
logger.log(Level.WARNING, "Failed to update sampler", e);
}
}
private static Sampler updateSampler(SamplingStrategyResponse response) {
PerOperationSamplingStrategies operationSampling = response.getOperationSampling();
if (operationSampling.getPerOperationStrategiesList().size() > 0) {
SamplingStrategyResponse.PerOperationSamplingStrategies operationSampling =
response.perOperationSamplingStrategies;
if (operationSampling.strategies.size() > 0) {
Sampler defaultSampler =
Sampler.traceIdRatioBased(operationSampling.getDefaultSamplingProbability());
Sampler.traceIdRatioBased(operationSampling.defaultSamplingProbability);
return Sampler.parentBased(
new PerOperationSampler(
defaultSampler, operationSampling.getPerOperationStrategiesList()));
new PerOperationSampler(defaultSampler, operationSampling.strategies));
}
switch (response.getStrategyType()) {
switch (response.strategyType) {
case PROBABILISTIC:
return Sampler.parentBased(
Sampler.traceIdRatioBased(response.getProbabilisticSampling().getSamplingRate()));
Sampler.traceIdRatioBased(response.probabilisticSamplingStrategy.samplingRate));
case RATE_LIMITING:
return Sampler.parentBased(
new RateLimitingSampler(response.getRateLimitingSampling().getMaxTracesPerSecond()));
new RateLimitingSampler(response.rateLimitingSamplingStrategy.maxTracesPerSecond));
case UNRECOGNIZED:
throw new AssertionError("unrecognized sampler type");
}
@ -128,9 +125,7 @@ public final class JaegerRemoteSampler implements Sampler, Closeable {
@Override
public void close() {
pollFuture.cancel(true);
pollExecutor.shutdown();
if (closeChannel) {
channel.shutdown();
}
pollExecutor.shutdownNow();
delegate.shutdown();
}
}

View File

@ -8,26 +8,34 @@ package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static java.util.Objects.requireNonNull;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.api.internal.Utils;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/** A builder for {@link JaegerRemoteSampler}. */
public final class JaegerRemoteSamplerBuilder {
private static final String DEFAULT_ENDPOINT = "localhost:14250";
private static final String GRPC_SERVICE_NAME = "jaeger.api_v2.SamplingManager";
// Visible for testing
static final String GRPC_ENDPOINT_PATH = "/" + GRPC_SERVICE_NAME + "/GetSamplingStrategy";
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:14250";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
private static final int DEFAULT_POLLING_INTERVAL_MILLIS = 60000;
private static final Sampler INITIAL_SAMPLER =
Sampler.parentBased(Sampler.traceIdRatioBased(0.001));
private String endpoint = DEFAULT_ENDPOINT;
@Nullable private ManagedChannel channel;
@Nullable private String serviceName;
private Sampler initialSampler = INITIAL_SAMPLER;
private int pollingIntervalMillis = DEFAULT_POLLING_INTERVAL_MILLIS;
private boolean closeChannel = true;
private static final long DEFAULT_TIMEOUT_SECS = 10;
private final GrpcServiceBuilder<
SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
delegate;
/**
* Sets the service name to be used by this exporter. Required.
@ -41,24 +49,19 @@ public final class JaegerRemoteSamplerBuilder {
return this;
}
/** Sets the Jaeger endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT}. */
/**
* Sets the Jaeger endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}.
*/
public JaegerRemoteSamplerBuilder setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint");
this.endpoint = endpoint;
delegate.setEndpoint(endpoint);
return this;
}
/**
* Sets the managed channel to use when communicating with the backend. Takes precedence over
* {@link #setEndpoint(String)} if both are called.
*
* <p>Note: if you use this option, the provided channel will *not* be closed when {@code close()}
* is called on the resulting {@link JaegerRemoteSampler}.
*/
public JaegerRemoteSamplerBuilder setChannel(ManagedChannel channel) {
requireNonNull(channel, "channel");
this.channel = channel;
closeChannel = false;
/** Sets trusted certificate. */
public JaegerRemoteSamplerBuilder setTrustedCertificates(byte[] trustedCertificatesPem) {
requireNonNull(trustedCertificatesPem, "trustedCertificatesPem");
delegate.setTrustedCertificates(trustedCertificatesPem);
return this;
}
@ -92,19 +95,40 @@ public final class JaegerRemoteSamplerBuilder {
return this;
}
/**
* Sets the managed channel to use when communicating with the backend. Takes precedence over
* {@link #setEndpoint(String)} if both are called.
*/
public JaegerRemoteSamplerBuilder setChannel(ManagedChannel channel) {
requireNonNull(channel, "channel");
delegate.setChannel(channel);
return this;
}
/**
* Builds the {@link JaegerRemoteSampler}.
*
* @return the remote sampler instance.
*/
public JaegerRemoteSampler build() {
ManagedChannel channel = this.channel;
if (channel == null) {
channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
}
return new JaegerRemoteSampler(
serviceName, channel, pollingIntervalMillis, initialSampler, closeChannel);
delegate.build(), serviceName, pollingIntervalMillis, initialSampler);
}
JaegerRemoteSamplerBuilder() {}
JaegerRemoteSamplerBuilder() {
delegate =
GrpcService.builder(
"remoteSampling",
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshallerRemoteSamplerServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}
// Visible for testing
GrpcServiceBuilder<SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
getDelegate() {
return delegate;
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static io.grpc.MethodDescriptor.generateFullMethodName;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerServiceStub;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
class MarshallerRemoteSamplerServiceGrpc {
private static final String SERVICE_NAME = "jaeger.api_v2.SamplingManager";
private static final MethodDescriptor.Marshaller<SamplingStrategyParametersMarshaler>
REQUEST_MARSHALLER =
new MethodDescriptor.Marshaller<SamplingStrategyParametersMarshaler>() {
@Override
public InputStream stream(SamplingStrategyParametersMarshaler value) {
return new MarshalerInputStream(value);
}
@Override
public SamplingStrategyParametersMarshaler parse(InputStream stream) {
throw new UnsupportedOperationException("Only for serializing");
}
};
private static final MethodDescriptor.Marshaller<SamplingStrategyResponseUnMarshaler>
RESPONSE_MARSHALLER =
new MethodDescriptor.Marshaller<SamplingStrategyResponseUnMarshaler>() {
@Override
public InputStream stream(SamplingStrategyResponseUnMarshaler value) {
throw new UnsupportedOperationException("Only for parsing");
}
@Override
public SamplingStrategyResponseUnMarshaler parse(InputStream stream) {
SamplingStrategyResponseUnMarshaler unmarshaller =
new SamplingStrategyResponseUnMarshaler();
try {
unmarshaller.read(readAllBytes(stream));
} catch (IOException e) {
// could not parse response
throw new IllegalStateException(
"could not parse jaeger remote sampling response", e);
}
return unmarshaller;
}
};
private static final MethodDescriptor<
SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
getPostSpansMethod =
MethodDescriptor
.<SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "GetSamplingStrategy"))
.setRequestMarshaller(REQUEST_MARSHALLER)
.setResponseMarshaller(RESPONSE_MARSHALLER)
.build();
static SamplingManagerFutureStub newFutureStub(Channel channel) {
return SamplingManagerFutureStub.newStub(SamplingManagerFutureStub::new, channel);
}
static final class SamplingManagerFutureStub
extends MarshalerServiceStub<
SamplingStrategyParametersMarshaler,
SamplingStrategyResponseUnMarshaler,
SamplingManagerFutureStub> {
private SamplingManagerFutureStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}
@Override
protected SamplingManagerFutureStub build(Channel channel, CallOptions callOptions) {
return new SamplingManagerFutureStub(channel, callOptions);
}
@Override
public ListenableFuture<SamplingStrategyResponseUnMarshaler> export(
SamplingStrategyParametersMarshaler request) {
return ClientCalls.futureUnaryCall(
getChannel().newCall(getPostSpansMethod, getCallOptions()), request);
}
}
private static byte[] readAllBytes(InputStream inputStream) throws IOException {
int bufLen = 4 * 0x400; // 4KB
byte[] buf = new byte[bufLen];
int readLen;
IOException exception = null;
try {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
while ((readLen = inputStream.read(buf, 0, bufLen)) != -1) {
outputStream.write(buf, 0, readLen);
}
return outputStream.toByteArray();
}
} catch (IOException e) {
exception = e;
throw e;
} finally {
if (exception == null) {
inputStream.close();
} else {
try {
inputStream.close();
} catch (IOException e) {
exception.addSuppressed(e);
}
}
}
}
private MarshallerRemoteSamplerServiceGrpc() {}
}

View File

@ -0,0 +1,223 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcRequestBody;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.Buffer;
import okio.GzipSource;
import okio.Okio;
final class OkHttpGrpcService<ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler>
implements GrpcService<ReqMarshalerT, ResUnMarshalerT> {
private static final String GRPC_STATUS = "grpc-status";
private static final String GRPC_MESSAGE = "grpc-message";
private static final Logger logger = Logger.getLogger(OkHttpGrpcService.class.getName());
private final String type;
private final OkHttpClient client;
private final String endpoint;
private final Headers headers;
private final boolean compressionEnabled;
/** Creates a new {@link OkHttpGrpcService}. */
OkHttpGrpcService(
String type,
OkHttpClient client,
String endpoint,
Headers headers,
boolean compressionEnabled) {
this.type = type;
this.client = client;
this.endpoint = endpoint;
this.headers = headers;
this.compressionEnabled = compressionEnabled;
}
@Override
public ResUnMarshalerT execute(
ReqMarshalerT exportRequest, ResUnMarshalerT responseUnmarshaller) {
Request.Builder requestBuilder = new Request.Builder().url(endpoint).headers(headers);
RequestBody requestBody = new GrpcRequestBody(exportRequest, compressionEnabled);
requestBuilder.post(requestBody);
try {
Response response = client.newCall(requestBuilder.build()).execute();
byte[] bodyBytes = new byte[0];
try {
bodyBytes = response.body().bytes();
} catch (IOException ignored) {
// It's unlikely a transport exception would actually be useful in debugging. There may
// be gRPC status information available handled below though, so ignore this exception
// and continue through gRPC error handling logic. In the worst case we will record the
// HTTP error.
}
String status = grpcStatus(response);
if ("0".equals(status)) {
if (bodyBytes.length > 5) {
ByteArrayInputStream bodyStream = new ByteArrayInputStream(bodyBytes);
bodyStream.skip(5);
if (bodyBytes[0] == '1') {
Buffer buffer = new Buffer();
buffer.readFrom(bodyStream);
GzipSource gzipSource = new GzipSource(buffer);
bodyBytes = Okio.buffer(gzipSource).getBuffer().readByteArray();
} else {
bodyBytes = Arrays.copyOfRange(bodyBytes, 5, bodyBytes.length);
}
responseUnmarshaller.read(bodyBytes);
return responseUnmarshaller;
}
return responseUnmarshaller;
}
// handle non OK status codes
String codeMessage =
status != null ? "gRPC status code " + status : "HTTP status code " + response.code();
String errorMessage = grpcMessage(response);
if (GrpcStatusUtil.GRPC_STATUS_UNIMPLEMENTED.equals(status)) {
logger.log(
Level.SEVERE,
"Failed to execute "
+ type
+ "s. Server responded with UNIMPLEMENTED. "
+ "Full error message: "
+ errorMessage);
} else if (GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE.equals(status)) {
logger.log(
Level.SEVERE,
"Failed to execute "
+ type
+ "s. Server is UNAVAILABLE. "
+ "Make sure your service is running and reachable from this network. "
+ "Full error message:"
+ errorMessage);
} else {
logger.log(
Level.WARNING,
"Failed to execute "
+ type
+ "s. Server responded with "
+ codeMessage
+ ". Error message: "
+ errorMessage);
}
} catch (IOException e) {
logger.log(
Level.SEVERE,
"Failed to execute "
+ type
+ "s. The request could not be executed. Full error message: "
+ e.getMessage());
}
return responseUnmarshaller;
}
@Nullable
private static String grpcStatus(Response response) {
// Status can either be in the headers or trailers depending on error
String grpcStatus = response.header(GRPC_STATUS);
if (grpcStatus == null) {
try {
grpcStatus = response.trailers().get(GRPC_STATUS);
} catch (IOException e) {
// Could not read a status, this generally means the HTTP status is the error.
return null;
}
}
return grpcStatus;
}
private static String grpcMessage(Response response) {
String message = response.header(GRPC_MESSAGE);
if (message == null) {
try {
message = response.trailers().get(GRPC_MESSAGE);
} catch (IOException e) {
// Fall through
}
}
if (message != null) {
return unescape(message);
}
// Couldn't get message for some reason, use the HTTP status.
return response.message();
}
@Override
public CompletableResultCode shutdown() {
client.dispatcher().cancelAll();
client.dispatcher().executorService().shutdownNow();
client.connectionPool().evictAll();
return CompletableResultCode.ofSuccess();
}
static boolean isRetryable(Response response) {
// Only retry on gRPC codes which will always come with an HTTP success
if (!response.isSuccessful()) {
return false;
}
// We don't check trailers for retry since retryable error codes always come with response
// headers, not trailers, in practice.
String grpcStatus = response.header(GRPC_STATUS);
return RetryUtil.retryableGrpcStatusCodes().contains(grpcStatus);
}
// From grpc-java
/** Unescape the provided ascii to a unicode {@link String}. */
private static String unescape(String value) {
for (int i = 0; i < value.length(); i++) {
char c = value.charAt(i);
if (c < ' ' || c >= '~' || (c == '%' && i + 2 < value.length())) {
return doUnescape(value.getBytes(StandardCharsets.US_ASCII));
}
}
return value;
}
private static String doUnescape(byte[] value) {
ByteBuffer buf = ByteBuffer.allocate(value.length);
for (int i = 0; i < value.length; ) {
if (value[i] == '%' && i + 2 < value.length) {
try {
buf.put((byte) Integer.parseInt(new String(value, i + 1, 2, StandardCharsets.UTF_8), 16));
i += 3;
continue;
} catch (NumberFormatException e) {
// ignore, fall through, just push the bytes.
}
}
buf.put(value[i]);
i += 1;
}
return new String(buf.array(), 0, buf.position(), StandardCharsets.UTF_8);
}
}

View File

@ -0,0 +1,154 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509TrustManager;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
final class OkHttpGrpcServiceBuilder<
ReqMarshalerT extends Marshaler, ResUnMarshalerT extends UnMarshaler>
implements GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> {
private final String type;
private final String grpcEndpointPath;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
private final Headers.Builder headers = new Headers.Builder();
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;
OkHttpGrpcServiceBuilder(
String type, String grpcEndpointPath, long defaultTimeoutSecs, URI defaultEndpoint) {
this.type = type;
this.grpcEndpointPath = grpcEndpointPath;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
@Override
public GrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setChannel(ManagedChannel channel) {
throw new UnsupportedOperationException("Only available on DefaultGrpcService");
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTimeout(
long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
timeoutNanos = unit.toNanos(timeout);
return this;
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
checkArgument(!timeout.isNegative(), "timeout must be non-negative");
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint");
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setCompression(
String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
this.compressionEnabled = true;
return this;
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> setTrustedCertificates(
byte[] trustedCertificatesPem) {
requireNonNull(trustedCertificatesPem, "trustedCertificatesPem");
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> addHeader(
String key, String value) {
requireNonNull(key, "key");
requireNonNull(value, "value");
headers.add(key, value);
return this;
}
@Override
public OkHttpGrpcServiceBuilder<ReqMarshalerT, ResUnMarshalerT> addRetryPolicy(
RetryPolicy retryPolicy) {
requireNonNull(retryPolicy, "retryPolicy");
this.retryPolicy = retryPolicy;
return this;
}
@Override
public GrpcService<ReqMarshalerT, ResUnMarshalerT> build() {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder().dispatcher(OkHttpUtil.newDispatcher());
clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos));
if (trustedCertificatesPem != null) {
try {
X509TrustManager trustManager = TlsUtil.trustManager(trustedCertificatesPem);
clientBuilder.sslSocketFactory(TlsUtil.sslSocketFactory(trustManager), trustManager);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates, are they valid X.509 in PEM format?", e);
}
}
String endpoint = this.endpoint.resolve(grpcEndpointPath).toString();
if (endpoint.startsWith("http://")) {
clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
} else {
clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
}
headers.add("te", "trailers");
if (compressionEnabled) {
headers.add("grpc-encoding", "gzip");
}
if (retryPolicy != null) {
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcExporter::isRetryable));
}
return new OkHttpGrpcService<>(
type, clientBuilder.build(), endpoint, headers.build(), compressionEnabled);
}
}

View File

@ -8,7 +8,6 @@ package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.OperationSamplingStrategy;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
@ -23,14 +22,15 @@ class PerOperationSampler implements Sampler {
private final Map<String, Sampler> perOperationSampler;
PerOperationSampler(
Sampler defaultSampler, List<OperationSamplingStrategy> perOperationSampling) {
Sampler defaultSampler,
List<SamplingStrategyResponse.OperationSamplingStrategy> perOperationSampling) {
this.defaultSampler = defaultSampler;
this.perOperationSampler = new LinkedHashMap<>(perOperationSampling.size());
for (OperationSamplingStrategy opSamplingStrategy : perOperationSampling) {
for (SamplingStrategyResponse.OperationSamplingStrategy opSamplingStrategy :
perOperationSampling) {
this.perOperationSampler.put(
opSamplingStrategy.getOperation(),
Sampler.traceIdRatioBased(
opSamplingStrategy.getProbabilisticSampling().getSamplingRate()));
opSamplingStrategy.operation,
Sampler.traceIdRatioBased(opSamplingStrategy.probabilisticSamplingStrategy.samplingRate));
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.opentelemetry.exporter.otlp.internal.MarshalerUtil;
import io.opentelemetry.exporter.otlp.internal.MarshalerWithSize;
import io.opentelemetry.exporter.otlp.internal.Serializer;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.internal.SamplingStrategyParameters;
import java.io.IOException;
final class SamplingStrategyParametersMarshaler extends MarshalerWithSize {
private final byte[] serviceNameUtf8;
static SamplingStrategyParametersMarshaler create(String serviceName) {
return new SamplingStrategyParametersMarshaler(MarshalerUtil.toBytes(serviceName));
}
private SamplingStrategyParametersMarshaler(byte[] serviceName) {
super(calculateSize(serviceName));
this.serviceNameUtf8 = serviceName;
}
@Override
protected void writeTo(Serializer output) throws IOException {
output.serializeString(SamplingStrategyParameters.SERVICENAME, serviceNameUtf8);
}
private static int calculateSize(byte[] serviceName) {
return MarshalerUtil.sizeBytes(SamplingStrategyParameters.SERVICENAME, serviceName);
}
}

View File

@ -0,0 +1,186 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
class SamplingStrategyResponse {
enum SamplingStrategyType {
PROBABILISTIC,
RATE_LIMITING,
UNRECOGNIZED,
}
static class RateLimitingSamplingStrategy {
final int maxTracesPerSecond;
private RateLimitingSamplingStrategy(Builder builder) {
this.maxTracesPerSecond = builder.maxTracesPerSecond;
}
static class Builder {
private int maxTracesPerSecond;
Builder setMaxTracesPerSecond(int maxTracesPerSecond) {
this.maxTracesPerSecond = maxTracesPerSecond;
return this;
}
RateLimitingSamplingStrategy build() {
return new RateLimitingSamplingStrategy(this);
}
}
}
static class ProbabilisticSamplingStrategy {
final double samplingRate;
ProbabilisticSamplingStrategy(Builder builder) {
this.samplingRate = builder.samplingRate;
}
static class Builder {
private double samplingRate;
Builder setSamplingRate(double samplingRate) {
this.samplingRate = samplingRate;
return this;
}
ProbabilisticSamplingStrategy build() {
return new ProbabilisticSamplingStrategy(this);
}
}
}
static class PerOperationSamplingStrategies {
final double defaultSamplingProbability;
final double defaultLowerBoundTracesPerSecond;
final double defaultUpperBoundTracesPerSecond;
final List<OperationSamplingStrategy> strategies;
private PerOperationSamplingStrategies(Builder builder) {
this.defaultSamplingProbability = builder.defaultSamplingProbability;
this.defaultLowerBoundTracesPerSecond = builder.defaultLowerBoundTracesPerSecond;
this.defaultUpperBoundTracesPerSecond = builder.defaultUpperBoundTracesPerSecond;
this.strategies = Collections.unmodifiableList(builder.strategies);
}
static class Builder {
private double defaultSamplingProbability;
private double defaultLowerBoundTracesPerSecond;
private double defaultUpperBoundTracesPerSecond;
private final List<OperationSamplingStrategy> strategies = new ArrayList<>();
Builder setDefaultSamplingProbability(double defaultSamplingProbability) {
this.defaultSamplingProbability = defaultSamplingProbability;
return this;
}
Builder setDefaultLowerBoundTracesPerSecond(double defaultLowerBoundTracesPerSecond) {
this.defaultLowerBoundTracesPerSecond = defaultLowerBoundTracesPerSecond;
return this;
}
Builder setDefaultUpperBoundTracesPerSecond(double defaultUpperBoundTracesPerSecond) {
this.defaultUpperBoundTracesPerSecond = defaultUpperBoundTracesPerSecond;
return this;
}
Builder addOperationStrategy(OperationSamplingStrategy operationSamplingStrategy) {
this.strategies.add(operationSamplingStrategy);
return this;
}
PerOperationSamplingStrategies build() {
return new PerOperationSamplingStrategies(this);
}
}
}
static class OperationSamplingStrategy {
final String operation;
final ProbabilisticSamplingStrategy probabilisticSamplingStrategy;
private OperationSamplingStrategy(Builder builder) {
this.operation = builder.operation;
this.probabilisticSamplingStrategy = builder.probabilisticSamplingStrategy;
}
static class Builder {
private String operation = "";
private ProbabilisticSamplingStrategy probabilisticSamplingStrategy =
new ProbabilisticSamplingStrategy.Builder().build();
Builder setOperation(String operation) {
this.operation = operation;
return this;
}
Builder setProbabilisticSamplingStrategy(
ProbabilisticSamplingStrategy probabilisticSamplingStrategy) {
this.probabilisticSamplingStrategy = probabilisticSamplingStrategy;
return this;
}
OperationSamplingStrategy build() {
return new OperationSamplingStrategy(this);
}
}
}
final SamplingStrategyType strategyType;
final RateLimitingSamplingStrategy rateLimitingSamplingStrategy;
final ProbabilisticSamplingStrategy probabilisticSamplingStrategy;
final PerOperationSamplingStrategies perOperationSamplingStrategies;
private SamplingStrategyResponse(Builder builder) {
this.strategyType = builder.samplingStrategyType;
this.rateLimitingSamplingStrategy = builder.rateLimitingSamplingStrategy;
this.probabilisticSamplingStrategy = builder.probabilisticSamplingStrategy;
this.perOperationSamplingStrategies = builder.perOperationSamplingStrategies;
}
static class Builder {
private SamplingStrategyType samplingStrategyType = SamplingStrategyType.UNRECOGNIZED;
private RateLimitingSamplingStrategy rateLimitingSamplingStrategy =
new RateLimitingSamplingStrategy.Builder().build();
private ProbabilisticSamplingStrategy probabilisticSamplingStrategy =
new ProbabilisticSamplingStrategy.Builder().build();
private PerOperationSamplingStrategies perOperationSamplingStrategies =
new PerOperationSamplingStrategies.Builder().build();
Builder setSamplingStrategyType(SamplingStrategyType samplingStrategyType) {
this.samplingStrategyType = samplingStrategyType;
return this;
}
Builder setRateLimitingSamplingStrategy(
RateLimitingSamplingStrategy rateLimitingSamplingStrategy) {
this.rateLimitingSamplingStrategy = rateLimitingSamplingStrategy;
return this;
}
Builder setProbabilisticSamplingStrategy(
ProbabilisticSamplingStrategy probabilisticSamplingStrategy) {
this.probabilisticSamplingStrategy = probabilisticSamplingStrategy;
return this;
}
Builder setPerOperationSamplingStrategies(
PerOperationSamplingStrategies perOperationSamplingStrategies) {
this.perOperationSamplingStrategies = perOperationSamplingStrategies;
return this;
}
SamplingStrategyResponse build() {
return new SamplingStrategyResponse(this);
}
}
}

View File

@ -0,0 +1,201 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import io.opentelemetry.exporter.otlp.internal.CodedInputStream;
import java.io.IOException;
import javax.annotation.Nullable;
class SamplingStrategyResponseUnMarshaler extends UnMarshaler {
@Nullable private SamplingStrategyResponse samplingStrategyResponse;
@Nullable
public SamplingStrategyResponse get() {
return samplingStrategyResponse;
}
@Override
public void read(byte[] payload) throws IOException {
SamplingStrategyResponse.Builder responseBuilder = new SamplingStrategyResponse.Builder();
try {
CodedInputStream codedInputStream = CodedInputStream.newInstance(payload);
parseResponse(responseBuilder, codedInputStream);
samplingStrategyResponse = responseBuilder.build();
} catch (IOException ex) {
// use null message
}
}
private static void parseResponse(
SamplingStrategyResponse.Builder responseBuilder, CodedInputStream input) throws IOException {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 8:
parseSamplingStrategyType(responseBuilder, input);
break;
case 18:
input.readRawVarint32(); // skip length
responseBuilder.setProbabilisticSamplingStrategy(parseProbabilistic(input));
break;
case 26:
input.readRawVarint32(); // skip length
responseBuilder.setRateLimitingSamplingStrategy(parseRateLimiting(input));
break;
case 34:
input.readRawVarint32(); // skip length
responseBuilder.setPerOperationSamplingStrategies(parsePerOperationStrategy(input));
break;
default:
input.skipField(tag);
}
}
}
private static void parseSamplingStrategyType(
SamplingStrategyResponse.Builder responseBuilder, CodedInputStream input) throws IOException {
int tagValue = input.readRawVarint32();
switch (tagValue) {
case 0:
responseBuilder.setSamplingStrategyType(
SamplingStrategyResponse.SamplingStrategyType.PROBABILISTIC);
break;
case 1:
responseBuilder.setSamplingStrategyType(
SamplingStrategyResponse.SamplingStrategyType.RATE_LIMITING);
break;
default:
responseBuilder.setSamplingStrategyType(
SamplingStrategyResponse.SamplingStrategyType.UNRECOGNIZED);
break;
}
}
private static SamplingStrategyResponse.ProbabilisticSamplingStrategy parseProbabilistic(
CodedInputStream input) throws IOException {
SamplingStrategyResponse.ProbabilisticSamplingStrategy.Builder builder =
new SamplingStrategyResponse.ProbabilisticSamplingStrategy.Builder();
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 9:
double samplingRate = input.readDouble();
return builder.setSamplingRate(samplingRate).build();
default:
input.skipField(tag);
break;
}
}
return builder.build();
}
private static SamplingStrategyResponse.RateLimitingSamplingStrategy parseRateLimiting(
CodedInputStream input) throws IOException {
SamplingStrategyResponse.RateLimitingSamplingStrategy.Builder builder =
new SamplingStrategyResponse.RateLimitingSamplingStrategy.Builder();
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 8:
int rate = input.readRawVarint32();
return builder.setMaxTracesPerSecond(rate).build();
default:
input.skipField(tag);
break;
}
}
return builder.build();
}
private static SamplingStrategyResponse.PerOperationSamplingStrategies parsePerOperationStrategy(
CodedInputStream input) throws IOException {
SamplingStrategyResponse.PerOperationSamplingStrategies.Builder builder =
new SamplingStrategyResponse.PerOperationSamplingStrategies.Builder();
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 9:
double defaultProbability = input.readDouble();
builder.setDefaultSamplingProbability(defaultProbability);
break;
case 17:
double lowerBoundPerSecond = input.readDouble();
builder.setDefaultLowerBoundTracesPerSecond(lowerBoundPerSecond);
break;
case 26:
input.readRawVarint32(); // skip length
SamplingStrategyResponse.OperationSamplingStrategy strategy =
parseOperationStrategy(input);
if (strategy != null) {
builder.addOperationStrategy(strategy);
}
break;
case 33:
double upperBoundPerSecond = input.readDouble();
builder.setDefaultUpperBoundTracesPerSecond(upperBoundPerSecond);
break;
default:
input.skipField(tag);
break;
}
}
return builder.build();
}
private static SamplingStrategyResponse.OperationSamplingStrategy parseOperationStrategy(
CodedInputStream input) throws IOException {
SamplingStrategyResponse.OperationSamplingStrategy.Builder builder =
new SamplingStrategyResponse.OperationSamplingStrategy.Builder();
boolean done = false;
boolean operationParsed = false;
boolean probabilisticSamplingParsed = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 10:
operationParsed = true;
String operation = input.readStringRequireUtf8();
builder.setOperation(operation);
break;
case 18:
probabilisticSamplingParsed = true;
input.readRawVarint32(); // skip length
builder.setProbabilisticSamplingStrategy(parseProbabilistic(input));
break;
default:
input.skipField(tag);
break;
}
if (operationParsed && probabilisticSamplingParsed) {
break;
}
}
return builder.build();
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import java.io.IOException;
/**
* UnMarshaler from protobuf wire format to SDK data type.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
abstract class UnMarshaler {
abstract void read(byte[] payload) throws IOException;
}

View File

@ -7,8 +7,6 @@ syntax = "proto3";
package jaeger.api_v2;
import "google/api/annotations.proto";
option java_package = "io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2";
enum SamplingStrategyType {
@ -48,10 +46,5 @@ message SamplingStrategyParameters {
}
service SamplingManager {
rpc GetSamplingStrategy(SamplingStrategyParameters) returns (SamplingStrategyResponse) {
option (google.api.http) = {
post: "/api/v2/samplingStrategy"
body: "*"
};
}
rpc GetSamplingStrategy(SamplingStrategyParameters) returns (SamplingStrategyResponse) {}
}

View File

@ -38,7 +38,7 @@ class JaegerRemoteSamplerIntegrationTest {
void remoteSampling_perOperation() {
try (JaegerRemoteSampler remoteSampler =
JaegerRemoteSampler.builder()
.setEndpoint("127.0.0.1:" + jaegerContainer.getMappedPort(COLLECTOR_PORT))
.setEndpoint("http://127.0.0.1:" + jaegerContainer.getMappedPort(COLLECTOR_PORT))
.setServiceName(SERVICE_NAME)
.build()) {
await()
@ -52,7 +52,7 @@ class JaegerRemoteSamplerIntegrationTest {
void remoteSampling_rateLimiting() {
try (JaegerRemoteSampler remoteSampler =
JaegerRemoteSampler.builder()
.setEndpoint("127.0.0.1:" + jaegerContainer.getMappedPort(COLLECTOR_PORT))
.setEndpoint("http://127.0.0.1:" + jaegerContainer.getMappedPort(COLLECTOR_PORT))
.setServiceName(SERVICE_NAME_RATE_LIMITING)
.build()) {
await()

View File

@ -32,7 +32,7 @@ public class JaegerRemoteSamplerProviderTest {
when(mockConfig.getString(JaegerRemoteSamplerProvider.SERVICE_NAME_PROPERTY))
.thenReturn("test_service");
HashMap<String, String> samplerArgs = new HashMap<>();
samplerArgs.put("endpoint", "localhost:9999");
samplerArgs.put("endpoint", "http://localhost:9999");
samplerArgs.put("pollingInterval", "99");
double samplingRate = 0.33;
samplerArgs.put("initialSamplingRate", String.valueOf(samplingRate));
@ -56,10 +56,10 @@ public class JaegerRemoteSamplerProviderTest {
.satisfies(
provider ->
assertThat(provider.createSampler(mockConfig))
.extracting("channel")
.extracting("delegate")
.extracting("target")
.isEqualTo("localhost:9999"));
.extracting("endpoint")
.isEqualTo(
"http://localhost:9999/jaeger.api_v2.SamplingManager/GetSamplingStrategy"));
}
@Test

View File

@ -8,81 +8,105 @@ package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import com.google.common.io.Closer;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.RateLimitingSamplingStrategy;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyType;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.SamplingManagerGrpc;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.awaitility.core.ThrowingRunnable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;
import org.slf4j.event.LoggingEvent;
class JaegerRemoteSamplerTest {
private static final String SERVICE_NAME = "my-service";
private static final int RATE = 999;
private static final AtomicInteger numPolls = new AtomicInteger();
private static final ConcurrentLinkedQueue<ArmeriaStatusException> grpcErrors =
new ConcurrentLinkedQueue<>();
private final String serverName = InProcessServerBuilder.generateName();
private final ManagedChannel inProcessChannel =
InProcessChannelBuilder.forName(serverName).directExecutor().build();
private static final ConcurrentLinkedQueue<Sampling.SamplingStrategyResponse> responses =
new ConcurrentLinkedQueue<>();
private final SamplingManagerGrpc.SamplingManagerImplBase service =
mock(
SamplingManagerGrpc.SamplingManagerImplBase.class,
delegatesTo(new MockSamplingManagerService()));
static class MockSamplingManagerService extends SamplingManagerGrpc.SamplingManagerImplBase {
@Override
public void getSamplingStrategy(
Sampling.SamplingStrategyParameters request,
StreamObserver<Sampling.SamplingStrategyResponse> responseObserver) {
numPolls.incrementAndGet();
Sampling.SamplingStrategyResponse response =
Sampling.SamplingStrategyResponse.newBuilder()
.setStrategyType(SamplingStrategyType.RATE_LIMITING)
.setRateLimitingSampling(
RateLimitingSamplingStrategy.newBuilder().setMaxTracesPerSecond(RATE).build())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
private static void addGrpcError(int code, @Nullable String message) {
grpcErrors.add(new ArmeriaStatusException(code, message));
}
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(OkHttpGrpcService.class, Level.TRACE);
@Order(1)
@RegisterExtension
static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension();
@Order(2)
@RegisterExtension
static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
JaegerRemoteSamplerBuilder.GRPC_ENDPOINT_PATH,
new AbstractUnaryGrpcService() {
@Override
protected CompletionStage<byte[]> handleMessage(
ServiceRequestContext ctx, byte[] message) {
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
throw grpcError;
}
Sampling.SamplingStrategyResponse response = responses.poll();
// use default
if (response == null) {
response =
Sampling.SamplingStrategyResponse.newBuilder()
.setStrategyType(SamplingStrategyType.RATE_LIMITING)
.setRateLimitingSampling(
RateLimitingSamplingStrategy.newBuilder()
.setMaxTracesPerSecond(RATE)
.build())
.build();
}
return CompletableFuture.completedFuture(response.toByteArray());
}
});
sb.http(0);
sb.https(0);
sb.tls(certificate.certificateFile(), certificate.privateKeyFile());
}
};
private final Closer closer = Closer.create();
@BeforeEach
public void before() throws IOException {
numPolls.set(0);
Server server =
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(service)
.build()
.start();
closer.register(server::shutdownNow);
closer.register(inProcessChannel::shutdownNow);
public void before() {
grpcErrors.clear();
responses.clear();
}
@AfterEach
@ -91,35 +115,44 @@ class JaegerRemoteSamplerTest {
}
@Test
void connectionWorks() throws Exception {
ArgumentCaptor<Sampling.SamplingStrategyParameters> requestCaptor =
ArgumentCaptor.forClass(Sampling.SamplingStrategyParameters.class);
void connectionWorks() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setChannel(inProcessChannel)
.setEndpoint(server.httpUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
// verify
verify(service).getSamplingStrategy(requestCaptor.capture(), ArgumentMatchers.any());
assertThat(requestCaptor.getValue().getServiceName()).isEqualTo(SERVICE_NAME);
assertThat(sampler.getDescription()).contains("RateLimitingSampler{999.00}");
}
// Default poll interval is 60s, inconceivable to have polled multiple times by now.
assertThat(numPolls).hasValue(1);
@Test
void tlsConnectionWorks() throws IOException {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpsUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setTrustedCertificates(Files.readAllBytes(certificate.certificateFile().toPath()))
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
// verify
assertThat(sampler.getDescription()).contains("RateLimitingSampler{999.00}");
}
@Test
void description() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setChannel(inProcessChannel)
.setEndpoint(server.httpUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
@ -127,19 +160,14 @@ class JaegerRemoteSamplerTest {
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
// wait until the sampling strategy is retrieved before exiting test method
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
// Default poll interval is 60s, inconceivable to have polled multiple times by now.
assertThat(numPolls).hasValue(1);
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void initialSampler() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint("example.com")
.setEndpoint("http://example.com")
.setServiceName(SERVICE_NAME)
.setInitialSampler(Sampler.alwaysOn())
.build();
@ -151,40 +179,141 @@ class JaegerRemoteSamplerTest {
void pollingInterval() throws Exception {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setChannel(inProcessChannel)
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(1, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
// wait until the sampling strategy is retrieved before exiting test method
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
Thread.sleep(500);
assertThat(numPolls).hasValueGreaterThanOrEqualTo(2);
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void pollingInterval_duration() throws Exception {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setChannel(inProcessChannel)
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(Duration.ofMillis(1))
.build();
closer.register(sampler);
// wait until the sampling strategy is retrieved before exiting test method
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void perOperationSampling() {
Sampling.SamplingStrategyResponse response =
Sampling.SamplingStrategyResponse.newBuilder()
.setOperationSampling(
Sampling.PerOperationSamplingStrategies.newBuilder()
.setDefaultSamplingProbability(0.55)
.setDefaultLowerBoundTracesPerSecond(100)
.setDefaultUpperBoundTracesPerSecond(200)
.addPerOperationStrategies(
Sampling.OperationSamplingStrategy.newBuilder()
.setOperation("foo")
.setProbabilisticSampling(
Sampling.ProbabilisticSamplingStrategy.newBuilder()
.setSamplingRate(0.90)
.build())
.build())
.addPerOperationStrategies(
Sampling.OperationSamplingStrategy.newBuilder()
.setOperation("bar")
.setProbabilisticSampling(
Sampling.ProbabilisticSamplingStrategy.newBuilder()
.setSamplingRate(0.7)
.build())
.build())
.build())
.setRateLimitingSampling(
RateLimitingSamplingStrategy.newBuilder().setMaxTracesPerSecond(RATE).build())
.build();
responses.add(response);
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
// Make sure only polls once.
.setPollingInterval(500, TimeUnit.SECONDS)
.build();
closer.register(sampler);
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
.untilAsserted(
() -> {
assertThat(sampler.getDescription())
.startsWith(
"JaegerRemoteSampler{ParentBased{root:PerOperationSampler{default=TraceIdRatioBased{0.550000}, perOperation={foo=TraceIdRatioBased{0.900000}, bar=TraceIdRatioBased{0.700000}}}");
assertThat(sampler.getDescription()).contains("bar");
});
}
Thread.sleep(500);
@Test
void internal_error_server_response() {
addGrpcError(13, "internal error");
assertThat(numPolls).hasValueGreaterThanOrEqualTo(2);
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains(" Server responded with gRPC status code 13");
assertThat(log.getLevel()).isEqualTo(Level.WARN);
}
@Test
void unavailable_error_server_response() {
addGrpcError(14, "クマ🐻 resource exhausted");
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains("Server is UNAVAILABLE");
assertThat(log.getLevel()).isEqualTo(Level.ERROR);
}
@Test
void unimplemented_error_server_response() {
addGrpcError(12, null);
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains("Server responded with UNIMPLEMENTED.");
assertThat(log.getLevel()).isEqualTo(Level.ERROR);
}
@Test
@ -195,9 +324,6 @@ class JaegerRemoteSamplerTest {
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setEndpoint(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("endpoint");
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setChannel(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("channel");
assertThatThrownBy(
() -> JaegerRemoteSampler.builder().setPollingInterval(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
@ -214,6 +340,12 @@ class JaegerRemoteSamplerTest {
.hasMessage("interval");
}
@Test
void usingOkHttp() {
assertThat(JaegerRemoteSampler.builder().getDelegate())
.isInstanceOf(OkHttpGrpcServiceBuilder.class);
}
static ThrowingRunnable samplerIsType(
JaegerRemoteSampler sampler, Class<? extends Sampler> expected) {
return () -> {

View File

@ -0,0 +1,111 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class OkHttpGrpcServiceBuilderTest {
private static OkHttpGrpcServiceBuilder<
SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
exporterBuilder() {
return new OkHttpGrpcServiceBuilder<>("some", "some", 1, URI.create("htt://localhost:8080"));
}
@Test
@SuppressWarnings("PreferJavaTimeOverload")
void validConfig() {
assertThatCode(() -> exporterBuilder().setTimeout(0, TimeUnit.MILLISECONDS))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(Duration.ofMillis(0)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(10, TimeUnit.MILLISECONDS))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(Duration.ofMillis(10)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://localhost:4317"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("https://localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://foo:bar@localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux"))
.doesNotThrowAnyException();
assertThatCode(
() ->
exporterBuilder().setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().addRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
@SuppressWarnings({"PreferJavaTimeOverload", "NullAway"})
public void invalidConfig() {
assertThatThrownBy(() -> exporterBuilder().setTimeout(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("timeout must be non-negative");
assertThatThrownBy(() -> exporterBuilder().setTimeout(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("unit");
assertThatThrownBy(() -> exporterBuilder().setTimeout(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("timeout");
assertThatThrownBy(() -> exporterBuilder().addRetryPolicy(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("retryPolicy");
assertThatThrownBy(() -> exporterBuilder().addHeader(null, "val"))
.isInstanceOf(NullPointerException.class)
.hasMessage("key");
assertThatThrownBy(() -> exporterBuilder().addHeader("key", null))
.isInstanceOf(NullPointerException.class)
.hasMessage("value");
assertThatThrownBy(() -> exporterBuilder().setTrustedCertificates(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("trustedCertificatesPem");
assertThatThrownBy(() -> exporterBuilder().setEndpoint(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("endpoint");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("😺://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must be a URL: 😺://localhost");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: localhost");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
assertThatThrownBy(() -> exporterBuilder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> exporterBuilder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Unsupported compression method. Supported compression methods include: gzip, none.");
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class DefaultGrpcServiceBuilderTest {
private static DefaultGrpcServiceBuilder<
SamplingStrategyParametersMarshaler, SamplingStrategyResponseUnMarshaler>
exporterBuilder() {
return new DefaultGrpcServiceBuilder<>(
"some", null, 10, URI.create("htt://localhost:8080"), "some");
}
@Test
@SuppressWarnings("PreferJavaTimeOverload")
void validConfig() {
assertThatCode(() -> exporterBuilder().setTimeout(0, TimeUnit.MILLISECONDS))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(Duration.ofMillis(0)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(10, TimeUnit.MILLISECONDS))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setTimeout(Duration.ofMillis(10)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://localhost:4317"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("https://localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setEndpoint("http://foo:bar@localhost"))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setCompression("gzip")).doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().setCompression("none")).doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().addHeader("foo", "bar").addHeader("baz", "qux"))
.doesNotThrowAnyException();
assertThatCode(
() ->
exporterBuilder().setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8)))
.doesNotThrowAnyException();
assertThatCode(() -> exporterBuilder().addRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
@SuppressWarnings({"PreferJavaTimeOverload", "NullAway"})
public void invalidConfig() {
assertThatThrownBy(() -> exporterBuilder().setTimeout(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("timeout must be non-negative");
assertThatThrownBy(() -> exporterBuilder().setTimeout(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("unit");
assertThatThrownBy(() -> exporterBuilder().setTimeout(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("timeout");
assertThatThrownBy(() -> exporterBuilder().addRetryPolicy(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("retryPolicy");
assertThatThrownBy(() -> exporterBuilder().addHeader(null, "val"))
.isInstanceOf(NullPointerException.class)
.hasMessage("key");
assertThatThrownBy(() -> exporterBuilder().addHeader("key", null))
.isInstanceOf(NullPointerException.class)
.hasMessage("value");
assertThatThrownBy(() -> exporterBuilder().setTrustedCertificates(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("trustedCertificatesPem");
assertThatThrownBy(() -> exporterBuilder().setEndpoint(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("endpoint");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("😺://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must be a URL: 😺://localhost");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: localhost");
assertThatThrownBy(() -> exporterBuilder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
assertThatThrownBy(() -> exporterBuilder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> exporterBuilder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Unsupported compression method. Supported compression methods include: gzip, none.");
assertThatThrownBy(() -> exporterBuilder().setChannel(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("channel");
}
}

View File

@ -0,0 +1,360 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.trace.jaeger.sampler;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import com.google.common.io.Closer;
import com.linecorp.armeria.common.grpc.protocol.ArmeriaStatusException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.RateLimitingSamplingStrategy;
import io.opentelemetry.sdk.extension.trace.jaeger.proto.api_v2.Sampling.SamplingStrategyType;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.awaitility.core.ThrowingRunnable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;
import org.slf4j.event.LoggingEvent;
class JaegerRemoteSamplerGrpcNettyTest {
private static final String SERVICE_NAME = "my-service";
private static final int RATE = 999;
private static final ConcurrentLinkedQueue<ArmeriaStatusException> grpcErrors =
new ConcurrentLinkedQueue<>();
private static final ConcurrentLinkedQueue<Sampling.SamplingStrategyResponse> responses =
new ConcurrentLinkedQueue<>();
private static void addGrpcError(int code, @Nullable String message) {
grpcErrors.add(new ArmeriaStatusException(code, message));
}
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(DefaultGrpcService.class, Level.TRACE);
@Order(1)
@RegisterExtension
static final SelfSignedCertificateExtension certificate = new SelfSignedCertificateExtension();
@Order(2)
@RegisterExtension
static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
JaegerRemoteSamplerBuilder.GRPC_ENDPOINT_PATH,
new AbstractUnaryGrpcService() {
@Override
protected CompletionStage<byte[]> handleMessage(
ServiceRequestContext ctx, byte[] message) {
ArmeriaStatusException grpcError = grpcErrors.poll();
if (grpcError != null) {
throw grpcError;
}
Sampling.SamplingStrategyResponse response = responses.poll();
// use default
if (response == null) {
response =
Sampling.SamplingStrategyResponse.newBuilder()
.setStrategyType(SamplingStrategyType.RATE_LIMITING)
.setRateLimitingSampling(
RateLimitingSamplingStrategy.newBuilder()
.setMaxTracesPerSecond(RATE)
.build())
.build();
}
return CompletableFuture.completedFuture(response.toByteArray());
}
});
sb.http(0);
sb.https(0);
sb.tls(certificate.certificateFile(), certificate.privateKeyFile());
}
};
private final Closer closer = Closer.create();
@BeforeEach
public void before() {
grpcErrors.clear();
responses.clear();
}
@AfterEach
void tearDown() throws Exception {
closer.close();
}
@Test
void connectionWorks() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
// verify
assertThat(sampler.getDescription()).contains("RateLimitingSampler{999.00}");
}
@Test
void tlsConnectionWorks() throws IOException {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpsUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setTrustedCertificates(Files.readAllBytes(certificate.certificateFile().toPath()))
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
// verify
assertThat(sampler.getDescription()).contains("RateLimitingSampler{999.00}");
}
@Test
void description() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setPollingInterval(1, TimeUnit.SECONDS)
.setServiceName(SERVICE_NAME)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
// wait until the sampling strategy is retrieved before exiting test method
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void initialSampler() {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint("http://example.com")
.setServiceName(SERVICE_NAME)
.setInitialSampler(Sampler.alwaysOn())
.build();
closer.register(sampler);
assertThat(sampler.getDescription()).startsWith("JaegerRemoteSampler{AlwaysOnSampler}");
}
@Test
void pollingInterval() throws Exception {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(1, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
// wait until the sampling strategy is retrieved before exiting test method
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void pollingInterval_duration() throws Exception {
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(Duration.ofMillis(1))
.build();
closer.register(sampler);
// wait until the sampling strategy is retrieved before exiting test method
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
}
@Test
void perOperationSampling() {
Sampling.SamplingStrategyResponse response =
Sampling.SamplingStrategyResponse.newBuilder()
.setOperationSampling(
Sampling.PerOperationSamplingStrategies.newBuilder()
.setDefaultSamplingProbability(0.55)
.setDefaultLowerBoundTracesPerSecond(100)
.setDefaultUpperBoundTracesPerSecond(200)
.addPerOperationStrategies(
Sampling.OperationSamplingStrategy.newBuilder()
.setOperation("foo")
.setProbabilisticSampling(
Sampling.ProbabilisticSamplingStrategy.newBuilder()
.setSamplingRate(0.90)
.build())
.build())
.addPerOperationStrategies(
Sampling.OperationSamplingStrategy.newBuilder()
.setOperation("bar")
.setProbabilisticSampling(
Sampling.ProbabilisticSamplingStrategy.newBuilder()
.setSamplingRate(0.7)
.build())
.build())
.build())
.setRateLimitingSampling(
RateLimitingSamplingStrategy.newBuilder().setMaxTracesPerSecond(RATE).build())
.build();
responses.add(response);
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
// Make sure only polls once.
.setPollingInterval(500, TimeUnit.SECONDS)
.build();
closer.register(sampler);
await()
.untilAsserted(
() -> {
assertThat(sampler.getDescription())
.startsWith(
"JaegerRemoteSampler{ParentBased{root:PerOperationSampler{default=TraceIdRatioBased{0.550000}, perOperation={foo=TraceIdRatioBased{0.900000}, bar=TraceIdRatioBased{0.700000}}}");
assertThat(sampler.getDescription()).contains("bar");
});
}
@Test
void internal_error_server_response() {
addGrpcError(13, "internal error");
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains(" Server responded with gRPC status code 13");
assertThat(log.getLevel()).isEqualTo(Level.WARN);
}
@Test
void unavailable_error_server_response() {
addGrpcError(14, "クマ🐻 resource exhausted");
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains("Server is UNAVAILABLE");
assertThat(log.getLevel()).isEqualTo(Level.ERROR);
}
@Test
void unimplemented_error_server_response() {
addGrpcError(12, null);
JaegerRemoteSampler sampler =
JaegerRemoteSampler.builder()
.setEndpoint(server.httpUri().toString())
.setServiceName(SERVICE_NAME)
.setPollingInterval(50, TimeUnit.MILLISECONDS)
.build();
closer.register(sampler);
assertThat(sampler.getDescription())
.startsWith("JaegerRemoteSampler{ParentBased{root:TraceIdRatioBased{0.001000}");
await().untilAsserted(samplerIsType(sampler, RateLimitingSampler.class));
LoggingEvent log = logs.assertContains("Server responded with UNIMPLEMENTED.");
assertThat(log.getLevel()).isEqualTo(Level.ERROR);
}
@Test
void invalidArguments() {
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setServiceName(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("serviceName");
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setEndpoint(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("endpoint");
assertThatThrownBy(
() -> JaegerRemoteSampler.builder().setPollingInterval(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("polling interval must be positive");
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setPollingInterval(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("unit");
assertThatThrownBy(
() -> JaegerRemoteSampler.builder().setPollingInterval(Duration.ofMillis(-1)))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("polling interval must be positive");
assertThatThrownBy(() -> JaegerRemoteSampler.builder().setPollingInterval(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("interval");
}
@Test
void usingGrpc() {
assertThat(JaegerRemoteSampler.builder().getDelegate())
.isInstanceOf(DefaultGrpcServiceBuilder.class);
}
static ThrowingRunnable samplerIsType(
JaegerRemoteSampler sampler, Class<? extends Sampler> expected) {
return () -> {
assertThat(sampler.getSampler().getClass().getName())
.isEqualTo("io.opentelemetry.sdk.trace.samplers.ParentBasedSampler");
Field field = sampler.getSampler().getClass().getDeclaredField("root");
field.setAccessible(true);
assertThat(field.get(sampler.getSampler()).getClass()).isEqualTo(expected);
};
}
}