Simplify shouldStart() check for SERVER & CONSUMER spans (#3771)

This commit is contained in:
Mateusz Rzeszutek 2021-08-11 06:00:34 +02:00 committed by GitHub
parent 1889c1adfb
commit 92a69c3309
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 100 additions and 72 deletions

View File

@ -103,8 +103,10 @@ public class Instrumenter<REQUEST, RESPONSE> {
SpanKind spanKind = spanKindExtractor.extract(request);
switch (spanKind) {
case SERVER:
suppressed = ServerSpan.exists(parentContext);
break;
case CONSUMER:
suppressed = ServerSpan.exists(parentContext) || ConsumerSpan.exists(parentContext);
suppressed = ConsumerSpan.exists(parentContext);
break;
case CLIENT:
suppressed = ClientSpan.exists(parentContext);

View File

@ -107,8 +107,10 @@ public abstract class BaseTracer {
suppressed = ClientSpan.exists(context);
break;
case SERVER:
suppressed = ServerSpan.exists(context);
break;
case CONSUMER:
suppressed = ServerSpan.exists(context) || ConsumerSpan.exists(context);
suppressed = ConsumerSpan.exists(context);
break;
default:
break;

View File

@ -41,22 +41,27 @@ class BaseTracerTest extends Specification {
result == expected
where:
kind | context | expected
SpanKind.CLIENT | root | true
SpanKind.SERVER | root | true
SpanKind.INTERNAL | root | true
SpanKind.PRODUCER | root | true
SpanKind.CONSUMER | root | true
SpanKind.CLIENT | tracer.withClientSpan(root, existingSpan) | false
SpanKind.SERVER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.INTERNAL | tracer.withClientSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.PRODUCER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.SERVER | tracer.withServerSpan(root, existingSpan) | false
SpanKind.INTERNAL | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withServerSpan(root, existingSpan) | false
SpanKind.PRODUCER | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CLIENT | tracer.withServerSpan(root, existingSpan) | true
kind | context | expected
SpanKind.CLIENT | root | true
SpanKind.SERVER | root | true
SpanKind.INTERNAL | root | true
SpanKind.PRODUCER | root | true
SpanKind.CONSUMER | root | true
SpanKind.CLIENT | tracer.withClientSpan(root, existingSpan) | false
SpanKind.SERVER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.INTERNAL | tracer.withClientSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.PRODUCER | tracer.withClientSpan(root, existingSpan) | true
SpanKind.SERVER | tracer.withServerSpan(root, existingSpan) | false
SpanKind.INTERNAL | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withServerSpan(root, existingSpan) | true
SpanKind.PRODUCER | tracer.withServerSpan(root, existingSpan) | true
SpanKind.CLIENT | tracer.withServerSpan(root, existingSpan) | true
SpanKind.SERVER | tracer.withConsumerSpan(root, existingSpan) | true
SpanKind.INTERNAL | tracer.withConsumerSpan(root, existingSpan) | true
SpanKind.CONSUMER | tracer.withConsumerSpan(root, existingSpan) | false
SpanKind.PRODUCER | tracer.withConsumerSpan(root, existingSpan) | true
SpanKind.CLIENT | tracer.withConsumerSpan(root, existingSpan) | true
}

View File

@ -5,6 +5,7 @@
package io.opentelemetry.instrumentation.spring.integration;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
@ -41,19 +42,23 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
final Context context;
MessageHeaderAccessor messageHeaderAccessor = createMutableHeaderAccessor(message);
// when there's no CONSUMER span created by another instrumentation, start it; there's no other
// messaging instrumentation that can do this, so spring-integration must ensure proper context
// propagation
// the new CONSUMER span will use the span context extracted from the incoming message as the
// parent
if (instrumenter.shouldStart(parentContext, messageWithChannel)) {
// only start a new CONSUMER span when there is no span in the context: this situation happens
// when there's no other messaging instrumentation that can do this - this way
// spring-integration instrumentation ensures proper context propagation: the new CONSUMER span
// will use the span context extracted from the incoming message as the parent
//
// when there already is a span in the context then it usually means one of two things:
// 1. spring-integration is a part of the producer invocation, e.g. invoked from a server method
// that puts something into a messaging queue/system
// 2. another messaging instrumentation has already created a CONSUMER span, in which case this
// instrumentation should not create another one
if (shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
messageHeaderAccessor.setHeader(
CONTEXT_AND_SCOPE_KEY, ContextAndScope.create(context, context.makeCurrent()));
} else {
// if there was a top-level span detected it means that there's another messaging
// instrumentation that creates CONSUMER/PRODUCER spans; in that case, back off and just
// inject the current context into the message
// in case there already was another span in the context: back off and just inject the current
// context into the message
context = parentContext;
messageHeaderAccessor.setHeader(
CONTEXT_AND_SCOPE_KEY, ContextAndScope.create(null, context.makeCurrent()));
@ -65,6 +70,11 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
return createMessageWithHeaders(message, messageHeaderAccessor);
}
private boolean shouldStart(Context parentContext, MessageWithChannel messageWithChannel) {
return instrumenter.shouldStart(parentContext, messageWithChannel)
&& Span.fromContextOrNull(parentContext) == null;
}
@Override
public void postSend(Message<?> message, MessageChannel messageChannel, boolean sent) {}

View File

@ -58,36 +58,30 @@ abstract class AbstractComplexPropagationTest extends InstrumentationSpecificati
receiveChannel.subscribe(messageHandler)
when:
runWithSpan("parent") {
sendChannel.send(MessageBuilder.withPayload("test")
.setHeader("theAnswer", "42")
.build())
}
sendChannel.send(MessageBuilder.withPayload("test")
.setHeader("theAnswer", "42")
.build())
then:
messageHandler.join()
assertTraces(1) {
trace(0, 4) {
trace(0, 3) {
// there's no span in the context, so spring-integration adds a CONSUMER one
span(0) {
name "parent"
}
// there's no top-level SERVER or CONSUMER span, so spring-integration adds a CONSUMER one
span(1) {
name "application.sendChannel process"
childOf span(0)
kind CONSUMER
}
// message is received in a separate thread without any context, so a CONSUMER span with parent
// extracted from the incoming message is created
span(2) {
span(1) {
name "application.receiveChannel process"
childOf span(1)
childOf span(0)
kind CONSUMER
}
span(3) {
span(2) {
name "handler"
childOf span(2)
childOf span(1)
}
}
}

View File

@ -25,23 +25,18 @@ abstract class AbstractSpringCloudStreamRabbitTest extends InstrumentationSpecif
then:
assertTraces(1) {
trace(0, 4) {
trace(0, 3) {
span(0) {
name "producer"
}
span(1) {
name "testProducer.output process"
name "testConsumer.input process"
childOf span(0)
kind CONSUMER
}
span(2) {
name "testConsumer.input process"
childOf span(1)
kind CONSUMER
}
span(3) {
name "consumer"
childOf span(2)
childOf span(1)
}
}
}

View File

@ -57,30 +57,24 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
channel.subscribe(messageHandler)
when:
runWithSpan("parent") {
channel.send(MessageBuilder.withPayload("test")
.build())
}
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 3) {
trace(0, 2) {
span(0) {
name "parent"
}
span(1) {
name interceptorSpanName
childOf span(0)
kind CONSUMER
}
span(2) {
span(1) {
name "handler"
childOf span(1)
childOf span(0)
}
def interceptorSpan = span(1)
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
@ -94,6 +88,38 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
"executorChannel" | "executorChannel process"
}
def "should not create a span when there is already a span in the context"() {
given:
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
runWithSpan("parent") {
channel.send(MessageBuilder.withPayload("test")
.build())
}
then:
messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
}
span(1) {
name "handler"
childOf span(0)
}
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
def "should handle multiple message channels in a chain"() {
given:
def channel1 = applicationContext.getBean("linkedChannel1", SubscribableChannel)
@ -103,30 +129,24 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
channel2.subscribe(messageHandler)
when:
runWithSpan("parent") {
channel1.send(MessageBuilder.withPayload("test")
.build())
}
channel1.send(MessageBuilder.withPayload("test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 3) {
trace(0, 2) {
span(0) {
name "parent"
}
span(1) {
name "application.linkedChannel1 process"
childOf span(0)
kind CONSUMER
}
span(2) {
span(1) {
name "handler"
childOf span(1)
childOf span(0)
}
def lastChannelSpan = span(1)
def lastChannelSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, lastChannelSpan)
}
}