### Cell 1 - Initialize Ray endpoints and verify dashboard

Installs requests, derives the Ray head host from RAY_ADDRESS, builds Dashboard/Serve/MLflow URLs, reads an Hugging Face token, and prints the endpoints plus the Jobs API version for a quick health check.

In [None]:
!pip -q install requests==2.* --disable-pip-version-check

import os, textwrap, base64, time, json, requests
from string import Template

raw_addr = os.getenv("RAY_ADDRESS", "ray://ai-starter-kit-kuberay-head-svc:10001")
if raw_addr.startswith("ray://"):
    HEAD_HOST = raw_addr.split("://", 1)[1].split(":", 1)[0]
else:
    HEAD_HOST = raw_addr.split(":", 1)[0] or "ai-starter-kit-kuberay-head-svc"

DASH_URL    = f"http://{HEAD_HOST}:8265"
SERVE_PORT  = int(os.getenv("SERVE_PORT", "8000"))
SERVE_ROUTE = "/v1"

HF_TOKEN_PATH = "/etc/secrets/huggingface/token"
HF_TOKEN = ""
if os.path.exists(HF_TOKEN_PATH):
    try:
        HF_TOKEN = open(HF_TOKEN_PATH).read().strip()
    except Exception:
        HF_TOKEN = ""

print("Head host:", HEAD_HOST)
print("Jobs API :", f"{DASH_URL}/api/jobs/")
print("Serve URL:", f"http://{HEAD_HOST}:{SERVE_PORT}{SERVE_ROUTE}")
print("MLflow   :", os.getenv("MLFLOW_TRACKING_URI", "http://ai-starter-kit-mlflow:5000"))

print("Jobs API version:", requests.get(f"{DASH_URL}/api/version", timeout=10).json())


### Cell 2 - Deploy a minimal Ray Serve smoke test and verify readiness

Submits a tiny FastAPI app to Ray Serve (one /healthz endpoint under /smoke) as a Ray Job, installing FastAPI on the fly. It polls the Jobs API for status and hits :8000/smoke/healthz up to 60 seconds, printing when the service responds 200 (i.e., smoke test passes).

In [None]:
import os, base64, textwrap, time, requests

DASH_URL = "http://ai-starter-kit-kuberay-head-svc:8265"

print("Jobs API:", requests.get(f"{DASH_URL}/api/version", timeout=10).json())

serve_py = textwrap.dedent("""
    from fastapi import FastAPI
    from ray import serve
    serve.start(detached=True, http_options={"host":"0.0.0.0","port":8000})
    app = FastAPI()

    @serve.deployment(name="smoke", num_replicas=1)
    @serve.ingress(app)
    class Smoke:
        @app.get("/healthz")
        async def health(self): return {"ok": True}

    serve.run(Smoke.bind(), route_prefix="/smoke")
    print("READY: smoke", flush=True)
""").strip()

b64 = base64.b64encode(serve_py.encode()).decode()
entry = f'python -c "import base64; exec(base64.b64decode(\'{b64}\'))"'
submit = requests.post(f"{DASH_URL}/api/jobs/", json={"entrypoint": entry, "runtime_env": {"pip": ["fastapi>=0.110"]}}, timeout=60).json()
job_id = submit["job_id"]
print("Job:", job_id)

svc = "http://ai-starter-kit-kuberay-head-svc:8000/smoke/healthz"
for i in range(60):
    s = requests.get(f"{DASH_URL}/api/jobs/{job_id}", timeout=10).json()["status"]
    try:
        r = requests.get(svc, timeout=2)
        print(f"tick {i:02d}: job={s}, health={r.status_code}")
        if r.status_code == 200:
            print("Smoke OK")
            break
    except Exception as e:
        print(f"tick {i:02d}: job={s}, health=ERR {e}")
    time.sleep(1)

### Cell 3 - Deploy model on Ray Serve with llama-cpp

Packages and submits a Ray Job that spins up a Ray Serve app exposing /v1/healthz and /v1/chat/completions. It downloads the preferred GGUF from Hugging Face, initializes llama-cpp-python, logs to MLflow, and prints the deployed health/chat URLs.

In [None]:
import os, base64, textwrap, requests

HEAD        = os.environ.get("RAY_HEAD_SVC", "ai-starter-kit-kuberay-head-svc")
DASH_URL    = f"http://{HEAD}:8265"
SERVE_PORT  = 8000
SERVE_ROUTE = "/v1"

runtime_env = {
    "pip": [
        "fastapi==0.110.0",
        "uvicorn==0.23.2",
        "huggingface_hub==0.25.2",
        "llama-cpp-python==0.3.16",   
        "hf_transfer==0.1.6",
        "mlflow==2.14.3",            
    ],
    "env_vars": {
        "HF_HUB_ENABLE_HF_TRANSFER": "1",
        "HUGGINGFACE_HUB_TOKEN": os.environ.get("HUGGINGFACE_HUB_TOKEN", ""),
        "SERVE_PORT": str(SERVE_PORT),

        "MODEL_REPO": "Qwen/Qwen2.5-1.5B-Instruct-GGUF",
        "GGUF_PREF_ORDER": "q4_k_m,q4_0,q3_k_m,q2_k",

        "LLM_CONTEXT": os.environ.get("LLM_CONTEXT", "1024"),
        "LLM_MAX_TOKENS": os.environ.get("LLM_MAX_TOKENS", "256"),
        "SERVER_MAX_NEW_TOKENS": os.environ.get("SERVER_MAX_NEW_TOKENS", "512"),

        "LLM_THREADS": os.environ.get("LLM_THREADS", "6"),
        "OMP_NUM_THREADS": os.environ.get("OMP_NUM_THREADS", "6"),
        "GPU_LAYERS": "0",   
        
        "PIP_PREFER_BINARY": "1",
        "CMAKE_ARGS": "-DGGML_OPENMP=OFF -DLLAMA_NATIVE=OFF",

        "HF_HOME": "/tmp/hf-cache",
        "TRANSFORMERS_CACHE": "/tmp/hf-cache",

        "MLFLOW_TRACKING_URI": os.environ.get("MLFLOW_TRACKING_URI", ""),
        "MLFLOW_EXPERIMENT_NAME": os.environ.get("MLFLOW_EXPERIMENT_NAME", "ray-llama-cpp"),
    },
}

serve_py = textwrap.dedent(f"""
import os, time, multiprocessing, uuid
from typing import List, Dict, Any
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from huggingface_hub import HfApi, hf_hub_download
from ray import serve
from llama_cpp import Llama

USE_MLFLOW = False
try:
    import mlflow
    if os.getenv("MLFLOW_TRACKING_URI"):
        mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
        mlflow.set_experiment(os.getenv("MLFLOW_EXPERIMENT_NAME","ray-llama-cpp"))
        USE_MLFLOW = True
except Exception as _e:
    USE_MLFLOW = False

SERVE_PORT  = int(os.getenv("SERVE_PORT", "{SERVE_PORT}"))
SERVE_ROUTE = "{SERVE_ROUTE}"
MODEL_REPO  = os.getenv("MODEL_REPO", "Qwen/Qwen2.5-1.5B-Instruct-GGUF")
GGUF_PREFS  = [s.strip() for s in os.getenv("GGUF_PREF_ORDER","q4_k_m,q4_0,q3_k_m,q2_k").split(",") if s.strip()]
CTX_LEN     = int(os.getenv("LLM_CONTEXT", "2048"))
MAX_TOKENS  = int(os.getenv("LLM_MAX_TOKENS", "256"))
HF_TOKEN    = os.getenv("HUGGINGFACE_HUB_TOKEN") or None

serve.start(detached=True, http_options={{"host":"0.0.0.0", "port":SERVE_PORT}})
app = FastAPI()

def pick_one_file(repo_id: str, prefs):
    api = HfApi()
    files = api.list_repo_files(repo_id=repo_id, repo_type="model", token=HF_TOKEN)
    ggufs = [f for f in files if f.lower().endswith(".gguf")]
    if not ggufs:
        raise RuntimeError(f"No .gguf files visible in {{repo_id}}")
    for pref in prefs:
        for f in ggufs:
            if pref.lower() in f.lower():
                return f
    return ggufs[0]

def pick_chat_format(repo: str, fname: str) -> str:
    return "qwen"

@serve.deployment(name="qwen", num_replicas=1, ray_actor_options={{"num_cpus": 6}})
@serve.ingress(app)
class OpenAICompatLlama:
    def __init__(self, repo_id: str = MODEL_REPO):
        target = pick_one_file(repo_id, GGUF_PREFS)
        print(f"[env] model repo: {{repo_id}} file: {{target}}", flush=True)
        local_dir = "/tmp/hf-gguf"; os.makedirs(local_dir, exist_ok=True)

        gguf_path = hf_hub_download(
            repo_id=repo_id, filename=target, token=HF_TOKEN,
            local_dir=local_dir, local_dir_use_symlinks=False,
            force_download=False, resume_download=True
        )
        print(f"[download] done: {{gguf_path}}", flush=True)

        n_threads = int(os.getenv("LLM_THREADS", max(2, (multiprocessing.cpu_count() or 4)//2)))
        print(f"[load] llama-cpp-python | ctx={{CTX_LEN}} threads={{n_threads}} gpu_layers={{int(os.getenv('GPU_LAYERS','0'))}}", flush=True)

        self.model_file = os.path.basename(gguf_path)
        self.model_repo = repo_id
        chat_format = pick_chat_format(self.model_repo, self.model_file)
        print(f"[load] chat_format={{chat_format}}", flush=True)

        self.llm = Llama(
            model_path=gguf_path,
            n_ctx=CTX_LEN,
            n_threads=n_threads,
            n_batch=256,                                
            n_gpu_layers=int(os.getenv("GPU_LAYERS","0")),
            chat_format=chat_format,
            verbose=False
        )
        print("[ready] model loaded", flush=True)

    @app.get("/healthz")
    async def health(self):
        return {{"status":"ok"}}

    @app.post("/chat/completions")
    async def chat_completions(self, request: Request):
        t0 = time.time()
        body = await request.json()

        messages    = body.get("messages", [])
        temperature = float(body.get("temperature", 0.2))
        req_max     = body.get("max_tokens", None)
        stop_words  = (body.get("stop", []) or []) + ["<|im_end|>", "</s>"]

        SERVER_MAX  = int(os.getenv("SERVER_MAX_NEW_TOKENS", "512"))
        max_tokens  = int(req_max if isinstance(req_max, int) else MAX_TOKENS)
        max_tokens  = max(32, min(max_tokens, CTX_LEN - 128, SERVER_MAX))

        rid = "chatcmpl-" + uuid.uuid4().hex[:24]
        created = int(time.time())
        model_name = f"{{self.model_repo}}/{{self.model_file}}"

        try:
            result = self.llm.create_chat_completion(
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                top_k=50,
                top_p=0.9,
                repeat_penalty=1.1,
                stop=stop_words,
            )
            out_text = (result["choices"][0]["message"]["content"] or "").strip()
            usage_raw = result.get("usage") or {{}}
            p_tokens = int(usage_raw.get("prompt_tokens") or 0)
            c_tokens = int(usage_raw.get("completion_tokens") or 0)
            err = None
        except Exception as e:
            out_text = ""
            p_tokens = c_tokens = 0
            err = str(e)

        if USE_MLFLOW:
            try:
                dur_ms = int((time.time()-t0) * 1000)
                with mlflow.start_run(run_name="chat"):
                    mlflow.set_tags({{
                        "model_repo": self.model_repo,
                        "model_file": self.model_file,
                        "framework": "llama-cpp-python",
                    }})
                    mlflow.log_params({{
                        "temperature": temperature,
                        "max_tokens": max_tokens,
                        "ctx": CTX_LEN,
                    }})
                    if not (p_tokens and c_tokens):
                        p_tokens = p_tokens or max(1, len(" ".join(m.get("content","") for m in messages).split()))
                        c_tokens = c_tokens or max(0, len(out_text.split()))
                    mlflow.log_metrics({{
                        "duration_ms": dur_ms,
                        "prompt_tokens_approx": p_tokens,
                        "completion_tokens_approx": c_tokens,
                        "total_tokens_approx": p_tokens + c_tokens,
                    }})
            except Exception:
                pass

        if err:
            return JSONResponse(status_code=500, content={{"error": err, "type":"generation_error"}})

        usage = {{
            "prompt_tokens": p_tokens,
            "completion_tokens": c_tokens,
            "total_tokens": p_tokens + c_tokens,
        }}
        return {{
            "id": rid,
            "object": "chat.completion",
            "created": created,
            "model": model_name,
            "choices": [
                {{
                    "index": 0,
                    "message": {{"role":"assistant","content": out_text}},
                    "finish_reason": "stop"
                }}
            ],
            "usage": usage
        }}

serve.run(OpenAICompatLlama.bind(), route_prefix=SERVE_ROUTE)
print("READY", flush=True)
""").strip()

payload = base64.b64encode(serve_py.encode()).decode()
entrypoint = 'python -c "import base64,sys;exec(base64.b64decode(\'{}\').decode())"'.format(payload)

job = requests.post(
    f"{DASH_URL}/api/jobs/",
    json={
        "entrypoint": entrypoint,
        "runtime_env": runtime_env,
        "metadata": {"job_name": "serve-qwen2_5-llama_cpp-openai"},
    },
    timeout=45
).json()

print("Job:", job.get("job_id"))
print("Health:", f"http://{HEAD}:{SERVE_PORT}{SERVE_ROUTE}/healthz")
print("Chat:  ", f"http://{HEAD}:{SERVE_PORT}{SERVE_ROUTE}/chat/completions")

### Cell 4 - Basic client + latency test

Calls /v1/healthz and then sends an OpenAI-style chat request to /v1/chat/completions with a short prompt. Prints latency and token usage, returning the assistant text.

In [None]:
import os, time, requests, json

HEAD       = os.environ.get("RAY_HEAD_SVC", "ai-starter-kit-kuberay-head-svc")
SERVE_PORT = 8000
BASE_URL   = f"http://{HEAD}:{SERVE_PORT}/v1"

def health():
    r = requests.get(f"{BASE_URL}/healthz", timeout=10)
    print("Health:", r.status_code, r.json())

def chat(prompt, temperature=0.4, max_tokens=220, stop=None):
    body = {
        "model": "qwen2.5-1.5b-instruct-gguf",
        "temperature": float(temperature),
        "max_tokens": int(max_tokens),
        "messages": [
            {"role": "system", "content": "You are Qwen2.5 Instruct running on a tiny CPU host. Be concise, complete sentences."},
            {"role": "user", "content": prompt},
        ],
    }
    if stop:
        body["stop"] = stop

    t0 = time.time()
    r = requests.post(f"{BASE_URL}/chat/completions", json=body, timeout=300)
    dt = time.time() - t0
    r.raise_for_status()
    out = r.json()["choices"][0]["message"]["content"]
    usage = r.json().get("usage", {})
    print(f"\nLatency: {dt:.2f}s  | usage: {usage}")
    print("\n---\n", out)
    return out

health()
_ = chat("Say 'test ok' then give me one short fun fact about llamas.", stop=["<|im_end|>"])

### Cell 5 - Multi-agent (Autogen) pipeline

Installs Autogen, configures OpenAIWrapper to hit Ray Serve /v1 endpoint, warms up the model, then runs a simple three-agent workflow (Researcher -> Writer -> Critic) to produce and refine a short report.

In [None]:
!pip -q install pyautogen~=0.2.35 "flaml[automl]" --disable-pip-version-check

import os, sys

for p in [
    "/tmp/models-cache/lib/python3.11/site-packages",               
    os.path.expanduser("~/.local/lib/python3.11/site-packages"), 
]:
    if os.path.isdir(p) and p not in sys.path:
        sys.path.insert(0, p)

import os, autogen
from autogen import AssistantAgent, UserProxyAgent

HEAD        = os.environ.get("RAY_HEAD_SVC", "ai-starter-kit-kuberay-head-svc")
SERVE_PORT  = 8000
BASE_URL    = f"http://{HEAD}:{SERVE_PORT}/v1" 

config_list = [
    {
        "model": "qwen2.5-1.5b-instruct-gguf",  
        "base_url": BASE_URL,            
        "api_key": "local",               
        "price": [0.0, 0.0],
    }
]

llm = autogen.OpenAIWrapper(config_list=config_list)
try:
    r = llm.create(messages=[{"role":"user","content":"Say 'test ok'."}], temperature=0.2, max_tokens=16)
    print("Warmup:", r.choices[0].message.content)
except Exception as e:
    print("Warmup failed:", e)

user_proxy = UserProxyAgent(
    name="UserProxy",
    system_message="You are the human admin. Initiate the task.",
    code_execution_config=False,
    human_input_mode="NEVER",
)

researcher = AssistantAgent(
    name="Researcher",
    system_message=(
        "You are a researcher. Gather concise, verified facts on the topic. "
        "Return several bullet points with inline source domains (e.g., nature.com, ibm.com). "
        "Keep under 100 words total. No made-up sources. "
        "Do not include any special end token."
    ),
    llm_config={"config_list": config_list, "temperature": 0.35, "max_tokens": 140, "timeout": 300},
)

writer = AssistantAgent(
    name="Writer",
    system_message=(
        "You are a writer. Using the Researcher’s notes, produce a clear word report under 160 words. "
        "Avoid speculation. Keep it structured and readable. "
        "Do not include any special end token."
    ),
    llm_config={"config_list": config_list, "temperature": 0.55, "max_tokens": 220, "timeout": 180},
)

critic = AssistantAgent(
    name="Critic",
    system_message=(
        "You are a critic. Review the Writer’s report for accuracy, clarity, and flow."
        "Present the tightened final text and keep it under 140 words. On a new last line output exactly: <|END|>"
    ),
    llm_config={"config_list": config_list, "temperature": 0.45, "max_tokens": 160, "timeout": 300},
)

def run_sequential(task):
    research_response = researcher.generate_reply(messages=[{"content": task, "role": "user"}])
    research_notes = research_response if isinstance(research_response, str) else research_response.get("content", "[no output]")
    print("\nResearch Notes:\n", research_notes)

    writer_prompt = f"Using these research notes, write the report:\n{research_notes}"
    writer_response = writer.generate_reply(messages=[{"content": writer_prompt, "role": "user"}])
    report = writer_response if isinstance(writer_response, str) else writer_response.get("content", "[no output]")
    print("\nDraft Report:\n", report)

    critic_prompt = f"Review this report:\n{report}"
    critic_response = critic.generate_reply(messages=[{"content": critic_prompt, "role": "user"}])
    final_text = critic_response if isinstance(critic_response, str) else critic_response.get("content", "[no output]")
    print("\nFinal Review:\n", final_text)
    return final_text

task = "Research the latest advancements in quantum computing as of 2025. Gather key facts, then write a short report (200–300 words). Have the Critic review and finalize."
final_output = run_sequential(task)

### Cell 6 - MLFlow: connect to tracking server and list recent chat runs

Installs MLflow, sets the tracking URI and experiment, then queries and prints the latest runs with key params/metrics (temperature, max_tokens, duration) to verify Serve logging.

In [None]:
!pip -q install mlflow==2.14.3 --disable-pip-version-check

import os, mlflow
from datetime import datetime

tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "http://ai-starter-kit-mlflow:5000")
mlflow.set_tracking_uri(tracking_uri)
print(f"MLflow Tracking URI: {tracking_uri}")

exp_name = os.getenv("MLFLOW_EXPERIMENT_NAME", "ray-llama-cpp")
exp = mlflow.set_experiment(exp_name)
print(f"Experiment: {exp.name} (ID: {exp.experiment_id})")
print("-" * 60)

client = mlflow.tracking.MlflowClient()
runs = client.search_runs(
    exp.experiment_id, 
    order_by=["attributes.start_time DESC"], 
    max_results=10
)

if not runs:
    print("No runs found. Run cells 4 or 5 first to generate inference requests.")
else:
    print(f"\nFound {len(runs)} recent runs:")
    print("-" * 60)
    
    for i, run in enumerate(runs, 1):
        start_time = datetime.fromtimestamp(run.info.start_time/1000).strftime('%Y-%m-%d %H:%M:%S')
        duration = run.data.metrics.get('duration_ms', 'N/A')
        temp = run.data.params.get('temperature', 'N/A')
        max_tokens = run.data.params.get('max_tokens', 'N/A')
        total_tokens = run.data.metrics.get('total_tokens_approx', 'N/A')
        
        print(f"\nRun {i}:")
        print(f"  ID:          {run.info.run_id[:12]}...")
        print(f"  Time:        {start_time}")
        print(f"  Status:      {run.info.status}")
        print(f"  Temperature: {temp}")
        print(f"  Max Tokens:  {max_tokens}")
        print(f"  Duration:    {duration} ms")
        print(f"  Total Tokens: {total_tokens}")
    
    print("\n" + "=" * 60)
    print("SUMMARY:")
    successful = sum(1 for r in runs if r.info.status == 'FINISHED')
    durations = [r.data.metrics.get('duration_ms', 0) for r in runs if r.data.metrics.get('duration_ms')]
    avg_duration = sum(durations) / len(durations) if durations else 0
    
    print(f"  Total Runs: {len(runs)}")
    print(f"  Successful: {successful}")
    print(f"  Failed: {len(runs) - successful}")
    print(f"  Avg Duration: {avg_duration:.1f} ms" if avg_duration else "  Avg Duration: N/A")

print("\n" + "=" * 60)
print("MLflow verification complete")