Merge pull request #97 from dapr/cyb3rward0g/update-local-executor

Executors: Sandbox support + per-project bootstrap + full refactor
This commit is contained in:
Roberto Rodriguez 2025-04-23 23:13:52 -04:00 committed by GitHub
commit c31e985d81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 1267 additions and 159 deletions

View File

@ -0,0 +1,501 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "39c2dcc0",
"metadata": {},
"source": [
"# Executor: LocalCodeExecutorBasic Examples\n",
"\n",
"This notebook shows how to execute Python and shell snippets in **isolated, cached virtual environments**"
]
},
{
"cell_type": "markdown",
"id": "c4ff4b2b",
"metadata": {},
"source": [
"## Install Required Libraries\n",
"Before starting, ensure the required libraries are installed:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5b41a66a",
"metadata": {},
"outputs": [],
"source": [
"!pip install dapr-agents"
]
},
{
"cell_type": "markdown",
"id": "a9c01be3",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "508fd446",
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"\n",
"from dapr_agents.executors.local import LocalCodeExecutor\n",
"from dapr_agents.types.executor import CodeSnippet, ExecutionRequest\n",
"from rich.console import Console\n",
"from rich.ansi import AnsiDecoder\n",
"import shutil"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "27594072",
"metadata": {},
"outputs": [],
"source": [
"logging.basicConfig(level=logging.INFO)\n",
"\n",
"executor = LocalCodeExecutor()\n",
"console = Console()\n",
"decoder = AnsiDecoder()"
]
},
{
"cell_type": "markdown",
"id": "4d663475",
"metadata": {},
"source": [
"## Running a basic Python Code Snippet"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "ba45ddc8",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:dapr_agents.executors.local:Sandbox backend enabled: seatbelt\n",
"INFO:dapr_agents.executors.local:Created a new virtual environment\n",
"INFO:dapr_agents.executors.local:Installing print, rich\n",
"INFO:dapr_agents.executors.local:Snippet 1 finished in 2.442s\n"
]
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"color: #008000; text-decoration-color: #008000; font-weight: bold\">Hello executor!</span>\n",
"</pre>\n"
],
"text/plain": [
"\u001b[1;32mHello executor!\u001b[0m\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"code = \"\"\"\n",
"from rich import print\n",
"print(\"[bold green]Hello executor![/bold green]\")\n",
"\"\"\"\n",
"\n",
"request = ExecutionRequest(snippets=[\n",
" CodeSnippet(language='python', code=code, timeout=10)\n",
"])\n",
"\n",
"results = await executor.execute(request)\n",
"results[0] # raw result\n",
"\n",
"# prettyprint with Rich\n",
"console.print(*decoder.decode(results[0].output))"
]
},
{
"cell_type": "markdown",
"id": "d28c7531",
"metadata": {},
"source": [
"## Run a Shell Snipper"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "4ea89b85",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:dapr_agents.executors.local:Sandbox backend enabled: seatbelt\n",
"INFO:dapr_agents.executors.local:Snippet 1 finished in 0.019s\n"
]
},
{
"data": {
"text/plain": [
"[ExecutionResult(status='success', output='4\\n', exit_code=0)]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"shell_request = ExecutionRequest(snippets=[\n",
" CodeSnippet(language='sh', code='echo $((2+2))', timeout=5)\n",
"])\n",
"\n",
"await executor.execute(shell_request)"
]
},
{
"cell_type": "markdown",
"id": "da281b6e",
"metadata": {},
"source": [
"## Reuse the cached virtual environment"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "3e9e7e9b",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:dapr_agents.executors.local:Sandbox backend enabled: seatbelt\n",
"INFO:dapr_agents.executors.local:Reusing cached virtual environment.\n",
"INFO:dapr_agents.executors.local:Installing print, rich\n",
"INFO:dapr_agents.executors.local:Snippet 1 finished in 0.297s\n"
]
},
{
"data": {
"text/plain": [
"[ExecutionResult(status='success', output='\\x1b[1;32mHello executor!\\x1b[0m\\n', exit_code=0)]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Rerunning the same Python request will reuse the cached venv, so it is faster\n",
"await executor.execute(request)"
]
},
{
"cell_type": "markdown",
"id": "14dc3e4c",
"metadata": {},
"source": [
"## Inject Helper Functions"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "82f9a168",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:dapr_agents.executors.local:Sandbox backend enabled: seatbelt\n",
"INFO:dapr_agents.executors.local:Created a new virtual environment\n",
"INFO:dapr_agents.executors.local:Snippet 1 finished in 1.408s\n"
]
},
{
"data": {
"text/plain": [
"[ExecutionResult(status='success', output='42\\n', exit_code=0)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def fancy_sum(a: int, b: int) -> int:\n",
" return a + b\n",
"\n",
"executor.user_functions.append(fancy_sum)\n",
"\n",
"helper_request = ExecutionRequest(snippets=[\n",
" CodeSnippet(language='python', code='print(fancy_sum(40, 2))', timeout=5)\n",
"])\n",
"\n",
"await executor.execute(helper_request)"
]
},
{
"cell_type": "markdown",
"id": "25f9718c",
"metadata": {},
"source": [
"## Clean Up"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "b09059f1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Cache directory removed ✅\n"
]
}
],
"source": [
"shutil.rmtree(executor.cache_dir, ignore_errors=True)\n",
"print(\"Cache directory removed ✅\")"
]
},
{
"cell_type": "markdown",
"id": "2c93cdef",
"metadata": {},
"source": [
"## Package-manager detection & automatic bootstrap"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "8691f3e3",
"metadata": {},
"outputs": [],
"source": [
"from dapr_agents.executors.utils import package_manager as pm\n",
"import pathlib, tempfile"
]
},
{
"cell_type": "markdown",
"id": "e9e08d81",
"metadata": {},
"source": [
"### Create a throw-away project"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "4c7dd9c3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"tmp project: /var/folders/9z/8xhqw8x1611fcbhzl339yrs40000gn/T/tmpmssk0m2b\n"
]
}
],
"source": [
"tmp_proj = pathlib.Path(tempfile.mkdtemp())\n",
"(tmp_proj / \"requirements.txt\").write_text(\"rich==13.7.0\\n\")\n",
"print(\"tmp project:\", tmp_proj)"
]
},
{
"cell_type": "markdown",
"id": "03558a95",
"metadata": {},
"source": [
"### Show what the helper detects"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "3b5acbfb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"detect_package_managers -> [<PackageManagerType.PIP: 'pip'>]\n",
"get_install_command -> pip install -r requirements.txt\n"
]
}
],
"source": [
"print(\"detect_package_managers ->\",\n",
" [m.name for m in pm.detect_package_managers(tmp_proj)])\n",
"print(\"get_install_command ->\",\n",
" pm.get_install_command(tmp_proj))"
]
},
{
"cell_type": "markdown",
"id": "42f1ae7c",
"metadata": {},
"source": [
"### Point the executor at that directory"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "81e53cf4",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from contextlib import contextmanager, ExitStack\n",
"\n",
"@contextmanager\n",
"def chdir(path):\n",
" \"\"\"\n",
" Temporarily change the process CWD to *path*.\n",
"\n",
" Works on every CPython ≥ 3.6 (and PyPy) and restores the old directory\n",
" even if an exception is raised inside the block.\n",
" \"\"\"\n",
" old_cwd = os.getcwd()\n",
" os.chdir(path)\n",
" try:\n",
" yield\n",
" finally:\n",
" os.chdir(old_cwd)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "fb2f5052",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:dapr_agents.executors.local:bootstrapping python project with 'pip install -r requirements.txt'\n",
"INFO:dapr_agents.executors.local:Sandbox backend enabled: seatbelt\n",
"INFO:dapr_agents.executors.local:Created a new virtual environment\n",
"INFO:dapr_agents.executors.local:Snippet 1 finished in 1.433s\n"
]
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">bootstrap OK\n",
"\n",
"</pre>\n"
],
"text/plain": [
"bootstrap OK\n",
"\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"with ExitStack() as stack:\n",
" # keep a directory handle open (optional but handy if youll delete tmp_proj later)\n",
" stack.enter_context(os.scandir(tmp_proj))\n",
"\n",
" # <-- our portable replacement for contextlib.chdir()\n",
" stack.enter_context(chdir(tmp_proj))\n",
"\n",
" # run a trivial snippet; executor will bootstrap because it now “sees”\n",
" # requirements.txt in the current working directory\n",
" out = await executor.execute(\n",
" ExecutionRequest(snippets=[\n",
" CodeSnippet(language=\"python\", code=\"print('bootstrap OK')\", timeout=5)\n",
" ])\n",
" )\n",
" console.print(out[0].output)"
]
},
{
"cell_type": "markdown",
"id": "45de2386",
"metadata": {},
"source": [
"### Clean Up the throw-away project "
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "0c7aa010",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Cache directory removed ✅\n",
"temporary project removed ✅\n"
]
}
],
"source": [
"shutil.rmtree(executor.cache_dir, ignore_errors=True)\n",
"print(\"Cache directory removed ✅\")\n",
"shutil.rmtree(tmp_proj, ignore_errors=True)\n",
"print(\"temporary project removed ✅\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36ea4010",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.1"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -1,227 +1,331 @@
from dapr_agents.executors import CodeExecutorBase
from dapr_agents.types.executor import ExecutionRequest, ExecutionResult
from typing import List, Union, Any, Callable
from pydantic import Field
from pathlib import Path
"""Local executor that runs Python or shell snippets in cached virtual-envs."""
import asyncio
import venv
import logging
import ast
import hashlib
import inspect
import logging
import time
import ast
import venv
from pathlib import Path
from typing import Any, Callable, List, Sequence, Union
from pydantic import Field, PrivateAttr
from dapr_agents.executors import CodeExecutorBase
from dapr_agents.executors.sandbox import detect_backend, wrap_command, SandboxType
from dapr_agents.executors.utils.package_manager import get_install_command, get_project_type
from dapr_agents.types.executor import ExecutionRequest, ExecutionResult
logger = logging.getLogger(__name__)
class LocalCodeExecutor(CodeExecutorBase):
"""Executes code locally in an optimized virtual environment with caching,
user-defined functions, and enhanced security.
Supports Python and shell execution with real-time logging,
efficient dependency management, and reduced file I/O.
class LocalCodeExecutor(CodeExecutorBase):
"""
Run snippets locally with **optional OS-level sandboxing** and
per-snippet virtual-env caching.
"""
cache_dir: Path = Field(default_factory=lambda: Path.cwd() / ".dapr_agents_cached_envs", description="Directory for cached virtual environments and execution artifacts.")
user_functions: List[Callable] = Field(default_factory=list, description="List of user-defined functions available during execution.")
cleanup_threshold: int = Field(default=604800, description="Time (in seconds) before cached virtual environments are considered for cleanup.")
cache_dir: Path = Field(
default_factory=lambda: Path.cwd() / ".dapr_agents_cached_envs",
description="Directory that stores cached virtual environments.",
)
user_functions: List[Callable] = Field(
default_factory=list,
description="Functions whose source is prepended to every Python snippet.",
)
sandbox: SandboxType = Field(
default="auto",
description="'seatbelt' | 'firejail' | 'none' | 'auto' (best available)",
)
writable_paths: List[Path] = Field(
default_factory=list,
description="Extra paths the sandboxed process may write to.",
)
cleanup_threshold: int = Field(
default=604_800, # one week
description="Seconds before a cached venv is considered stale.",
)
_env_lock = asyncio.Lock()
_env_lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock)
_bootstrapped_root: Path | None = PrivateAttr(default=None)
def model_post_init(self, __context: Any) -> None:
"""Ensures the cache directory is created after model initialization."""
def model_post_init(self, __context: Any) -> None: # noqa: D401
"""Create ``cache_dir`` after pydantic instantiation."""
super().model_post_init(__context)
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info("Cache directory set.")
logger.debug(f"{self.cache_dir}")
logger.debug("venv cache directory: %s", self.cache_dir)
async def execute(self, request: Union[ExecutionRequest, dict]) -> List[ExecutionResult]:
"""Executes Python or shell code securely in a persistent virtual environment with caching and real-time logging.
async def execute(
self, request: Union[ExecutionRequest, dict]
) -> List[ExecutionResult]:
"""
Run the snippets in *request* and return their results.
Args:
request (Union[ExecutionRequest, dict]): The execution request containing code snippets.
request: ``ExecutionRequest`` instance or a raw mapping that can
be unpacked into one.
Returns:
List[ExecutionResult]: A list of execution results for each snippet.
A list with one ``ExecutionResult`` for every snippet in the
original request.
"""
if isinstance(request, dict):
request = ExecutionRequest(**request)
await self._bootstrap_project()
self.validate_snippets(request.snippets)
results = []
for snippet in request.snippets:
start_time = time.time()
# Resolve sandbox once
eff_backend: SandboxType = (
detect_backend() if self.sandbox == "auto" else self.sandbox
)
if eff_backend != "none":
logger.info(
"Sandbox backend enabled: %s%s",
eff_backend,
f" (writable: {', '.join(map(str, self.writable_paths))})"
if self.writable_paths
else "",
)
else:
logger.info("Sandbox disabled - running commands directly.")
# Main loop
results: list[ExecutionResult] = []
for snip_idx, snippet in enumerate(request.snippets, start=1):
start = time.perf_counter()
# Assemble the *raw* command
if snippet.language == "python":
required_packages = self._extract_imports(snippet.code)
logger.info(f"Packages Required: {required_packages}")
venv_path = await self._get_or_create_cached_env(required_packages)
# Load user-defined functions dynamically in memory
function_code = "\n".join(inspect.getsource(f) for f in self.user_functions) if self.user_functions else ""
exec_script = f"{function_code}\n{snippet.code}" if function_code else snippet.code
python_executable = venv_path / "bin" / "python3"
command = [str(python_executable), "-c", exec_script]
env = await self._prepare_python_env(snippet.code)
python_bin = env / "bin" / "python3"
prelude = "\n".join(inspect.getsource(fn) for fn in self.user_functions)
script = f"{prelude}\n{snippet.code}" if prelude else snippet.code
raw_cmd: Sequence[str] = [str(python_bin), "-c", script]
else:
command = ["sh", "-c", snippet.code]
raw_cmd = ["sh", "-c", snippet.code]
logger.info("Executing command")
logger.debug(f"{' '.join(command)}")
# Wrap for sandbox
final_cmd = wrap_command(raw_cmd, eff_backend, self.writable_paths)
logger.debug(
"Snippet %s - launch command: %s",
snip_idx,
" ".join(final_cmd),
)
try:
# Start subprocess execution with explicit timeout
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=True
)
# Run it
snip_timeout = getattr(snippet, "timeout", request.timeout)
results.append(await self._run_subprocess(final_cmd, snip_timeout))
# Wait for completion with timeout enforcement
stdout_output, stderr_output = await asyncio.wait_for(process.communicate(), timeout=request.timeout)
status = "success" if process.returncode == 0 else "error"
execution_time = time.time() - start_time
logger.info(f"Execution completed in {execution_time:.2f} seconds.")
if stderr_output:
logger.error(f"STDERR: {stderr_output.decode()}")
results.append(ExecutionResult(
status=status,
output=stdout_output.decode(),
exit_code=process.returncode
))
except asyncio.TimeoutError:
process.terminate() # Ensure subprocess is killed if it times out
results.append(ExecutionResult(status="error", output="Execution timed out", exit_code=1))
except Exception as e:
results.append(ExecutionResult(status="error", output=str(e), exit_code=1))
logger.info(
"Snippet %s finished in %.3fs",
snip_idx,
time.perf_counter() - start,
)
return results
def _extract_imports(self, code: str) -> List[str]:
"""Parses a Python script and extracts top-level module imports.
async def _bootstrap_project(self) -> None:
"""Install top-level dependencies once per executor instance."""
cwd = Path.cwd().resolve()
if self._bootstrapped_root == cwd:
return
install_cmd = get_install_command(str(cwd))
if install_cmd:
logger.info(
"bootstrapping %s project with '%s'",
get_project_type(str(cwd)).value,
install_cmd
)
proc = await asyncio.create_subprocess_shell(
install_cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, err = await proc.communicate()
if proc.returncode:
logger.warning(
"bootstrap failed (%d): %s",
proc.returncode,
err.decode().strip()
)
self._bootstrapped_root = cwd
async def _prepare_python_env(self, code: str) -> Path:
"""
Ensure a virtual-env exists that satisfies *code* imports.
Args:
code (str): The Python code snippet to analyze.
code: User-supplied Python source.
Returns:
List[str]: A list of imported module names found in the code.
Path to the virtual-env directory.
"""
imports = self._extract_imports(code)
env = await self._get_or_create_cached_env(imports)
missing = await self._get_missing_packages(imports, env)
if missing:
await self._install_missing_packages(missing, env)
return env
@staticmethod
def _extract_imports(code: str) -> List[str]:
"""
Return all top-level imported module names in *code*.
Args:
code: Python source to scan.
Returns:
Unique list of first-segment module names.
Raises:
SyntaxError: If the code has invalid syntax and cannot be parsed.
SyntaxError: If *code* cannot be parsed.
"""
try:
parsed_code = ast.parse(code)
except SyntaxError as e:
logger.error(f"Syntax error while parsing code: {e}")
tree = ast.parse(code)
except SyntaxError:
logger.error("cannot parse user code, assuming no imports")
return []
modules = set()
for node in ast.walk(parsed_code):
if isinstance(node, ast.Import):
for alias in node.names:
modules.add(alias.name.split('.')[0]) # Get the top-level package
elif isinstance(node, ast.ImportFrom) and node.module:
modules.add(node.module.split('.')[0])
names = {
alias.name.partition('.')[0]
for node in ast.walk(tree)
for alias in getattr(node, "names", [])
if isinstance(node, (ast.Import, ast.ImportFrom))
}
if any(isinstance(node, ast.ImportFrom) and node.module
for node in ast.walk(tree)):
names |= {
node.module.partition('.')[0]
for node in ast.walk(tree)
if isinstance(node, ast.ImportFrom) and node.module
}
return sorted(names)
return list(modules)
async def _get_missing_packages(self, packages: List[str], env_path: Path) -> List[str]:
"""Determines which packages are missing inside a given virtual environment.
Args:
packages (List[str]): A list of package names to check.
env_path (Path): Path to the virtual environment.
Returns:
List[str]: A list of packages that are missing from the virtual environment.
async def _get_missing_packages(
self, packages: List[str], env_path: Path
) -> List[str]:
"""
python_bin = env_path / "bin" / "python3"
async def check_package(pkg):
process = await asyncio.create_subprocess_exec(
str(python_bin), "-c", f"import {pkg}",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
return pkg if process.returncode != 0 else None # Return package name if missing
tasks = [check_package(pkg) for pkg in packages]
results = await asyncio.gather(*tasks)
return [pkg for pkg in results if pkg] # Filter out installed packages
async def _get_or_create_cached_env(self, dependencies: List[str]) -> Path:
"""Creates or retrieves a cached virtual environment based on dependencies.
This function checks if a suitable cached virtual environment exists.
If it does not, it creates a new one and installs missing dependencies.
Identify which *packages* are not importable from *env_path*.
Args:
dependencies (List[str]): List of required package names.
packages: Candidate import names.
env_path: Path to the virtual-env.
Returns:
Path: Path to the virtual environment directory.
Subset of *packages* that need installation.
"""
python = env_path / "bin" / "python3"
async def probe(pkg: str) -> str | None:
proc = await asyncio.create_subprocess_exec(
str(python),
"- <<PY\nimport importlib.util, sys;"
f"sys.exit(importlib.util.find_spec('{pkg}') is None)\nPY",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
)
await proc.wait()
return pkg if proc.returncode else None
missing = await asyncio.gather(*(probe(p) for p in packages))
return [m for m in missing if m]
async def _get_or_create_cached_env(self, deps: List[str]) -> Path:
"""
Return a cached venv path keyed by the sorted list *deps*.
Args:
deps: Import names required by user code.
Returns:
Path to the virtual-env directory.
Raises:
RuntimeError: If virtual environment creation or package installation fails.
RuntimeError: If venv creation fails.
"""
async with self._env_lock:
env_hash = hashlib.md5(",".join(sorted(dependencies)).encode()).hexdigest()
env_path = self.cache_dir / f"env_{env_hash}"
digest = hashlib.sha1(",".join(sorted(deps)).encode()).hexdigest()
env_path = self.cache_dir / f"env_{digest}"
async with self._env_lock:
if env_path.exists():
logger.info("Reusing cached virtual environment.")
else:
logger.info("Setting up a new virtual environment.")
try:
venv.create(str(env_path), with_pip=True)
except Exception as e:
logger.error(f"Failed to create virtual environment: {e}")
raise RuntimeError(f"Virtual environment creation failed: {e}")
venv.create(env_path, with_pip=True)
logger.info("Created a new virtual environment")
logger.debug("venv %s created", env_path)
except Exception as exc: # noqa: BLE001
raise RuntimeError("virtual-env creation failed") from exc
return env_path
# Identify missing packages
missing_packages = await self._get_missing_packages(dependencies, env_path)
if missing_packages:
await self._install_missing_packages(missing_packages, env_path)
return env_path
async def _install_missing_packages(self, packages: List[str], env_dir: Path):
"""Installs missing Python packages inside the virtual environment.
async def _install_missing_packages(
self, packages: List[str], env_dir: Path
) -> None:
"""
``pip install`` *packages* inside *env_dir*.
Args:
packages (List[str]): A list of package names to install.
env_dir (Path): Path to the virtual environment where packages should be installed.
packages: Package names to install.
env_dir: Target virtual-env directory.
Raises:
RuntimeError: If the package installation process fails.
RuntimeError: If installation returns non-zero exit code.
"""
if not packages:
return
python = env_dir / "bin" / "python3"
cmd = [str(python), "-m", "pip", "install", *packages]
logger.info("Installing %s", ", ".join(packages))
python_bin = env_dir / "bin" / "python3"
command = [str(python_bin), "-m", "pip", "install", *packages]
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.DEVNULL, # Suppresses stdout since it's not used
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=True
)
_, stderr = await process.communicate() # Capture only stderr
_, err = await proc.communicate()
if proc.returncode != 0:
msg = err.decode().strip()
logger.error("pip install failed: %s", msg)
raise RuntimeError(msg)
logger.debug("Installed %d package(s)", len(packages))
if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(f"Package installation failed: {error_msg}")
raise RuntimeError(f"Package installation failed: {error_msg}")
async def _run_subprocess(self, cmd: Sequence[str], timeout: int) -> ExecutionResult:
"""
Run *cmd* with *timeout* seconds.
logger.info(f"Installed dependencies: {', '.join(packages)}")
Args:
cmd: Command list to execute.
timeout: Maximum runtime in seconds.
Returns:
``ExecutionResult`` with captured output.
"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
out, err = await asyncio.wait_for(proc.communicate(), timeout)
status = "success" if proc.returncode == 0 else "error"
if err:
logger.debug("stderr: %s", err.decode().strip())
return ExecutionResult(
status=status,
output=out.decode(),
exit_code=proc.returncode
)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return ExecutionResult(status="error", output="execution timed out", exit_code=1)
except Exception as exc:
return ExecutionResult(status="error", output=str(exc), exit_code=1)

View File

@ -0,0 +1,199 @@
"""Light-weight cross-platform sandbox helpers."""
import platform
import shutil
from pathlib import Path
from typing import List, Literal, Sequence
SandboxType = Literal["none", "seatbelt", "firejail", "auto"]
_READ_ONLY_SEATBELT_POLICY = r"""
(version 1)
; ---------------- default = deny everything -----------------
(deny default)
; ---------------- read-only FS access -----------------------
(allow file-read*)
; ---------------- minimal process mgmt ----------------------
(allow process-exec)
(allow process-fork)
(allow signal (target self))
; ---------------- write-only to /dev/null -------------------
(allow file-write-data
(require-all
(path "/dev/null")
(vnode-type CHARACTER-DEVICE)))
; ---------------- harmless sysctls --------------------------
(allow sysctl-read
(sysctl-name "hw.activecpu")
(sysctl-name "hw.busfrequency_compat")
(sysctl-name "hw.byteorder")
(sysctl-name "hw.cacheconfig")
(sysctl-name "hw.cachelinesize_compat")
(sysctl-name "hw.cpufamily")
(sysctl-name "hw.cpufrequency_compat")
(sysctl-name "hw.cputype")
(sysctl-name "hw.l1dcachesize_compat")
(sysctl-name "hw.l1icachesize_compat")
(sysctl-name "hw.l2cachesize_compat")
(sysctl-name "hw.l3cachesize_compat")
(sysctl-name "hw.logicalcpu_max")
(sysctl-name "hw.machine")
(sysctl-name "hw.ncpu")
(sysctl-name "hw.nperflevels")
(sysctl-name "hw.memsize")
(sysctl-name "hw.pagesize")
(sysctl-name "hw.packages")
(sysctl-name "hw.physicalcpu_max")
(sysctl-name "kern.hostname")
(sysctl-name "kern.osrelease")
(sysctl-name "kern.ostype")
(sysctl-name "kern.osversion")
(sysctl-name "kern.version")
(sysctl-name-prefix "hw.perflevel")
)
"""
def detect_backend() -> SandboxType: # noqa: D401
"""Return the best-effort sandbox backend for the current host."""
system = platform.system()
if system == "Darwin" and shutil.which("sandbox-exec"):
return "seatbelt"
if system == "Linux" and shutil.which("firejail"):
return "firejail"
return "none"
def _seatbelt_cmd(cmd: Sequence[str], writable_paths: List[Path]) -> List[str]:
"""
Construct a **macOS seatbelt** command line.
The resulting list can be passed directly to `asyncio.create_subprocess_exec`.
It launches the target *cmd* under **sandbox-exec** with an
*initially-read-only* profile; every directory in *writable_paths* is added
as an explicit write-allowed sub-path.
Args:
cmd:
The *raw* command (program + args) that should run inside the sandbox.
writable_paths:
Absolute paths that the child process must be able to modify
(e.g. a temporary working directory).
Each entry becomes a param `-D WR<i>=<path>` and a corresponding
``file-write*`` rule in the generated profile.
Returns:
list[str]
A fully-assembled ``sandbox-exec`` invocation:
``['sandbox-exec', '-p', <profile>, , '--', *cmd]``.
"""
policy = _READ_ONLY_SEATBELT_POLICY
params: list[str] = []
if writable_paths:
# Build parameter substitutions and the matching `(allow file-write*)` stanza.
write_terms: list[str] = []
for idx, path in enumerate(writable_paths):
param = f"WR{idx}"
params.extend(["-D", f"{param}={path}"])
write_terms.append(f'(subpath (param "{param}"))')
policy += f"\n(allow file-write*\n {' '.join(write_terms)}\n)"
return [
"sandbox-exec",
"-p",
policy,
*params,
"--",
*cmd,
]
def _firejail_cmd(cmd: Sequence[str], writable_paths: List[Path]) -> List[str]:
"""
Build a **Firejail** command line (Linux only).
The wrapper enables seccomp, disables sound and networking, and whitelists
the provided *writable_paths* so the child process can persist data there.
Args:
cmd:
The command (program + args) to execute.
writable_paths:
Directories that must remain writable inside the Firejail sandbox.
Returns:
list[str]
A Firejail-prefixed command suitable for
``asyncio.create_subprocess_exec``.
Raises:
ValueError
If *writable_paths* contains non-absolute paths.
"""
for p in writable_paths:
if not p.is_absolute():
raise ValueError(f"Firejail whitelist paths must be absolute: {p}")
rw_flags = sum([["--whitelist", str(p)] for p in writable_paths], [])
return [
"firejail",
"--quiet", # suppress banner
"--seccomp", # enable seccomp filter
"--nosound",
"--net=none",
*rw_flags,
"--",
*cmd,
]
def wrap_command(
cmd: Sequence[str],
backend: SandboxType,
writable_paths: List[Path] | None = None,
) -> List[str]:
"""
Produce a sandbox-wrapped command according to *backend*.
This is the single public helper used by the executors: it hides the
platform-specific details of **seatbelt** and **Firejail** while providing
a graceful fallback to no sandbox.
Args:
cmd:
The raw command (program + args) to execute.
backend:
One of ``'seatbelt'``, ``'firejail'``, ``'none'`` or ``'auto'``.
When ``'auto'`` is supplied the caller should already have resolved the
platform with :func:`detect_backend`; the value is treated as ``'none'``.
writable_paths:
Extra directories that must remain writable inside the sandbox.
Ignored when *backend* is ``'none'`` / ``'auto'``.
Returns:
list[str]
The command list ready for ``asyncio.create_subprocess_exec``.
If sandboxing is disabled, this is simply ``list(cmd)``.
Raises:
ValueError
If an unrecognised *backend* value is given.
"""
if backend in ("none", "auto"):
return list(cmd)
writable_paths = writable_paths or []
if backend == "seatbelt":
return _seatbelt_cmd(cmd, writable_paths)
if backend == "firejail":
return _firejail_cmd(cmd, writable_paths)
raise ValueError(f"Unknown sandbox backend: {backend!r}")

View File

@ -0,0 +1,303 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import List, Optional, Set
from functools import lru_cache
from enum import Enum
class PackageManagerType(str, Enum):
"""Types of package managers that can be detected."""
PIP = "pip"
POETRY = "poetry"
PIPENV = "pipenv"
CONDA = "conda"
NPM = "npm"
YARN = "yarn"
PNPM = "pnpm"
BUN = "bun"
CARGO = "cargo"
GO = "go"
MAVEN = "maven"
GRADLE = "gradle"
COMPOSER = "composer"
UNKNOWN = "unknown"
class ProjectType(str, Enum):
"""Types of projects that can be detected."""
PYTHON = "python"
NODE = "node"
RUST = "rust"
GO = "go"
JAVA = "java"
PHP = "php"
UNKNOWN = "unknown"
class PackageManager:
"""Information about a package manager and its commands."""
def __init__(
self,
name: PackageManagerType,
project_type: ProjectType,
install_cmd: str,
add_cmd: Optional[str] = None,
remove_cmd: Optional[str] = None,
update_cmd: Optional[str] = None,
markers: Optional[List[str]] = None,
) -> None:
"""
Initialize a package manager.
Args:
name: Package manager identifier.
project_type: Type of project this manager serves.
install_cmd: Command to install project dependencies.
add_cmd: Command to add a single package.
remove_cmd: Command to remove a package.
update_cmd: Command to update packages.
markers: Filenames indicating this manager in a project.
"""
self.name = name
self.project_type = project_type
self.install_cmd = install_cmd
self.add_cmd = add_cmd or f"{name.value} install"
self.remove_cmd = remove_cmd or f"{name.value} remove"
self.update_cmd = update_cmd or f"{name.value} update"
self.markers = markers or []
def __str__(self) -> str:
return self.name.value
# Known package managers
PACKAGE_MANAGERS: dict[PackageManagerType, PackageManager] = {
# Python package managers
PackageManagerType.PIP: PackageManager(
name=PackageManagerType.PIP,
project_type=ProjectType.PYTHON,
install_cmd="pip install -r requirements.txt",
add_cmd="pip install",
remove_cmd="pip uninstall",
update_cmd="pip install --upgrade",
markers=["requirements.txt", "setup.py", "setup.cfg"]
),
PackageManagerType.POETRY: PackageManager(
name=PackageManagerType.POETRY,
project_type=ProjectType.PYTHON,
install_cmd="poetry install",
add_cmd="poetry add",
remove_cmd="poetry remove",
update_cmd="poetry update",
markers=["pyproject.toml", "poetry.lock"]
),
PackageManagerType.PIPENV: PackageManager(
name=PackageManagerType.PIPENV,
project_type=ProjectType.PYTHON,
install_cmd="pipenv install",
add_cmd="pipenv install",
remove_cmd="pipenv uninstall",
update_cmd="pipenv update",
markers=["Pipfile", "Pipfile.lock"]
),
PackageManagerType.CONDA: PackageManager(
name=PackageManagerType.CONDA,
project_type=ProjectType.PYTHON,
install_cmd="conda env update -f environment.yml",
add_cmd="conda install",
remove_cmd="conda remove",
update_cmd="conda update",
markers=["environment.yml", "environment.yaml"]
),
# JavaScript package managers
PackageManagerType.NPM: PackageManager(
name=PackageManagerType.NPM,
project_type=ProjectType.NODE,
install_cmd="npm install",
add_cmd="npm install",
remove_cmd="npm uninstall",
update_cmd="npm update",
markers=["package.json", "package-lock.json"]
),
PackageManagerType.YARN: PackageManager(
name=PackageManagerType.YARN,
project_type=ProjectType.NODE,
install_cmd="yarn install",
add_cmd="yarn add",
remove_cmd="yarn remove",
update_cmd="yarn upgrade",
markers=["package.json", "yarn.lock"]
),
PackageManagerType.PNPM: PackageManager(
name=PackageManagerType.PNPM,
project_type=ProjectType.NODE,
install_cmd="pnpm install",
add_cmd="pnpm add",
remove_cmd="pnpm remove",
update_cmd="pnpm update",
markers=["package.json", "pnpm-lock.yaml"]
),
PackageManagerType.BUN: PackageManager(
name=PackageManagerType.BUN,
project_type=ProjectType.NODE,
install_cmd="bun install",
add_cmd="bun add",
remove_cmd="bun remove",
update_cmd="bun update",
markers=["package.json", "bun.lockb"]
),
}
@lru_cache(maxsize=None)
def is_installed(name: str) -> bool:
"""
Check if a given command exists on PATH.
Args:
name: Command name to check.
Returns:
True if the command is available, False otherwise.
"""
return shutil.which(name) is not None
@lru_cache(maxsize=None)
def detect_package_managers(directory: str) -> List[PackageManager]:
"""
Detect all installed package managers by looking for marker files.
Args:
directory: Path to the project root.
Returns:
A list of PackageManager instances found in the directory.
"""
dir_path = Path(directory)
if not dir_path.is_dir():
return []
found: List[PackageManager] = []
for pm in PACKAGE_MANAGERS.values():
for marker in pm.markers:
if (dir_path / marker).exists() and is_installed(pm.name.value):
found.append(pm)
break
return found
@lru_cache(maxsize=None)
def get_primary_package_manager(directory: str) -> Optional[PackageManager]:
"""
Determine the primary package manager using lockfile heuristics.
Args:
directory: Path to the project root.
Returns:
The chosen PackageManager or None if none detected.
"""
managers = detect_package_managers(directory)
if not managers:
return None
if len(managers) == 1:
return managers[0]
dir_path = Path(directory)
# Prefer lockfiles over others
lock_priority = [
(PackageManagerType.POETRY, "poetry.lock"),
(PackageManagerType.PIPENV, "Pipfile.lock"),
(PackageManagerType.PNPM, "pnpm-lock.yaml"),
(PackageManagerType.YARN, "yarn.lock"),
(PackageManagerType.BUN, "bun.lockb"),
(PackageManagerType.NPM, "package-lock.json"),
]
for pm_type, lock in lock_priority:
if (dir_path / lock).exists() and PACKAGE_MANAGERS.get(pm_type) in managers:
return PACKAGE_MANAGERS[pm_type]
return managers[0]
def get_install_command(directory: str) -> Optional[str]:
"""
Get the shell command to install project dependencies.
Args:
directory: Path to the project root.
Returns:
A shell command string or None if no manager detected.
"""
pm = get_primary_package_manager(directory)
return pm.install_cmd if pm else None
def get_add_command(directory: str, package: str, dev: bool = False) -> Optional[str]:
"""
Get the shell command to add a package to the project.
Args:
directory: Path to the project root.
package: Package name to add.
dev: Whether to add as a development dependency.
Returns:
A shell command string or None if no manager detected.
"""
pm = get_primary_package_manager(directory)
if not pm:
return None
base = pm.add_cmd
if dev and pm.name in {
PackageManagerType.PIP,
PackageManagerType.POETRY,
PackageManagerType.NPM,
PackageManagerType.YARN,
PackageManagerType.PNPM,
PackageManagerType.BUN,
PackageManagerType.COMPOSER,
}:
flag = "--dev" if pm.name in {PackageManagerType.PIP, PackageManagerType.POETRY} else "--save-dev"
return f"{base} {package} {flag}"
return f"{base} {package}"
def get_project_type(directory: str) -> ProjectType:
"""
Infer project type from the primary package manager or file extensions.
Args:
directory: Path to the project root.
Returns:
The detected ProjectType.
"""
pm = get_primary_package_manager(directory)
if pm:
return pm.project_type
# Fallback by extension scanning
exts: Set[str] = set()
for path in Path(directory).rglob('*'):
if path.is_file():
exts.add(path.suffix.lower())
if len(exts) > 50:
break
if '.py' in exts:
return ProjectType.PYTHON
if {'.js', '.ts'} & exts:
return ProjectType.NODE
if '.rs' in exts:
return ProjectType.RUST
if '.go' in exts:
return ProjectType.GO
if '.java' in exts:
return ProjectType.JAVA
if '.php' in exts:
return ProjectType.PHP
return ProjectType.UNKNOWN

View File

@ -13,6 +13,7 @@ class CodeSnippet(BaseModel):
language: str = Field(..., description="The programming language of the code snippet (e.g., 'python', 'javascript').")
code: str = Field(..., description="The actual source code to be executed.")
timeout: int = Field(5, description="Per-snippet timeout (seconds). Executor falls back to the request-level timeout if omitted.")
class ExecutionRequest(BaseModel):
"""Represents a request to execute a code snippet."""