gen ai uploader timeout and fix flaky bugs (#3770)

This commit is contained in:
Aaron Abbott 2025-09-19 15:17:31 -04:00 committed by GitHub
parent 9fab62bcea
commit 13fa314cc6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 76 additions and 36 deletions

View File

@ -21,8 +21,10 @@ import posixpath
import threading
from base64 import b64encode
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import ExitStack
from dataclasses import asdict, dataclass
from functools import partial
from time import time
from typing import Any, Callable, Final, Literal, TextIO, cast
from uuid import uuid4
@ -103,12 +105,12 @@ class FsspecUploadHook(UploadHook):
def _submit_all(self, upload_data: UploadData) -> None:
def done(future: Future[None]) -> None:
self._semaphore.release()
try:
future.result()
except Exception: # pylint: disable=broad-except
_logger.exception("fsspec uploader failed")
finally:
self._semaphore.release()
for path, json_encodeable in upload_data.items():
# could not acquire, drop data
@ -128,7 +130,7 @@ class FsspecUploadHook(UploadHook):
_logger.info(
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
)
break
self._semaphore.release()
def _calculate_ref_path(self) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
@ -209,9 +211,21 @@ class FsspecUploadHook(UploadHook):
**references,
}
def shutdown(self) -> None:
# TODO: support timeout
self._executor.shutdown()
def shutdown(self, *, timeout_sec: float = 10.0) -> None:
deadline = time() + timeout_sec
# Wait for all tasks to finish to flush the queue
with ExitStack() as stack:
for _ in range(self._max_size):
remaining = deadline - time()
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
# Couldn't finish flushing all uploads before timeout
break
stack.callback(self._semaphore.release)
# Queue is flushed and blocked, start shutdown
self._executor.shutdown(wait=False)
class Base64JsonEncoder(json.JSONEncoder):

View File

@ -18,6 +18,7 @@ import importlib
import logging
import sys
import threading
from contextlib import contextmanager
from dataclasses import asdict
from typing import Any
from unittest import TestCase
@ -84,12 +85,24 @@ FAKE_OUTPUTS = [
FAKE_SYSTEM_INSTRUCTION = [types.Text(content="You are a helpful assistant.")]
class ThreadSafeMagicMock(MagicMock):
def __init__(self, *args, **kwargs) -> None:
self.__dict__["_lock"] = threading.Lock()
super().__init__(*args, **kwargs)
def _increment_mock_call(self, /, *args, **kwargs):
with self.__dict__["_lock"]:
super()._increment_mock_call(*args, **kwargs)
class TestFsspecUploadHook(TestCase):
def setUp(self):
self._fsspec_patcher = patch(
"opentelemetry.util.genai._fsspec_upload.fsspec_hook.fsspec"
)
self.mock_fsspec = self._fsspec_patcher.start()
self.mock_fsspec.open = ThreadSafeMagicMock()
self.hook = FsspecUploadHook(
base_path=BASE_PATH,
max_size=MAXSIZE,
@ -99,6 +112,20 @@ class TestFsspecUploadHook(TestCase):
self.hook.shutdown()
self._fsspec_patcher.stop()
@contextmanager
def block_upload(self):
unblock_upload = threading.Event()
def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()
try:
self.mock_fsspec.open.side_effect = blocked_upload
yield
finally:
unblock_upload.set()
def test_shutdown_no_items(self):
self.hook.shutdown()
@ -118,46 +145,45 @@ class TestFsspecUploadHook(TestCase):
)
def test_upload_blocked(self):
unblock_upload = threading.Event()
with self.block_upload():
# fill the queue
for _ in range(MAXSIZE):
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
def blocked_upload(*args: Any):
unblock_upload.wait()
return MagicMock()
self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)
self.mock_fsspec.open.side_effect = blocked_upload
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
# fill the queue
for _ in range(MAXSIZE):
self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)
def test_shutdown_timeout(self):
with self.block_upload():
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertLessEqual(
self.mock_fsspec.open.call_count,
MAXSIZE,
f"uploader should only be called {MAXSIZE=} times",
)
with self.assertLogs(level=logging.WARNING) as logs:
self.hook.upload(
inputs=FAKE_INPUTS,
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertIn(
"fsspec upload queue is full, dropping upload", logs.output[0]
)
unblock_upload.set()
# shutdown should timeout and return even though there are still items in the queue
self.hook.shutdown(timeout_sec=0.01)
def test_failed_upload_logs(self):
def failing_upload(*args: Any) -> None:
raise RuntimeError("failed to upload")
self.mock_fsspec.open = MagicMock(wraps=failing_upload)
self.mock_fsspec.open.side_effect = RuntimeError("failed to upload")
with self.assertLogs(level=logging.ERROR) as logs:
self.hook.upload(
@ -177,7 +203,7 @@ class TestFsspecUploadHook(TestCase):
outputs=FAKE_OUTPUTS,
system_instruction=FAKE_SYSTEM_INSTRUCTION,
)
self.assertEqual(len(logs.output), 1)
self.assertEqual(len(logs.output), 3)
self.assertIn(
"attempting to upload file after FsspecUploadHook.shutdown() was already called",
logs.output[0],