fix: Kafka initialization occasionally failed due to concurrent injection of OpenTelemetryMetricsReporter (to #12538) (#12583)

This commit is contained in:
Lumian Zhang 2024-11-07 23:59:07 +08:00 committed by GitHub
parent c8bd230c9e
commit 59e3318d9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 8 deletions

View File

@ -42,10 +42,24 @@ public class KafkaMetricsConsumerInstrumentation implements TypeInstrumentation
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
// this advice block can be shared across multiple threads. Directly modifying
// `config` could lead to unexpected item loss due to race conditions, where
// some entries might be lost as different threads attempt to modify it
// concurrently.
//
// To prevent such issues, a copy of the `config` should be created here before
// any modifications are made. This ensures that each thread operates on its
// own independent copy of the configuration, thereby eliminating the risk of
// configurations corruption.
//
// More detailed information:
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538
// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
enhanceConfig(config);
}
}

View File

@ -42,10 +42,24 @@ public class KafkaMetricsProducerInstrumentation implements TypeInstrumentation
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
// In versions of spring-kafka prior to 2.5.0.RC1, when the `ProducerPerThread`
// of DefaultKafkaProducerFactory is set to true, the `config` object entering
// this advice block can be shared across multiple threads. Directly modifying
// `config` could lead to unexpected item loss due to race conditions, where
// some entries might be lost as different threads attempt to modify it
// concurrently.
//
// To prevent such issues, a copy of the `config` should be created here before
// any modifications are made. This ensures that each thread operates on its
// own independent copy of the configuration, thereby eliminating the risk of
// configurations corruption.
//
// More detailed information:
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/12538
// ensure config is a mutable map and avoid concurrency conflicts
config = new HashMap<>(config);
enhanceConfig(config);
}
}