Convert remaining aws sdk 1.11 tests from groovy to java (#12777)

This commit is contained in:
Jay DeLuca 2024-11-25 12:24:48 -05:00 committed by GitHub
parent 55c137c9c3
commit 4f94664041
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 645 additions and 1066 deletions

View File

@ -1,201 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.BucketNotificationConfiguration
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.QueueConfiguration
import com.amazonaws.services.s3.model.S3Event
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest
import com.amazonaws.services.s3.model.TopicConfiguration
import com.amazonaws.services.sns.AmazonSNSAsyncClient
import com.amazonaws.services.sns.model.CreateTopicResult
import com.amazonaws.services.sns.model.SetTopicAttributesRequest
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest
import com.amazonaws.services.sqs.model.PurgeQueueRequest
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import org.slf4j.LoggerFactory
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.DockerImageName
import java.time.Duration
class AwsConnector {
private LocalStackContainer localstack
private AmazonSQSAsyncClient sqsClient
private AmazonS3Client s3Client
private AmazonSNSAsyncClient snsClient
static localstack() {
AwsConnector awsConnector = new AwsConnector()
awsConnector.localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2"))
.withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS, LocalStackContainer.Service.S3)
.withEnv("DEBUG", "1")
.withEnv("SQS_PROVIDER", "elasticmq")
.withStartupTimeout(Duration.ofMinutes(2))
awsConnector.localstack.start()
awsConnector.localstack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")))
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsConnector.localstack .getAccessKey(), awsConnector.localstack.getSecretKey()))
awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder()
.withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.SQS))
.withCredentials(credentialsProvider)
.build()
awsConnector.s3Client = AmazonS3Client.builder()
.withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.S3))
.withCredentials(credentialsProvider)
.build()
awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder()
.withEndpointConfiguration(getEndpointConfiguration(awsConnector.localstack, LocalStackContainer.Service.SNS))
.withCredentials(credentialsProvider)
.build()
return awsConnector
}
static AwsClientBuilder.EndpointConfiguration getEndpointConfiguration(LocalStackContainer localstack, LocalStackContainer.Service service) {
return new AwsClientBuilder.EndpointConfiguration(localstack.getEndpointOverride(service).toString(), localstack.getRegion())
}
static liveAws() {
AwsConnector awsConnector = new AwsConnector()
awsConnector.sqsClient = AmazonSQSAsyncClient.asyncBuilder()
.withRegion(Regions.US_EAST_1)
.build()
awsConnector.s3Client = AmazonS3Client.builder()
.withRegion(Regions.US_EAST_1)
.build()
awsConnector.snsClient = AmazonSNSAsyncClient.asyncBuilder()
.withRegion(Regions.US_EAST_1)
.build()
return awsConnector
}
def createQueue(String queueName) {
println "Create queue ${queueName}"
return sqsClient.createQueue(queueName).getQueueUrl()
}
def getQueueArn(String queueUrl) {
println "Get ARN for queue ${queueUrl}"
return sqsClient.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn")).getAttributes()
.get("QueueArn")
}
def setTopicPublishingPolicy(String topicArn) {
println "Set policy for topic ${topicArn}"
snsClient.setTopicAttributes(new SetTopicAttributesRequest(topicArn, "Policy", String.format(SNS_POLICY, topicArn)))
}
private static final String SNS_POLICY = "{" +
" \"Statement\": [" +
" {" +
" \"Effect\": \"Allow\"," +
" \"Principal\": \"*\"," +
" \"Action\": \"sns:Publish\"," +
" \"Resource\": \"%s\"" +
" }]" +
"}"
def setQueuePublishingPolicy(String queueUrl, String queueArn) {
println "Set policy for queue ${queueArn}"
sqsClient.setQueueAttributes(queueUrl, Collections.singletonMap("Policy", String.format(SQS_POLICY, queueArn)))
}
private static final String SQS_POLICY = "{" +
" \"Statement\": [" +
" {" +
" \"Effect\": \"Allow\"," +
" \"Principal\": \"*\"," +
" \"Action\": \"sqs:SendMessage\"," +
" \"Resource\": \"%s\"" +
" }]" +
"}"
def createBucket(String bucketName) {
println "Create bucket ${bucketName}"
s3Client.createBucket(bucketName)
}
def deleteBucket(String bucketName) {
println "Delete bucket ${bucketName}"
ObjectListing objectListing = s3Client.listObjects(bucketName)
Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator()
while (objIter.hasNext()) {
s3Client.deleteObject(bucketName, objIter.next().getKey())
}
s3Client.deleteBucket(bucketName)
}
def enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) {
println "Enable notification for bucket ${bucketName} to queue ${sqsQueueArn}"
BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration()
notificationConfiguration.addConfiguration("sqsQueueConfig",
new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut)))
s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
bucketName, notificationConfiguration))
}
def enableS3ToSnsNotifications(String bucketName, String snsTopicArn) {
println "Enable notification for bucket ${bucketName} to topic ${snsTopicArn}"
BucketNotificationConfiguration notificationConfiguration = new BucketNotificationConfiguration()
notificationConfiguration.addConfiguration("snsTopicConfig",
new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut)))
s3Client.setBucketNotificationConfiguration(new SetBucketNotificationConfigurationRequest(
bucketName, notificationConfiguration))
}
def createTopicAndSubscribeQueue(String topicName, String queueArn) {
println "Create topic ${topicName} and subscribe to queue ${queueArn}"
CreateTopicResult ctr = snsClient.createTopic(topicName)
snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn)
return ctr.getTopicArn()
}
def receiveMessage(String queueUrl) {
println "Receive message from queue ${queueUrl}"
sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20))
}
def purgeQueue(String queueUrl) {
println "Purge queue ${queueUrl}"
sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl))
}
def putSampleData(String bucketName) {
println "Put sample data to bucket ${bucketName}"
s3Client.putObject(bucketName, "otelTestKey", "otelTestData")
}
def publishSampleNotification(String topicArn) {
snsClient.publish(topicArn, "Hello There")
}
def disconnect() {
if (localstack != null) {
localstack.stop()
}
}
}

View File

@ -1,650 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes
import io.opentelemetry.semconv.ServerAttributes
import io.opentelemetry.semconv.HttpAttributes
import io.opentelemetry.semconv.NetworkAttributes
import io.opentelemetry.semconv.UrlAttributes
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
class S3TracingTest extends AgentInstrumentationSpecification {
@Shared
AwsConnector awsConnector = AwsConnector.localstack()
def cleanupSpec() {
awsConnector.disconnect()
}
def "S3 upload triggers SQS message"() {
setup:
String queueName = "s3ToSqsTestQueue"
String bucketName = "otel-s3-to-sqs-test-bucket"
String queueUrl = awsConnector.createQueue(queueName)
awsConnector.createBucket(bucketName)
String queueArn = awsConnector.getQueueArn(queueUrl)
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn)
awsConnector.enableS3ToSqsNotifications(bucketName, queueArn)
when:
// test message, auto created by AWS
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
then:
assertTraces(10) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.name" queueName
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "CreateQueue"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(1, 1) {
span(0) {
name "S3.CreateBucket"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.bucket.name" bucketName
"rpc.method" "CreateBucket"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(2, 1) {
span(0) {
name "SQS.GetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "GetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(3, 1) {
span(0) {
name "SQS.SetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "SetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(4, 1) {
span(0) {
name "S3.SetBucketNotificationConfiguration"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "SetBucketNotificationConfiguration"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(5, 3) {
span(0) {
name "S3.PutObject"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "PutObject"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
span(1) {
name "s3ToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
trace(6, 1) {
span(0) {
name "S3.ListObjects"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "ListObjects"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "GET"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(7, 1) {
span(0) {
name "S3.DeleteObject"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "DeleteObject"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(8, 1) {
span(0) {
name "S3.DeleteBucket"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "DeleteBucket"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(9, 1) {
span(0) {
name "SQS.PurgeQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "PurgeQueue"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
}
}
def "S3 upload triggers SNS topic notification, then creates SQS message"() {
setup:
String queueName = "s3ToSnsToSqsTestQueue"
String bucketName = "otel-s3-sns-sqs-test-bucket"
String topicName = "s3ToSnsToSqsTestTopic"
String queueUrl = awsConnector.createQueue(queueName)
String queueArn = awsConnector.getQueueArn(queueUrl)
awsConnector.createBucket(bucketName)
String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn)
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn)
awsConnector.setTopicPublishingPolicy(topicArn)
awsConnector.enableS3ToSnsNotifications(bucketName, topicArn)
when:
// test message, auto created by AWS
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
then:
assertTraces(14) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.name" queueName
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "CreateQueue"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(1, 1) {
span(0) {
name "SQS.GetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "GetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(2, 1) {
span(0) {
name "S3.CreateBucket"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "CreateBucket"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(3, 1) {
span(0) {
name "SNS.CreateTopic"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "CreateTopic"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(4, 1) {
span(0) {
name "SNS.Subscribe"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "Subscribe"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
trace(5, 1) {
span(0) {
name "SQS.SetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "SetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(6, 1) {
span(0) {
name "SNS.SetTopicAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "SetTopicAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
trace(7, 1) {
span(0) {
name "S3.SetBucketNotificationConfiguration"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "SetBucketNotificationConfiguration"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(8, 1) {
span(0) {
name "S3.PutObject"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "PutObject"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "PUT"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(9, 2) {
span(0) {
name "s3ToSnsToSqsTestQueue process"
kind CONSUMER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(1) {
name "process child"
childOf span(0)
attributes {
}
}
}
trace(10, 1) {
span(0) {
name "S3.ListObjects"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "ListObjects"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "GET"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(11, 1) {
span(0) {
name "S3.DeleteObject"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "DeleteObject"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(12, 1) {
span(0) {
name "S3.DeleteBucket"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"rpc.method" "DeleteBucket"
"rpc.system" "aws-api"
"rpc.service" "Amazon S3"
"aws.bucket.name" bucketName
"$HttpAttributes.HTTP_REQUEST_METHOD" "DELETE"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 204
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(13, 1) {
span(0) {
name "SQS.PurgeQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "PurgeQueue"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://") }
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
}
}
}

View File

@ -1,215 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes
import io.opentelemetry.semconv.ServerAttributes
import io.opentelemetry.semconv.HttpAttributes
import io.opentelemetry.semconv.NetworkAttributes
import io.opentelemetry.semconv.UrlAttributes
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
class SnsTracingTest extends AgentInstrumentationSpecification {
@Shared
AwsConnector awsConnector = AwsConnector.localstack()
def cleanupSpec() {
awsConnector.disconnect()
}
def "SNS notification triggers SQS message consumed with AWS SDK"() {
setup:
String queueName = "snsToSqsTestQueue"
String topicName = "snsToSqsTestTopic"
String queueUrl = awsConnector.createQueue(queueName)
String queueArn = awsConnector.getQueueArn(queueUrl)
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn)
String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn)
when:
awsConnector.publishSampleNotification(topicArn)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
then:
assertTraces(6) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.name" queueName
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "CreateQueue"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(1, 1) {
span(0) {
name "SQS.GetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "GetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(2, 1) {
span(0) {
name "SQS.SetQueueAttributes"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "SetQueueAttributes"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(3, 1) {
span(0) {
name "SNS.CreateTopic"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "CreateTopic"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
}
}
}
trace(4, 1) {
span(0) {
name "SNS.Subscribe"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "Subscribe"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
trace(5, 3) {
span(0) {
name "SNS.Publish"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.method" "Publish"
"rpc.system" "aws-api"
"rpc.service" "AmazonSNS"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
span(1) {
name "snsToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" String
"aws.queue.url" queueUrl
"$AwsIncubatingAttributes.AWS_REQUEST_ID" String
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"rpc.method" "ReceiveMessage"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" String
"$ServerAttributes.SERVER_ADDRESS" String
"$ServerAttributes.SERVER_PORT" { it == null || Number }
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
"$NetworkAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
}
}
}

View File

@ -0,0 +1,190 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.BucketNotificationConfiguration;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.QueueConfiguration;
import com.amazonaws.services.s3.model.S3Event;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.SetBucketNotificationConfigurationRequest;
import com.amazonaws.services.s3.model.TopicConfiguration;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.SetTopicAttributesRequest;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
class AwsConnector {
private final LocalStackContainer localStack;
private final AmazonSQSAsync sqsClient;
private final AmazonS3 s3Client;
private final AmazonSNSAsync snsClient;
AwsConnector() {
localStack =
new LocalStackContainer(DockerImageName.parse("localstack/localstack:2.0.2"))
.withServices(
LocalStackContainer.Service.SQS,
LocalStackContainer.Service.SNS,
LocalStackContainer.Service.S3)
.withEnv("DEBUG", "1")
.withEnv("SQS_PROVIDER", "elasticmq")
.withStartupTimeout(Duration.ofMinutes(2));
localStack.start();
localStack.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("test")));
AWSCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(localStack.getAccessKey(), localStack.getSecretKey()));
sqsClient =
AmazonSQSAsyncClient.asyncBuilder()
.withEndpointConfiguration(
getEndpointConfiguration(localStack, LocalStackContainer.Service.SQS))
.withCredentials(credentialsProvider)
.build();
s3Client =
AmazonS3Client.builder()
.withEndpointConfiguration(
getEndpointConfiguration(localStack, LocalStackContainer.Service.S3))
.withCredentials(credentialsProvider)
.build();
snsClient =
AmazonSNSAsyncClient.asyncBuilder()
.withEndpointConfiguration(
getEndpointConfiguration(localStack, LocalStackContainer.Service.SNS))
.withCredentials(credentialsProvider)
.build();
}
static AwsClientBuilder.EndpointConfiguration getEndpointConfiguration(
LocalStackContainer localStack, LocalStackContainer.Service service) {
return new AwsClientBuilder.EndpointConfiguration(
localStack.getEndpointOverride(service).toString(), localStack.getRegion());
}
String createQueue(String queueName) {
return sqsClient.createQueue(queueName).getQueueUrl();
}
String getQueueArn(String queueUrl) {
return sqsClient
.getQueueAttributes(new GetQueueAttributesRequest(queueUrl).withAttributeNames("QueueArn"))
.getAttributes()
.get("QueueArn");
}
void setTopicPublishingPolicy(String topicArn) {
String snsPolicy =
"{"
+ " \"Statement\": ["
+ " {"
+ " \"Effect\": \"Allow\","
+ " \"Principal\": \"*\","
+ " \"Action\": \"sns:Publish\","
+ " \"Resource\": \"%s\""
+ " }]"
+ "}";
snsClient.setTopicAttributes(
new SetTopicAttributesRequest(topicArn, "Policy", String.format(snsPolicy, topicArn)));
}
void setQueuePublishingPolicy(String queueUrl, String queueArn) {
String sqsPolicy =
"{"
+ " \"Statement\": ["
+ " {"
+ " \"Effect\": \"Allow\","
+ " \"Principal\": \"*\","
+ " \"Action\": \"sqs:SendMessage\","
+ " \"Resource\": \"%s\""
+ " }]"
+ "}";
sqsClient.setQueueAttributes(
queueUrl, Collections.singletonMap("Policy", String.format(sqsPolicy, queueArn)));
}
void createBucket(String bucketName) {
s3Client.createBucket(bucketName);
}
void deleteBucket(String bucketName) {
ObjectListing objectListing = s3Client.listObjects(bucketName);
for (S3ObjectSummary element : objectListing.getObjectSummaries()) {
s3Client.deleteObject(bucketName, element.getKey());
}
s3Client.deleteBucket(bucketName);
}
void enableS3ToSqsNotifications(String bucketName, String sqsQueueArn) {
BucketNotificationConfiguration notificationConfiguration =
new BucketNotificationConfiguration();
notificationConfiguration.addConfiguration(
"sqsQueueConfig",
new QueueConfiguration(sqsQueueArn, EnumSet.of(S3Event.ObjectCreatedByPut)));
s3Client.setBucketNotificationConfiguration(
new SetBucketNotificationConfigurationRequest(bucketName, notificationConfiguration));
}
void enableS3ToSnsNotifications(String bucketName, String snsTopicArn) {
BucketNotificationConfiguration notificationConfiguration =
new BucketNotificationConfiguration();
notificationConfiguration.addConfiguration(
"snsTopicConfig",
new TopicConfiguration(snsTopicArn, EnumSet.of(S3Event.ObjectCreatedByPut)));
s3Client.setBucketNotificationConfiguration(
new SetBucketNotificationConfigurationRequest(bucketName, notificationConfiguration));
}
String createTopicAndSubscribeQueue(String topicName, String queueArn) {
CreateTopicResult ctr = snsClient.createTopic(topicName);
snsClient.subscribe(ctr.getTopicArn(), "sqs", queueArn);
return ctr.getTopicArn();
}
ReceiveMessageResult receiveMessage(String queueUrl) {
return sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20));
}
void purgeQueue(String queueUrl) {
sqsClient.purgeQueue(new PurgeQueueRequest(queueUrl));
}
void putSampleData(String bucketName) {
s3Client.putObject(bucketName, "otelTestKey", "otelTestData");
}
void publishSampleNotification(String topicArn) {
snsClient.publish(topicArn, "Hello There");
}
void disconnect() {
if (localStack != null) {
localStack.stop();
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
class AwsSpanAssertions {
private AwsSpanAssertions() {}
static SpanDataAssert sqs(
SpanDataAssert span, String queueName, String queueUrl, String rpcMethod) {
return span.hasName("SQS." + rpcMethod)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
equalTo(stringKey("aws.queue.name"), queueName),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)),
equalTo(RPC_METHOD, rpcMethod),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "AmazonSQS"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))));
}
static SpanDataAssert s3(
SpanDataAssert span,
String bucketName,
String rpcMethod,
String requestMethod,
int responseStatusCode) {
return span.hasName("S3." + rpcMethod)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
equalTo(stringKey("aws.bucket.name"), bucketName),
equalTo(RPC_METHOD, rpcMethod),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Amazon S3"),
equalTo(HTTP_REQUEST_METHOD, requestMethod),
equalTo(HTTP_RESPONSE_STATUS_CODE, responseStatusCode),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))));
}
static SpanDataAssert sns(SpanDataAssert span, String topicArn, String rpcMethod) {
return span.hasName("SNS." + rpcMethod)
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(MESSAGING_DESTINATION_NAME, topicArn),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)),
equalTo(RPC_METHOD, rpcMethod),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "AmazonSNS"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))));
}
}

View File

@ -0,0 +1,231 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.s3;
import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sns;
import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sqs;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@SuppressWarnings("deprecation") // MESSAGING_OPERATION is deprecated
class S3TracingTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final AwsConnector awsConnector = new AwsConnector();
@AfterAll
static void cleanUp() {
awsConnector.disconnect();
}
@Test
void testS3UploadTriggersSqsMessage() {
String queueName = "s3ToSqsTestQueue";
String bucketName = "otel-s3-to-sqs-test-bucket";
String queueUrl = awsConnector.createQueue(queueName);
awsConnector.createBucket(bucketName);
String queueArn = awsConnector.getQueueArn(queueUrl);
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn);
awsConnector.enableS3ToSqsNotifications(bucketName, queueArn);
// test message, auto created by AWS
awsConnector.receiveMessage(queueUrl);
awsConnector.putSampleData(bucketName);
// traced message
ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl);
receiveMessageResult
.getMessages()
.forEach(message -> testing.runWithSpan("process child", () -> {}));
// cleanup
awsConnector.deleteBucket(bucketName);
awsConnector.purgeQueue(queueUrl);
testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "CreateBucket", "PUT", 200)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "GetQueueAttributes")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "SetQueueAttributes")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "SetBucketNotificationConfiguration", "PUT", 200)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "PutObject", "PUT", 200),
span ->
span.hasName("s3ToSqsTestQueue process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "AmazonSQS"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "s3ToSqsTestQueue"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))),
span ->
span.hasName("process child")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty())),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "ListObjects", "GET", 200)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "DeleteObject", "DELETE", 204)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "DeleteBucket", "DELETE", 204)),
trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, null, queueUrl, "PurgeQueue")));
}
@Test
void testS3UploadTriggersSnsTopicNotificationThenCreatesSqsMessage() {
String queueName = "s3ToSnsToSqsTestQueue";
String bucketName = "otel-s3-to-sns-to-sqs-test-bucket";
String topicName = "s3ToSnsTestTopic";
String queueUrl = awsConnector.createQueue(queueName);
String queueArn = awsConnector.getQueueArn(queueUrl);
awsConnector.createBucket(bucketName);
String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn);
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn);
awsConnector.setTopicPublishingPolicy(topicArn);
awsConnector.enableS3ToSnsNotifications(bucketName, topicArn);
// test message, auto created by AWS
awsConnector.receiveMessage(queueUrl);
awsConnector.putSampleData(bucketName);
// traced message
ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl);
receiveMessageResult
.getMessages()
.forEach(message -> testing.runWithSpan("process child", () -> {}));
// cleanup
awsConnector.deleteBucket(bucketName);
awsConnector.purgeQueue(queueUrl);
testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "GetQueueAttributes")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "CreateBucket", "PUT", 200)),
trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, null, "CreateTopic")),
trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "Subscribe")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "SetQueueAttributes")),
trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "SetTopicAttributes")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "SetBucketNotificationConfiguration", "PUT", 200)),
trace ->
trace.hasSpansSatisfyingExactly(span -> s3(span, bucketName, "PutObject", "PUT", 200)),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("s3ToSnsToSqsTestQueue process")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "AmazonSQS"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "s3ToSnsToSqsTestQueue"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))),
span ->
span.hasName("process child")
.hasParent(trace.getSpan(0))
.hasAttributes(Attributes.empty())),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "ListObjects", "GET", 200)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "DeleteObject", "DELETE", 204)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> s3(span, bucketName, "DeleteBucket", "DELETE", 204)),
trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, null, queueUrl, "PurgeQueue")));
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sns;
import static io.opentelemetry.javaagent.instrumentation.awssdk.v1_11.AwsSpanAssertions.sqs;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class SnsTracingTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final AwsConnector awsConnector = new AwsConnector();
@AfterAll
static void cleanUp() {
awsConnector.disconnect();
}
@Test
@SuppressWarnings("deprecation") // MESSAGING_OPERATION is deprecated
void testSnsNotificationTriggersSqsMessageConsumedWithAwsSdk() {
String queueName = "snsToSqsTestQueue";
String topicName = "snsToSqsTestTopic";
String queueUrl = awsConnector.createQueue(queueName);
String queueArn = awsConnector.getQueueArn(queueUrl);
awsConnector.setQueuePublishingPolicy(queueUrl, queueArn);
String topicArn = awsConnector.createTopicAndSubscribeQueue(topicName, queueArn);
awsConnector.publishSampleNotification(topicArn);
ReceiveMessageResult receiveMessageResult = awsConnector.receiveMessage(queueUrl);
receiveMessageResult
.getMessages()
.forEach(message -> testing.runWithSpan("process child", () -> {}));
testing.waitAndAssertTraces(
trace -> trace.hasSpansSatisfyingExactly(span -> sqs(span, queueName, null, "CreateQueue")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "GetQueueAttributes")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sqs(span, null, queueUrl, "SetQueueAttributes")),
trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, null, "CreateTopic")),
trace -> trace.hasSpansSatisfyingExactly(span -> sns(span, topicArn, "Subscribe")),
trace ->
trace.hasSpansSatisfyingExactly(
span -> sns(span, topicArn, "Publish"),
span ->
span.hasName("snsToSqsTestQueue process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), v -> v.isInstanceOf(String.class)),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(AWS_REQUEST_ID, v -> v.isInstanceOf(String.class)),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "AmazonSQS"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, val -> val.startsWith("http://")),
satisfies(SERVER_ADDRESS, v -> v.isInstanceOf(String.class)),
equalTo(NETWORK_PROTOCOL_VERSION, "1.1"),
satisfies(
SERVER_PORT,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "snsToSqsTestQueue"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))),
span ->
span.hasName("process child")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty())));
}
}