Use a weak reference to sqlalchemy Engine to avoid memory leak (#1771)
* Use a weak reference to sqlalchemy Engine to avoid memory leak Closes #1761 By using a weak reference to the `Engine` object, we can avoid the memory leak as disposed `Engines` get properly deallocated. Whenever `SQLAlchemy` is uninstrumented, we only trigger a removal for those event listeners which are listening for objects that haven't been garbage-collected yet. * Made a mistake in resolving the weak reference * Fixed formatting issues * Updated changelog * Added unit test to check that engine was garbage collected * Do not save engine in EngineTracer to avoid memory leak * Add an empty line to satisfy black formatter * Fix isort complaints * Fixed the issue when pool name is not set and =None * Fix formatting issue * Rebased after changes in a recent commit * Updated PR number in changelog --------- Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
This commit is contained in:
		
							parent
							
								
									a45c9c3792
								
							
						
					
					
						commit
						2e49ba1af8
					
				|  | @ -40,6 +40,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| - `opentelemetry-instrumentation-system-metrics` Add `process.` prefix to `runtime.memory`, `runtime.cpu.time`, and `runtime.gc_count`. Change `runtime.memory` from count to UpDownCounter. ([#1735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1735)) | ||||
| - Add request and response hooks for GRPC instrumentation (client only) | ||||
|   ([#1706](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1706)) | ||||
| - Fix memory leak in SQLAlchemy instrumentation where disposed `Engine` does not get garbage collected | ||||
|   ([#1771](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1771) | ||||
| - `opentelemetry-instrumentation-pymemcache` Update instrumentation to support pymemcache >4 | ||||
|   ([#1764](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1764)) | ||||
| - `opentelemetry-instrumentation-confluent-kafka` Add support for higher versions of confluent_kafka | ||||
|  |  | |||
|  | @ -13,6 +13,7 @@ | |||
| # limitations under the License. | ||||
| import os | ||||
| import re | ||||
| import weakref | ||||
| 
 | ||||
| from sqlalchemy.event import (  # pylint: disable=no-name-in-module | ||||
|     listen, | ||||
|  | @ -99,11 +100,11 @@ class EngineTracer: | |||
|         commenter_options=None, | ||||
|     ): | ||||
|         self.tracer = tracer | ||||
|         self.engine = engine | ||||
|         self.connections_usage = connections_usage | ||||
|         self.vendor = _normalize_vendor(engine.name) | ||||
|         self.enable_commenter = enable_commenter | ||||
|         self.commenter_options = commenter_options if commenter_options else {} | ||||
|         self._engine_attrs = _get_attributes_from_engine(engine) | ||||
|         self._leading_comment_remover = re.compile(r"^/\*.*?\*/") | ||||
| 
 | ||||
|         self._register_event_listener( | ||||
|  | @ -118,23 +119,11 @@ class EngineTracer: | |||
|         self._register_event_listener(engine, "checkin", self._pool_checkin) | ||||
|         self._register_event_listener(engine, "checkout", self._pool_checkout) | ||||
| 
 | ||||
|     def _get_connection_string(self): | ||||
|         drivername = self.engine.url.drivername or "" | ||||
|         host = self.engine.url.host or "" | ||||
|         port = self.engine.url.port or "" | ||||
|         database = self.engine.url.database or "" | ||||
|         return f"{drivername}://{host}:{port}/{database}" | ||||
| 
 | ||||
|     def _get_pool_name(self): | ||||
|         if self.engine.pool.logging_name is not None: | ||||
|             return self.engine.pool.logging_name | ||||
|         return self._get_connection_string() | ||||
| 
 | ||||
|     def _add_idle_to_connection_usage(self, value): | ||||
|         self.connections_usage.add( | ||||
|             value, | ||||
|             attributes={ | ||||
|                 "pool.name": self._get_pool_name(), | ||||
|                 **self._engine_attrs, | ||||
|                 "state": "idle", | ||||
|             }, | ||||
|         ) | ||||
|  | @ -143,7 +132,7 @@ class EngineTracer: | |||
|         self.connections_usage.add( | ||||
|             value, | ||||
|             attributes={ | ||||
|                 "pool.name": self._get_pool_name(), | ||||
|                 **self._engine_attrs, | ||||
|                 "state": "used", | ||||
|             }, | ||||
|         ) | ||||
|  | @ -169,12 +158,21 @@ class EngineTracer: | |||
|     @classmethod | ||||
|     def _register_event_listener(cls, target, identifier, func, *args, **kw): | ||||
|         listen(target, identifier, func, *args, **kw) | ||||
|         cls._remove_event_listener_params.append((target, identifier, func)) | ||||
|         cls._remove_event_listener_params.append( | ||||
|             (weakref.ref(target), identifier, func) | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def remove_all_event_listeners(cls): | ||||
|         for remove_params in cls._remove_event_listener_params: | ||||
|             remove(*remove_params) | ||||
|         for ( | ||||
|             weak_ref_target, | ||||
|             identifier, | ||||
|             func, | ||||
|         ) in cls._remove_event_listener_params: | ||||
|             # Remove an event listener only if saved weak reference points to an object | ||||
|             # which has not been garbage collected | ||||
|             if weak_ref_target() is not None: | ||||
|                 remove(weak_ref_target(), identifier, func) | ||||
|         cls._remove_event_listener_params.clear() | ||||
| 
 | ||||
|     def _operation_name(self, db_name, statement): | ||||
|  | @ -300,3 +298,22 @@ def _get_attributes_from_cursor(vendor, cursor, attrs): | |||
|             if info.port: | ||||
|                 attrs[SpanAttributes.NET_PEER_PORT] = int(info.port) | ||||
|     return attrs | ||||
| 
 | ||||
| 
 | ||||
| def _get_connection_string(engine): | ||||
|     drivername = engine.url.drivername or "" | ||||
|     host = engine.url.host or "" | ||||
|     port = engine.url.port or "" | ||||
|     database = engine.url.database or "" | ||||
|     return f"{drivername}://{host}:{port}/{database}" | ||||
| 
 | ||||
| 
 | ||||
| def _get_attributes_from_engine(engine): | ||||
|     """Set metadata attributes of the database engine""" | ||||
|     attrs = {} | ||||
| 
 | ||||
|     attrs["pool.name"] = getattr( | ||||
|         getattr(engine, "pool", None), "logging_name", None | ||||
|     ) or _get_connection_string(engine) | ||||
| 
 | ||||
|     return attrs | ||||
|  |  | |||
|  | @ -307,3 +307,26 @@ class TestSqlalchemyInstrumentation(TestBase): | |||
|         cnx.execute("SELECT 1 + 1;").fetchall() | ||||
|         spans = self.memory_exporter.get_finished_spans() | ||||
|         self.assertEqual(len(spans), 0) | ||||
| 
 | ||||
|     def test_no_memory_leakage_if_engine_diposed(self): | ||||
|         SQLAlchemyInstrumentor().instrument() | ||||
|         import gc | ||||
|         import weakref | ||||
| 
 | ||||
|         from sqlalchemy import create_engine | ||||
| 
 | ||||
|         callback = mock.Mock() | ||||
| 
 | ||||
|         def make_shortlived_engine(): | ||||
|             engine = create_engine("sqlite:///:memory:") | ||||
|             # Callback will be called if engine is deallocated during garbage | ||||
|             # collection | ||||
|             weakref.finalize(engine, callback) | ||||
|             with engine.connect() as conn: | ||||
|                 conn.execute("SELECT 1 + 1;").fetchall() | ||||
| 
 | ||||
|         for _ in range(0, 5): | ||||
|             make_shortlived_engine() | ||||
| 
 | ||||
|         gc.collect() | ||||
|         assert callback.call_count == 5 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue