support topic as kwarg (#949)
This commit is contained in:
		
							parent
							
								
									d86f1645da
								
							
						
					
					
						commit
						8fc95cab64
					
				|  | @ -41,6 +41,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
|   ([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903)) | ||||
| - `opentelemetry-instrumentation-falcon` Safer patching mechanism | ||||
|   ([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895)) | ||||
| - `opentelemetry-instrumentation-kafka-python` Fix topic extraction | ||||
|   ([#949](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/949)) | ||||
| 
 | ||||
| ## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29 | ||||
| 
 | ||||
|  |  | |||
|  | @ -25,11 +25,11 @@ class KafkaPropertiesExtractor: | |||
|         return kwargs.get(key, default_value) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def extract_send_topic(args): | ||||
|     def extract_send_topic(args, kwargs): | ||||
|         """extract topic from `send` method arguments in KafkaProducer class""" | ||||
|         if len(args) > 0: | ||||
|             return args[0] | ||||
|         return "unknown" | ||||
|         return KafkaPropertiesExtractor._extract_argument( | ||||
|             "topic", 0, "unknown", args, kwargs | ||||
|         ) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def extract_send_value(args, kwargs): | ||||
|  | @ -56,7 +56,7 @@ class KafkaPropertiesExtractor: | |||
|     def extract_send_partition(instance, args, kwargs): | ||||
|         """extract partition `send` method arguments, using the `_partition` method in KafkaProducer class""" | ||||
|         try: | ||||
|             topic = KafkaPropertiesExtractor.extract_send_topic(args) | ||||
|             topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) | ||||
|             key = KafkaPropertiesExtractor.extract_send_key(args, kwargs) | ||||
|             value = KafkaPropertiesExtractor.extract_send_value(args, kwargs) | ||||
|             partition = KafkaPropertiesExtractor._extract_argument( | ||||
|  | @ -145,7 +145,7 @@ def _wrap_send(tracer: Tracer, produce_hook: ProduceHookT) -> Callable: | |||
|             headers = [] | ||||
|             kwargs["headers"] = headers | ||||
| 
 | ||||
|         topic = KafkaPropertiesExtractor.extract_send_topic(args) | ||||
|         topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs) | ||||
|         bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers( | ||||
|             instance | ||||
|         ) | ||||
|  |  | |||
|  | @ -28,14 +28,57 @@ class TestUtils(TestCase): | |||
|     @mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span") | ||||
|     @mock.patch("opentelemetry.trace.set_span_in_context") | ||||
|     @mock.patch("opentelemetry.propagate.inject") | ||||
|     def test_wrap_send( | ||||
|     def test_wrap_send_with_topic_as_arg( | ||||
|         self, | ||||
|         inject: mock.MagicMock, | ||||
|         set_span_in_context: mock.MagicMock, | ||||
|         enrich_span: mock.MagicMock, | ||||
|         extract_send_partition: mock.MagicMock, | ||||
|         extract_bootstrap_servers: mock.MagicMock, | ||||
|     ): | ||||
|     ) -> None: | ||||
|         self.wrap_send_helper( | ||||
|             inject, | ||||
|             set_span_in_context, | ||||
|             enrich_span, | ||||
|             extract_send_partition, | ||||
|             extract_bootstrap_servers, | ||||
|         ) | ||||
| 
 | ||||
|     @mock.patch( | ||||
|         "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers" | ||||
|     ) | ||||
|     @mock.patch( | ||||
|         "opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_send_partition" | ||||
|     ) | ||||
|     @mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span") | ||||
|     @mock.patch("opentelemetry.trace.set_span_in_context") | ||||
|     @mock.patch("opentelemetry.propagate.inject") | ||||
|     def test_wrap_send_with_topic_as_kwarg( | ||||
|         self, | ||||
|         inject: mock.MagicMock, | ||||
|         set_span_in_context: mock.MagicMock, | ||||
|         enrich_span: mock.MagicMock, | ||||
|         extract_send_partition: mock.MagicMock, | ||||
|         extract_bootstrap_servers: mock.MagicMock, | ||||
|     ) -> None: | ||||
|         self.args = [] | ||||
|         self.kwargs["topic"] = self.topic_name | ||||
|         self.wrap_send_helper( | ||||
|             inject, | ||||
|             set_span_in_context, | ||||
|             enrich_span, | ||||
|             extract_send_partition, | ||||
|             extract_bootstrap_servers, | ||||
|         ) | ||||
| 
 | ||||
|     def wrap_send_helper( | ||||
|         self, | ||||
|         inject: mock.MagicMock, | ||||
|         set_span_in_context: mock.MagicMock, | ||||
|         enrich_span: mock.MagicMock, | ||||
|         extract_send_partition: mock.MagicMock, | ||||
|         extract_bootstrap_servers: mock.MagicMock, | ||||
|     ) -> None: | ||||
|         tracer = mock.MagicMock() | ||||
|         produce_hook = mock.MagicMock() | ||||
|         original_send_callback = mock.MagicMock() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue