TL;DR: Don’t start with a chat demo. Start with one valuable, verifiable job; wrap a deterministic orchestrator around an LLM; add typed tools, grounding, budgets/safety, observability, and a tiny evaluation harness. Ship something you can trust, measure, and improve.
0) Define the job → success criteria → guardrails
Pick one narrow, high-value workflow.
Example job: “Given a user question, search internal docs and return a cited answer under 200 words.”
Success: exact answer present in source, ≤ 200 words, cites ≥ 1 doc link.
Non-goals: chit-chat, multi-turn memory, reasoning about PDFs.
Guardrails: max 6 steps, ≤ 8s P95 latency, ≤ ₹X per 100 tasks, no PII in logs.
This forces you to design for outcomes, not vibes.
1) Minimal architecture (one page)
Client/UI → sends
goal + context
.Orchestrator (deterministic) → state machine/graph that caps steps/time.
LLM Adapter → provider-agnostic wrapper.
Tools/Skills (typed) → search, retrieval, calculators, formatters.
Grounding Layer → retriever + re-ranker + cache.
Budgets & Safety → token/time/₹ caps, PII redaction, allow/deny tool lists.
Observability → traces, metrics, structured logs.
Eval Harness → 30–50 cases + leaderboard.
2) Project scaffold
llm_app/ app.py # FastAPI/CLI entry orchestrator.py # loop/graph + guards llm.py # provider-agnostic client tools.py # typed tool contracts grounding.py # retriever/reranker/cache safety.py # PII redaction, policy checks observability.py # traces/metrics/logs eval/ cases.jsonl # test set judge.py # LLM-as-judge (optional)
3) Provider-agnostic LLM adapter
Keep the rest of your code oblivious to vendors.
# llm.pyimport os, httpx, jsonfrom typing import Dict, Any class LLMClient: def __init__(self, base_url: str = os.getenv("LLM_BASE_URL"), api_key: str = os.getenv("LLM_API_KEY")): self.base_url, self.api_key = base_url, api_key async def chat(self, messages: list[Dict[str, str]], tools: list[Dict[str, Any]] | None = None, **params): headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} payload = {"messages": messages, "tools": tools or [], **params} async with httpx.AsyncClient(timeout=15) as client: r = await client.post(f"{self.base_url}/chat", headers=headers, json=payload) r.raise_for_status() return r.json() # Expect { "output": "...", "tool_call": {...}? }
4) Typed tools with validation
Your tools are the system’s muscles. Type them.
# tools.pyfrom pydantic import BaseModel, Field class SearchInput(BaseModel): query: str = Field(min_length=3, max_length=256) top_k: int = Field(default=5, ge=1, le=10) async def search_docs(args: SearchInput) -> dict: # plug your own vector/keyword search; return {"hits":[{"id":..., "text":..., "url":...}, ...]} ... TOOL_SPEC = [{ "name": "search_docs", "description": "Search internal knowledge base", "input_schema": SearchInput.model_json_schema()}]
5) Grounding micro-stack (retriever → reranker → cache)
Start simple: hybrid BM25 + vectors, then rerank.
# grounding.pyfrom functools import lru_cache @lru_cache(maxsize=2048)def cache_key(q: str, k: int) -> tuple: return (q.strip().lower(), k) async def retrieve(query: str, top_k: int = 5) -> list[dict]: # 1) lexical + vector search hits = await search_index(query=query, top_k=top_k*3) # 2) rerank (cross-encoder or fast heuristic) reranked = await rerank(query, hits) return reranked[:top_k]
Tip: Log retrieval diagnostics (recall@k on eval set) from day one.
6) Deterministic orchestrator (bounded loop)
Wrap the LLM in a predictable skeleton. No unbounded “think loops”.
# orchestrator.pyfrom typing import Anyfrom tools import TOOL_SPEC, SearchInput, search_docsfrom safety import redact_pii, check_policyfrom grounding import retrievefrom observability import trace_span, counter MAX_STEPS = 6 TOOL_REGISTRY = {"search_docs": (SearchInput, search_docs)} @trace_span("run_task")async def run_task(goal: str, llm, budgets: dict[str, Any]): history = [{"role": "system", "content": "You answer only with grounded, cited text ≤ 200 words."}] history += [{"role": "user", "content": redact_pii(goal)}] cost = 0; steps = 0 while steps < MAX_STEPS and cost < budgets["max_tokens"]: resp = await llm.chat(messages=history, tools=TOOL_SPEC, temperature=0.2, max_tokens=400) cost += resp.get("usage", {}).get("total_tokens", 0) if tool := resp.get("tool_call"): name, args = tool["name"], tool["arguments"] if name not in TOOL_REGISTRY: break schema, fn = TOOL_REGISTRY[name] inputs = schema(**args) # validate if name == "search_docs": hits = await retrieve(inputs.query, inputs.top_k) tool_obs = {"tool": name, "result": hits} else: tool_obs = await fn(inputs) history += [{"role": "tool", "name": name, "content": tool_obs}] else: answer = resp["output"] if not check_policy(answer): # safety gate history += [{"role": "system", "content": "Revise to meet policy: no PII, cite sources."}] steps += 1; continue counter("task.success").inc() return answer steps += 1 counter("task.timeouts").inc() return "Unable to produce a grounded answer within limits. Escalating."
7) Safety, budgets, and failure-mode design
Budgets: cap
MAX_STEPS
, tokens, wall-clock time, and requests.PII: redact before send; never log raw inputs; hash user IDs.
Allow/Deny tools: deny state-mutating tools unless preconditions pass.
Fallbacks: graceful “can’t answer” with escalation payload (question + retrieved hits + logs).
8) Observability (from day one)
Instrument everything.
Traces: one span per LLM/tool call with inputs/outputs sizes, durations.
Metrics:
task_success
,time_ms_p50/p95
,tokens
,tool_calls
,timeouts
,escalations
.Logs: structured (JSON) per step with correlation IDs.
# observability.pyimport time, json, functoolsfrom collections import Counter_metrics = Counter() def counter(name): class C: def inc(self, v=1): _metrics[name]+=v return C() def trace_span(name): def deco(fn): @functools.wraps(fn) async def wrap(*a, **k): t=time.time() try: return await fn(*a, **k) finally: print(json.dumps({"span":name,"ms":int((time.time()-t)*1000)})) return wrap return deco
9) Tiny evaluation harness (make it real)
Create 30–50 real tasks with gold answers + evidence links.
eval/cases.jsonl
{"id":"q1","question":"How do we rotate API keys?","gold_contains":["rotate every 90 days"],"must_cite":true}
Runner:
# eval/judge.pyimport json, asynciofrom orchestrator import run_taskfrom llm import LLMClient async def evaluate(): llm = LLMClient() ok=0; total=0 for line in open("eval/cases.jsonl"): c = json.loads(line); total+=1 out = await run_task(c["question"], llm, budgets={"max_tokens":4000}) pass_contains = all(x.lower() in out.lower() for x in c["gold_contains"]) pass_citation = ("http" in out) if c.get("must_cite") else True ok += (pass_contains and pass_citation) print(f"TSR: {ok}/{total} = {ok/total:.2%}") if __name__ == "__main__": asyncio.run(evaluate())
Track TSR (task success rate) and latency before you add features.
10) Rollout checklist
Success criteria + 30–50 eval cases.
Deterministic orchestrator + budgets.
Typed tools with validation + idempotency.
Retrieval quality checked on eval set.
Safety gates (redaction, policy).
Traces/metrics/logs + alerting.
P95 latency and cost within budget.
Escalation path defined.
11) Sensible next steps
Memory: add short-term scratchpad summarization; add long-term only if it improves TSR or latency.
Caching: response + retrieval caches with TTL and invalidation hooks.
Reranking: upgrade to learned rerankers when drift appears.
Multi-agent: introduce a router/specialists only when specialization yields measurable gains.
Guarded generation: structured outputs (JSON schemas) to reduce post-processing errors.
Closing
“Building an LLM app” isn’t about calling a model; it’s about owning the loop—planning, tool use, grounding, safety, and measurement. Ship the smallest version that’s reliable under constraints, then iterate with evidence.