# Build an agent
> This bundle contains all pages in the Build an agent section.
> Source: https://www.union.ai/docs/v2/union/user-guide/build-agent/

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent ===

# Build an agent

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

This section covers how to build, deploy, and run agentic AI applications on Union.ai.

Building an agent on Union.ai breaks down into two **orthogonal** choices:

1. **How you build the agent loop** — plain Python, the built-in `flyte.ai.agents.Agent` harness, or a third-party framework (LangGraph, PydanticAI, OpenAI Agents SDK).
2. **How you deploy and run it** — as a task you invoke on demand, as a scheduled task driven by a `flyte.Trigger`, or as a long-running app (e.g. a webhook or chat UI).

Any agent from axis (1) can be deployed via any pattern in axis (2). The two are independent, so you can start with a pure-Python loop run on demand and later move it behind a schedule or a webhook without rewriting the agent.

## How Union.ai maps to the agentic world

- **`TaskEnvironment`**: The sandboxed execution environment for your agent steps. It configures the container image, hardware resources (CPU, GPU), and secrets (API keys). Think of it as defining "where this code runs."
- **`@env.task`**: Turns any Python function into a remotely-executed step. Each task runs in its own container with the resources you specified. This is the equivalent of a node in LangGraph or n8n.
- **Tasks calling tasks**: A task can `await` other tasks, and each called task gets its own container automatically. No separate workflow decorator needed. The calling task IS your workflow, this is how you build multi-step agentic pipelines.
- **`@flyte.trace`**: Marks helper functions inside a task for fine-grained observability and caching. Each traced call appears as a span in the Union.ai dashboard, with its inputs and outputs captured and checkpointed. Use this on your LLM calls, tool executions, and routing decisions to get full visibility into every turn of the agent loop.

> [!TIP]
> See the [Union.ai Quickstart](https://www.union.ai/docs/v2/union/user-guide/quickstart/page.md) for a hands-on walkthrough.

## Ways to build an agent

| Approach | When to use it | Guide |
|----------|----------------|-------|
| **Pure Python** | You want full control over the loop and the lightest possible dependency footprint | **Build an agent > Pure Python agents** |
| **The `Agent` harness** | You want a batteries-included tool-use loop with tools, MCP servers, memory, and HITL built in | **Build an agent > Flyte-native agents** |
| **Third-party frameworks** | You already have agents written with LangGraph, PydanticAI, or the OpenAI Agents SDK | [Agent framework integrations](https://www.union.ai/docs/v2/union/user-guide/agent-framework-integrations/_index) |

The `Agent` harness has a few dedicated guides of its own:

- **Build an agent > Flyte-native agents**: customize the loop by overriding `run`.
- **Build an agent > Agent memory**: persist conversation history and artifacts across runs with `MemoryStore`.
- **Build an agent > Agent chat UI**: give any agent a hosted chat interface.

## Deploying an agent

Once you've built an agent, **Build an agent > Deploy an agent as a service** covers running it as a task, on a schedule via `flyte.Trigger`, and behind an `AppEnvironment` webhook.

## Related

- [**Sandboxing**](https://www.union.ai/docs/v2/union/user-guide/sandboxing/_index): safely execute LLM-generated code.
- [**Build an MCP server**](https://www.union.ai/docs/v2/union/user-guide/build-mcp/_index): serve Model Context Protocol servers for AI assistants to interact with Union.ai.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent/python-agents ===

# Pure Python agents

The lightest way to build an agent on Union.ai is to write the loop yourself in plain Python. Union.ai is framework-agnostic: use any Python LLM library (OpenAI SDK, Anthropic SDK, LiteLLM, etc.) inside your tasks. The platform provides the production infrastructure layer: sandboxed execution, parallel fan-out, durable checkpointing, and observability for every step of the agent loop.

This approach gives you full control over the loop and the smallest possible dependency footprint. If you'd rather not hand-roll the tool-call loop, see [The Flyte Agent harness](./flyte-agents), which provides a batteries-included loop with tools, MCP servers, memory, and HITL. If you already have agents written in a third-party framework, see [Agent framework integrations](../agent-framework-integrations/_index) for [LangGraph](https://www.union.ai/docs/v2/union/user-guide/agent-framework-integrations/langgraph), [PydanticAI](https://www.union.ai/docs/v2/union/user-guide/agent-framework-integrations/pydantic-ai), and [OpenAI Agents SDK](https://www.union.ai/docs/v2/union/user-guide/agent-framework-integrations/openai-agents-sdk).

Two decorators are all you need:

| Decorator | What it does | Think of it as... |
|-----------|-------------|-------------------|
| **`@env.task`** | Runs a function in its own container on Union.ai with dedicated resources, dependencies, and secrets | A sandboxed agent step with its own execution environment |
| **`@flyte.trace`** | Marks a helper function for observability, where each call appears as a span in the Union.ai dashboard with captured I/O | An observability hook on your LLM calls, tool executions, and routing decisions |

## ReAct pattern: Reason, Act, Observe (no framework needed)

The [ReAct pattern](https://arxiv.org/abs/2210.03629) is the most common agent architecture: the LLM reasons about what to do, calls a tool, observes the result, and repeats until done. This example is implemented directly with flyte:

```
Thought → Action → Observation → repeat until done
```

```
import json

from openai import AsyncOpenAI
from pydantic import BaseModel

import flyte

env = flyte.TaskEnvironment(
    name="agent_env",
    image=flyte.Image.from_debian_base(python_version=(3, 13)).with_pip_packages("openai"),
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    secrets=[flyte.Secret(key="OPENAI_API_KEY")],
)

TOOLS = {"add": lambda a, b: a + b, "multiply": lambda a, b: a * b}

@flyte.trace  # each call = a span in the dashboard
async def reason(goal: str, history: str) -> dict:
    """LLM picks a tool or returns a final answer."""
    r = await AsyncOpenAI().chat.completions.create(
        model="gpt-4.1-nano",
        response_format={"type": "json_object"},
        messages=[
            {
                "role": "system",
                "content": f"Tools: {list(TOOLS)}. Respond JSON: "
                '{"thought":..,"tool":..,"args":{}} or {"thought":..,"done":true,"answer":..}',
            },
            {"role": "user", "content": f"Goal: {goal}\n\n{history}\nWhat next?"},
        ],
    )
    return json.loads(r.choices[0].message.content)

@flyte.trace
async def act(tool: str, args: dict) -> str:
    """Execute the chosen tool."""
    return str(TOOLS[tool](**args))

class AgentResult(BaseModel):
    answer: str
    steps: int

@env.task  # runs in its own container
async def react_agent(goal: str, max_steps: int = 10) -> AgentResult:
    history = ""
    for step in range(1, max_steps + 1):  # the agent loop
        decision = await reason(goal, history)  # Thought
        if decision.get("done"):
            return AgentResult(answer=str(decision["answer"]), steps=step)
        result = await act(decision["tool"], decision["args"])  # Action
        # Observation
        history += f"Step {step}: {decision['thought']} -> {decision['tool']}({decision['args']}) = {result}\n"
    return AgentResult(answer="Max steps reached", steps=max_steps)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/building-agents/react_agent.py*

```bash
flyte run agent.py react_agent --goal "What is (12 + 8) * 3?"
# => AgentResult(answer='60', steps=3)
```

**What's happening under the hood:**
- `react_agent` runs in a container with only `openai` installed and 2 CPU / 2GB RAM
- Each `reason()` and `act()` call is traced, so you see every LLM call, every tool invocation, and every intermediate result in the Union.ai dashboard
- The agent's inputs and final output are durably persisted, letting you inspect any past run end-to-end
- Swap in your own tools (web search, database queries, API calls) by adding to the `TOOLS` dict

> [!TIP]
> See the [Agentic Refinement docs](https://www.union.ai/docs/v2/union/user-guide/advanced-project/agentic-refinement), [Traces docs](https://www.union.ai/docs/v2/union/user-guide/task-programming/traces), and [more patterns (planner, debate, etc.)](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows).

## Plan-and-Execute with parallel fan-out

The [Plan-and-Execute pattern](https://blog.langchain.com/plan-and-execute-agents/) splits a complex query into sub-tasks, fans them out in parallel, then synthesizes the results. With Union.ai the fan-out is just `asyncio.gather()`, and each sub-task gets its own container, giving you true parallelism on separate hardware.

```python
# workflow.py
import os, json, asyncio, flyte
from openai import AsyncOpenAI

env = flyte.TaskEnvironment(
    name="research_env",
    image=flyte.Image.from_debian_base(python_version=(3, 13)).with_pip_packages("openai"),
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    secrets=[flyte.Secret(key="OPENAI_API_KEY")],
)

@flyte.trace
async def llm(prompt: str) -> str:
    r = await AsyncOpenAI().chat.completions.create(
        model="gpt-4.1-nano",
        messages=[{"role": "user", "content": prompt}],
    )
    return r.choices[0].message.content

@env.task
async def plan(query: str, n: int = 3) -> list[str]:
    """Split the query into sub-topics."""
    raw = await llm(
        f'Break this into exactly {n} sub-topics. Return ONLY a JSON array of strings.\n\n{query}'
    )
    return json.loads(raw)[:n]

@env.task
async def research(topic: str) -> str:
    """Research one sub-topic (each call = its own container)."""
    return await llm(f"Write a short, factual report on: {topic}")

@env.task
async def synthesize(query: str, reports: list[str]) -> str:
    """Combine the sub-reports into a final answer."""
    sections = "\n\n".join(reports)
    return await llm(f"Synthesize a final answer to '{query}' from:\n\n{sections}")

@env.task
async def research_workflow(query: str, num_topics: int = 3) -> str:
    topics = await plan(query, num_topics)
    reports = list(await asyncio.gather(*[research(t) for t in topics]))  # parallel fan-out
    return await synthesize(query, reports)
```

```bash
flyte run workflow.py research_workflow --query "Impact of storms on travel insurance payouts"
```

**What's happening under the hood:**

```
research_workflow (orchestrator)
  ├── plan          → LLM breaks query into N sub-topics      [container 1]
  ├── research(t1)  → researches one sub-topic                [container 2]  ┐
  ├── research(t2)  → researches one sub-topic                [container 3]  ├ parallel
  ├── research(t3)  → researches one sub-topic                [container 4]  ┘
  └── synthesize    → LLM combines reports into final answer  [container 5]
```

- **Fan-out:** `asyncio.gather()` launches all research tasks in parallel, each in its own container
- **Observability:** `@flyte.trace` on each LLM call means every prompt and response is visible as a span in the Union.ai dashboard
- **Durable checkpointing:** Each task's output is persisted. If `synthesize` fails, re-running skips the completed `plan` and `research` steps (with caching enabled)

> [!TIP]
> The same fan-out works with any framework inside the `research` task. See [LangGraph](https://www.union.ai/docs/v2/union/user-guide/agent-framework-integrations/langgraph) for a version that runs a LangGraph research agent (with web-search tool calling) inside each parallel container.

## More agentic patterns

Union.ai is framework-agnostic, so these patterns work with any LLM library. Each maps to well-known agent architectures:

| Pattern | What it does | When to use it | Link |
|---------|-------------|----------------|------|
| **ReAct** | Reason → Act → Observe loop with tool calling | Single-agent tasks with tools (API calls, search, code execution) | [multi-agent-workflows/react](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows) |
| **Plan-and-Execute** | LLM creates a plan, independent steps fan out in parallel, results are synthesized | Complex queries that decompose into parallel sub-tasks | [multi-agent-workflows/planner](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows) |
| **Evaluator-Optimizer (Reflection)** | Generate → Critique → Refine loop until quality threshold met | Content generation, code generation, any task with clear quality criteria | [Agentic Refinement docs](https://www.union.ai/docs/v2/union/user-guide/advanced-project/agentic-refinement) |
| **Orchestrator-Workers (Manager)** | Supervisor agent delegates to specialist worker agents, reviews quality, requests revisions | Multi-agent systems where sub-tasks require different expertise | [multi-agent-workflows/manager](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows) |
| **Debate** | Multiple agents solve independently, then debate to consensus | High-stakes decisions where diverse reasoning improves accuracy | [multi-agent-workflows/debate](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows) |
| **Sequential (Prompt Chaining)** | Static pipeline of LLM calls, no dynamic routing | Predictable multi-step transformations (extract → validate → format) | [multi-agent-workflows/sequential](https://github.com/unionai/workshops/tree/main/tutorials/multi-agent-workflows) |

## How Union.ai's primitives map to the agent stack

If you're coming from LangGraph, CrewAI, OpenAI Agents SDK, or similar frameworks, here's how the concepts you already know translate:

**Your agent loop** is a Python `for`/`while` loop inside an `@env.task`. Each iteration calls `@flyte.trace`-decorated functions for reasoning and tool execution. Union.ai doesn't impose a loop structure; you write it in plain Python, which means any pattern (ReAct, reflection, plan-and-execute) works naturally.

**Tool calling** is just calling Python functions. Define your tools as regular functions, decorate them with `@flyte.trace` for observability, and call them from within the agent loop. Use any tool-calling mechanism your LLM SDK provides (OpenAI function calling, Anthropic tool use, LangChain `bind_tools()`). MCP servers can be accessed from within tasks using the MCP Python SDK.

**Parallel fan-out** (LangGraph's `Send()`, n8n's Split in Batches) is `asyncio.gather()`. Each awaited task gets its own container, giving you true parallelism on separate hardware, not just concurrent coroutines.

**State and checkpointing** (LangGraph's Checkpointers, Threads) is automatic. Every task's inputs and outputs are durably persisted. `@flyte.trace` adds sub-step checkpoints within a task. Re-running with caching enabled skips completed steps, Union.ai's equivalent of replaying from a checkpoint.

**Routing and conditional logic** (LangGraph's `add_conditional_edges`, n8n's If/Switch nodes) is Python `if/else`. No special API needed.

**Environment isolation** (different dependencies per step) is `TaskEnvironment`. Your LLM step can use `langchain==0.3`; your data step can use `pandas` + GPU. Each gets its own container image.

**Guardrails and validation** are Python code between steps: `if/else` checks, Pydantic validation, structured output parsing, or libraries like NeMo Guardrails. Raise an exception to fail a step and trigger retries.

**Observability:** The Union.ai dashboard shows the full execution tree with per-step inputs, outputs, logs, resource usage, and timing. `@flyte.trace` adds spans within a task for fine-grained visibility into individual LLM calls and tool invocations. For LLM-specific metrics (token usage, cost per call), integrate with Langfuse or LangSmith from within your tasks.

## Next steps

- [The Flyte Agent harness](./flyte-agents): skip the boilerplate with a built-in tool-use loop.
- [Deploy an agent as a service](./deploy-agent-as-service): run this agent on a schedule or behind a webhook.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent/flyte-agents ===

# Flyte-native agents

`flyte.ai.agents.Agent` is a flyte-native, batteries-included agent harness. Instead of hand-rolling the tool-call loop (as in [Build an agent with pure Python](./python-agents)), you declare a set of tools and instructions, and the harness drives a robust LLM ↔ tool loop for you.

The harness deeply integrates with Union.ai:

- **Tools** can be plain Python callables, `@flyte.trace` helpers, `@env.task` durable tasks, `LazyEntity` remote-task references, or pre-built `AgentTool` instances.
- **MCP servers** (Slack, GitHub, Linear, filesystem, …) are first-class: pass a `MCPServerSpec` and their tools are loaded into the catalog automatically.
- **Memory** persists across runs via `flyte.io.Dir`. See [Agent memory](./agent-memory).
- **HITL** support pauses the loop and asks a human for approval before sensitive tools execute.

## How it works

`Agent(...)` collapses heterogeneous tool sources into a single tool registry plus an auto-generated system prompt. `agent.run(message)` then drives an LLM ↔ tool-call loop:

1. Send the conversation and tool catalog to the LLM.
2. If the assistant returns tool calls, execute each one (sequentially or concurrently), append the results back into the message history, and loop.
3. Stop when the assistant returns a plain-text reply (no tool calls) or when `max_turns` is reached.

```mermaid
flowchart TB
    inputs["tools (fn / task / MCP),<br/> skills, memory, and instructions"]
    inputs --> agent[["Agent<br/>(tool registry, skills, system prompt)"]]

    subgraph loop["agent.run(message)  ·  agent.run.aio(message) in async code"]
      direction TB
      llm["call_llm"] --loop max_turns times--> branch{"tool_calls?"}
      branch -- yes --> exec["execute tools<br/>(optional HITL approval)"]
      exec --> llm
      branch -- no --> done["final reply"]
    end

    user(["user message"]) --> llm
    agent --> llm
    done --> result(["AgentResult<br/>+ updated memory"])
```

The call returns an `AgentResult` with the final `summary`, an `error` string (empty on success), and the number of `attempts` (turns) taken.

### Sync vs async

`agent.run` is synchronous by default. Inside `async def` code — Flyte tasks, FastAPI handlers, etc. — use the `.aio(...)` companion instead.

| Context | Call |
|---------|------|
| Scripts, notebooks, sync code | `result = agent.run(message)` |
| `async def` tasks / handlers | `result = await agent.run.aio(message)` |

## A minimal agent

Declare a few tools as plain async functions, build an `Agent`, and call `run`. The harness reads each tool's signature and docstring to build the JSON schema and description the LLM sees, so well-documented tools work best.

```
from flyte.ai.agents import Agent

async def add(x: float, y: float) -> float:
    """Add two numbers and return their sum."""
    return x + y

async def multiply(x: float, y: float) -> float:
    """Multiply two numbers and return their product."""
    return x * y

async def get_weather(city: str) -> dict[str, str | float]:
    """Return a weather snapshot for `city`.

    In a real agent, replace this stub with a call to a weather API (and
    promote it to a ``@env.task`` for durable, retryable execution).
    """
    fake = {
        "new york": {"temperature_f": 68.4, "conditions": "partly cloudy"},
        "san francisco": {"temperature_f": 61.0, "conditions": "foggy"},
        "tokyo": {"temperature_f": 74.2, "conditions": "sunny"},
    }
    return fake.get(city.lower(), {"temperature_f": 70.0, "conditions": "clear"})

agent = Agent(
    name="basic-helper",
    instructions=(
        "You are a friendly assistant. Use the available tools to look up "
        "weather and compute math. Reply with a single sentence summary."
    ),
    model="claude-haiku-4-5",
    tools=[add, multiply, get_weather],
    max_turns=6,
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/agent-harness/basic_agent.py*

Call it synchronously, or with `await agent.run.aio(message)` inside async code:

```python
result = agent.run("What's 17 * 23 plus the temperature in NYC?")
print(result.summary)
```

## Tools

The `tools=` argument accepts a sequence (or a `{name: tool}` mapping) of any mix of:

| Tool source | What it is | When to use it |
|-------------|------------|----------------|
| Plain callable | A sync or async Python function | Lightweight, in-process helpers |
| `@flyte.trace` helper | A traced function | In-process helpers you want as spans in the dashboard |
| `@env.task` template | A durable Flyte task | Heavy compute / IO that should run on-cluster, be retryable, and observable |
| `LazyEntity` | A reference to a remote deployed task | Calling already-deployed tasks by name |
| `AgentTool` | A pre-built tool descriptor | Renaming, custom schema, or HITL gating |

Pass a mapping to expose a tool under a different name to the LLM:

```python
agent = Agent(
    name="ticket-shepherd",
    tools={"fetch_data": durable_fetch_with_retries, "summarize": summarize},
)
```

When a tool is an `@env.task`, the harness invokes it with `task.aio(...)`, so each tool call executes durably on the cluster and shows up in the Union.ai dashboard.

### Customizing a tool with `tool(...)`

Use the `tool` decorator/wrapper to rename a tool, override its description, or gate it behind human approval, without writing an `AgentTool` by hand:

```python
from flyte.ai.agents import tool

@tool(requires_approval=True)
@env.task
async def issue_refund(order_id: str, amount_usd: float) -> dict:
    """Issue a refund to the customer."""
    ...
```

When the LLM tries to call a tool marked `requires_approval=True`, the harness invokes the agent's `approval_callback` and waits for a boolean decision before executing. The default callback raises a human-input request via the `flyteplugins-hitl` plugin and blocks until a human approves or denies. If denied, the agent receives a synthetic tool message explaining the rejection so it can recover gracefully.

Pass `call_handler` to intercept *how* a tool is invoked. The handler is an async callback `(call_llm, tool_fn, **kwargs) -> result` that runs in place of the default execution. Await `tool_fn` to run the default behavior, or reach into `tool_fn.target` (the underlying task / callable) and `call_llm` (the agent's LLM callback) to do something custom — for example, ask the LLM how to size compute, then run the task with overridden resources and retry on OOM:

```python
async def right_size(call_llm, tool_fn, **kwargs):
    resources = await _ask_llm_for_resources(call_llm, tool_fn, kwargs)
    return await tool_fn.target.override(resources=resources).aio(**kwargs)

@tool(call_handler=right_size)
@env.task
async def train(...): ...
```

## MCP integration

The harness can connect to one or more [Model Context Protocol (MCP)](https://modelcontextprotocol.io) servers and surface their tools transparently. On the first `run` call, the harness connects to each server, lists its tools, and registers them in the catalog.

Declare servers with `MCPServerSpec` — either an HTTP(S) `url` (for streamable-http / SSE transports) or a `command` (for stdio servers):

```python
from flyte.ai.agents import Agent, MCPServerSpec

agent = Agent(
    name="release-shepherd",
    instructions="Inspect recent PRs, score release risk, and post a digest.",
    tools=[compute_release_score],          # local durable task tool
    mcp_servers=[
        MCPServerSpec(
            name="github",
            url="https://<host>/mcp/mcp",
            transport="streamable-http",
            tool_prefix="gh_",               # avoid name collisions
            tool_filter=["list_pull_requests", "comment_on_pull_request"],  # allowlist
        ),
        MCPServerSpec(
            name="github-stdio",
            command=["uvx", "mcp-server-github"],
        ),
    ],
)
```

Useful `MCPServerSpec` knobs:

- `tool_prefix` — prepend a prefix to every tool name from this server to avoid collisions.
- `tool_filter` — an allowlist of tool names to expose to the LLM (`None` exposes all).
- `headers` — HTTP headers (e.g. `Authorization`) for authenticated servers.

MCP support requires the `mcp` package: `pip install 'flyte[mcp]'`. To serve your own MCP servers on Union.ai, see [Build an MCP](../build-mcp/_index).

## Skills

Pass extra context to append to the system prompt via `skills=`. Each entry is either a literal string or a `pathlib.Path` to a local text file:

```python
import pathlib

agent = Agent(
    name="ticket-shepherd",
    instructions="You triage support tickets.",
    tools=[list_open_tickets, summarize],
    skills=[pathlib.Path("TICKETING_HANDBOOK.md")],
)
```

## Observability

Every step of the loop emits a typed `AgentEvent` (`agent_start`, `turn_start`, `tool_start`, `tool_end`, `approval_request`, …). Subscribe by setting the `agent_progress_cb` context variable to forward events to logs, NDJSON streams, websockets, or Union.ai reports. The built-in chat UI uses this hook to stream progress; see [Add a chat UI](./agent-chat-ui).

## Extending the agent class

The default loop is robust, but sometimes you need custom behavior around it: input guardrails, output post-processing, a different control flow, or extra bookkeeping. The cleanest way to do this is to subclass `Agent` and override its `run` method.

`Agent` is a [dataclass](https://docs.python.org/3/library/dataclasses.html), and `run` is the single public entry point that drives the loop. There are two common strategies:

1. **Wrap the built-in loop** — add logic before and after `super().run(...)`. Best when you mostly want the default behavior plus pre/post steps.
2. **Replace the loop entirely** — implement `run` (and `tool_descriptions`) yourself. Best when you need a fundamentally different control flow but still want to plug into the rest of the ecosystem (e.g. the chat UI).

### `run` is sync-by-default

`Agent.run` is wrapped with `@syncify`, which means callers can use it synchronously (`agent.run(...)`) or await the async companion (`await agent.run.aio(...)`). When you override `run`, decorate your async implementation with `@syncify` to keep the same dual interface, and call the parent loop via `await super().run.aio(...)`.

### Strategy 1: wrap the built-in loop

Subclass `Agent` as a dataclass so you can add your own fields, then override `run` to add an input guardrail and post-process the answer:

```
from dataclasses import dataclass

from flyte.ai.agents import Agent, MemoryStore
from flyte.ai.agents.protocol import AgentResult
from flyte.syncify import syncify

@dataclass(kw_only=True)
class GuardedAgent(Agent):
    """An Agent that rejects banned input and signs its final answer."""

    banned_terms: tuple[str, ...] = ()
    signature: str = ""

    @syncify
    async def run(self, message: str, memory: list[dict] | MemoryStore | None = None) -> AgentResult:
        # 1. Pre-flight input guardrail — short-circuit without an LLM call.
        lowered = message.lower()
        if any(term in lowered for term in self.banned_terms):
            return AgentResult(error="Request rejected by input guardrail.")

        # 2. Delegate to the built-in tool-use loop.
        result = await super().run.aio(message, memory)

        # 3. Post-process the final answer.
        if result.summary and self.signature:
            result.summary = f"{result.summary.strip()}\n\n— {self.signature}"
        return result
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/extending-the-agent/guarded_agent.py*

Instantiate and call it just like a regular `Agent`:

```python
agent = GuardedAgent(
    name="guarded-helper",
    instructions="You are a careful assistant.",
    tools=[...],
    banned_terms=("ssn", "password"),
    signature="GuardedAgent",
)

result = agent.run("Summarize today's open tickets.")
```

Because `GuardedAgent` still subclasses `Agent`, every other feature — tools, MCP servers, memory, HITL — keeps working unchanged.

### Strategy 2: implement `run` from scratch

If you want a completely custom loop, implement the `AgentProtocol`: a class exposing `run(message, memory) -> AgentResult` and `tool_descriptions() -> list[dict]`. Any object satisfying this protocol can be used anywhere the harness is accepted, including the [chat UI](./agent-chat-ui). `memory` may be a `list[dict]` of prior messages (a chat history) or a `MemoryStore`.

```python
from __future__ import annotations

from flyte.ai.agents import MemoryStore
from flyte.ai.agents.protocol import AgentResult
from flyte.syncify import syncify

class MyCustomAgent:
    """A fully custom agent that implements the AgentProtocol."""

    def __init__(self, tools: dict):
        self._tools = tools

    @syncify
    async def run(self, message: str, memory: list[dict] | MemoryStore | None = None) -> AgentResult:
        # Your own control flow: reasoning, routing, tool calls, retries, etc.
        prior = memory.messages if isinstance(memory, MemoryStore) else (memory or [])
        answer = await self._my_loop(message, prior)
        return AgentResult(summary=answer)

    def tool_descriptions(self) -> list[dict[str, str]]:
        return [
            {"name": name, "signature": f"{name}(...)", "description": fn.__doc__ or ""}
            for name, fn in self._tools.items()
        ]
```

> [!NOTE]
> `AgentResult` carries `summary`, `error`, `attempts`, and (for code-generating agents) `code` and `charts`. Populate the fields relevant to your loop; downstream consumers like the chat UI read `summary` and `error`.

### Choosing between subclassing and composition

Subclassing is the right tool when you need to change *how the loop runs*. If you only need to change *what happens around a run* — for example, looping the agent until a condition is met, or combining several agents — prefer plain composition: call `agent.run.aio(...)` from inside your own `@env.task`. This keeps the harness untouched and your orchestration logic explicit and observable in the dashboard.

## Next steps

- [Agent memory](./agent-memory): persist transcript and artifacts across runs.
- [Add a chat UI](./agent-chat-ui): wrap the agent in a hosted chat interface.
- [Deploy an agent as a service](./deploy-agent-as-service): run on a schedule or behind a webhook.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent/agent-memory ===

# Agent memory

By default, an [`Agent`](./flyte-agents) is stateless: each `run` starts from a blank conversation. `MemoryStore` gives an agent continuity across runs by persisting both the conversation transcript and arbitrary path-addressed artifacts to a `flyte.io.Dir`. This is what lets a scheduled or webhook-driven agent remember what it did last time.

Use cases:

- An "inbox triage" agent that recalls which threads it has already responded to.
- A research agent that builds up a scratchpad over many days.
- Any sleep/wake pattern where the agent wakes on a schedule and resumes prior context.

## What a `MemoryStore` holds

A `MemoryStore` combines two complementary stores backed by a working directory:

- **`messages`** — the live LLM conversation transcript, managed by the agent. Mutate it only via `append()` / `extend()`.
- **Path-addressed files** — arbitrary named blobs under the working root. Read and write them with `read_text` / `write_text` / `read_json` / `write_json` / `list_paths`.

The on-disk layout under the root looks like:

```
<root>/messages.json                           # transcript
<root>/<your/path>.{txt,json,…}                # path-addressed entries
<root>/meta/<encoded_path>.json                # per-entry metadata (sha, actor, ts)
<root>/audit/log.jsonl                         # opt-in audit trail
<root>/versions/<encoded_path>/<ts>_<sha>.txt  # opt-in version history
```

## Sync vs async

The path-addressed I/O methods (`read_text`, `read_json`, `write_text`, `write_json`, `get_meta`, `current_sha`) and the lifecycle methods (`create`, `get_or_create`, `save`) are sync-by-default with an `.aio(...)` companion. Inside `async def` tasks, use the `.aio` form.

## Keyed stores: the easy path

For durable agent memory, use a **keyed store**. `MemoryStore.get_or_create(key=...)` loads an existing store if present, otherwise creates a new one, saving to a deterministic blob-store namespace under the active Union.ai raw-data root:

```
{storage_root}/agents/memory-store/v0/{org}/{project}/{domain}/{key}
```

First, define the agent. Here it's a small research assistant with a single, **stateless** `web_search` tool — its continuity comes from memory, not from the tool:

```
@env.task
async def web_search(query: str, max_results: int = 3) -> list[dict[str, str]]:
    """Search the web for `query` and return the top matching results.

    A stateless tool — it knows nothing about the agent's memory. But because the
    results it returns are recorded in the conversation transcript, the agent can
    recall or build on them in a later run without searching again.

    This stub returns canned results so the example runs offline. In a real
    agent, replace it with a call to a search API (Tavily, Brave, SerpAPI, …);
    keeping it an `@env.task` makes each search durable, retryable, and
    observable in the dashboard.
    """
    return [
        {
            "title": f"{query.title()} — overview ({i + 1})",
            "url": f"https://example.com/?q={query.replace(' ', '+')}&r={i + 1}",
            "snippet": f"Key point #{i + 1} about {query}.",
        }
        for i in range(max_results)
    ]

agent = Agent(
    name="memory-assistant",
    instructions=(
        "You are a personal research assistant with long-term memory. You "
        "remember what the user is working on and the facts they share, because "
        "your prior conversation transcript is always available. Use web_search "
        "to look things up, and reuse earlier findings from the conversation "
        "instead of searching again when you already have the answer."
    ),
    model="claude-haiku-4-5",
    tools=[web_search],
    max_turns=12,
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/agent-memory/agent_with_memory.py*

Reuse the same `key` across runs to keep continuity. The chat task below picks up where the previous run left off (see the [full example](https://github.com/unionai/unionai-examples/tree/main/v2/user-guide/build-agent/agent-memory/agent_with_memory.py) for the `TaskEnvironment` setup):

```
@env.task(report=True)
async def chat(message: str, memory_key: str = MEMORY_KEY) -> str:
    """One conversation turn that picks up where the last run left off."""
    # Load (or create) the keyed store; restores the prior transcript.
    memory = await MemoryStore.get_or_create.aio(key=memory_key)
    flyte.logger.info("Restored %d prior messages from memory.", len(memory.messages))

    # Memory is passed in per call (not attached to the agent). The prior
    # transcript is prepended to the conversation and this turn is appended back
    # onto the store, which is also returned on result.memory.
    result = await agent.run.aio(message, memory=memory)

    # Saving is explicit — run never persists on its own. Write the updated
    # transcript back to the deterministic keyed remote path.
    await memory.save.aio()
    return result.summary or result.error
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/agent-memory/agent_with_memory.py*

The agent has no note-taking tools. Continuity comes entirely from the persisted transcript, and it remembers two kinds of things for free: the **facts the user shares** and the **results its tools return**. The first run records both in `messages.json`; a later run with the same `memory_key` reloads and prepends them, so the agent recalls earlier context — and reuses prior `web_search` findings instead of searching again. That is the core value of `MemoryStore` — no extra plumbing required.

## Working with a MemoryStore independently

Beyond the transcript, you can persist structured artifacts under arbitrary paths in the same store. This is optional — most agents get all the continuity they need from the transcript above — but it's useful when you want durable, queryable state (a scratchpad, a dedupe ledger, intermediate results).

A flyte task can commit its own artifact by loading the keyed store, read-modify-writing a path-addressed file, and calling `save()`. Every write is recorded in a metadata sidecar (sha256, actor, timestamp) and, by default, appended to an audit log:

```python
from flyte.ai.agents import MemoryStore

MEMORY_KEY = "my-assistant"
NOTES_PATH = "notes/notes.json"

@env.task
async def add_note(note: str) -> str:
    """A tool that commits its own artifact to the keyed store."""
    memory = await MemoryStore.get_or_create.aio(key=MEMORY_KEY)
    notes = await memory.read_json.aio(NOTES_PATH, default=[])
    notes.append(note)
    await memory.write_json.aio(NOTES_PATH, notes, reason="agent note")
    await memory.save.aio()  # commit the artifact to the keyed remote path
    return f"Noted: {note}"
```

> [!NOTE] Coordinating tool writes with the transcript
> Artifacts live on independent paths (e.g. `notes/notes.json`) from the transcript (`messages.json`), so they never collide. But when a tool writes to the same keyed store that the orchestrator also saves, the orchestrator's working copy goes stale mid-run. Reload the store with `get_or_create` after `agent.run`, carry over the updated transcript (`reloaded.messages = result.memory.messages`), and save once — otherwise the orchestrator's final save re-uploads a stale copy and clobbers the tool's artifact.

## Optimistic concurrency

When several tasks or agents share one keyed store (e.g. parallel tool calls, or a sleep/wake pattern), guard against lost updates by passing `expected_sha=`. The write succeeds only if the current content still matches; otherwise it raises `ConcurrencyError`:

```python
from flyte.ai.agents import ConcurrencyError

notes = await memory.read_json.aio("notes/notes.json", default=[])
sha = await memory.current_sha.aio("notes/notes.json")
notes.append(note)
try:
    await memory.write_json.aio("notes/notes.json", notes, expected_sha=sha, reason="agent note")
except ConcurrencyError:
    # Another writer updated the file between our read and write.
    return "Memory changed while saving the note; please retry."
```

## Optional capabilities

`MemoryStore` (and `create` / `get_or_create`) accept a few flags:

| Option | Default | What it does |
|--------|---------|--------------|
| `audit` | `True` | Append every successful write to `audit/log.jsonl`. Inspect with `audit_tail(n)`. |
| `keep_versions` | `False` | Snapshot every write under `versions/` for full history (≈ 2× storage per write). |
| `read_only_prefixes` | `()` | Reject direct writes into the given prefixes (e.g. `("memory/",)`), so the agent must stage proposals elsewhere and a trusted pipeline promotes them. |

The internal `audit/`, `meta/`, and `versions/` prefixes and `messages.json` are reserved — writes to them are rejected, and they're excluded from `list_paths`.

## Passing memory to the agent

Memory is not attached to the agent — it is passed in per call and returned on the result. `agent.run(message, memory=store)` prepends the store's prior transcript, runs the loop, and appends the new turn back onto the store. Persisting is explicit: `run` never writes on its own, so call `memory.save()` (or `await memory.save.aio()`) yourself afterward.

```python
memory = await MemoryStore.get_or_create.aio(key="my-assistant")
result = await agent.run.aio(message, memory=memory)
await memory.save.aio()  # save() always targets the deterministic keyed path
```

You can also pass a plain `list[dict]` of prior messages as `memory` for a stateless, single-shot history (nothing is persisted in that case).

## Lower-level usage

Every `MemoryStore` is **keyed** — there is no unkeyed/ephemeral store. You normally obtain one via `MemoryStore.create(key=...)` or `MemoryStore.get_or_create(key=...)`, but direct construction is supported for advanced use and serialization (`MemoryStore` is a Flyte I/O type, so it can be passed as a task input/output). `save()` takes no arguments: it always uploads the working root to the deterministic keyed `remote_path`. When `root` is omitted, a temporary working directory is created and cleaned up automatically.

## Next steps

- [The Flyte Agent harness](./flyte-agents): how the agent loop uses `memory`.
- [Deploy an agent as a service](./deploy-agent-as-service): schedule a memory-backed agent so it resumes context on each wakeup.

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent/agent-chat-ui ===

# Agent chat UI

A useful way to interact with an agent is through a chat interface. Because Union.ai can [host apps](../build-apps/_index) behind a URL, you can serve a chat UI for your agent with no separate infrastructure. There are two approaches:

1. **`AgentChatAppEnvironment`** — the fastest path. Any agent that implements the `AgentProtocol` (including the built-in `Agent`, in tool-use or `code_mode`) gets a hosted chat shell, tool sidebar, and streaming for free.
2. **A custom FastAPI app** — full control over the UI. Wrap the agent in a `FastAPIAppEnvironment` and serve your own HTML/CSS/JS.

Both reuse the same agent object, so you can start with the built-in shell and graduate to a custom UI later.

## Option 1: the built-in chat UI

`flyte.ai.chat.AgentChatAppEnvironment` wraps an agent in a hosted chat app. Since `Agent` implements the `AgentProtocol`, it plugs straight in:

```
import flyte
from flyte.ai.agents import Agent
from flyte.ai.chat import AgentChatAppEnvironment, CustomTheme

task_env = flyte.TaskEnvironment(
    name="chat-agent-tools",
    image=flyte.Image.from_debian_base().with_pip_packages("litellm", "httpx"),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    secrets=[flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY")],
)

@task_env.task
async def search_docs(query: str, max_results: int = 3) -> list[dict[str, str]]:
    """Search internal documentation (stub) and return matching snippets."""
    corpus = [
        {"title": "Tasks", "body": "Define a task by decorating an async function with @env.task."},
        {"title": "Triggers", "body": "Schedule a task by attaching a flyte.Trigger with a flyte.Cron automation."},
        {"title": "Secrets", "body": "Mount cluster-managed secrets into a task with flyte.Secret(...)."},
    ]
    needle = query.lower()
    matches = [d for d in corpus if needle in d["body"].lower() or needle in d["title"].lower()]
    return matches[:max_results]

agent = Agent(
    name="docs-helper",
    instructions=(
        "You are a friendly internal docs assistant. Use search_docs to find "
        "relevant snippets. Always cite the doc title in your final answer."
    ),
    model="claude-haiku-4-5",
    tools=[search_docs],
    max_turns=8,
)

@task_env.task(report=True)
async def chat_entrypoint(message: str, memory: list[dict]) -> dict:
    """Parent task that owns the agent loop and the nested tool tasks."""
    result = await agent.run.aio(message, memory=memory)
    return {
        "summary": result.summary,
        "error": result.error,
        "attempts": result.attempts,
        "charts": [],
        "code": "",
    }

env = AgentChatAppEnvironment(
    name="docs-agent-chat-ui",
    agent=agent,
    task_entrypoint=chat_entrypoint,
    title="Internal docs assistant",
    subtitle="Backed by a flyte.ai.agents.Agent + durable Flyte task tools.",
    theme=CustomTheme(accent_color="#6F2AEF", accent_hover_color="#8B52F2"),
    prompt_nudges=[
        {"label": "Basics", "prompt": "Can you show me a hello world example?"},
        {"label": "Triggers", "prompt": "How do I schedule a task?"},
    ],
    depends_on=[task_env],
    image=flyte.Image.from_debian_base().with_pip_packages("litellm", "fastapi", "uvicorn"),
    resources=flyte.Resources(cpu=2, memory="2Gi"),
    secrets=flyte.Secret("internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
)
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/agent-chat-ui/agent_chat_ui.py*

The `task_entrypoint` is a parent task that owns the agent loop, so the nested durable tool tasks run correctly under it. The chat shell streams progress by subscribing to the agent's `agent_progress_cb` events.

## Option 2: a custom FastAPI chat app

When you want to control the look and feel, wrap any `AgentProtocol`-compatible agent in a `FastAPIAppEnvironment` and serve your own UI. A natural fit is an `Agent` in **code mode** (`code_mode=True`): each turn the LLM writes Python that runs in a [sandbox](../sandboxing/_index) with the tools exposed as functions, returning code + a summary (and any charts you choose to surface), all behind a conversational web interface.

The architecture is small:

```
Browser (Chat UI)
  ├── GET  /            -> embedded HTML/CSS/JS chat interface
  ├── GET  /api/tools   -> JSON list of available tool descriptions
  └── POST /api/chat    -> { message, memory } -> { code, charts, summary, error }
           └── Agent.run(message, memory)
```

The app itself is just a FastAPI server. The endpoints call the agent's `run.aio` and `tool_descriptions` methods:

```python
import pathlib

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel

import flyte
from flyte.ai.agents import Agent
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(title="Chat Data Analytics Agent")

env = FastAPIAppEnvironment(
    name="chat-analytics-agent",
    app=app,
    image=flyte.Image.from_debian_base().with_pip_packages(
        "fastapi", "uvicorn", "httpx", "pydantic-monty", "litellm",
    ),
    secrets=flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY"),
    scaling=flyte.app.Scaling(replicas=1),
)

agent = Agent(
    name="analytics",
    instructions="You are a data analyst. Use the tools to fetch, aggregate, and chart data.",
    tools=ALL_TOOLS,
    code_mode=True,
    max_turns=15,
)

class ChatRequest(BaseModel):
    message: str
    memory: list[dict] = []

class ChatResponse(BaseModel):
    code: str = ""
    charts: list[str] = []
    summary: str = ""
    error: str = ""

@app.get("/api/tools")
async def get_tools() -> list[dict]:
    """Return JSON descriptions of available tool functions (for the sidebar)."""
    return agent.tool_descriptions()

@app.post("/api/chat")
async def chat(req: ChatRequest) -> ChatResponse:
    """Generate code, run it in the sandbox, and return results."""
    result = await agent.run.aio(req.message, memory=req.memory)
    return ChatResponse(code=result.code, charts=result.charts,
                        summary=result.summary, error=result.error)

@app.get("/", response_class=HTMLResponse)
async def index() -> HTMLResponse:
    """Serve the embedded chat UI."""
    return HTMLResponse(content=CHAT_HTML)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_handle = flyte.serve(env)
    print(f"Deployed Chat Analytics Agent: {app_handle.url}")
```

`CHAT_HTML` is the embedded front-end (a single HTML string with the chat markup, styles, and a small fetch-based client that POSTs to `/api/chat` and renders the returned charts and summary). `ALL_TOOLS` is the agent's tool registry. Keeping both in their own modules means adding a tool is the only change required — the agent auto-generates its system prompt from each tool's signature and docstring.

Run it locally during development, then deploy with one command:

```bash
# Local development
python chat_app.py

# Deploy to Union.ai
flyte deploy chat_app.py env
```

Union.ai assigns a URL, handles TLS, and auto-scales the app.

> [!TIP]
> Drop `code_mode=True` to serve a standard tool-use [`Agent`](./flyte-agents) (or plug in any object implementing the `AgentProtocol`) behind the same UI. The endpoints only depend on `run.aio` and `tool_descriptions`.

## Next steps

- [The Flyte Agent harness](./flyte-agents): the agent powering the chat UI.
- [Sandboxing](../sandboxing/_index): how an `Agent` in code mode safely executes generated code.
- [Deploy an agent as a service](./deploy-agent-as-service): other ways to run an agent (task, schedule, webhook).

=== PAGE: https://www.union.ai/docs/v2/union/user-guide/build-agent/deploy-agent-as-service ===

# Deploy an agent as a service

Once you've built an agent — with [pure Python](./python-agents), the [`Agent` harness](./flyte-agents), or a [third-party framework](../agent-framework-integrations/_index) — *how* you run it is an independent choice. The same agent object can be deployed in several ways:

| Pattern | When to use it | What invokes the agent |
|---------|----------------|------------------------|
| **As a task** | On-demand runs from the CLI, a notebook, or another service | `flyte.run(...)` |
| **As a scheduled task** | Recurring autonomous wakeups (triage, monitoring, reports) | A `flyte.Trigger` (cron or fixed-rate) |
| **Behind a webhook** | React to external events (GitHub, paging tools, CI) | An HTTP `POST` to an `AppEnvironment` |

All three wrap the agent loop in a regular Flyte task, so every run is durable, retryable, and observable in the Union.ai dashboard. The examples below use the `Agent` harness, but the pattern is identical for any agent — just call your agent's entry point inside the task.

## As a task

The simplest deployment: put the agent loop in an `@env.task` and invoke it on demand. This works for any agent.

```python
import flyte
from flyte.ai.agents import Agent

env = flyte.TaskEnvironment(
    name="concierge-agent",
    image=flyte.Image.from_debian_base().with_pip_packages("litellm"),
    secrets=[flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY")],
)

agent = Agent(
    name="customer-concierge",
    instructions="You are a customer-service concierge.",
    tools=[...],
)

@env.task(report=True)
async def concierge(request: str) -> str:
    """Run the agent for a single request."""
    result = await agent.run.aio(request)
    return result.summary or result.error
```

Run it on demand:

```bash
flyte run agent.py concierge --request "Refund order #12345 to the customer."
```

Or from Python with `flyte.run(concierge, request="...")`. To register a stable, deployed version of the task, use `flyte deploy agent.py env`.

## As a scheduled task (via `Trigger`)

To run an agent autonomously on a schedule, attach a `flyte.Trigger` to the task. The "wakeup" is a regular Flyte task — the agent loop runs inside it, so every tool call is durable, observable, and retryable. Pair this with [agent memory](./agent-memory) so the agent resumes prior context on each wakeup.

```
agent = Agent(
    name="github-triage",
    instructions=(
        "You are a GitHub issue triager. For each wakeup: list open issues for "
        "the configured repo, classify each one, group them by severity, and "
        "post a concise digest to the team channel. Always end by calling post_digest."
    ),
    model="claude-haiku-4-5",
    tools=[list_open_issues, classify_issue, post_digest],
    max_turns=20,
)

@env.task(
    triggers=flyte.Trigger(
        "daily-triage",
        flyte.Cron("0 9 * * *"),  # every day at 09:00
        inputs={"trigger_time": flyte.TriggerTime, "repo": "flyteorg/flyte", "channel": "#flyte-triage"},
    ),
    report=True,
)
async def triage_repo(trigger_time: datetime, repo: str, channel: str) -> str:
    """Scheduled wakeup that runs the triage agent end-to-end."""
    message = f"It is {trigger_time.isoformat()}. Triage the open issues in {repo} and post a digest to {channel}."
    with flyte.group("triage-loop"):
        result = await agent.run.aio(message)
    return result.summary or f"[triage failed] {result.error}"
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/deploy/scheduled_triage_agent.py*

The agent's tools (`list_open_issues`, `classify_issue`, `post_digest`) are durable `@env.task`s; see the [full example](https://github.com/unionai/unionai-examples/tree/main/v2/user-guide/build-agent/deploy/scheduled_triage_agent.py) for their definitions.

Deploying the task registers the trigger; from then on Union.ai wakes the agent on schedule. Use `flyte.Cron(...)` for calendar schedules or `flyte.FixedRate(...)` for fixed intervals. The `flyte.TriggerTime` input is filled with the scheduled fire time. See [Triggers](https://www.union.ai/docs/v2/union/user-guide/task-configuration/triggers) for the full schedule reference.

## Behind a webhook (`AppEnvironment`)

To kick off an agent run in response to an external event, deploy a small FastAPI app via an `AppEnvironment` that exposes an HTTP endpoint. The endpoint launches the agent task with `flyte.run.aio(...)`, so the long-running agent loop executes durably in the background while the webhook returns immediately with a run URL.

```
@tool_env.task(report=True)
async def review_pr(repo: str, pr_number: int, event: str) -> str:
    """Durable task that runs the agent for a single webhook event."""
    message = f"GitHub webhook fired for {repo}#{pr_number} (event={event}). Review the PR."
    result = await agent.run.aio(message)
    return result.summary or result.error

def _build_app():
    from fastapi import FastAPI

    api = FastAPI(title="flyte-agent-webhook")

    @api.post("/trigger")
    async def trigger(payload: dict) -> dict[str, str]:
        repo = payload.get("repository")
        pr_number = int(payload.get("pull_request", {}).get("number", 0))
        event = payload.get("action")
        run = await flyte.run.aio(review_pr, repo=repo, pr_number=pr_number, event=event)
        return {"run_url": run.url, "name": run.name}

    return api

webhook_env = flyte.app.AppEnvironment(
    name="flyte-agent-webhook",
    image=flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn", "litellm"),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=True,
    depends_on=[tool_env],
)

@webhook_env.server
async def serve():
    import uvicorn

    config = uvicorn.Config(_build_app(), host="0.0.0.0", port=webhook_env.get_port().port)
    await uvicorn.Server(config).serve()
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-agent/deploy/webhook_agent.py*

The agent and its tools (`fetch_pr`, `post_comment`) are defined in the [full example](https://github.com/unionai/unionai-examples/tree/main/v2/user-guide/build-agent/deploy/webhook_agent.py).

Once deployed, point your external system at the `/trigger` URL:

```bash
curl -X POST -H "Content-Type: application/json" \
    -d '{"repository": "flyteorg/flyte", "pull_request": {"number": 123}, "action": "opened"}' \
    https://<subdomain>.apps.<endpoint>/trigger
```

> [!NOTE]
> When the webhook app submits runs on behalf of incoming requests, it needs valid Union.ai credentials. Use passthrough auth (a `FastAPIPassthroughAuthMiddleware` and `flyte.init_passthrough`) so the run is submitted with the caller's identity. See [FastAPI apps](https://www.union.ai/docs/v2/union/user-guide/native-app-integrations/fastapi-app).

## Chat and other app patterns

- **Chat UI:** To let users converse with the agent in a browser, serve it behind a chat interface. See [Add a chat UI](./agent-chat-ui).
- **FastAPI endpoint:** For API-first agents, expose your agent behind a REST endpoint with `FastAPIAppEnvironment` so other services or agents can call it programmatically.
- **Model serving:** [Serve open-weight LLMs](https://www.union.ai/docs/v2/union/user-guide/native-app-integrations/vllm-app) on GPUs behind an OpenAI-compatible API with `VLLMAppEnvironment` or `SGLangAppEnvironment`.

> [!TIP]
> See [Build Apps](../build-apps/_index) and [Configure Apps](../configure-apps/_index) for more details on hosting services on Union.ai.

