Add the actual XRay remote sampler which polls rules and orders / app… (#3343)

* Add the actual XRay remote sampler which polls rules and orders / applies them.

* Test cleanup

* Don't spam logs
This commit is contained in:
Anuraag Agrawal 2021-06-27 10:46:21 +09:00 committed by GitHub
parent fb7e8d2983
commit f83a9531be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 544 additions and 1 deletions

View File

@ -1,2 +1,16 @@
Comparing source compatibility of against Comparing source compatibility of against
No changes. +++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) void close()
+++ NEW METHOD: PUBLIC(+) java.lang.String getDescription()
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder newBuilder(io.opentelemetry.sdk.resources.Resource)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.trace.samplers.SamplingResult shouldSample(io.opentelemetry.context.Context, java.lang.String, java.lang.String, io.opentelemetry.api.trace.SpanKind, io.opentelemetry.api.common.Attributes, java.util.List)
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSampler build()
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setEndpoint(java.lang.String)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setInitialSampler(io.opentelemetry.sdk.trace.samplers.Sampler)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(java.time.Duration)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.extension.aws.trace.AwsXrayRemoteSamplerBuilder setPollingInterval(long, java.util.concurrent.TimeUnit)

View File

@ -0,0 +1,105 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.aws.trace;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
/** Remote sampler that gets sampling configuration from AWS X-Ray. */
public final class AwsXrayRemoteSampler implements Sampler, Closeable {
private static final Logger logger = Logger.getLogger(AwsXrayRemoteSampler.class.getName());
private static final String WORKER_THREAD_NAME =
AwsXrayRemoteSampler.class.getSimpleName() + "_WorkerThread";
private final Resource resource;
private final Sampler initialSampler;
private final XraySamplerClient client;
private final ScheduledExecutorService executor;
private final ScheduledFuture<?> pollFuture;
@Nullable private volatile GetSamplingRulesResponse previousRulesResponse;
private volatile Sampler sampler;
/**
* Returns a {@link AwsXrayRemoteSamplerBuilder} with the given {@link Resource}. This {@link
* Resource} should be the same as what the {@linkplain io.opentelemetry.sdk.OpenTelemetrySdk
* OpenTelemetry SDK} is configured with.
*/
// TODO(anuraaga): Deprecate after
// https://github.com/open-telemetry/opentelemetry-specification/issues/1588
public static AwsXrayRemoteSamplerBuilder newBuilder(Resource resource) {
return new AwsXrayRemoteSamplerBuilder(resource);
}
AwsXrayRemoteSampler(
Resource resource, String endpoint, Sampler initialSampler, long pollingIntervalNanos) {
this.resource = resource;
this.initialSampler = initialSampler;
client = new XraySamplerClient(endpoint);
executor =
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(WORKER_THREAD_NAME));
sampler = initialSampler;
pollFuture =
executor.scheduleAtFixedRate(
this::getAndUpdateSampler, 0, pollingIntervalNanos, TimeUnit.NANOSECONDS);
}
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
return sampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}
@Override
public String getDescription() {
return "AwsXrayRemoteSampler{" + sampler.getDescription() + "}";
}
private void getAndUpdateSampler() {
try {
// No pagination support yet, or possibly ever.
GetSamplingRulesResponse response =
client.getSamplingRules(GetSamplingRulesRequest.create(null));
if (!response.equals(previousRulesResponse)) {
sampler = new SamplingRulesSampler(resource, initialSampler, response.getSamplingRules());
previousRulesResponse = response;
}
} catch (Throwable t) {
logger.log(Level.FINE, "Failed to update sampler", t);
}
}
@Override
public void close() {
pollFuture.cancel(true);
executor.shutdownNow();
// No flushing behavior so no need to wait for the shutdown.
}
}

View File

@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.aws.trace;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/** A builder for {@link AwsXrayRemoteSampler}. */
public final class AwsXrayRemoteSamplerBuilder {
private static final String DEFAULT_ENDPOINT = "http://localhost:2000";
private static final long DEFAULT_POLLING_INTERVAL_SECS = 300;
private final Resource resource;
private String endpoint = DEFAULT_ENDPOINT;
private Sampler initialSampler = Sampler.parentBased(Sampler.alwaysOn());
private long pollingIntervalNanos = TimeUnit.SECONDS.toNanos(DEFAULT_POLLING_INTERVAL_SECS);
AwsXrayRemoteSamplerBuilder(Resource resource) {
this.resource = resource;
}
/**
* Sets the endpoint for the TCP proxy to connect to. This is the address to the port on the
* OpenTelemetry Collector configured for proxying X-Ray sampling requests. If unset, defaults to
* {@value DEFAULT_ENDPOINT}.
*/
public AwsXrayRemoteSamplerBuilder setEndpoint(String endpoint) {
requireNonNull(endpoint, "endpoint");
this.endpoint = endpoint;
return this;
}
/**
* Sets the polling interval for configuration updates. If unset, defaults to {@value
* DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive.
*/
public AwsXrayRemoteSamplerBuilder setPollingInterval(Duration delay) {
requireNonNull(delay, "delay");
return setPollingInterval(delay.toNanos(), TimeUnit.NANOSECONDS);
}
/**
* Sets the polling interval for configuration updates. If unset, defaults to {@value
* DEFAULT_POLLING_INTERVAL_SECS}s. Must be positive.
*/
public AwsXrayRemoteSamplerBuilder setPollingInterval(long delay, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(delay >= 0, "delay must be non-negative");
pollingIntervalNanos = unit.toNanos(delay);
return this;
}
/**
* Sets the initial sampler that is used before sampling configuration is obtained. If unset,
* defaults to a parent-based always-on sampler.
*/
public AwsXrayRemoteSamplerBuilder setInitialSampler(Sampler initialSampler) {
requireNonNull(initialSampler, "initialSampler");
this.initialSampler = initialSampler;
return this;
}
/** Returns a {@link AwsXrayRemoteSampler} with the configuration of this builder. */
public AwsXrayRemoteSampler build() {
return new AwsXrayRemoteSampler(resource, endpoint, initialSampler, pollingIntervalNanos);
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.aws.trace;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
final class SamplingRulesSampler implements Sampler {
private static final Logger logger = Logger.getLogger(SamplingRulesSampler.class.getName());
private final Resource resource;
private final Sampler fallbackSampler;
private final SamplingRuleApplier[] ruleAppliers;
SamplingRulesSampler(
Resource resource,
Sampler fallbackSampler,
List<GetSamplingRulesResponse.SamplingRuleRecord> rules) {
this.resource = resource;
this.fallbackSampler = fallbackSampler;
ruleAppliers =
rules.stream()
.map(GetSamplingRulesResponse.SamplingRuleRecord::getRule)
// Lower priority value takes precedence so normal ascending sort.
.sorted(Comparator.comparingInt(GetSamplingRulesResponse.SamplingRule::getPriority))
.map(SamplingRuleApplier::new)
.toArray(SamplingRuleApplier[]::new);
}
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
for (SamplingRuleApplier applier : ruleAppliers) {
if (applier.matches(name, attributes, resource)) {
return applier.shouldSample(
parentContext, traceId, name, spanKind, attributes, parentLinks);
}
}
// In practice, X-Ray always returns a Default rule that matches all requests so it is a bug in
// our code or X-Ray to reach here, fallback just in case.
logger.log(
Level.FINE,
"No sampling rule matched the request. "
+ "This is a bug in either the OpenTelemetry SDK or X-Ray.");
return fallbackSampler.shouldSample(
parentContext, traceId, name, spanKind, attributes, parentLinks);
}
@Override
public String getDescription() {
return "XrayRulesSampler{" + Arrays.toString(ruleAppliers) + "}";
}
}

View File

@ -0,0 +1,190 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.extension.aws.trace;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import com.google.common.io.ByteStreams;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class AwsXrayRemoteSamplerTest {
private static final byte[] RESPONSE_1;
private static final byte[] RESPONSE_2;
static {
try {
RESPONSE_1 =
ByteStreams.toByteArray(
requireNonNull(
AwsXrayRemoteSamplerTest.class.getResourceAsStream(
"/test-sampling-rules-response-1.json")));
RESPONSE_2 =
ByteStreams.toByteArray(
requireNonNull(
AwsXrayRemoteSamplerTest.class.getResourceAsStream(
"/test-sampling-rules-response-2.json")));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static final AtomicReference<byte[]> response = new AtomicReference<>();
private static final String TRACE_ID = TraceId.fromLongs(1, 2);
@RegisterExtension
public static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
"/GetSamplingRules",
(ctx, req) -> {
byte[] response = AwsXrayRemoteSamplerTest.response.get();
if (response == null) {
// Error out until the test configures a response, the sampler will use the
// initial
// sampler in the meantime.
return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, response);
});
}
};
private AwsXrayRemoteSampler sampler;
@BeforeEach
void setUp() {
sampler =
AwsXrayRemoteSampler.newBuilder(Resource.empty())
.setInitialSampler(Sampler.alwaysOn())
.setEndpoint(server.httpUri().toString())
.setPollingInterval(Duration.ofMillis(10))
.build();
}
@AfterEach
void tearDown() {
sampler.close();
response.set(null);
}
@Test
void getAndUpdate() {
// Initial Sampler allows all.
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"cat-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.RECORD_AND_SAMPLE);
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"dog-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.RECORD_AND_SAMPLE);
response.set(RESPONSE_1);
// cat-service allowed, others dropped
await()
.untilAsserted(
() -> {
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"cat-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.RECORD_AND_SAMPLE);
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"dog-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.DROP);
});
response.set(RESPONSE_2);
// cat-service dropped, others allowed
await()
.untilAsserted(
() -> {
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"cat-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.DROP);
assertThat(
sampler
.shouldSample(
Context.root(),
TRACE_ID,
"dog-service",
SpanKind.SERVER,
Attributes.empty(),
Collections.emptyList())
.getDecision())
.isEqualTo(SamplingDecision.RECORD_AND_SAMPLE);
});
}
@Test
void initialSampler() {
assertThat(sampler.getDescription()).isEqualTo("AwsXrayRemoteSampler{AlwaysOnSampler}");
}
}

View File

@ -0,0 +1,42 @@
{
"SamplingRuleRecords": [
{
"SamplingRule": {
"RuleName": "Test",
"RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test",
"ResourceARN": "*",
"Priority": 1,
"FixedRate": 1.0,
"ReservoirSize": 1,
"ServiceName": "cat-service",
"ServiceType": "*",
"Host": "*",
"HTTPMethod": "*",
"URLPath": "*",
"Version": 1,
"Attributes": {}
},
"CreatedAt": "2021-06-18T17:28:15+09:00",
"ModifiedAt": "2021-06-18T17:28:15+09:00"
},
{
"SamplingRule": {
"RuleName": "Default",
"RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Default",
"ResourceARN": "*",
"Priority": 10000,
"FixedRate": 0.0,
"ReservoirSize": 1,
"ServiceName": "*",
"ServiceType": "*",
"Host": "*",
"HTTPMethod": "*",
"URLPath": "*",
"Version": 1,
"Attributes": {}
},
"CreatedAt": "1970-01-01T09:00:00+09:00",
"ModifiedAt": "1970-01-01T09:00:00+09:00"
}
]
}

View File

@ -0,0 +1,42 @@
{
"SamplingRuleRecords": [
{
"SamplingRule": {
"RuleName": "Test",
"RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Test",
"ResourceARN": "*",
"Priority": 1,
"FixedRate": 0.0,
"ReservoirSize": 1,
"ServiceName": "cat-service",
"ServiceType": "*",
"Host": "*",
"HTTPMethod": "*",
"URLPath": "*",
"Version": 1,
"Attributes": {}
},
"CreatedAt": "2021-06-18T17:28:15+09:00",
"ModifiedAt": "2021-06-18T17:28:15+09:00"
},
{
"SamplingRule": {
"RuleName": "Default",
"RuleARN": "arn:aws:xray:us-east-1:595986152929:sampling-rule/Default",
"ResourceARN": "*",
"Priority": 10000,
"FixedRate": 1.0,
"ReservoirSize": 1,
"ServiceName": "*",
"ServiceType": "*",
"Host": "*",
"HTTPMethod": "*",
"URLPath": "*",
"Version": 1,
"Attributes": {}
},
"CreatedAt": "1970-01-01T09:00:00+09:00",
"ModifiedAt": "1970-01-01T09:00:00+09:00"
}
]
}