Artificial Intelligence Methodologies

How to build a production-ready agent with the Anthropic SDK, step by step

How to build a production-ready agent with the Anthropic SDK, step by step

Building a solid agent with the Anthropic SDK no longer requires bespoke glue. This tutorial walks step by step through a production-ready agent in Python: tool use, streaming with backpressure, prompt caching, your own MCP server, and OTel traces. The result is a repository you can deploy as a small container and operate without surprises. See also: the complete guide to the mcp model context protocol for the protocol context that frames the MCP piece of this tutorial.

Key takeaways

  • The Anthropic SDK splits into two layers: the Messages API in the anthropic package for tool use, streaming, and caching, and the Claude Agent SDK (claude-agent-sdk) for registering MCP servers and applying policy with allowed_tools.
  • The tool-use loop is always the same: messages.create with tools=[...], read stop_reason, run the tool_use, send a tool_result, and call again until end_turn.
  • Streaming forces you to think about backpressure from day one: a bounded asyncio.Queue between text_stream and the consumer prevents the SDK from blocking and the UI from falling behind.
  • Prompt caching pays off when there’s a large reused context block; with cache_control={"type": "ephemeral"}, cache hits cost 10% of base input, per Anthropic’s official docs.
  • Observability is now standardised: OpenTelemetry’s GenAI conventions define the gen_ai.provider.name="anthropic" attribute and chat / execute_tool operations that any OTLP backend understands.
  • Packaging in a multi-stage Dockerfile based on python:3.12-slim and a non-root user leaves the agent ready for any modern container runtime.

Prerequisites and project structure

You need Python 3.12 or later, an Anthropic account with ANTHROPIC_API_KEY, Docker for the final step, and optionally a local OpenTelemetry collector (Tempo, Jaeger, or another) to inspect traces during development. If you’re coming from the model where the agent drives the OS UI directly, contrast this approach with Claude’s Computer Use: we work with structured tool use and MCP here, which is far more auditable.

The repository structure separates layers cleanly: agent logic lives under app/, the custom MCP server under mcp_server/, tests under tests/, and operational config under ops/. The header image shows the full tree and acts as a map for the rest of the post.

agente-anthropic-sdk/
├── app/
│   ├── main.py
│   ├── streaming.py
│   ├── caching.py
│   ├── observability.py
│   └── tools/
├── mcp_server/
│   ├── server.py
│   └── tools.py
├── tests/
├── ops/
├── Dockerfile
└── requirements.txt

Minimal dependencies are five. anthropic covers the Messages API; claude-agent-sdk adds MCP integration and the allowed_tools policy; mcp is used for the custom server; opentelemetry-sdk plus the OTLP gRPC exporter instrument the whole thing.

anthropic>=0.40,<0.50
claude-agent-sdk>=0.3,<1.0
mcp>=1.6,<2.0
opentelemetry-sdk>=1.27,<2.0
opentelemetry-exporter-otlp-proto-grpc>=1.27,<2.0
python-dotenv>=1.0,<2.0

Pin versions to a compatible range and no more; the ecosystem moves fast, and a strict pin ages badly within weeks. Load environment variables on startup with python-dotenv from a .env file that never gets committed.

First turn with tool use

The tool-use loop is the heart of the agent. You define a tool with its input_schema JSON Schema, register it in the call to messages.create, and process any tool_use blocks that show up in the response. Once you finish executing the local function, you return the result as tool_result and call the model again with the accumulated conversation. The Anthropic SDK docs[1] put it this way: the model responds with stop_reason: "tool_use", your code runs the operation, and you send back a tool_result to close the loop.

# app/main.py
import anthropic

client = anthropic.Anthropic()

TOOLS = [
    {
        "name": "get_weather",
        "description": "Returns the current temperature for a city.",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "City and country."}
            },
            "required": ["location"],
        },
    }
]


def run_tool(name: str, args: dict) -> str:
    if name == "get_weather":
        # Replace with your real integration (Open-Meteo, AEMET, etc.).
        return f"21C, clear skies in {args['location']}"
    raise ValueError(f"Unknown tool: {name}")


def chat(prompt: str) -> str:
    messages = [{"role": "user", "content": prompt}]
    while True:
        response = client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )
        messages.append({"role": "assistant", "content": response.content})

        if response.stop_reason != "tool_use":
            return "".join(b.text for b in response.content if b.type == "text")

        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                output = run_tool(block.name, block.input)
                tool_results.append(
                    {"type": "tool_result", "tool_use_id": block.id, "content": output}
                )
        messages.append({"role": "user", "content": tool_results})

Three details worth nailing from the first turn. The loop ends when stop_reason is anything other than tool_use; anything else means the model still wants to call tools. The message history accumulates rather than getting rewritten: the model needs to see both its own tool_use and the matching tool_result to reason about the next step. And tool errors don’t get swallowed — if the real integration raises, return a tool_result with is_error: True and a readable message so the model can react and either retry or ask for help.

Streaming and backpressure

For a chat UI or a websocket channel, streaming instead of blocking until the end is the difference between a fluid experience and one that feels stuck. The SDK exposes client.messages.stream(...) as a context manager, and the text_stream attribute iterates over text chunks as they arrive.

# app/streaming.py
import asyncio
import anthropic

client = anthropic.Anthropic()


async def stream_to_consumer(prompt: str, queue: asyncio.Queue) -> None:
    # Caller must pass a bounded queue, e.g. asyncio.Queue(maxsize=50); an unbounded queue defeats backpressure.
    try:
        with client.messages.stream(
            model="claude-opus-4-7",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            for text in stream.text_stream:
                # The timeout protects against consumers that don't read.
                await asyncio.wait_for(queue.put(text), timeout=2.0)
    except asyncio.TimeoutError:
        # Backpressure: drop or return 503 to the end client.
        await queue.put(None)
    except anthropic.APIStatusError as exc:
        await queue.put(f"[error {exc.status_code}]")
        raise

The with block guarantees the stream closes even if the consumer raises. The bounded asyncio.Queue(maxsize=N) is the trick that avoids the most common streaming problem: when the end client consumes slowly, the producer fills memory without the SDK noticing. With wait_for and a short timeout, the producer detects pressure earlier and degrades gracefully, returning a 503 to the end client or cutting generation with stream.close().

Three error cases deserve explicit handling. anthropic.APIStatusError with status 529 means service overload and is a candidate for retry with backoff. Client cancellation (websocket closed) should propagate by cancelling the task reading from text_stream, not waiting for it to finish. And intermediate timeouts — a proxy that cuts at 60 seconds — are different from 529 and should be handled by reopening the stream from the last coherent point, not by retrying the full turn.

Prompt caching: when it pays off

Prompt caching turns stable input tokens into cheap reads. When you mark a block of the prompt with cache_control={"type": "ephemeral"}, Anthropic stores that portion and, on later requests that reuse it, charges 10% of base input — a 90% discount on cache hit, per the Anthropic SDK docs[1] prompt-caching page. The first write carries an overhead (1.25× for a 5-minute TTL, 2× for a 1-hour TTL), so the rule of thumb is clear: cache blocks you’ll reuse at least three times.

# app/caching.py
import anthropic

client = anthropic.Anthropic()

LARGE_SYSTEM = """You are a support assistant for jacar.es. Follow these rules...
[... several thousand tokens of instructions, few-shot examples and policies ...]
"""


def ask(history: list[dict], user_msg: str) -> str:
    response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1024,
        system=[
            {
                "type": "text",
                "text": LARGE_SYSTEM,
                "cache_control": {"type": "ephemeral"},
            }
        ],
        messages=history + [{"role": "user", "content": user_msg}],
    )
    usage = response.usage
    print(
        "input:", usage.input_tokens,
        "cache_read:", getattr(usage, "cache_read_input_tokens", 0),
        "cache_write:", getattr(usage, "cache_creation_input_tokens", 0),
    )
    return response.content[0].text

The signal that you’re caching correctly shows up in response.usage: after the first turn, cache_creation_input_tokens reflects the write; on subsequent turns, cache_read_input_tokens grows and input_tokens shrinks to whatever changed. If you never see cache reads, some token in the block changed between calls — a dynamic timestamp, a list reordered — and the cache key got invalidated. Cache stable blocks, fix the example ordering, and keep volatile content out of the marked block.

Two cases where caching doesn’t pay: short prompts with little reuse (the initial write outweighs accumulated savings) and agents that swap the system prompt every turn for extreme personalisation. In those scenarios, drop cache_control and revisit the architecture: there’s usually room to move personalisation into a stable user block and keep the system constant.

Registering your own MCP server

The Claude Agent SDK exposes MCP server registration as a first-class API, the key piece that connects this tutorial to the complete guide on the mcp model context protocol. In Python, the integration goes through claude_agent_sdk.query() and ClaudeAgentOptions(mcp_servers=..., allowed_tools=...). The naming convention mcp__<server-name>__<tool-name> already standardises how the model sees each tool without collisions.

We start with the server. A create_ticket tool that opens an incident in your product domain, exposed via stdio so the agent can launch it as a subprocess:

# mcp_server/server.py
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("dominio")


@mcp.tool()
def create_ticket(title: str, priority: str = "normal") -> dict:
    """Create a ticket in the internal system. Returns {id, url}."""
    # Replace with your real integration (Linear, Jira, in-house).
    return {"id": "TCK-1042", "url": "https://soporte.example.com/tickets/1042"}


if __name__ == "__main__":
    mcp.run()  # stdio by default

And the client that consumes it from the agent, registering the server in code and pinning allowed_tools to exactly the tool we want exposed to the model:

# app/main.py (MCP variant)
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage


async def run_agent(prompt: str) -> None:
    options = ClaudeAgentOptions(
        mcp_servers={
            "dominio": {
                "command": "python",
                "args": ["-m", "mcp_server.server"],
            }
        },
        allowed_tools=["mcp__dominio__create_ticket"],
    )
    async for message in query(prompt=prompt, options=options):
        if isinstance(message, ResultMessage) and message.subtype == "success":
            print(message.result)


asyncio.run(run_agent("Open a high-priority ticket for the checkout incident."))

Three production-tested rules. Credentials get injected via env, never the prompt: env={"DOMAIN_API_TOKEN": os.environ["DOMAIN_API_TOKEN"]} in the descriptor puts rotation under deployment control. Policy is tuned with allowed_tools by exact name when the list is small, and with mcp__dominio__* when you trust the whole server; the global wildcard mcp__* is almost always too broad. And contract tests on the tool listing — a test_mcp_contract.py that enumerates and snapshots — catch renames and removals before they break the agent in production.

Observability with OTel GenAI

The OpenTelemetry GenAI semantic conventions[2] now cover the Anthropic case with gen_ai.provider.name="anthropic", the span name chat {model}, and operations chat, execute_tool, and embeddings. That means your traces line up in Grafana or Honeycomb with the rest of the distributed trace effortlessly. The sibling cluster on agent observability with OTel GenAI goes deeper; here it’s enough to wire the exporter and open a span around each call to the model.

# app/observability.py
from contextlib import contextmanager
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("agente-anthropic-sdk")


@contextmanager
def chat_span(model: str):
    with tracer.start_as_current_span(f"chat {model}") as span:
        span.set_attribute("gen_ai.provider.name", "anthropic")
        span.set_attribute("gen_ai.system", "Anthropic")  # back-compat with older instrumentations
        span.set_attribute("gen_ai.operation.name", "chat")
        span.set_attribute("gen_ai.request.model", model)
        try:
            yield span
        except Exception as exc:
            span.record_exception(exc)
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            raise

We also emit gen_ai.system so that collectors and tools that started on the initial spec keep recognising the agent without reconfiguration.

Wrap every messages.create with with chat_span("claude-opus-4-7") as span: and, after the call, attach usage attributes from response.usage: gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, and, if you’re caching, gen_ai.usage.input_tokens.cache_read. The OTLPSpanExporter() reads the endpoint from OTEL_EXPORTER_OTLP_ENDPOINT, so pointing at a local collector during dev and at Tempo or Honeycomb in production is config, not code. See also: the official MCP specification[3] if you want to correlate agent traces with the MCP server’s using the mcp.method.name and mcp.session.id attributes.

Packaging and deployment

A multi-stage Dockerfile based on python:3.12-slim gives you a small, reproducible, reasonably secure image. The idea: install dependencies in an intermediate stage with pip caching, and copy only the result plus the code into the final stage, which runs as a non-root user.

# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --target /opt/deps -r requirements.txt

FROM python:3.12-slim
RUN useradd --create-home --uid 10001 agente
WORKDIR /app
COPY --from=builder /opt/deps /opt/deps
COPY app/ ./app/
COPY mcp_server/ ./mcp_server/
ENV PYTHONPATH=/opt/deps PYTHONUNBUFFERED=1
USER agente
ENTRYPOINT ["python", "-m", "app.main"]

Three decisions matter more than they look. The agente user with fixed UID 10001 avoids running as root and simplifies PodSecurity policies in Kubernetes. PYTHONPATH=/opt/deps keeps project code separate from site-packages, which helps diagnostics. And PYTHONUNBUFFERED=1 makes logs emit immediately, essential so an orchestrator detects startup failures without artificial delays. If you also run the MCP server inside the container, declare a different ENTRYPOINT for that image or use a sidecar; in larger clusters, the patterns from multi-agent systems with LangGraph, CrewAI, and AutoGen describe how to treat this container as a node in an agent graph.

In CI, run tests with a fixed Python version — Python 3.13’s optional GIL changes concurrency assumptions, and pinning the version until you’ve validated SDK behaviour with nogil is sensible. Contract tests on the MCP server, snapshot tests on the tool listing, and a minimal tool-use loop test with a mocked client cover the most common regressions.

Conclusion

A production-ready agent on the Anthropic SDK comes down to six well-isolated pieces: tool-use loop, streaming with backpressure, prompt caching where it pays, your own MCP server, OTel traces, and a small container. Each piece is orthogonal to the others, which makes iteration safe and code review straightforward. The operational consequence is what counts: an agent that’s understood, observed, and deployed like any other service. The reference repository with all the code from this tutorial lives at github.com/jacarsystems/jacar-anthropic-sdk-demo[4] — clone it, set up .env, and start with docker compose up.

Follow us on jacar.es for more on agents in production, MCP, GenAI observability, and real patterns from the Anthropic ecosystem.

Was this useful?
[Total: 0 · Average: 0]
  1. Anthropic SDK docs
  2. OpenTelemetry GenAI semantic conventions
  3. official MCP specification
  4. github.com/jacarsystems/jacar-anthropic-sdk-demo

Written by

CEO - Jacar Systems

Passionate about technology, cloud infrastructure and artificial intelligence. Writes about DevOps, AI, platforms and software from Madrid.