Rename UploadHook -> CompletionHook (#3780)

This commit is contained in:
Aaron Abbott 2025-09-24 15:29:39 -04:00 committed by GitHub
parent b232b9a298
commit cf00cf57b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 186 additions and 172 deletions

View File

@ -21,7 +21,7 @@ OpenTelemetry Python - GenAI Util
:undoc-members:
:show-inheritance:
.. automodule:: opentelemetry.util.genai.upload_hook
.. automodule:: opentelemetry.util.genai.completion_hook
:members:
:undoc-members:
:show-inheritance:

View File

@ -7,12 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
- Add upload hook to genai utils to implement semconv v1.37.
- Add completion hook to genai utils to implement semconv v1.37.
The hook uses [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support
various pluggable backends.
Includes a hook implementation using
[`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) to support uploading to various
pluggable backends.
([#3780](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3780))
([#3752](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3752))
([#3759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3759))
([#3763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3763))
- Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable.
Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)).

View File

@ -30,8 +30,8 @@ dependencies = [
"opentelemetry-api>=1.31.0",
]
[project.entry-points.opentelemetry_genai_upload_hook]
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"
[project.entry-points.opentelemetry_genai_completion_hook]
fsspec_upload = "opentelemetry.util.genai._fsspec_upload:fsspec_completion_upload_hook"
[project.optional-dependencies]
test = ["pytest>=7.0.0"]

View File

@ -16,24 +16,27 @@ from __future__ import annotations
from os import environ
from opentelemetry.util.genai.completion_hook import (
CompletionHook,
_NoOpCompletionHook,
)
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
)
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook
def fsspec_upload_hook() -> UploadHook:
def fsspec_completion_upload_hook() -> CompletionHook:
# If fsspec is not installed the hook will be a no-op.
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
from opentelemetry.util.genai._fsspec_upload.completion_hook import (
FsspecUploadCompletionHook,
)
except ImportError:
return _NoOpUploadHook()
return _NoOpCompletionHook()
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
if not base_path:
return _NoOpUploadHook()
return _NoOpCompletionHook()
return FsspecUploadHook(base_path=base_path)
return FsspecUploadCompletionHook(base_path=base_path)

View File

@ -34,7 +34,7 @@ from opentelemetry._logs import LogRecord
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes
from opentelemetry.trace import Span
from opentelemetry.util.genai import types
from opentelemetry.util.genai.upload_hook import UploadHook
from opentelemetry.util.genai.completion_hook import CompletionHook
GEN_AI_INPUT_MESSAGES_REF: Final = (
gen_ai_attributes.GEN_AI_INPUT_MESSAGES + "_ref"
@ -75,12 +75,12 @@ def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]
class FsspecUploadHook(UploadHook):
"""An upload hook using ``fsspec`` to upload to external storage
class FsspecUploadCompletionHook(CompletionHook):
"""An completion hook using ``fsspec`` to upload to external storage
This function can be used as the
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
:func:`~opentelemetry.util.genai.completion_hook.load_completion_hook` implementation by
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``fsspec_upload``.
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
base path for uploads.
@ -128,7 +128,7 @@ class FsspecUploadHook(UploadHook):
fut.add_done_callback(done)
except RuntimeError:
_logger.info(
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
"attempting to upload file after FsspecUploadCompletionHook.shutdown() was already called"
)
self._semaphore.release()
@ -161,7 +161,7 @@ class FsspecUploadHook(UploadHook):
cls=Base64JsonEncoder,
)
def upload(
def on_completion(
self,
*,
inputs: list[types.InputMessage],

View File

@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module defines the generic hooks for GenAI content uploading
"""This module defines the generic hooks for GenAI content completion
The hooks are specified as part of semconv in `Uploading content to external storage
<https://github.com/open-telemetry/semantic-conventions/blob/v1.37.0/docs/gen-ai/gen-ai-spans.md#uploading-content-to-external-storage>`__.
This module defines the `UploadHook` type that custom implementations should implement, and a
`load_upload_hook` function to load it from an entry point.
This module defines the `CompletionHook` type that custom implementations should implement, and a
`load_completion_hook` function to load it from an entry point.
"""
from __future__ import annotations
@ -34,18 +34,18 @@ from opentelemetry.util._importlib_metadata import (
)
from opentelemetry.util.genai import types
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK,
OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK,
)
_logger = logging.getLogger(__name__)
@runtime_checkable
class UploadHook(Protocol):
"""A hook to upload GenAI content to an external storage.
class CompletionHook(Protocol):
"""A hook to be called on completion of a GenAI operation.
This is the interface for a hook that can be
used to upload GenAI content to an external storage. The hook is a
used to capture GenAI content on completion. The hook is a
callable that takes the inputs, outputs, and system instruction of a
GenAI interaction, as well as the span and log record associated with
it.
@ -66,7 +66,7 @@ class UploadHook(Protocol):
interaction.
"""
def upload(
def on_completion(
self,
*,
inputs: list[types.InputMessage],
@ -77,43 +77,47 @@ class UploadHook(Protocol):
) -> None: ...
class _NoOpUploadHook(UploadHook):
def upload(self, **kwargs: Any) -> None:
class _NoOpCompletionHook(CompletionHook):
def on_completion(self, **kwargs: Any) -> None:
return None
def load_upload_hook() -> UploadHook:
"""Load the upload hook from entry point or return a noop implementation
def load_completion_hook() -> CompletionHook:
"""Load the completion hook from entry point or return a noop implementation
This function loads an upload hook from the entry point group
``opentelemetry_genai_upload_hook`` with name coming from
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK`. If one can't be found, returns a no-op
This function loads an completion hook from the entry point group
``opentelemetry_genai_completion_hook`` with name coming from
:envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK`. If one can't be found, returns a no-op
implementation.
"""
hook_name = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK, None)
hook_name = environ.get(OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK, None)
if not hook_name:
return _NoOpUploadHook()
return _NoOpCompletionHook()
for entry_point in entry_points(group="opentelemetry_genai_upload_hook"): # pyright: ignore[reportUnknownVariableType]
for entry_point in entry_points( # pyright: ignore[reportUnknownVariableType]
group="opentelemetry_genai_completion_hook"
):
name = cast(str, entry_point.name) # pyright: ignore[reportUnknownMemberType]
try:
if hook_name != name:
continue
hook = entry_point.load()() # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType]
if not isinstance(hook, UploadHook):
_logger.debug("%s is not a valid UploadHook. Using noop", name)
if not isinstance(hook, CompletionHook):
_logger.debug(
"%s is not a valid CompletionHook. Using noop", name
)
continue
_logger.debug("Using UploadHook %s", name)
_logger.debug("Using CompletionHook %s", name)
return hook
except Exception: # pylint: disable=broad-except
_logger.exception(
"UploadHook %s configuration failed. Using noop", name
"CompletionHook %s configuration failed. Using noop", name
)
return _NoOpUploadHook()
return _NoOpCompletionHook()
__all__ = ["UploadHook", "load_upload_hook"]
__all__ = ["CompletionHook", "load_completion_hook"]

View File

@ -16,11 +16,11 @@ OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT = (
"OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT"
)
OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK = (
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK"
OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK = (
"OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK"
)
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK
.. envvar:: OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK
"""
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = (

View File

@ -0,0 +1,101 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from dataclasses import dataclass
from typing import Any, Callable
from unittest import TestCase
from unittest.mock import Mock, patch
from opentelemetry.util.genai.completion_hook import (
CompletionHook,
_NoOpCompletionHook,
load_completion_hook,
)
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK,
)
class FakeCompletionHook(CompletionHook):
def on_completion(self, **kwargs: Any):
pass
class InvalidCompletionHook:
pass
@dataclass
class FakeEntryPoint:
name: str
load: Callable[[], type[CompletionHook]]
class TestCompletionHook(TestCase):
@patch.dict("os.environ", {})
def test_load_completion_hook_noop(self):
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)
@patch(
"opentelemetry.util.genai.completion_hook.entry_points",
)
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK: "my-hook"}
)
def test_load_completion_hook_custom(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("my-hook", lambda: FakeCompletionHook)
]
self.assertIsInstance(load_completion_hook(), FakeCompletionHook)
@patch("opentelemetry.util.genai.completion_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK: "my-hook"}
)
def test_load_completion_hook_invalid(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("my-hook", lambda: InvalidCompletionHook)
]
with self.assertLogs(level=logging.DEBUG) as logs:
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)
self.assertEqual(len(logs.output), 1)
self.assertIn(
"is not a valid CompletionHook. Using noop", logs.output[0]
)
@patch("opentelemetry.util.genai.completion_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK: "my-hook"}
)
def test_load_completion_hook_error(self, mock_entry_points: Mock):
def load():
raise RuntimeError("error")
mock_entry_points.return_value = [FakeEntryPoint("my-hook", load)]
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)
@patch("opentelemetry.util.genai.completion_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK: "my-hook"}
)
def test_load_completion_hook_not_found(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("other-hook", lambda: FakeCompletionHook)
]
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)

View File

@ -29,12 +29,12 @@ import fsspec
from opentelemetry._logs import LogRecord
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai import types
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
from opentelemetry.util.genai._fsspec_upload.completion_hook import (
FsspecUploadCompletionHook,
)
from opentelemetry.util.genai.upload_hook import (
_NoOpUploadHook,
load_upload_hook,
from opentelemetry.util.genai.completion_hook import (
_NoOpCompletionHook,
load_completion_hook,
)
# Use MemoryFileSystem for testing
@ -45,14 +45,16 @@ BASE_PATH = "memory://"
@patch.dict(
"os.environ",
{
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec",
"OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK": "fsspec_upload",
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH,
},
clear=True,
)
class TestFsspecEntryPoint(TestCase):
def test_fsspec_entry_point(self):
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)
self.assertIsInstance(
load_completion_hook(), FsspecUploadCompletionHook
)
def test_fsspec_entry_point_no_fsspec(self):
"""Tests that the a no-op uploader is used when fsspec is not installed"""
@ -62,10 +64,10 @@ class TestFsspecEntryPoint(TestCase):
# Simulate fsspec imports failing
with patch.dict(
sys.modules,
{"opentelemetry.util.genai._fsspec_upload.fsspec_hook": None},
{"opentelemetry.util.genai._fsspec_upload.completion_hook": None},
):
importlib.reload(_fsspec_upload)
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)
MAXSIZE = 5
@ -95,15 +97,15 @@ class ThreadSafeMagicMock(MagicMock):
super()._increment_mock_call(*args, **kwargs)
class TestFsspecUploadHook(TestCase):
class TestFsspecUploadCompletionHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
"opentelemetry.util.genai._fsspec_upload.completion_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.mock_fsspec.open = ThreadSafeMagicMock()
self.hook = FsspecUploadHook(
self.hook = FsspecUploadCompletionHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
)
@ -130,7 +132,7 @@ class TestFsspecUploadHook(TestCase):
self.hook.shutdown()
def test_upload_then_shutdown(self):
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -148,7 +150,7 @@ class TestFsspecUploadHook(TestCase):
with self.block_upload():
# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -161,7 +163,7 @@ class TestFsspecUploadHook(TestCase):
)
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -173,7 +175,7 @@ class TestFsspecUploadHook(TestCase):
def test_shutdown_timeout(self):
with self.block_upload():
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -186,7 +188,7 @@ class TestFsspecUploadHook(TestCase):
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
with self.assertLogs(level=logging.ERROR) as logs:
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -198,21 +200,21 @@ class TestFsspecUploadHook(TestCase):
def test_upload_after_shutdown_logs(self):
self.hook.shutdown()
with self.assertLogs(level=logging.INFO) as logs:
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertEqual(len(logs.output), 3)
self.assertIn(
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
"attempting to upload file after FsspecUploadCompletionHook.shutdown() was already called",
logs.output[0],
)
class FsspecUploaderTest(TestCase):
def test_upload(self):
FsspecUploadHook._do_upload(
FsspecUploadCompletionHook._do_upload(
"memory://my_path",
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
)
@ -224,10 +226,10 @@ class FsspecUploaderTest(TestCase):
)
class TestFsspecUploadHookIntegration(TestBase):
class TestFsspecUploadCompletionHookIntegration(TestBase):
def setUp(self):
super().setUp()
self.hook = FsspecUploadHook(base_path=BASE_PATH)
self.hook = FsspecUploadCompletionHook(base_path=BASE_PATH)
def tearDown(self):
super().tearDown()
@ -242,7 +244,7 @@ class TestFsspecUploadHookIntegration(TestBase):
log_record = LogRecord()
with tracer.start_as_current_span("chat mymodel") as span:
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -282,7 +284,7 @@ class TestFsspecUploadHookIntegration(TestBase):
def test_stamps_empty_log(self):
log_record = LogRecord()
self.hook.upload(
self.hook.on_completion(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
@ -296,7 +298,7 @@ class TestFsspecUploadHookIntegration(TestBase):
def test_upload_bytes(self) -> None:
log_record = LogRecord()
self.hook.upload(
self.hook.on_completion(
inputs=[
types.InputMessage(
role="user",

View File

@ -1,99 +0,0 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from dataclasses import dataclass
from typing import Any, Callable
from unittest import TestCase
from unittest.mock import Mock, patch
from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK,
)
from opentelemetry.util.genai.upload_hook import (
UploadHook,
_NoOpUploadHook,
load_upload_hook,
)
class FakeUploadHook(UploadHook):
def upload(self, **kwargs: Any):
pass
class InvalidUploadHook:
pass
@dataclass
class FakeEntryPoint:
name: str
load: Callable[[], type[UploadHook]]
class TestUploadHook(TestCase):
@patch.dict("os.environ", {})
def test_load_upload_hook_noop(self):
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
@patch(
"opentelemetry.util.genai.upload_hook.entry_points",
)
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK: "my-hook"}
)
def test_load_upload_hook_custom(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("my-hook", lambda: FakeUploadHook)
]
self.assertIsInstance(load_upload_hook(), FakeUploadHook)
@patch("opentelemetry.util.genai.upload_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK: "my-hook"}
)
def test_load_upload_hook_invalid(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("my-hook", lambda: InvalidUploadHook)
]
with self.assertLogs(level=logging.DEBUG) as logs:
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
self.assertEqual(len(logs.output), 1)
self.assertIn("is not a valid UploadHook. Using noop", logs.output[0])
@patch("opentelemetry.util.genai.upload_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK: "my-hook"}
)
def test_load_upload_hook_error(self, mock_entry_points: Mock):
def load():
raise RuntimeError("error")
mock_entry_points.return_value = [FakeEntryPoint("my-hook", load)]
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
@patch("opentelemetry.util.genai.upload_hook.entry_points")
@patch.dict(
"os.environ", {OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK: "my-hook"}
)
def test_load_upload_hook_not_found(self, mock_entry_points: Mock):
mock_entry_points.return_value = [
FakeEntryPoint("other-hook", lambda: FakeUploadHook)
]
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)