Parallelized autoresearch agent
Code available here.
This tutorial extends the
Autoresearch agent pattern with a code-mode MLE agent that plans batches of training experiments, saves distinct train.py edits, and runs them in parallel via flyte.map. It follows the
karpathy/autoresearch loop — minimize validation bits-per-byte on a TinyGPT variant — but orchestrates fan-out batches with durable Flyte tasks and
unionai-sandbox execution.
Compared to the single-threaded Claude Code autoresearch tutorial, this agent:
- Edits full
train.pysource (upstream karpathy style) instead of calling a remote coding CLI - Uses
code_mode=Trueso the LLM writes Python plans that call batch tools such asrun_experiment_batch - Persists a leaderboard, code-edit history, and batch plans in
MemoryStore - Right-sizes each experiment with an LLM via a
@toolcall_handler, then retries on Flyte or sandbox OOM by bumping memory
Each experiment has different compute needs (wider models, larger batch sizes, longer training loops). A single static flyte.Resources on the task would either waste cluster memory or OOM on the heavy configs. Instead, this example uses the same
call_handler pattern as the Flyte SDK self-correcting agent: before every run, a sizing LLM reads the tool name, docstring, and call arguments and returns a JSON resource spec; the handler applies it with tool_fn.target.override(resources=...).aio(**kwargs) and retries with more memory when needed.
Define the task environments
The example uses three environments — bundle preparation, sandbox experiments, and the agent driver — sharing a Debian-based image with PyTorch and sandbox tooling.
agent_env = flyte.TaskEnvironment(
name="autoresearch-agent",
resources=flyte.Resources(cpu=1, memory="2Gi"),
image=image,
include=_INCLUDE,
secrets=[flyte.Secret(key="internal-anthropic-api-key", as_env_var="ANTHROPIC_API_KEY")],
depends_on=[experiment_env, bundle_env],
)
Supporting modules (train.py, prepare.py, tools.py, and ui.py) live alongside the entry point in the example directory.
Right-size experiments with call_handler
The right-sizing logic lives in tools.py. execute_with_right_sizing asks the LLM for a resource estimate, runs the underlying @env.task with override(resources=...), and loops on flyte.errors.OOMError or a sandbox-reported OOM flag until the run succeeds or retries are exhausted:
async def execute_with_right_sizing(
call_llm: LLMCallable,
target_task: Any,
*,
model: str,
tool_name: str,
description: str,
max_oom_retries: int = MAX_OOM_RETRIES,
**kwargs: Any,
) -> dict:
"""LLM-size *target_task*, run it, and retry with more memory on OOM."""
resources = await estimate_resources(call_llm, model, tool_name, description, kwargs)
attempt = 0
while True:
try:
with flyte.group(f"{tool_name}-attempt-{attempt + 1}"):
result = await target_task.override(resources=resources).aio(**kwargs)
except flyte.errors.OOMError:
if attempt >= max_oom_retries:
flyte.logger.error("%s Flyte OOM after %d retries; giving up.", tool_name, attempt)
raise
resources = bump_memory(resources)
attempt += 1
flyte.logger.warning(
"%s Flyte OOM; retrying with memory=%s",
tool_name,
resources.memory,
)
continue
if isinstance(result, dict):
result["resources"] = f"cpu={resources.cpu}, mem={resources.memory}"
result["oom_retries"] = attempt
if isinstance(result, dict) and result.get("oom"):
if attempt >= max_oom_retries:
return result
resources = bump_memory(resources)
attempt += 1
flyte.logger.warning(
"%s sandbox OOM; retrying with memory=%s",
tool_name,
resources.memory,
)
continue
return result
def right_sizing_handler(*, max_oom_retries: int = MAX_OOM_RETRIES):
"""Build a ``@tool`` ``call_handler`` that right-sizes and self-heals on OOM."""
async def handle(call_llm: LLMCallable, tool_fn: ToolFn, **kwargs: Any) -> Any:
return await execute_with_right_sizing(
call_llm,
tool_fn.target,
model=tool_fn.model,
tool_name=tool_fn.name,
description=tool_fn.description,
max_oom_retries=max_oom_retries,
**kwargs,
)
return handle
right_size = right_sizing_handler(max_oom_retries=MAX_OOM_RETRIES)
right_size is the pre-built handler passed to @tool(call_handler=...). The agent does not need a back-reference to the Agent instance — the harness passes call_llm and tool_fn.model into the handler on each invocation.
The experiment task stacks @tool(call_handler=tools.right_size) on @experiment_env.task. The task body only loads edited code and runs sandbox training; sizing and OOM recovery happen in the handler:
@tool(call_handler=tools.right_size)
@experiment_env.task
async def run_experiment(
title: str,
time_budget_sec: int = 45,
memory_key: str = tools.MEMORY_KEY_FANOUT,
) -> dict:
"""Train using agent-edited ``train.py`` with LLM right-sizing and OOM self-healing."""
train_py = await tools.load_train_code(memory_key, title)
config_overrides = await tools.load_config_overrides(memory_key, title)
duplicate = await tools.check_duplicate_config(memory_key, title, train_py, config_overrides)
if duplicate:
result = {
"success": False,
"title": title,
"error": (
f"Duplicate config of '{duplicate['duplicate_of']}' "
f"(signature {duplicate['config_signature']}); change train.py or overrides."
),
"duplicate_of": duplicate["duplicate_of"],
}
await tools.record_experiment_result(
memory_key,
result,
actor="parallelized-autoresearch",
)
return result
bundle = await build_bundle()
cache_dir = await materialize_cache(bundle)
result = await tools.run_train_in_sandbox(
cache_dir,
train_py,
title=title,
time_budget_sec=time_budget_sec,
config_overrides=config_overrides or None,
)
if result.get("success"):
await tools.record_promising_run(memory_key, title, result)
await tools.register_config_signature(
memory_key,
title,
train_py,
config_overrides,
actor="parallelized-autoresearch",
)
await tools.record_experiment_result(
memory_key,
result,
actor="parallelized-autoresearch",
)
return result
# ``flyte.map`` invokes ``run_experiment.aio`` directly (not through the agent
# registry), so bind the LLM callback and model here for ``call_handler`` right-sizing.
run_experiment = dataclasses.replace(
run_experiment,
call_llm=tools.call_llm,
model=MODEL,
)
Batch fan-out calls flyte.map.aio(run_experiment, ...) from run_experiment_batch. That path invokes run_experiment.aio() directly — not through the agent registry — so the example binds call_llm and model on the tool after construction (see the dataclasses.replace block above). With Flyte SDK ≥ 2.5.5, AgentTool.aio routes through call_handler, so every mapped experiment gets LLM right-sizing even when the agent only exposes run_experiment_batch in code mode.
The fan-out agent task
The driver task parallelized_autoresearch restores prior memory (default key parallelized-autoresearch), streams Activity / Leaderboard / Code edits / Memory report tabs, and runs the code-mode agent loop. The agent tool registry is trimmed to the batch workflow — run_experiment is internal to run_experiment_batch, not a sandbox function the LLM calls directly.
@agent_env.task(report=True)
async def parallelized_autoresearch(
n_experiments: int = 6,
num_shards: int = DEFAULT_NUM_SHARDS,
memory_key: str = tools.MEMORY_KEY_FANOUT,
batch_size: int = 3,
max_turns: int = DEFAULT_MAX_TURNS,
) -> AutoresearchOutput:
"""Drive the fan-out code-edit MLE agent with sandbox batch execution."""
bundle = await build_bundle(num_shards=num_shards)
profile = await profile_bundle(bundle)
memory = await MemoryStore.get_or_create.aio(key=memory_key)
persisted = await memory.read_json.aio("memory/leaderboard.json", default=[])
promising = await memory.read_json.aio("memory/promising_code.json", default=[])
history = await tools.load_research_history(memory_key)
flyte.logger.info(
"Fan-out agent restored %d messages, %d experiments, %d promising edits, best val_bpb=%s.",
len(memory.messages),
len(persisted),
len(promising),
history.get("best_val_bpb"),
)
events: list[dict[str, Any]] = []
async def on_event(ev) -> None:
events.append({"type": ev.type, "data": ev.data})
if ev.type in ("tool_start", "tool_end", "tool_error", "turn_start", "agent_end"):
tab = flyte.report.get_tab("Activity")
tab.replace(ui.render_activity_log(events))
await flyte.report.flush.aio()
if ev.type == "tool_end" and ev.data.get("tool") in (
"edit_train_code_batch",
"<sandbox>",
):
edits = await tools.load_saved_code_edits(memory_key)
if edits:
flyte.report.get_tab("Code edits").replace(ui.render_code_edits_panel(edits))
await flyte.report.flush.aio()
directive_text = ui.directive_code_edit_fanout(
n_experiments,
profile,
memory_key,
batch_size=batch_size,
history=history,
)
token = agent_progress_cb.set(on_event)
run_agent = build_fanout_agent(max_turns=max_turns)
try:
result = await run_agent.run.aio(directive_text, memory=memory)
finally:
agent_progress_cb.reset(token)
leaderboard, best = ui.parse_leaderboard(
memory.messages,
promising_fallback=promising,
)
leaderboard_dicts = [dataclasses.asdict(e) for e in leaderboard]
code_edits = await tools.load_saved_code_edits(memory_key)
tab_lb = flyte.report.get_tab("Leaderboard")
tab_lb.replace(ui.render_leaderboard(leaderboard, best))
flyte.report.get_tab("Code edits").replace(
ui.render_code_edits_panel(code_edits, best_title=best.title if best else None)
)
await memory.write_json.aio(
"memory/leaderboard.json",
leaderboard_dicts,
actor="parallelized-autoresearch",
reason=f"leaderboard after {len(leaderboard)} experiments",
)
await memory.save.aio()
audit = await memory.audit_tail(20)
hypotheses = await memory.read_json.aio("memory/hypotheses.json", default=[])
promising = await memory.read_json.aio("memory/promising_code.json", default=[])
tab_mem = flyte.report.get_tab("Memory")
tab_mem.replace(
ui.render_memory_panel(
memory_key,
len(memory.messages),
leaderboard_dicts,
audit,
hypotheses,
persisted_promising=promising,
code_edits=code_edits,
)
)
summary_body = result.summary or result.error or ""
if result.error and leaderboard:
best_line = f" Best val_bpb so far: {best.val_bpb} ({best.title})." if best and best.val_bpb else ""
summary_body = f"{result.error}{best_line}"
await flyte.report.replace.aio(
ui.render_summary(
directive_text,
leaderboard,
best,
summary_body,
code_edits=code_edits,
)
)
await flyte.report.flush.aio()
return AutoresearchOutput(
directive=directive_text,
dataset_profile=profile,
best=best,
leaderboard=leaderboard,
summary=summary_body,
memory_key=memory_key,
total_experiments=len(leaderboard),
)
Run the agent
Create secrets
Register an Anthropic API key for agent LLM calls and for per-experiment resource sizing inside call_handler:
flyte create secret internal-anthropic-api-key <YOUR_ANTHROPIC_API_KEY>Run remotely
From the example directory:
cd v2/tutorials/parallelized_autoresearch
uv run --script parallelized_autoresearch.py --n-experiments 6 --batch-size 3 --num-shards 1Use --memory-key to resume a prior research session (default: parallelized-autoresearch). Pass a unique key — for example parallelized-autoresearch-20260622-215057 — to start with empty memory. Code mode needs more turns than JSON tool mode — increase --max-turns for larger sweeps.
Or invoke the agent task directly with flyte run (snake_case task inputs):
flyte run parallelized_autoresearch.py parallelized_autoresearch \
--n_experiments 6 --batch_size 3 --num_shards 1 --max_turns 12 \
--memory_key parallelized-autoresearchThe first run downloads climbmix data shards and trains a BPE tokenizer. Subsequent runs reuse cached bundle tasks. Requires Flyte SDK ≥ 2.5.5 for call_handler support in code mode and on AgentTool.aio (used by flyte.map fan-out).
See also the single-task Autoresearch agent tutorial for the Claude Code + pull-request workflow.