Building Your First LLM Application: a Production-Grade Path (Not a Toy)

TL;DR: Don’t start with a chat demo. Start with one valuable, verifiable job; wrap a deterministic orchestrator around an LLM; add typed tools, grounding, budgets/safety, observability, and a tiny evaluation harness. Ship something you can trust, measure, and improve.


0) Define the job → success criteria → guardrails

Pick one narrow, high-value workflow.

Example job: “Given a user question, search internal docs and return a cited answer under 200 words.”

  • Success: exact answer present in source, ≤ 200 words, cites ≥ 1 doc link.

  • Non-goals: chit-chat, multi-turn memory, reasoning about PDFs.

  • Guardrails: max 6 steps, ≤ 8s P95 latency, ≤ ₹X per 100 tasks, no PII in logs.

This forces you to design for outcomes, not vibes.


1) Minimal architecture (one page)

  • Client/UI → sends goal + context.

  • Orchestrator (deterministic) → state machine/graph that caps steps/time.

  • LLM Adapter → provider-agnostic wrapper.

  • Tools/Skills (typed) → search, retrieval, calculators, formatters.

  • Grounding Layer → retriever + re-ranker + cache.

  • Budgets & Safety → token/time/₹ caps, PII redaction, allow/deny tool lists.

  • Observability → traces, metrics, structured logs.

  • Eval Harness → 30–50 cases + leaderboard.


2) Project scaffold

llm_app/
app.py # FastAPI/CLI entry
orchestrator.py # loop/graph + guards
llm.py # provider-agnostic client
tools.py # typed tool contracts
grounding.py # retriever/reranker/cache
safety.py # PII redaction, policy checks
observability.py # traces/metrics/logs
eval/
cases.jsonl # test set
judge.py # LLM-as-judge (optional)

3) Provider-agnostic LLM adapter

Keep the rest of your code oblivious to vendors.

# llm.py
import os, httpx, json
from typing import Dict, Any
class LLMClient:
def __init__(self, base_url: str = os.getenv("LLM_BASE_URL"), api_key: str = os.getenv("LLM_API_KEY")):
self.base_url, self.api_key = base_url, api_key
async def chat(self, messages: list[Dict[str, str]], tools: list[Dict[str, Any]] | None = None, **params):
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {"messages": messages, "tools": tools or [], **params}
async with httpx.AsyncClient(timeout=15) as client:
r = await client.post(f"{self.base_url}/chat", headers=headers, json=payload)
r.raise_for_status()
return r.json() # Expect { "output": "...", "tool_call": {...}? }

4) Typed tools with validation

Your tools are the system’s muscles. Type them.

# tools.py
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(min_length=3, max_length=256)
top_k: int = Field(default=5, ge=1, le=10)
async def search_docs(args: SearchInput) -> dict:
# plug your own vector/keyword search; return {"hits":[{"id":..., "text":..., "url":...}, ...]}
...
TOOL_SPEC = [{
"name": "search_docs",
"description": "Search internal knowledge base",
"input_schema": SearchInput.model_json_schema()
}]

5) Grounding micro-stack (retriever → reranker → cache)

Start simple: hybrid BM25 + vectors, then rerank.

# grounding.py
from functools import lru_cache
@lru_cache(maxsize=2048)
def cache_key(q: str, k: int) -> tuple: return (q.strip().lower(), k)
async def retrieve(query: str, top_k: int = 5) -> list[dict]:
# 1) lexical + vector search
hits = await search_index(query=query, top_k=top_k*3)
# 2) rerank (cross-encoder or fast heuristic)
reranked = await rerank(query, hits)
return reranked[:top_k]

Tip: Log retrieval diagnostics (recall@k on eval set) from day one.


6) Deterministic orchestrator (bounded loop)

Wrap the LLM in a predictable skeleton. No unbounded “think loops”.

# orchestrator.py
from typing import Any
from tools import TOOL_SPEC, SearchInput, search_docs
from safety import redact_pii, check_policy
from grounding import retrieve
from observability import trace_span, counter
MAX_STEPS = 6
TOOL_REGISTRY = {"search_docs": (SearchInput, search_docs)}
@trace_span("run_task")
async def run_task(goal: str, llm, budgets: dict[str, Any]):
history = [{"role": "system", "content": "You answer only with grounded, cited text ≤ 200 words."}]
history += [{"role": "user", "content": redact_pii(goal)}]
cost = 0; steps = 0
while steps < MAX_STEPS and cost < budgets["max_tokens"]:
resp = await llm.chat(messages=history, tools=TOOL_SPEC, temperature=0.2, max_tokens=400)
cost += resp.get("usage", {}).get("total_tokens", 0)
if tool := resp.get("tool_call"):
name, args = tool["name"], tool["arguments"]
if name not in TOOL_REGISTRY: break
schema, fn = TOOL_REGISTRY[name]
inputs = schema(**args) # validate
if name == "search_docs":
hits = await retrieve(inputs.query, inputs.top_k)
tool_obs = {"tool": name, "result": hits}
else:
tool_obs = await fn(inputs)
history += [{"role": "tool", "name": name, "content": tool_obs}]
else:
answer = resp["output"]
if not check_policy(answer): # safety gate
history += [{"role": "system", "content": "Revise to meet policy: no PII, cite sources."}]
steps += 1; continue
counter("task.success").inc()
return answer
steps += 1
counter("task.timeouts").inc()
return "Unable to produce a grounded answer within limits. Escalating."

7) Safety, budgets, and failure-mode design

  • Budgets: cap MAX_STEPS, tokens, wall-clock time, and requests.

  • PII: redact before send; never log raw inputs; hash user IDs.

  • Allow/Deny tools: deny state-mutating tools unless preconditions pass.

  • Fallbacks: graceful “can’t answer” with escalation payload (question + retrieved hits + logs).


8) Observability (from day one)

Instrument everything.

  • Traces: one span per LLM/tool call with inputs/outputs sizes, durations.

  • Metrics: task_success, time_ms_p50/p95, tokens, tool_calls, timeouts, escalations.

  • Logs: structured (JSON) per step with correlation IDs.

# observability.py
import time, json, functools
from collections import Counter
_metrics = Counter()
def counter(name):
class C:
def inc(self, v=1): _metrics[name]+=v
return C()
def trace_span(name):
def deco(fn):
@functools.wraps(fn)
async def wrap(*a, **k):
t=time.time()
try: return await fn(*a, **k)
finally: print(json.dumps({"span":name,"ms":int((time.time()-t)*1000)}))
return wrap
return deco

9) Tiny evaluation harness (make it real)

Create 30–50 real tasks with gold answers + evidence links.

eval/cases.jsonl

{"id":"q1","question":"How do we rotate API keys?","gold_contains":["rotate every 90 days"],"must_cite":true}

Runner:

# eval/judge.py
import json, asyncio
from orchestrator import run_task
from llm import LLMClient
async def evaluate():
llm = LLMClient()
ok=0; total=0
for line in open("eval/cases.jsonl"):
c = json.loads(line); total+=1
out = await run_task(c["question"], llm, budgets={"max_tokens":4000})
pass_contains = all(x.lower() in out.lower() for x in c["gold_contains"])
pass_citation = ("http" in out) if c.get("must_cite") else True
ok += (pass_contains and pass_citation)
print(f"TSR: {ok}/{total} = {ok/total:.2%}")
if __name__ == "__main__":
asyncio.run(evaluate())

Track TSR (task success rate) and latency before you add features.


10) Rollout checklist

  • Success criteria + 30–50 eval cases.

  • Deterministic orchestrator + budgets.

  • Typed tools with validation + idempotency.

  • Retrieval quality checked on eval set.

  • Safety gates (redaction, policy).

  • Traces/metrics/logs + alerting.

  • P95 latency and cost within budget.

  • Escalation path defined.


11) Sensible next steps

  • Memory: add short-term scratchpad summarization; add long-term only if it improves TSR or latency.

  • Caching: response + retrieval caches with TTL and invalidation hooks.

  • Reranking: upgrade to learned rerankers when drift appears.

  • Multi-agent: introduce a router/specialists only when specialization yields measurable gains.

  • Guarded generation: structured outputs (JSON schemas) to reduce post-processing errors.


Closing

“Building an LLM app” isn’t about calling a model; it’s about owning the loop—planning, tool use, grounding, safety, and measurement. Ship the smallest version that’s reliable under constraints, then iterate with evidence.