opentelemetry-python-contrib/util/opentelemetry-util-genai/tests/test_fsspec_upload.py

275 lines
8.4 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=import-outside-toplevel,no-name-in-module
import importlib
import logging
import sys
import threading
from dataclasses import asdict
from typing import Any
from unittest import TestCase
from unittest.mock import MagicMock, patch
import fsspec
from opentelemetry._logs import LogRecord
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.genai import types
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
)
from opentelemetry.util.genai.upload_hook import (
_NoOpUploadHook,
load_upload_hook,
)
# Use MemoryFileSystem for testing
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.memory.MemoryFileSystem
BASE_PATH = "memory://"
@patch.dict(
"os.environ",
{
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK": "fsspec",
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH,
},
clear=True,
)
class TestFsspecEntryPoint(TestCase):
def test_fsspec_entry_point(self):
self.assertIsInstance(load_upload_hook(), FsspecUploadHook)
def test_fsspec_entry_point_no_fsspec(self):
"""Tests that the a no-op uploader is used when fsspec is not installed"""
from opentelemetry.util.genai import _fsspec_upload
# Simulate fsspec imports failing
with patch.dict(
sys.modules,
{"opentelemetry.util.genai._fsspec_upload.fsspec_hook": None},
):
importlib.reload(_fsspec_upload)
self.assertIsInstance(load_upload_hook(), _NoOpUploadHook)
MAXSIZE = 5
FAKE_INPUTS = [
types.InputMessage(
role="user",
parts=[types.Text(content="What is the capital of France?")],
),
]
FAKE_OUTPUTS = [
types.OutputMessage(
role="assistant",
parts=[types.Text(content="Paris")],
finish_reason="stop",
),
]
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
class TestFsspecUploadHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.hook = FsspecUploadHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
)
def tearDown(self) -> None:
self.hook.shutdown()
self._fsspec_patcher.stop()
def test_shutdown_no_items(self):
self.hook.shutdown()
def test_upload_then_shutdown(self):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
# all items should be consumed
self.hook.shutdown()
self.assertEqual(
self.mock_fsspec.open.call_count,
3,
"should have uploaded 3 files",
)
def test_upload_blocked(self):
unblock_upload = threading.Event()
def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()
self.mock_fsspec.open.side_effect = blocked_upload
# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)
unblock_upload.set()
def test_failed_upload_logs(self):
def failing_upload(*args: Any) -> None:
raise RuntimeError("failed to upload")
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
with self.assertLogs(level=logging.ERROR) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.hook.shutdown()
self.assertIn("fsspec uploader failed", logs.output[0])
def test_upload_after_shutdown_logs(self):
self.hook.shutdown()
with self.assertLogs(level=logging.INFO) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertEqual(len(logs.output), 1)
self.assertIn(
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
logs.output[0],
)
class FsspecUploaderTest(TestCase):
def test_upload(self):
FsspecUploadHook._do_upload(
"memory://my_path",
lambda: [asdict(fake_input) for fake_input in FAKE_INPUTS],
)
with fsspec.open("memory://my_path", "r") as file:
self.assertEqual(
file.read(),
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
)
class TestFsspecUploadHookIntegration(TestBase):
def assert_fsspec_equal(self, path: str, value: str) -> None:
with fsspec.open(path, "r") as file:
self.assertEqual(file.read(), value)
def test_upload_completions(self):
hook = FsspecUploadHook(
base_path=BASE_PATH,
)
tracer = self.tracer_provider.get_tracer(__name__)
log_record = LogRecord()
with tracer.start_as_current_span("chat mymodel") as span:
hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
span=span,
log_record=log_record,
)
hook.shutdown()
finished_spans = self.get_finished_spans()
self.assertEqual(len(finished_spans), 1)
span = finished_spans[0]
# span attributes, log attributes, and log body have refs
for attributes in [
span.attributes,
log_record.attributes,
]:
for ref_key in [
"gen_ai.input.messages_ref",
"gen_ai.output.messages_ref",
"gen_ai.system_instructions_ref",
]:
self.assertIn(ref_key, attributes)
self.assert_fsspec_equal(
span.attributes["gen_ai.input.messages_ref"],
'[{"role":"user","parts":[{"content":"What is the capital of France?","type":"text"}]}]',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.output.messages_ref"],
'[{"role":"assistant","parts":[{"content":"Paris","type":"text"}],"finish_reason":"stop"}]',
)
self.assert_fsspec_equal(
span.attributes["gen_ai.system_instructions_ref"],
'[{"content":"You are a helpful assistant.","type":"text"}]',
)
@staticmethod
def upload_with_log(log_record: LogRecord):
hook = FsspecUploadHook(
base_path=BASE_PATH,
)
hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
log_record=log_record,
)
hook.shutdown()
def test_stamps_empty_log(self):
log_record = LogRecord()
self.upload_with_log(log_record)
# stamp on both body and attributes
self.assertIn("gen_ai.input.messages_ref", log_record.attributes)
self.assertIn("gen_ai.output.messages_ref", log_record.attributes)
self.assertIn("gen_ai.system_instructions_ref", log_record.attributes)