Fix spring-integration context leak (#4673)
This commit is contained in:
parent
ee32c41d35
commit
e6a9a6fd47
|
@ -37,6 +37,23 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
|
public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
|
||||||
|
|
||||||
|
Map<MessageChannel, ContextAndScope> localMap = LOCAL_CONTEXT_AND_SCOPE.get();
|
||||||
|
if (localMap.get(messageChannel) != null) {
|
||||||
|
// GlobalChannelInterceptorProcessor.afterSingletonsInstantiated() adds the global
|
||||||
|
// interceptors for every bean name / channel pair, which means it's possible that this
|
||||||
|
// interceptor is added twice to the same channel if the channel is registered twice under
|
||||||
|
// different bean names
|
||||||
|
//
|
||||||
|
// there's an option for this class to implement VetoCapableInterceptor and prevent itself
|
||||||
|
// from being registered if it's already registered, but the VetoCapableInterceptor interface
|
||||||
|
// broke backwards compatibility in 5.2.0, and the version prior to 5.2.0 takes a parameter
|
||||||
|
// of type ChannelInterceptorAware which doesn't exist after 5.2.0, and while it's possible to
|
||||||
|
// implement both at the same time (since we compile using 4.1.0), muzzle doesn't like the
|
||||||
|
// missing class type when running testLatestDeps
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
Context parentContext = Context.current();
|
Context parentContext = Context.current();
|
||||||
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);
|
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);
|
||||||
|
|
||||||
|
@ -55,16 +72,12 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
|
||||||
// instrumentation should not create another one
|
// instrumentation should not create another one
|
||||||
if (shouldStart(parentContext, messageWithChannel)) {
|
if (shouldStart(parentContext, messageWithChannel)) {
|
||||||
context = instrumenter.start(parentContext, messageWithChannel);
|
context = instrumenter.start(parentContext, messageWithChannel);
|
||||||
LOCAL_CONTEXT_AND_SCOPE
|
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
|
||||||
.get()
|
|
||||||
.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
|
|
||||||
} else {
|
} else {
|
||||||
// in case there already was another span in the context: back off and just inject the current
|
// in case there already was another span in the context: back off and just inject the current
|
||||||
// context into the message
|
// context into the message
|
||||||
context = parentContext;
|
context = parentContext;
|
||||||
LOCAL_CONTEXT_AND_SCOPE
|
localMap.put(messageChannel, ContextAndScope.create(null, context.makeCurrent()));
|
||||||
.get()
|
|
||||||
.put(messageChannel, ContextAndScope.create(null, context.makeCurrent()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
propagators
|
propagators
|
||||||
|
@ -113,6 +126,13 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
|
||||||
@Override
|
@Override
|
||||||
public Message<?> beforeHandle(
|
public Message<?> beforeHandle(
|
||||||
Message<?> message, MessageChannel channel, MessageHandler handler) {
|
Message<?> message, MessageChannel channel, MessageHandler handler) {
|
||||||
|
|
||||||
|
Map<MessageChannel, ContextAndScope> localMap = LOCAL_CONTEXT_AND_SCOPE.get();
|
||||||
|
if (localMap.get(channel) != null) {
|
||||||
|
// see comment explaining the same conditional in preSend()
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, channel);
|
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, channel);
|
||||||
Context context =
|
Context context =
|
||||||
propagators
|
propagators
|
||||||
|
@ -120,7 +140,7 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {
|
||||||
.extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE);
|
.extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE);
|
||||||
// beforeHandle()/afterMessageHandles() always execute in a different thread than send(), so
|
// beforeHandle()/afterMessageHandles() always execute in a different thread than send(), so
|
||||||
// there's no real risk of overwriting the send() context
|
// there's no real risk of overwriting the send() context
|
||||||
LOCAL_CONTEXT_AND_SCOPE.get().put(channel, ContextAndScope.create(null, context.makeCurrent()));
|
localMap.put(channel, ContextAndScope.create(null, context.makeCurrent()));
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,41 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
|
||||||
"executorChannel" | "executorChannel process"
|
"executorChannel" | "executorChannel process"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def "should not add interceptor twice"() {
|
||||||
|
given:
|
||||||
|
def channel = applicationContext.getBean("directChannel1", SubscribableChannel)
|
||||||
|
|
||||||
|
def messageHandler = new CapturingMessageHandler()
|
||||||
|
channel.subscribe(messageHandler)
|
||||||
|
|
||||||
|
when:
|
||||||
|
channel.send(MessageBuilder.withPayload("test")
|
||||||
|
.build())
|
||||||
|
|
||||||
|
then:
|
||||||
|
def capturedMessage = messageHandler.join()
|
||||||
|
|
||||||
|
assertTraces(1) {
|
||||||
|
trace(0, 2) {
|
||||||
|
span(0) {
|
||||||
|
// the channel name is overwritten by the last bean registration
|
||||||
|
name "application.directChannel2 process"
|
||||||
|
kind CONSUMER
|
||||||
|
}
|
||||||
|
span(1) {
|
||||||
|
name "handler"
|
||||||
|
childOf span(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
def interceptorSpan = span(0)
|
||||||
|
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cleanup:
|
||||||
|
channel.unsubscribe(messageHandler)
|
||||||
|
}
|
||||||
|
|
||||||
def "should not create a span when there is already a span in the context"() {
|
def "should not create a span when there is already a span in the context"() {
|
||||||
given:
|
given:
|
||||||
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
|
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
|
||||||
|
@ -165,11 +200,24 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
|
||||||
@SpringBootConfiguration
|
@SpringBootConfiguration
|
||||||
@EnableAutoConfiguration
|
@EnableAutoConfiguration
|
||||||
static class MessageChannelsConfig {
|
static class MessageChannelsConfig {
|
||||||
|
|
||||||
|
SubscribableChannel problematicSharedChannel = new DirectChannel()
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
SubscribableChannel directChannel() {
|
SubscribableChannel directChannel() {
|
||||||
new DirectChannel()
|
new DirectChannel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
SubscribableChannel directChannel1() {
|
||||||
|
problematicSharedChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
SubscribableChannel directChannel2() {
|
||||||
|
problematicSharedChannel
|
||||||
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) {
|
SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) {
|
||||||
def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor())
|
def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor())
|
||||||
|
|
Loading…
Reference in New Issue