mirror of https://github.com/vllm-project/vllm.git
363 lines
12 KiB
Python
363 lines
12 KiB
Python
# SPDX-License-Identifier: Apache-2.0
|
|
import argparse
|
|
import asyncio
|
|
import logging
|
|
import random
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
import aiohttp # Import aiohttp
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
|
|
from backend_request_func import RequestFuncInput, RequestFuncOutput
|
|
from benchmark_dataset import RandomDataset, SampleRequest
|
|
|
|
try:
|
|
from vllm.transformers_utils.tokenizer import get_tokenizer
|
|
except ImportError:
|
|
from backend_request_func import get_tokenizer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkMetrics:
|
|
completed: int
|
|
total_input: int
|
|
total_output: int
|
|
mean_ttft_ms: float
|
|
median_ttft_ms: float
|
|
std_ttft_ms: float
|
|
percentiles_ttft_ms: list[tuple[float, float]]
|
|
mean_itl_ms: float
|
|
median_itl_ms: float
|
|
std_itl_ms: float
|
|
percentiles_itl_ms: list[tuple[float, float]]
|
|
mean_e2el_ms: float
|
|
median_e2el_ms: float
|
|
std_e2el_ms: float
|
|
percentiles_e2el_ms: list[tuple[float, float]]
|
|
|
|
|
|
async def reset_cache(reset_url: str):
|
|
"""Sends a POST request to reset the prefix cache."""
|
|
logger.debug("Resetting prefix cache at %s", reset_url)
|
|
try:
|
|
async with (
|
|
aiohttp.ClientSession() as session,
|
|
session.post(reset_url) as response,
|
|
):
|
|
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
|
|
logger.debug("Prefix cache reset successful: %s", response.status)
|
|
except aiohttp.ClientConnectorError as e:
|
|
logger.error("Failed to connect to cache reset endpoint %s: %s}", reset_url, e)
|
|
except aiohttp.ClientResponseError as e:
|
|
logger.error(
|
|
"Cache reset request failed with status %s: %s", e.status, e.message
|
|
)
|
|
except Exception as e:
|
|
logger.error("An unexpected error occurred during cache reset: %s", e)
|
|
|
|
|
|
async def sequential_benchmark(
|
|
backend: str,
|
|
api_url: str,
|
|
model_id: str,
|
|
tokenizer,
|
|
input_requests: list[SampleRequest],
|
|
request_func,
|
|
selected_percentiles: list[float],
|
|
cache_reset_url: Optional[str] = None,
|
|
):
|
|
"""
|
|
Benchmark that processes requests sequentially, waiting for each to complete
|
|
before starting the next one. Resets prefix cache between requests.
|
|
"""
|
|
outputs = []
|
|
|
|
pbar = tqdm(total=len(input_requests))
|
|
|
|
benchmark_start_time = time.perf_counter()
|
|
|
|
# Process requests sequentially
|
|
for request in input_requests:
|
|
prompt, prompt_len, output_len = (
|
|
request.prompt,
|
|
request.prompt_len,
|
|
request.expected_output_len,
|
|
)
|
|
|
|
logger.info("Sending request with len %s", request.prompt_len)
|
|
logger.debug('Request str: "%s"', request.prompt[:50])
|
|
request_start_time = time.perf_counter()
|
|
|
|
request_func_input = RequestFuncInput(
|
|
model=model_id,
|
|
prompt=prompt,
|
|
api_url=api_url,
|
|
prompt_len=prompt_len,
|
|
output_len=output_len,
|
|
)
|
|
|
|
output = await request_func(request_func_input=request_func_input)
|
|
|
|
request_end_time = time.perf_counter()
|
|
# Add timing information
|
|
if output.success and not hasattr(output, "latency"):
|
|
output.latency = request_end_time - request_start_time
|
|
logger.info("Finished request with latency %.4f s", output.latency)
|
|
|
|
outputs.append(output)
|
|
pbar.update(1)
|
|
|
|
pbar.close()
|
|
|
|
benchmark_duration = time.perf_counter() - benchmark_start_time
|
|
|
|
# Calculate metrics
|
|
metrics = calculate_metrics(
|
|
input_requests=input_requests,
|
|
outputs=outputs,
|
|
dur_s=benchmark_duration,
|
|
tokenizer=tokenizer,
|
|
selected_percentiles=selected_percentiles,
|
|
)
|
|
|
|
print_results(metrics, benchmark_duration)
|
|
|
|
result = {
|
|
"duration": benchmark_duration,
|
|
"completed": metrics.completed,
|
|
"total_input_tokens": metrics.total_input,
|
|
"total_output_tokens": metrics.total_output,
|
|
"input_lens": [request.prompt_len for request in input_requests],
|
|
"output_lens": [
|
|
output.output_tokens if output.success else 0 for output in outputs
|
|
],
|
|
"ttfts": [output.ttft for output in outputs if output.success],
|
|
"itls": [output.itl for output in outputs if output.success],
|
|
"generated_texts": [
|
|
output.generated_text for output in outputs if output.success
|
|
],
|
|
"errors": [output.error for output in outputs if not output.success],
|
|
}
|
|
|
|
# Add summary statistics
|
|
for stat_name in ["ttft", "itl", "e2el"]:
|
|
for metric_name in ["mean", "median", "std"]:
|
|
result[f"{metric_name}_{stat_name}_ms"] = getattr(
|
|
metrics, f"{metric_name}_{stat_name}_ms"
|
|
)
|
|
|
|
for p, value in getattr(metrics, f"percentiles_{stat_name}_ms"):
|
|
p_word = str(int(p)) if int(p) == p else str(p)
|
|
result[f"p{p_word}_{stat_name}_ms"] = value
|
|
|
|
return result
|
|
|
|
|
|
def calculate_metrics(
|
|
input_requests: list[SampleRequest],
|
|
outputs: list[RequestFuncOutput],
|
|
dur_s: float,
|
|
tokenizer,
|
|
selected_percentiles: list[float],
|
|
) -> BenchmarkMetrics:
|
|
"""Calculate benchmark metrics from results."""
|
|
total_input = 0
|
|
completed = 0
|
|
total_output = 0
|
|
ttfts = []
|
|
itls = []
|
|
e2els = []
|
|
|
|
for i, output in enumerate(outputs):
|
|
if output.success:
|
|
output_len = output.output_tokens
|
|
|
|
if not output_len:
|
|
# Use tokenizer to count output tokens if not provided
|
|
output_len = len(
|
|
tokenizer(output.generated_text, add_special_tokens=False).input_ids
|
|
)
|
|
|
|
total_output += output_len
|
|
total_input += input_requests[i].prompt_len
|
|
|
|
if hasattr(output, "ttft") and output.ttft is not None:
|
|
ttfts.append(output.ttft)
|
|
|
|
if hasattr(output, "itl") and output.itl:
|
|
# Ensure itl is a list of floats
|
|
if isinstance(output.itl, list):
|
|
itls.extend(output.itl)
|
|
else:
|
|
logger.warning(
|
|
"Expected list for ITL but got %s. Appending as is.",
|
|
type(output.itl),
|
|
)
|
|
itls.append(output.itl)
|
|
|
|
if hasattr(output, "latency") and output.latency is not None:
|
|
e2els.append(output.latency)
|
|
|
|
completed += 1
|
|
|
|
return BenchmarkMetrics(
|
|
completed=completed,
|
|
total_input=total_input,
|
|
total_output=total_output,
|
|
mean_ttft_ms=np.mean(ttfts or [0]) * 1000,
|
|
median_ttft_ms=np.median(ttfts or [0]) * 1000,
|
|
std_ttft_ms=np.std(ttfts or [0]) * 1000,
|
|
percentiles_ttft_ms=[
|
|
(p, np.percentile(ttfts or [0], p) * 1000) for p in selected_percentiles
|
|
],
|
|
mean_itl_ms=np.mean(itls or [0]) * 1000,
|
|
median_itl_ms=np.median(itls or [0]) * 1000,
|
|
std_itl_ms=np.std(itls or [0]) * 1000,
|
|
percentiles_itl_ms=[
|
|
(p, np.percentile(itls or [0], p) * 1000) for p in selected_percentiles
|
|
],
|
|
mean_e2el_ms=np.mean(e2els or [0]) * 1000,
|
|
median_e2el_ms=np.median(e2els or [0]) * 1000,
|
|
std_e2el_ms=np.std(e2els or [0]) * 1000,
|
|
percentiles_e2el_ms=[
|
|
(p, np.percentile(e2els or [0], p) * 1000) for p in selected_percentiles
|
|
],
|
|
)
|
|
|
|
|
|
def print_results(metrics: BenchmarkMetrics, benchmark_duration: float):
|
|
"""Print benchmark results in a formatted way."""
|
|
print("{s:{c}^{n}}".format(s=" Sequential Benchmark Result ", n=60, c="="))
|
|
print("{:<40} {:<10}".format("Successful requests:", metrics.completed))
|
|
print("{:<40} {:<10.2f}".format("Benchmark duration (s):", benchmark_duration))
|
|
print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input))
|
|
print("{:<40} {:<10}".format("Total generated tokens:", metrics.total_output))
|
|
|
|
def print_metric_stats(metric_name, header):
|
|
print("{s:{c}^{n}}".format(s=header, n=60, c="-"))
|
|
print(
|
|
"{:<40} {:<10.2f}".format(
|
|
f"Mean {metric_name} (ms):",
|
|
getattr(metrics, f"mean_{metric_name.lower()}_ms"),
|
|
)
|
|
)
|
|
print(
|
|
"{:<40} {:<10.2f}".format(
|
|
f"Median {metric_name} (ms):",
|
|
getattr(metrics, f"median_{metric_name.lower()}_ms"),
|
|
)
|
|
)
|
|
|
|
for p, value in getattr(metrics, f"percentiles_{metric_name.lower()}_ms"):
|
|
p_word = str(int(p)) if int(p) == p else str(p)
|
|
print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value))
|
|
|
|
print_metric_stats("TTFT", "Time to First Token")
|
|
print_metric_stats("ITL", "Inter-token Latency")
|
|
print_metric_stats("E2EL", "End-to-end Latency")
|
|
print("=" * 60)
|
|
|
|
|
|
async def main_async(args):
|
|
# Import needed functions based on your setup
|
|
from backend_request_func import ASYNC_REQUEST_FUNCS
|
|
|
|
backend = args.backend
|
|
model_id = args.model
|
|
tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model
|
|
|
|
# Set up API URL
|
|
if args.base_url is not None:
|
|
api_url = f"{args.base_url}{args.endpoint}"
|
|
else:
|
|
api_url = f"http://{args.host}:{args.port}{args.endpoint}"
|
|
|
|
# Set up Cache Reset URL
|
|
cache_reset_url = f"http://{args.host}:{args.port}/reset_prefix_cache"
|
|
logger.info("Prefix cache reset configured at: %s", cache_reset_url)
|
|
|
|
# Get tokenizer
|
|
tokenizer = get_tokenizer(tokenizer_id, trust_remote_code=args.trust_remote_code)
|
|
|
|
# Get request function
|
|
if backend in ASYNC_REQUEST_FUNCS:
|
|
request_func = ASYNC_REQUEST_FUNCS[backend]
|
|
else:
|
|
raise ValueError(f"Unknown backend: {backend}")
|
|
|
|
input_requests = RandomDataset().sample(
|
|
tokenizer=tokenizer,
|
|
num_requests=args.num_requests,
|
|
prefix_len=0,
|
|
input_len=args.input_len,
|
|
output_len=args.output_len,
|
|
range_ratio=0.0,
|
|
)
|
|
|
|
# Run benchmark
|
|
result = await sequential_benchmark(
|
|
backend=backend,
|
|
api_url=api_url,
|
|
model_id=model_id,
|
|
tokenizer=tokenizer,
|
|
input_requests=input_requests,
|
|
request_func=request_func,
|
|
selected_percentiles=[50, 90, 95, 99],
|
|
cache_reset_url=cache_reset_url,
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def main(args):
|
|
print(args)
|
|
random.seed(args.seed)
|
|
np.random.seed(args.seed)
|
|
|
|
asyncio.run(main_async(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Sequential benchmark for LLM serving")
|
|
parser.add_argument(
|
|
"--backend", type=str, default="vllm", help="Backend to use for requests"
|
|
)
|
|
parser.add_argument(
|
|
"--base-url",
|
|
type=str,
|
|
default=None,
|
|
help="Server base URL (overrides --host and --port)",
|
|
)
|
|
parser.add_argument("--host", type=str, default="127.0.0.1")
|
|
parser.add_argument("--port", type=int, default=8000)
|
|
parser.add_argument(
|
|
"--endpoint", type=str, default="/v1/completions", help="API endpoint"
|
|
)
|
|
parser.add_argument("--model", type=str, required=True, help="Name of the model")
|
|
parser.add_argument(
|
|
"--tokenizer", type=str, help="Name of the tokenizer (defaults to model name)"
|
|
)
|
|
parser.add_argument(
|
|
"--num-requests", type=int, default=100, help="Number of requests to process"
|
|
)
|
|
parser.add_argument(
|
|
"--input-len", type=int, default=128, help="Input len for generated prompts"
|
|
)
|
|
parser.add_argument(
|
|
"--output-len", type=int, default=None, help="Override output len for requests"
|
|
)
|
|
parser.add_argument("--seed", type=int, default=42)
|
|
parser.add_argument(
|
|
"--trust-remote-code",
|
|
action="store_true",
|
|
help="Trust remote code from HuggingFace",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
main(args)
|