Add `fsspec` gen ai upload hook (#3759)

* Add fsspec gen ai upload hook

* split up into sub-package to make imports cleaner

* Get rid of FsspecUploader separate class

* comments, clean up doc strings
This commit is contained in:
Aaron Abbott 2025-09-17 13:44:44 -04:00 committed by GitHub
parent 7bac6bedef
commit 7819be1850
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 483 additions and 5 deletions

View File

@ -9,6 +9,9 @@ pymemcache~=1.3
# Required by conf
django>=2.2
# Require by opentelemetry-util-genai
fsspec>=2025.9.0
# Required by instrumentation and exporter packages
aio_pika~=7.2.0
aiohttp~=3.0

View File

@ -130,6 +130,7 @@ intersphinx_mapping = {
None,
),
"redis": ("https://redis.readthedocs.io/en/latest/", None),
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
}
# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky

View File

@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
:members:
:undoc-members:
:show-inheritance:
.. automodule:: opentelemetry.util.genai._fsspec_upload
:members:
:show-inheritance:

View File

@ -1060,7 +1060,7 @@ deps =
{[testenv]test_deps}
{toxinidir}/opentelemetry-instrumentation
{toxinidir}/util/opentelemetry-util-http
{toxinidir}/util/opentelemetry-util-genai
{toxinidir}/util/opentelemetry-util-genai[fsspec]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]

View File

@ -30,10 +30,12 @@ dependencies = [
"opentelemetry-api>=1.31.0",
]
[project.entry-points.opentelemetry_genai_upload_hook]
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
[project.optional-dependencies]
test = [
"pytest>=7.0.0",
]
test = ["pytest>=7.0.0"]
fsspec = ["fsspec>=2025.9.0"]
[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"

View File

@ -0,0 +1,39 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from os import environ
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
)
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
def fsspec_upload_hook() -> UploadHook:
# If fsspec is not installed the hook will be a no-op.
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
)
except ImportError:
return _NoOpUploadHook()
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
if not base_path:
return _NoOpUploadHook()
return FsspecUploadHook(base_path=base_path)

View File

@ -0,0 +1,184 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import json
import logging
import posixpath
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import asdict, dataclass
from functools import partial
from typing import Any, Callable, Literal, TextIO, cast
from uuid import uuid4
import fsspec
from opentelemetry._logs import LogRecord
from opentelemetry.trace import Span
from opentelemetry.util.genai import types
from opentelemetry.util.genai.upload_hook import UploadHook
_logger = logging.getLogger(__name__)
@dataclass
class Completion:
inputs: list[types.InputMessage]
outputs: list[types.OutputMessage]
system_instruction: list[types.MessagePart]
@dataclass
class CompletionRefs:
inputs_ref: str
outputs_ref: str
system_instruction_ref: str
JsonEncodeable = list[dict[str, Any]]
# mapping of upload path to function computing upload data dict
UploadData = dict[str, Callable[[], JsonEncodeable]]
def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
"""typed wrapper around `fsspec.open`"""
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
class FsspecUploadHook(UploadHook):
"""An upload hook using ``fsspec`` to upload to external storage
This function can be used as the
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
base path for uploads.
Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
as a requirement to achieve this.
"""
def __init__(
self,
*,
base_path: str,
max_size: int = 20,
) -> None:
self._base_path = base_path
self._max_size = max_size
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
# limits the number of queued tasks. If the queue is full, data will be dropped.
self._executor = ThreadPoolExecutor(max_workers=max_size)
self._semaphore = threading.BoundedSemaphore(max_size)
def _submit_all(self, upload_data: UploadData) -> None:
def done(future: Future[None]) -> None:
self._semaphore.release()
try:
future.result()
except Exception: # pylint: disable=broad-except
_logger.exception("fsspec uploader failed")
for path, json_encodeable in upload_data.items():
# could not acquire, drop data
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
_logger.warning(
"fsspec upload queue is full, dropping upload %s",
path,
)
continue
try:
fut = self._executor.submit(
self._do_upload, path, json_encodeable
)
fut.add_done_callback(done)
except RuntimeError:
_logger.info(
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
)
break
def _calculate_ref_path(self) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
# gen_ai.response.id from the active span.
uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_inputs.json"
),
outputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_outputs.json"
),
system_instruction_ref=posixpath.join(
self._base_path, f"{uuid_str}_system_instruction.json"
),
)
@staticmethod
def _do_upload(
path: str, json_encodeable: Callable[[], JsonEncodeable]
) -> None:
with fsspec_open(path, "w") as file:
json.dump(json_encodeable(), file, separators=(",", ":"))
def upload(
self,
*,
inputs: list[types.InputMessage],
outputs: list[types.OutputMessage],
system_instruction: list[types.MessagePart],
span: Span | None = None,
log_record: LogRecord | None = None,
**kwargs: Any,
) -> None:
completion = Completion(
inputs=inputs,
outputs=outputs,
system_instruction=system_instruction,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()
def to_dict(
dataclass_list: list[types.InputMessage]
| list[types.OutputMessage]
| list[types.MessagePart],
) -> JsonEncodeable:
return [asdict(dc) for dc in dataclass_list]
self._submit_all(
{
# Use partial to defer as much as possible to the background threads
ref_names.inputs_ref: partial(to_dict, completion.inputs),
ref_names.outputs_ref: partial(to_dict, completion.outputs),
ref_names.system_instruction_ref: partial(
to_dict, completion.system_instruction
),
},
)
# TODO: stamp the refs on telemetry
def shutdown(self) -> None:
# TODO: support timeout
self._executor.shutdown()

View File

@ -22,3 +22,24 @@ OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK = (
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK
"""
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = (
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH"
)
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH
An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local
path like ``/path/to/prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more
information, see
* `Instantiate a file-system
<https://filesystem-spec.readthedocs.io/en/latest/usage.html#instantiate-a-file-system>`_ for supported values and how to
install support for additional backend implementations.
* `Configuration
<https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_ for
configuring a backend with environment variables.
* `URL Chaining
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
use cases.
"""

View File

@ -1,2 +1,3 @@
pytest==7.4.4
-e opentelemetry-instrumentation
fsspec==2025.9.0
-e opentelemetry-instrumentation

View File

@ -0,0 +1,223 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=import-outside-toplevel,no-name-in-module
import importlib
import logging
import sys
import threading
from dataclasses import asdict
from typing import Any
from unittest import TestCase
from unittest.mock import MagicMock, patch
import fsspec
from fsspec.implementations.memory import MemoryFileSystem
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai import types
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
)
from opentelemetry.util.genai.upload_hook import (
_NoOpUploadHook,
load_upload_hook,
)
# Use MemoryFileSystem for testing
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.memory.MemoryFileSystem
BASE_PATH = "memory://"
@patch.dict(
"os.environ",
{
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec",
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH,
},
clear=True,
)
class TestFsspecEntryPoint(TestCase):
def test_fsspec_entry_point(self):
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)
def test_fsspec_entry_point_no_fsspec(self):
"""Tests that the a no-op uploader is used when fsspec is not installed"""
from opentelemetry.util.genai import _fsspec_upload
# Simulate fsspec imports failing
with patch.dict(
sys.modules,
{"opentelemetry.util.genai._fsspec_upload.fsspec_hook": None},
):
importlib.reload(_fsspec_upload)
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
MAXSIZE = 5
FAKE_INPUTS = [
types.InputMessage(
role="user",
parts=[types.Text(content="What is the capital of France?")],
),
]
FAKE_OUTPUTS = [
types.OutputMessage(
role="assistant",
parts=[types.Text(content="Paris")],
finish_reason="stop",
),
]
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
class TestFsspecUploadHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.hook = FsspecUploadHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
)
def tearDown(self) -> None:
self.hook.shutdown()
self._fsspec_patcher.stop()
def test_shutdown_no_items(self):
self.hook.shutdown()
def test_upload_then_shutdown(self):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
# all items should be consumed
self.hook.shutdown()
self.assertEqual(
self.mock_fsspec.open.call_count,
3,
"should have uploaded 3 files",
)
def test_upload_blocked(self):
unblock_upload = threading.Event()
def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()
self.mock_fsspec.open.side_effect = blocked_upload
# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)
unblock_upload.set()
def test_failed_upload_logs(self):
def failing_upload(*args: Any) -> None:
raise RuntimeError("failed to upload")
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
with self.assertLogs(level=logging.ERROR) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.hook.shutdown()
self.assertIn("fsspec uploader failed", logs.output[0])
def test_upload_after_shutdown_logs(self):
self.hook.shutdown()
with self.assertLogs(level=logging.INFO) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertEqual(len(logs.output), 1)
self.assertIn(
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
logs.output[0],
)
class FsspecUploaderTest(TestCase):
def test_upload(self):
FsspecUploadHook._do_upload(
"memory://my_path",
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
)
with fsspec.open("memory://my_path", "r") as file:
self.assertEqual(
file.read(),
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
)
class TestFsspecUploadHookIntegration(TestBase):
def setUp(self):
MemoryFileSystem.store.clear()
def assert_fsspec_equal(self, path: str, value: str) -> None:
with fsspec.open(path, "r") as file:
self.assertEqual(file.read(), value)
def test_upload_completions(self):
hook = FsspecUploadHook(
base_path=BASE_PATH,
)
hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
hook.shutdown()
fs = fsspec.open(BASE_PATH).fs
self.assertEqual(len(fs.ls(BASE_PATH)), 3)
# TODO: test stamped telemetry