Apache camel enable sqs propagation (#2102)
* adding AWS SQS tests to Apache Camel instrumentation * code review changes Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com> * SQS context propagation for Apache Camel instrumentation * code review - removed not needed dep Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
This commit is contained in:
parent
5bf9f34caa
commit
1f22fcaab0
|
@ -0,0 +1,10 @@
|
|||
apply from: "$rootDir/gradle/java.gradle"
|
||||
|
||||
dependencies {
|
||||
testImplementation project(':instrumentation:apache-camel-2.20:javaagent')
|
||||
testImplementation group: 'org.apache.camel', name: 'camel-core', version: '2.20.1'
|
||||
testImplementation group: 'org.apache.camel', name: 'camel-aws', version: '2.20.1'
|
||||
|
||||
testImplementation deps.opentelemetryTraceProps
|
||||
testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.18.1'
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanContext;
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import org.apache.camel.Endpoint;
|
||||
import org.apache.camel.component.aws.sqs.SqsComponent;
|
||||
import org.apache.camel.component.aws.sqs.SqsConfiguration;
|
||||
import org.apache.camel.component.aws.sqs.SqsEndpoint;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class CamelPropagationUtilTest {
|
||||
|
||||
@Test
|
||||
public void shouldExtractAwsParent() {
|
||||
|
||||
// given
|
||||
Endpoint endpoint = new SqsEndpoint("", new SqsComponent(), new SqsConfiguration());
|
||||
Map<String, Object> exchangeHeaders =
|
||||
Collections.singletonMap(
|
||||
"AWSTraceHeader",
|
||||
"Root=1-5759e988-bd862e3fe1be46a994272793;Parent=53995c3f42cd8ad8;Sampled=1\n");
|
||||
|
||||
// when
|
||||
Context parent = CamelPropagationUtil.extractParent(exchangeHeaders, endpoint);
|
||||
|
||||
// then
|
||||
Span parentSpan = Span.fromContext(parent);
|
||||
SpanContext parentSpanContext = parentSpan.getSpanContext();
|
||||
assertThat(parentSpanContext.getTraceIdAsHexString())
|
||||
.isEqualTo("5759e988bd862e3fe1be46a994272793");
|
||||
assertThat(parentSpanContext.getSpanIdAsHexString()).isEqualTo("53995c3f42cd8ad8");
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ ext {
|
|||
|
||||
dependencies {
|
||||
library group: 'org.apache.camel', name: 'camel-core', version: "$camelversion"
|
||||
implementation deps.opentelemetryTraceProps
|
||||
|
||||
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-2.0:javaagent')
|
||||
testInstrumentation project(':instrumentation:servlet:servlet-3.0:javaagent')
|
||||
|
|
|
@ -38,6 +38,11 @@ public class ApacheCamelInstrumentationModule extends InstrumentationModule {
|
|||
return singletonList(new CamelContextInstrumentation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] additionalHelperClassNames() {
|
||||
return new String[] {"io.opentelemetry.extension.trace.propagation.AwsXrayPropagator"};
|
||||
}
|
||||
|
||||
public static class CamelContextInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<ClassLoader> classLoaderOptimization() {
|
||||
|
|
|
@ -9,13 +9,34 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
|
|||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.propagation.TextMapPropagator.Getter;
|
||||
import io.opentelemetry.context.propagation.TextMapPropagator.Setter;
|
||||
import io.opentelemetry.extension.trace.propagation.AwsXrayPropagator;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import org.apache.camel.Endpoint;
|
||||
|
||||
final class CamelPropagationUtil {
|
||||
|
||||
private CamelPropagationUtil() {}
|
||||
|
||||
static Context extractParent(final Map<String, Object> exchangeHeaders) {
|
||||
private static boolean isAwsPropagated(Endpoint endpoint) {
|
||||
return endpoint.getClass().getName().endsWith("SqsEndpoint");
|
||||
}
|
||||
|
||||
static Context extractParent(final Map<String, Object> exchangeHeaders, Endpoint endpoint) {
|
||||
return (isAwsPropagated(endpoint)
|
||||
? extractAwsPropagationParent(exchangeHeaders)
|
||||
: extractHttpPropagationParent(exchangeHeaders));
|
||||
}
|
||||
|
||||
private static Context extractAwsPropagationParent(final Map<String, Object> exchangeHeaders) {
|
||||
return AwsXrayPropagator.getInstance()
|
||||
.extract(
|
||||
Context.current(),
|
||||
Collections.singletonMap("X-Amzn-Trace-Id", exchangeHeaders.get("AWSTraceHeader")),
|
||||
MapGetter.INSTANCE);
|
||||
}
|
||||
|
||||
private static Context extractHttpPropagationParent(final Map<String, Object> exchangeHeaders) {
|
||||
return GlobalOpenTelemetry.getPropagators()
|
||||
.getTextMapPropagator()
|
||||
.extract(Context.current(), exchangeHeaders, MapGetter.INSTANCE);
|
||||
|
|
|
@ -43,7 +43,8 @@ final class CamelRoutePolicy extends RoutePolicySupport {
|
|||
SpanBuilder builder = CamelTracer.TRACER.spanBuilder(name);
|
||||
builder.setSpanKind(spanKind);
|
||||
if (!activeSpan.getSpanContext().isValid()) {
|
||||
Context parentContext = CamelPropagationUtil.extractParent(exchange.getIn().getHeaders());
|
||||
Context parentContext =
|
||||
CamelPropagationUtil.extractParent(exchange.getIn().getHeaders(), route.getEndpoint());
|
||||
if (parentContext != null) {
|
||||
builder.setParent(parentContext);
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import org.apache.camel.LoggingLevel
|
||||
import org.apache.camel.builder.RouteBuilder
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import org.apache.camel.LoggingLevel
|
||||
import org.apache.camel.builder.RouteBuilder
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import org.apache.camel.LoggingLevel
|
||||
import org.apache.camel.builder.RouteBuilder
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.SERVER
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import org.apache.camel.LoggingLevel
|
||||
import org.apache.camel.builder.RouteBuilder
|
|
@ -0,0 +1,552 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
|
||||
|
||||
import com.amazonaws.services.sqs.model.SendMessageRequest
|
||||
import io.opentelemetry.instrumentation.test.AgentTestRunner
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import org.apache.camel.CamelContext
|
||||
import org.apache.camel.ProducerTemplate
|
||||
import org.elasticmq.rest.sqs.SQSRestServerBuilder
|
||||
import org.springframework.boot.SpringApplication
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap
|
||||
import spock.lang.Shared
|
||||
|
||||
class SqsCamelTest extends AgentTestRunner {
|
||||
|
||||
@Shared
|
||||
ConfigurableApplicationContext server
|
||||
|
||||
@Shared
|
||||
def sqs
|
||||
@Shared
|
||||
int sqsPort
|
||||
|
||||
def setupSpec() {
|
||||
|
||||
/**
|
||||
* Temporarily using emq instead of localstack till the latter supports AWS trace propagation
|
||||
*
|
||||
sqs = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
|
||||
.withServices(LocalStackContainer.Service.SQS)
|
||||
sqs.start()
|
||||
sqsPort = sqs.getMappedPort(4566)
|
||||
|
||||
def app = new SpringApplication(SqsConfig)
|
||||
app.addInitializers(new ApplicationContextInitializer<AbstractApplicationContext>() {
|
||||
@Override
|
||||
void initialize(AbstractApplicationContext applicationContext) {
|
||||
applicationContext.getBeanFactory().registerSingleton("localStack", sqs)
|
||||
}
|
||||
})
|
||||
server = app.run()**/
|
||||
|
||||
sqsPort = PortUtils.randomOpenPort()
|
||||
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
|
||||
println getClass().name + " SQS server started at: localhost:$sqsPort/"
|
||||
|
||||
def app = new SpringApplication(SqsConfig)
|
||||
app.setDefaultProperties(ImmutableMap.of("sqs.port", sqsPort))
|
||||
server = app.run()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
if (server != null) {
|
||||
server.close()
|
||||
server = null
|
||||
}
|
||||
if (sqs != null) {
|
||||
sqs.stopAndWait()
|
||||
}
|
||||
}
|
||||
|
||||
def "camel SQS producer - camel SQS consumer"() {
|
||||
setup:
|
||||
def camelContext = server.getBean(CamelContext)
|
||||
ProducerTemplate template = camelContext.createProducerTemplate()
|
||||
|
||||
when:
|
||||
template.sendBody("direct:input", "{\"type\": \"hello\"}")
|
||||
|
||||
then:
|
||||
assertTraces(6) {
|
||||
trace(0, 5) {
|
||||
|
||||
span(0) {
|
||||
name "input"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"apache-camel.uri" "direct://input"
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name "sqsCamelTest"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient"
|
||||
"messaging.destination" "sqsCamelTest"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "SQS.SendMessage"
|
||||
kind PRODUCER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "SendMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CONSUMER
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"http.user_agent" String
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(4) {
|
||||
name "sqsCamelTest"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient"
|
||||
"messaging.destination" "sqsCamelTest"
|
||||
"messaging.message_id" String
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(2, 1) {
|
||||
span(0) {
|
||||
name "SQS.DeleteMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "DeleteMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(3, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(4, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(5, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "AWS SDK SQS producer - camel SQS consumer"() {
|
||||
setup:
|
||||
def awsClient = server.getBean("sqsClient")
|
||||
|
||||
when:
|
||||
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/queue/sqsCamelTest", "{\"type\": \"hello\"}")
|
||||
awsClient.sendMessage(send)
|
||||
|
||||
then:
|
||||
assertTraces(6) {
|
||||
trace(0, 3) {
|
||||
|
||||
span(0) {
|
||||
name "SQS.SendMessage"
|
||||
kind PRODUCER
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "SendMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CONSUMER
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"http.user_agent" String
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "sqsCamelTest"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient"
|
||||
"messaging.destination" "sqsCamelTest"
|
||||
"messaging.message_id" String
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(2, 1) {
|
||||
span(0) {
|
||||
name "SQS.DeleteMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "DeleteMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(3, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(4, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(5, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "camel SQS producer - AWS SDK SQS consumer"() {
|
||||
setup:
|
||||
def awsClient = server.getBean("sqsClient")
|
||||
awsClient.createQueue("sqsCamelSeparateQueueTest")
|
||||
|
||||
def camelContext = server.getBean(CamelContext)
|
||||
ProducerTemplate template = camelContext.createProducerTemplate()
|
||||
|
||||
when:
|
||||
template.sendBody("direct:separate-input", "{\"type\": \"hello\"}")
|
||||
awsClient.receiveMessage("http://localhost:$sqsPort/queue/sqsCamelSeparateQueueTest")
|
||||
|
||||
then:
|
||||
assertTraces(3) {
|
||||
trace(0, 1) {
|
||||
|
||||
span(0) {
|
||||
name "SQS.CreateQueue"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "CreateQueueRequest"
|
||||
"aws.queue.name" "sqsCamelSeparateQueueTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 4) {
|
||||
|
||||
span(0) {
|
||||
name "separate-input"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"apache-camel.uri" "direct://separate-input"
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name "sqsCamelSeparateQueueTest"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelSeparateQueueTest?amazonSQSClient=%23sqsClient"
|
||||
"messaging.destination" "sqsCamelSeparateQueueTest"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "SQS.SendMessage"
|
||||
kind PRODUCER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "SendMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelSeparateQueueTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CONSUMER
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelSeparateQueueTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"http.user_agent" String
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
|
||||
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
|
||||
*/
|
||||
trace(2, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelSeparateQueueTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider
|
||||
import com.amazonaws.auth.BasicAWSCredentials
|
||||
|
@ -27,7 +27,7 @@ class SqsConfig {
|
|||
|
||||
@Override
|
||||
void configure() throws Exception {
|
||||
from("aws-sqs://sqsCamelTest?amazonSQSClient=#sqsClient&messageAttributeNames=traceparent")
|
||||
from("aws-sqs://sqsCamelTest?amazonSQSClient=#sqsClient")
|
||||
.log(LoggingLevel.INFO, "test", "RECEIVER got body : \${body}")
|
||||
.log(LoggingLevel.INFO, "test", "RECEIVER got headers : \${headers}")
|
||||
}
|
||||
|
@ -48,6 +48,20 @@ class SqsConfig {
|
|||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
RouteBuilder separateQueueProducerRoute() {
|
||||
return new RouteBuilder() {
|
||||
|
||||
@Override
|
||||
void configure() throws Exception {
|
||||
from("direct:separate-input")
|
||||
.log(LoggingLevel.INFO, "test", "SENDING body: \${body}")
|
||||
.log(LoggingLevel.INFO, "test", "SENDING headers: \${headers}")
|
||||
.to("aws-sqs://sqsCamelSeparateQueueTest?amazonSQSClient=#sqsClient")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporarily using emq instead of localstack till the latter supports AWS trace propagation
|
||||
*
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
import org.springframework.boot.SpringBootConfiguration
|
|
@ -3,7 +3,7 @@
|
|||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
package io.opentelemetry.javaagent.instrumentation.apachecamel
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
|
@ -1,257 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package test
|
||||
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
|
||||
import static io.opentelemetry.api.trace.Span.Kind.CONSUMER
|
||||
import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
|
||||
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
|
||||
|
||||
import io.opentelemetry.instrumentation.test.utils.PortUtils
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import org.apache.camel.CamelContext
|
||||
import org.apache.camel.ProducerTemplate
|
||||
import org.elasticmq.rest.sqs.SQSRestServerBuilder
|
||||
import org.springframework.boot.SpringApplication
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap
|
||||
import spock.lang.Shared
|
||||
|
||||
class SqsCamelTest extends AgentInstrumentationSpecification {
|
||||
|
||||
@Shared
|
||||
ConfigurableApplicationContext server
|
||||
|
||||
@Shared
|
||||
def sqs
|
||||
@Shared
|
||||
int sqsPort
|
||||
|
||||
def setupSpec() {
|
||||
|
||||
/**
|
||||
* Temporarily using emq instead of localstack till the latter supports AWS trace propagation
|
||||
*
|
||||
sqs = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
|
||||
.withServices(LocalStackContainer.Service.SQS)
|
||||
sqs.start()
|
||||
sqsPort = sqs.getMappedPort(4566)
|
||||
|
||||
def app = new SpringApplication(SqsConfig)
|
||||
app.addInitializers(new ApplicationContextInitializer<AbstractApplicationContext>() {
|
||||
@Override
|
||||
void initialize(AbstractApplicationContext applicationContext) {
|
||||
applicationContext.getBeanFactory().registerSingleton("localStack", sqs)
|
||||
}
|
||||
})
|
||||
server = app.run()**/
|
||||
|
||||
sqsPort = PortUtils.randomOpenPort()
|
||||
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
|
||||
println getClass().name + " SQS server started at: localhost:$sqsPort/"
|
||||
|
||||
def app = new SpringApplication(SqsConfig)
|
||||
app.setDefaultProperties(ImmutableMap.of("sqs.port", sqsPort))
|
||||
server = app.run()
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
if (server != null) {
|
||||
server.close()
|
||||
server = null
|
||||
}
|
||||
if (sqs != null) {
|
||||
sqs.stopAndWait()
|
||||
}
|
||||
}
|
||||
|
||||
def "simple sqs producer-consumer services"() {
|
||||
setup:
|
||||
def camelContext = server.getBean(CamelContext)
|
||||
ProducerTemplate template = camelContext.createProducerTemplate()
|
||||
|
||||
when:
|
||||
template.sendBody("direct:input", "{\"type\": \"hello\"}")
|
||||
|
||||
then:
|
||||
assertTraces(6) {
|
||||
trace(0, 5) {
|
||||
|
||||
span(0) {
|
||||
name "input"
|
||||
kind INTERNAL
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"apache-camel.uri" "direct://input"
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name "sqsCamelTest"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient"
|
||||
"messaging.destination" "sqsCamelTest"
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "SQS.SendMessage"
|
||||
kind PRODUCER
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "SendMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(3) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CONSUMER
|
||||
childOf span(2)
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"http.user_agent" String
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
span(4) {
|
||||
name "sqsCamelTest"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
attributes {
|
||||
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient&messageAttributeNames=traceparent"
|
||||
"messaging.destination" "sqsCamelTest"
|
||||
"messaging.message_id" String
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(2, 1) {
|
||||
span(0) {
|
||||
name "SQS.DeleteMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "DeleteMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(3, 1) {
|
||||
span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(4, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
trace(5, 1) {
|
||||
it.span(0) {
|
||||
name "SQS.ReceiveMessage"
|
||||
kind CLIENT
|
||||
hasNoParent()
|
||||
attributes {
|
||||
"aws.agent" "java-aws-sdk"
|
||||
"aws.endpoint" "http://localhost:$sqsPort"
|
||||
"aws.operation" "ReceiveMessageRequest"
|
||||
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
|
||||
"aws.service" "AmazonSQS"
|
||||
"http.flavor" "1.1"
|
||||
"http.method" "POST"
|
||||
"http.status_code" 200
|
||||
"http.url" "http://localhost:$sqsPort"
|
||||
"net.peer.name" "localhost"
|
||||
"net.peer.port" sqsPort
|
||||
"net.transport" "IP.TCP"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@
|
|||
</root>
|
||||
|
||||
<logger name="io.opentelemetry.javaagent.instrumentation.apachecamel" level="trace"/>
|
||||
<logger name="io.opentelemetry.javaagent.testing.exporter.OtlpInMemoryMetricExporter" level="ERROR" />
|
||||
<logger name="org.apache.camel" level="debug" />
|
||||
<logger name="test" level="trace"/>
|
||||
<logger name="org.testcontainers" level="debug" />
|
||||
|
|
|
@ -56,6 +56,7 @@ include ':smoke-tests'
|
|||
include ':instrumentation:akka-actor-2.5:javaagent'
|
||||
include ':instrumentation:akka-http-10.0:javaagent'
|
||||
include ':instrumentation:apache-camel-2.20:javaagent'
|
||||
include ':instrumentation:apache-camel-2.20:javaagent-unittests'
|
||||
include ':instrumentation:apache-httpasyncclient-4.1:javaagent'
|
||||
include ':instrumentation:apache-httpclient:apache-httpclient-2.0:javaagent'
|
||||
include ':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent'
|
||||
|
|
Loading…
Reference in New Issue