Separate out core instrumentation for AWS SDK to allow manual setup o… (#421)

* Separate out core instrumentation for AWS SDK to allow manual setup of instrumentation.

* Instrumentation core test
This commit is contained in:
Anuraag Agrawal 2020-05-28 02:43:56 +09:00 committed by GitHub
parent 6c619ab114
commit d4a14f6b98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 781 additions and 78 deletions

View File

@ -1,6 +1,10 @@
import java.time.Duration
apply plugin: 'java'
if (project.path.startsWith(":instrumentation-core")) {
apply plugin: 'java-library'
} else {
apply plugin: 'java'
}
apply plugin: 'groovy'
apply from: "$rootDir/gradle/spotless.gradle"

View File

@ -0,0 +1,17 @@
# Instrumentation Core
These modules for the core logic for library instrumentation. [instrumentation](../instrumentation)
should add core logic here which can be set up manually by a user, and agent-specific code
for automatically setting up the instrumentation in that folder.
Note, we are currently working on separating instrumentatin projects so that their core parts can
be accessed by users not using the agent. Due to the current Gradle setup, we have these two top-level
folders, instrumentation and instrumentation-core, but eventually we want to move to flattening them
into something like
```
instrumentation/
aws-sdk-2.2/
aws-sdk-2.2/
aws-sdk-2.2-auto/
```

View File

@ -0,0 +1,46 @@
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest
}
dependencies {
// TODO(anuraaga): We currently include common instrumentation logic like decorators in the
// bootstrap, but we need to move it out so manual instrumentation does not depend on code from
// the agent, like Agent.
api project(':auto-bootstrap')
api deps.opentelemetryApi
api group: 'software.amazon.awssdk', name: 'aws-core', version: '2.2.0'
testCompile project(':testing')
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testCompile project(':instrumentation:apache-httpclient:apache-httpclient-4.0')
// Also include netty instrumentation because it is used by aws async client
testCompile project(':instrumentation:netty:netty-4.1')
testCompile group: 'software.amazon.awssdk', name: 'apache-client', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 's3', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 'rds', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 'ec2', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 'sqs', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 'dynamodb', version: '2.2.0'
testCompile group: 'software.amazon.awssdk', name: 'kinesis', version: '2.2.0'
latestDepTestCompile project(':instrumentation:apache-httpclient:apache-httpclient-4.0')
latestDepTestCompile project(':instrumentation:netty:netty-4.1')
latestDepTestCompile group: 'software.amazon.awssdk', name: 'apache-client', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 's3', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 'rds', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 'ec2', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 'sqs', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 'dynamodb', version: '+'
latestDepTestCompile group: 'software.amazon.awssdk', name: 'kinesis', version: '+'
}

View File

@ -0,0 +1,77 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.TracingExecutionInterceptor.SPAN_ATTRIBUTE;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Span.Kind;
import io.opentelemetry.trace.Tracer;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
/**
* Entrypoint to OpenTelemetry instrumentation of the AWS SDK. Register the {@link
* ExecutionInterceptor} returned by {@link #newInterceptor()} with an SDK client to have all
* requests traced.
*
* <pre>{@code
* DynamoDbClient dynamoDb = DynamoDbClient.builder()
* .overrideConfiguration(ClientOverrideConfiguration.builder()
* .addExecutionInterceptor(AwsSdk.newInterceptor())
* .build())
* .build();
* }</pre>
*/
public class AwsSdk {
private static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.aws-sdk-2.2");
/** Returns the {@link Tracer} used to instrument the AWS SDK. */
public static Tracer tracer() {
return TRACER;
}
/**
* Returns an {@link ExecutionInterceptor} that can be used with an {@link
* software.amazon.awssdk.http.SdkHttpClient} to trace SDK requests. Spans are created with the
* kind {@link Kind#CLIENT}. If you also instrument the HTTP calls made by the SDK, e.g., by
* adding Apache HTTP client or Netty instrumentation, you may want to use {@link
* #newInterceptor(Kind)} with {@link Kind#INTERNAL} instead.
*/
public static ExecutionInterceptor newInterceptor() {
return newInterceptor(Kind.CLIENT);
}
/**
* Returns an {@link ExecutionInterceptor} that can be used with an {@link
* software.amazon.awssdk.http.SdkHttpClient} to trace SDK requests. Spans are created with the
* provided {@link Kind}.
*/
public static ExecutionInterceptor newInterceptor(Kind kind) {
return new TracingExecutionInterceptor(kind);
}
/**
* Returns the {@link Span} stored in the {@link ExecutionAttributes}, or {@code null} if there is
* no span set.
*/
public static Span getSpanFromAttributes(ExecutionAttributes attributes) {
return attributes.getAttribute(SPAN_ATTRIBUTE);
}
}

View File

@ -13,12 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.awssdk.v2_2;
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.HttpClientDecorator;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.net.URI;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.SdkRequest;
@ -28,15 +26,12 @@ import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, SdkHttpResponse> {
public static final AwsSdkClientDecorator DECORATE = new AwsSdkClientDecorator();
public static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.aws-sdk-2.2");
final class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, SdkHttpResponse> {
static final AwsSdkClientDecorator DECORATE = new AwsSdkClientDecorator();
static final String COMPONENT_NAME = "java-aws-sdk";
public Span onSdkRequest(final Span span, final SdkRequest request) {
Span onSdkRequest(final Span span, final SdkRequest request) {
// S3
request
.getValueForField("Bucket", String.class)
@ -59,14 +54,14 @@ public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, S
return span;
}
public String spanName(final ExecutionAttributes attributes) {
String spanName(final ExecutionAttributes attributes) {
final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
final String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
return awsServiceName + "." + awsOperation;
}
public Span onAttributes(final Span span, final ExecutionAttributes attributes) {
Span onAttributes(final Span span, final ExecutionAttributes attributes) {
final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
final String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
@ -79,7 +74,7 @@ public class AwsSdkClientDecorator extends HttpClientDecorator<SdkHttpRequest, S
}
// Not overriding the super. Should call both with each type of response.
public Span onResponse(final Span span, final SdkResponse response) {
Span onSdkResponse(final Span span, final SdkResponse response) {
if (response instanceof AwsResponse) {
span.setAttribute("aws.requestId", ((AwsResponse) response).responseMetadata().requestId());
}

View File

@ -0,0 +1,94 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkClientDecorator.DECORATE;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Span.Kind;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
/** AWS request execution interceptor */
final class TracingExecutionInterceptor implements ExecutionInterceptor {
static final ExecutionAttribute<Span> SPAN_ATTRIBUTE =
new ExecutionAttribute<>("io.opentelemetry.auto.Span");
private final Kind kind;
TracingExecutionInterceptor(Kind kind) {
this.kind = kind;
}
@Override
public void beforeExecution(
final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
final Span span =
AwsSdk.tracer()
.spanBuilder(DECORATE.spanName(executionAttributes))
.setSpanKind(kind)
.startSpan();
DECORATE.afterStart(span);
executionAttributes.putAttribute(SPAN_ATTRIBUTE, span);
}
@Override
public void afterMarshalling(
final Context.AfterMarshalling context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
DECORATE.onRequest(span, context.httpRequest());
DECORATE.onSdkRequest(span, context.request());
DECORATE.onAttributes(span, executionAttributes);
}
}
@Override
public void afterExecution(
final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
try {
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
// Call onResponse on both types of responses:
DECORATE.onSdkResponse(span, context.response());
DECORATE.onResponse(span, context.httpResponse());
DECORATE.beforeFinish(span);
} finally {
span.end();
}
}
}
@Override
public void onExecutionFailure(
final Context.FailedExecution context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
try {
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
DECORATE.onError(span, context.exception());
DECORATE.beforeFinish(span);
} finally {
span.end();
}
}
}
}

View File

@ -0,0 +1,315 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.opentelemetry.auto.bootstrap.instrumentation.decorator.HttpClientDecorator
import io.opentelemetry.auto.instrumentation.api.MoreTags
import io.opentelemetry.auto.instrumentation.api.Tags
import io.opentelemetry.auto.test.InstrumentationTestRunner
import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdk
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.core.ResponseInputStream
import software.amazon.awssdk.core.async.AsyncResponseTransformer
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.exception.SdkClientException
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.DynamoDbClient
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.ec2.Ec2AsyncClient
import software.amazon.awssdk.services.ec2.Ec2Client
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest
import software.amazon.awssdk.services.rds.RdsAsyncClient
import software.amazon.awssdk.services.rds.RdsClient
import software.amazon.awssdk.services.rds.model.DeleteOptionGroupRequest
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.CreateBucketRequest
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import spock.lang.AutoCleanup
import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReference
import static io.opentelemetry.auto.test.server.http.TestHttpServer.httpServer
import static io.opentelemetry.trace.Span.Kind.CLIENT
class Aws2ClientCoreTest extends InstrumentationTestRunner {
private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider
.create(AwsBasicCredentials.create("my-access-key", "my-secret-key"))
@Shared
def responseBody = new AtomicReference<String>()
@AutoCleanup
@Shared
def server = httpServer {
handlers {
all {
response.status(200).send(responseBody.get())
}
}
}
def "send #operation request with builder {#builder.class.getName()} mocked response"() {
setup:
def client = builder
.endpointOverride(server.address)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.addExecutionInterceptor(AwsSdk.newInterceptor())
.build())
.build()
responseBody.set(body)
def response = call.call(client)
if (response instanceof Future) {
response = response.get()
}
expect:
response != null
response.class.simpleName.startsWith(operation) || response instanceof ResponseInputStream
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "$service.$operation"
spanKind CLIENT
errored false
parent()
tags {
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_PORT" server.address.port
"$Tags.HTTP_URL" { it.startsWith("${server.address}${path}") }
"$Tags.HTTP_METHOD" "$method"
"$Tags.HTTP_STATUS" 200
"aws.service" "$service"
"aws.operation" "${operation}"
"aws.agent" "java-aws-sdk"
"aws.requestId" "$requestId"
if (service == "S3") {
"aws.bucket.name" "somebucket"
} else if (service == "Sqs" && operation == "CreateQueue") {
"aws.queue.name" "somequeue"
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" "someurl"
} else if (service == "DynamoDb") {
"aws.table.name" "sometable"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
}
}
}
}
}
server.lastRequest.headers.get("traceparent") == null
where:
service | operation | method | path | requestId | builder | call | body
"S3" | "CreateBucket" | "PUT" | "/somebucket" | "UNKNOWN" | S3Client.builder() | { c -> c.createBucket(CreateBucketRequest.builder().bucket("somebucket").build()) } | ""
"S3" | "GetObject" | "GET" | "/somebucket/somekey" | "UNKNOWN" | S3Client.builder() | { c -> c.getObject(GetObjectRequest.builder().bucket("somebucket").key("somekey").build()) } | ""
"DynamoDb" | "CreateTable" | "POST" | "/" | "UNKNOWN" | DynamoDbClient.builder() | { c -> c.createTable(CreateTableRequest.builder().tableName("sometable").build()) } | ""
"Kinesis" | "DeleteStream" | "POST" | "/" | "UNKNOWN" | KinesisClient.builder() | { c -> c.deleteStream(DeleteStreamRequest.builder().streamName("somestream").build()) } | ""
"Sqs" | "CreateQueue" | "POST" | "/" | "7a62c49f-347e-4fc4-9331-6e8e7a96aa73" | SqsClient.builder() | { c -> c.createQueue(CreateQueueRequest.builder().queueName("somequeue").build()) } | """
<CreateQueueResponse>
<CreateQueueResult><QueueUrl>https://queue.amazonaws.com/123456789012/MyQueue</QueueUrl></CreateQueueResult>
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "/" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
<MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
</SendMessageResult>
<ResponseMetadata><RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId></ResponseMetadata>
</SendMessageResponse>
"""
"Ec2" | "AllocateAddress" | "POST" | "/" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2Client.builder() | { c -> c.allocateAddress() } | """
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<publicIp>192.0.2.1</publicIp>
<domain>standard</domain>
</AllocateAddressResponse>
"""
"Rds" | "DeleteOptionGroup" | "POST" | "/" | "0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99" | RdsClient.builder() | { c -> c.deleteOptionGroup(DeleteOptionGroupRequest.builder().build()) } | """
<DeleteOptionGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ResponseMetadata><RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId></ResponseMetadata>
</DeleteOptionGroupResponse>
"""
}
def "send #operation async request with builder {#builder.class.getName()} mocked response"() {
setup:
def client = builder
.endpointOverride(server.address)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.addExecutionInterceptor(AwsSdk.newInterceptor())
.build())
.build()
responseBody.set(body)
def response = call.call(client)
if (response instanceof Future) {
response = response.get()
}
expect:
response != null
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "$service.$operation"
spanKind CLIENT
errored false
parent()
tags {
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_PORT" server.address.port
"$Tags.HTTP_URL" { it.startsWith("${server.address}${path}") }
"$Tags.HTTP_METHOD" "$method"
"$Tags.HTTP_STATUS" 200
"aws.service" "$service"
"aws.operation" "${operation}"
"aws.agent" "java-aws-sdk"
"aws.requestId" "$requestId"
if (service == "S3") {
"aws.bucket.name" "somebucket"
} else if (service == "Sqs" && operation == "CreateQueue") {
"aws.queue.name" "somequeue"
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" "someurl"
} else if (service == "DynamoDb") {
"aws.table.name" "sometable"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
}
}
}
}
}
server.lastRequest.headers.get("traceparent") == null
where:
service | operation | method | path | requestId | builder | call | body
"S3" | "CreateBucket" | "PUT" | "/somebucket" | "UNKNOWN" | S3AsyncClient.builder() | { c -> c.createBucket(CreateBucketRequest.builder().bucket("somebucket").build()) } | ""
"S3" | "GetObject" | "GET" | "/somebucket/somekey" | "UNKNOWN" | S3AsyncClient.builder() | { c -> c.getObject(GetObjectRequest.builder().bucket("somebucket").key("somekey").build(), AsyncResponseTransformer.toBytes()) } | "1234567890"
"DynamoDb" | "CreateTable" | "POST" | "/" | "UNKNOWN" | DynamoDbAsyncClient.builder() | { c -> c.createTable(CreateTableRequest.builder().tableName("sometable").build()) } | ""
// Kinesis seems to expect an http2 response which is incompatible with our test server.
// "Kinesis" | "DeleteStream" | "POST" | "/" | "UNKNOWN" | KinesisAsyncClient.builder() | { c -> c.deleteStream(DeleteStreamRequest.builder().streamName("somestream").build()) } | ""
"Sqs" | "CreateQueue" | "POST" | "/" | "7a62c49f-347e-4fc4-9331-6e8e7a96aa73" | SqsAsyncClient.builder() | { c -> c.createQueue(CreateQueueRequest.builder().queueName("somequeue").build()) } | """
<CreateQueueResponse>
<CreateQueueResult><QueueUrl>https://queue.amazonaws.com/123456789012/MyQueue</QueueUrl></CreateQueueResult>
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "/" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
<MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
</SendMessageResult>
<ResponseMetadata><RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId></ResponseMetadata>
</SendMessageResponse>
"""
"Ec2" | "AllocateAddress" | "POST" | "/" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2AsyncClient.builder() | { c -> c.allocateAddress() } | """
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<publicIp>192.0.2.1</publicIp>
<domain>standard</domain>
</AllocateAddressResponse>
"""
"Rds" | "DeleteOptionGroup" | "POST" | "/" | "0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99" | RdsAsyncClient.builder() | { c -> c.deleteOptionGroup(DeleteOptionGroupRequest.builder().build()) } | """
<DeleteOptionGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ResponseMetadata><RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId></ResponseMetadata>
</DeleteOptionGroupResponse>
"""
}
// TODO(anuraaga): Without AOP instrumentation of the HTTP client, we cannot model retries as
// spans because of https://github.com/aws/aws-sdk-java-v2/issues/1741. We should at least tweak
// the instrumentation to add Events for retries instead.
def "timeout and retry errors not captured"() {
setup:
def server = httpServer {
handlers {
all {
Thread.sleep(500)
response.status(200).send()
}
}
}
def client = S3Client.builder()
.endpointOverride(server.address)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.overrideConfiguration(ClientOverrideConfiguration.builder()
.addExecutionInterceptor(AwsSdk.newInterceptor())
.build())
.httpClientBuilder(ApacheHttpClient.builder().socketTimeout(Duration.ofMillis(50)))
.build()
when:
client.getObject(GetObjectRequest.builder().bucket("somebucket").key("somekey").build())
then:
thrown SdkClientException
assertTraces(1) {
trace(0, 1) {
span(0) {
operationName "S3.GetObject"
spanKind CLIENT
errored true
parent()
tags {
"$MoreTags.NET_PEER_NAME" "localhost"
"$MoreTags.NET_PEER_PORT" server.address.port
"$Tags.HTTP_URL" "$server.address/somebucket/somekey"
"$Tags.HTTP_METHOD" "GET"
"aws.service" "S3"
"aws.operation" "GetObject"
"aws.agent" "java-aws-sdk"
"aws.bucket.name" "somebucket"
errorTags SdkClientException, "Unable to execute HTTP request: Read timed out"
}
}
}
}
cleanup:
server.close()
}
String expectedOperationName(String method) {
return method != null ? "HTTP $method" : HttpClientDecorator.DEFAULT_SPAN_NAME
}
}

View File

@ -10,7 +10,6 @@ muzzle {
group = "software.amazon.awssdk"
module = "aws-core"
versions = "[2.2.0,)"
assertInverse = true
}
}
@ -19,6 +18,8 @@ testSets {
}
dependencies {
compile project(':instrumentation-core:aws-sdk:aws-sdk-2.2-core')
compileOnly group: 'software.amazon.awssdk', name: 'aws-core', version: '2.2.0'
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.

View File

@ -27,9 +27,10 @@ public abstract class AbstractAwsClientInstrumentation extends Instrumenter.Defa
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AwsSdkClientDecorator",
"io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdk",
"io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkClientDecorator",
packageName + ".TracingExecutionInterceptor",
packageName + ".TracingExecutionInterceptor$ScopeHolder"
packageName + ".TracingExecutionInterceptor$ScopeHolder",
};
}
}

View File

@ -15,21 +15,46 @@
*/
package io.opentelemetry.auto.instrumentation.awssdk.v2_2;
import static io.opentelemetry.auto.instrumentation.awssdk.v2_2.AwsSdkClientDecorator.DECORATE;
import static io.opentelemetry.auto.instrumentation.awssdk.v2_2.AwsSdkClientDecorator.TRACER;
import static io.opentelemetry.trace.TracingContextUtils.currentContextWith;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdk;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Span.Kind;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.builder.SdkClientBuilder;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.Context.AfterExecution;
import software.amazon.awssdk.core.interceptor.Context.AfterMarshalling;
import software.amazon.awssdk.core.interceptor.Context.AfterTransmission;
import software.amazon.awssdk.core.interceptor.Context.AfterUnmarshalling;
import software.amazon.awssdk.core.interceptor.Context.BeforeExecution;
import software.amazon.awssdk.core.interceptor.Context.BeforeMarshalling;
import software.amazon.awssdk.core.interceptor.Context.BeforeTransmission;
import software.amazon.awssdk.core.interceptor.Context.BeforeUnmarshalling;
import software.amazon.awssdk.core.interceptor.Context.FailedExecution;
import software.amazon.awssdk.core.interceptor.Context.ModifyHttpRequest;
import software.amazon.awssdk.core.interceptor.Context.ModifyHttpResponse;
import software.amazon.awssdk.core.interceptor.Context.ModifyRequest;
import software.amazon.awssdk.core.interceptor.Context.ModifyResponse;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
/** AWS request execution interceptor */
/**
* {@link ExecutionInterceptor} that delegates to {@link AwsSdk}, augmenting {@link
* #beforeTransmission(BeforeTransmission, ExecutionAttributes)} to make sure the span is set to the
* current context to allow downstream instrumentation like Netty to pick it up.
*/
public class TracingExecutionInterceptor implements ExecutionInterceptor {
public static class ScopeHolder {
@ -40,65 +65,15 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
// need to inject helper for it.
private static final Consumer<ClientOverrideConfiguration.Builder>
OVERRIDE_CONFIGURATION_CONSUMER =
builder -> builder.addExecutionInterceptor(new TracingExecutionInterceptor());
builder ->
builder.addExecutionInterceptor(
// Agent will trace HTTP calls too so use INTERNAL kind.
new TracingExecutionInterceptor(AwsSdk.newInterceptor(Kind.INTERNAL)));
private static final ExecutionAttribute<Span> SPAN_ATTRIBUTE =
new ExecutionAttribute<>("io.opentelemetry.auto.Span");
private final ExecutionInterceptor delegate;
@Override
public void beforeExecution(
final Context.BeforeExecution context, final ExecutionAttributes executionAttributes) {
final Span span = TRACER.spanBuilder(DECORATE.spanName(executionAttributes)).startSpan();
try (final Scope scope = currentContextWith(span)) {
DECORATE.afterStart(span);
executionAttributes.putAttribute(SPAN_ATTRIBUTE, span);
}
}
@Override
public void afterMarshalling(
final Context.AfterMarshalling context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
DECORATE.onRequest(span, context.httpRequest());
DECORATE.onSdkRequest(span, context.request());
DECORATE.onAttributes(span, executionAttributes);
}
@Override
public void beforeTransmission(
final Context.BeforeTransmission context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
// This scope will be closed by AwsHttpClientInstrumentation since ExecutionInterceptor API
// doesn't provide a way to run code in the same thread after transmission has been scheduled.
ScopeHolder.CURRENT.set(currentContextWith(span));
}
@Override
public void afterExecution(
final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
// Call onResponse on both types of responses:
DECORATE.onResponse(span, context.response());
DECORATE.onResponse(span, context.httpResponse());
DECORATE.beforeFinish(span);
span.end();
}
}
@Override
public void onExecutionFailure(
final Context.FailedExecution context, final ExecutionAttributes executionAttributes) {
final Span span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
DECORATE.onError(span, context.exception());
DECORATE.beforeFinish(span);
span.end();
}
private TracingExecutionInterceptor(ExecutionInterceptor delegate) {
this.delegate = delegate;
}
/**
@ -112,4 +87,113 @@ public class TracingExecutionInterceptor implements ExecutionInterceptor {
public static void muzzleCheck() {
// Noop
}
@Override
public void beforeExecution(BeforeExecution context, ExecutionAttributes executionAttributes) {
delegate.beforeExecution(context, executionAttributes);
}
@Override
public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes executionAttributes) {
return delegate.modifyRequest(context, executionAttributes);
}
@Override
public void beforeMarshalling(
BeforeMarshalling context, ExecutionAttributes executionAttributes) {
delegate.beforeMarshalling(context, executionAttributes);
}
@Override
public void afterMarshalling(AfterMarshalling context, ExecutionAttributes executionAttributes) {
delegate.afterMarshalling(context, executionAttributes);
}
@Override
public SdkHttpRequest modifyHttpRequest(
ModifyHttpRequest context, ExecutionAttributes executionAttributes) {
return delegate.modifyHttpRequest(context, executionAttributes);
}
@Override
public Optional<RequestBody> modifyHttpContent(
ModifyHttpRequest context, ExecutionAttributes executionAttributes) {
return delegate.modifyHttpContent(context, executionAttributes);
}
@Override
public Optional<AsyncRequestBody> modifyAsyncHttpContent(
ModifyHttpRequest context, ExecutionAttributes executionAttributes) {
return delegate.modifyAsyncHttpContent(context, executionAttributes);
}
@Override
public void beforeTransmission(
BeforeTransmission context, ExecutionAttributes executionAttributes) {
delegate.beforeTransmission(context, executionAttributes);
final Span span = AwsSdk.getSpanFromAttributes(executionAttributes);
if (span != null) {
// This scope will be closed by AwsHttpClientInstrumentation since ExecutionInterceptor API
// doesn't provide a way to run code in the same thread after transmission has been scheduled.
ScopeHolder.CURRENT.set(currentContextWith(span));
}
}
@Override
public void afterTransmission(
AfterTransmission context, ExecutionAttributes executionAttributes) {
delegate.afterTransmission(context, executionAttributes);
}
@Override
public SdkHttpResponse modifyHttpResponse(
ModifyHttpResponse context, ExecutionAttributes executionAttributes) {
return delegate.modifyHttpResponse(context, executionAttributes);
}
@Override
public Optional<Publisher<ByteBuffer>> modifyAsyncHttpResponseContent(
ModifyHttpResponse context, ExecutionAttributes executionAttributes) {
return delegate.modifyAsyncHttpResponseContent(context, executionAttributes);
}
@Override
public Optional<InputStream> modifyHttpResponseContent(
ModifyHttpResponse context, ExecutionAttributes executionAttributes) {
return delegate.modifyHttpResponseContent(context, executionAttributes);
}
@Override
public void beforeUnmarshalling(
BeforeUnmarshalling context, ExecutionAttributes executionAttributes) {
delegate.beforeUnmarshalling(context, executionAttributes);
}
@Override
public void afterUnmarshalling(
AfterUnmarshalling context, ExecutionAttributes executionAttributes) {
delegate.afterUnmarshalling(context, executionAttributes);
}
@Override
public SdkResponse modifyResponse(
ModifyResponse context, ExecutionAttributes executionAttributes) {
return delegate.modifyResponse(context, executionAttributes);
}
@Override
public void afterExecution(AfterExecution context, ExecutionAttributes executionAttributes) {
delegate.afterExecution(context, executionAttributes);
}
@Override
public Throwable modifyException(
FailedExecution context, ExecutionAttributes executionAttributes) {
return delegate.modifyException(context, executionAttributes);
}
@Override
public void onExecutionFailure(FailedExecution context, ExecutionAttributes executionAttributes) {
delegate.onExecutionFailure(context, executionAttributes);
}
}

View File

@ -146,6 +146,8 @@ include ':instrumentation:trace-annotation'
include ':instrumentation:twilio-6.6'
include ':instrumentation:vertx-testing'
include ':instrumentation-core:aws-sdk:aws-sdk-2.2-core'
// exporter support
include ':exporter-support'

View File

@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.auto.test
import com.google.common.base.Predicate
import com.google.common.base.Predicates
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.SimpleType
import io.opentelemetry.auto.test.asserts.InMemoryExporterAssert
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.trace.data.SpanData
import org.junit.Before
import spock.lang.Specification
/**
* A spock test runner which automatically initializes an in-memory exporter that can be used to
* verify traces.
*/
abstract class InstrumentationTestRunner extends Specification {
protected static final InMemoryExporter TEST_WRITER
static {
TEST_WRITER = new InMemoryExporter()
OpenTelemetrySdk.getTracerProvider().addSpanProcessor(TEST_WRITER)
}
@Before
void beforeTest() {
TEST_WRITER.clear()
}
protected void assertTraces(
final int size,
@ClosureParams(
value = SimpleType,
options = "io.opentelemetry.auto.test.asserts.ListWriterAssert")
@DelegatesTo(value = InMemoryExporterAssert, strategy = Closure.DELEGATE_FIRST)
final Closure spec) {
InMemoryExporterAssert.assertTraces(
TEST_WRITER, size, Predicates.<List<SpanData>>alwaysFalse(), spec)
}
protected void assertTracesWithFilter(
final int size,
final Predicate<List<SpanData>> excludes,
@ClosureParams(
value = SimpleType,
options = "io.opentelemetry.auto.test.asserts.ListWriterAssert")
@DelegatesTo(value = InMemoryExporterAssert, strategy = Closure.DELEGATE_FIRST)
final Closure spec) {
InMemoryExporterAssert.assertTraces(TEST_WRITER, size, excludes, spec)
}
}

View File

@ -7,6 +7,7 @@ excludedClassesCoverage += [
'io.opentelemetry.auto.test.base.*',
'io.opentelemetry.auto.test.log.*',
'io.opentelemetry.auto.test.AgentTestRunner',
'io.opentelemetry.auto.test.InstrumentationTestRunner',
'io.opentelemetry.auto.test.InMemoryExporter.*',
'io.opentelemetry.auto.test.utils.*',
// Avoid applying jacoco instrumentation to classes instrumented by tested agent