Search
Search docs, blog posts, and ecosystem packages with citations.
Enter a query to see grounded citations.
We can't find the internet
Attempting to reconnect
Search docs, blog posts, and ecosystem packages with citations.
Build a memory-backed agent that turns one goal into a task list and works through it step by step.
Complete Build your first workflow before starting. This notebook is about stateful task execution, so the first pass stays local and deterministic.
Mix.install([
{:jido, "~> 2.1"}
])
Logger.configure(level: :warning)
# Livebook imports can execute generated docs as doctests.
# Disable compiler docs until the current Jido Hex release drops the invalid signal_types/0 example.
Code.put_compiler_option(:docs, false)
This tutorial runs entirely locally. The main path focuses on task state, memory, and resume behavior. A later section shows where AI planning can plug into the same pattern.
case Jido.start() do
{:ok, _} -> :ok
{:error, {:already_started, _}} -> :ok
end
runtime = Jido.default_instance()
Jido.Memory.Agent stores task data under the reserved :__memory__ key in agent state. This helper gives the notebook one place to read and write the task list.
defmodule MyApp.TaskMemory do
alias Jido.Memory.Agent, as: MemoryAgent
def ensure(agent) do
agent
|> MemoryAgent.ensure()
|> MemoryAgent.ensure_space(:tasks, [])
end
def tasks(agent) do
agent = ensure(agent)
case MemoryAgent.space(agent, :tasks) do
%{data: tasks} when is_list(tasks) -> tasks
_ -> []
end
end
def put_tasks(agent, tasks) do
agent = ensure(agent)
agent
|> MemoryAgent.update_space(:tasks, fn space -> %{space | data: tasks} end)
end
def summary(tasks) do
%{
total: length(tasks),
pending: Enum.count(tasks, &(&1.status == :pending)),
done: Enum.count(tasks, &(&1.status == :done))
}
end
end
The agent only needs two domain actions:
goal.plan creates the initial task list task.run_next executes exactly one pending task defmodule MyApp.PlanGoalAction do
use Jido.Action,
name: "plan_goal",
schema: [
goal: [type: :string, required: true]
]
def run(%{goal: goal}, context) do
tasks = build_tasks(goal)
agent = MyApp.TaskAgent.new(state: context.state)
agent = MyApp.TaskMemory.put_tasks(agent, tasks)
{:ok,
%{
goal: goal,
status: :planned,
current_task_id: nil,
last_result: nil,
planned_count: length(tasks),
summary: MyApp.TaskMemory.summary(tasks),
__memory__: agent.state.__memory__
}}
end
defp build_tasks(goal) do
[
%{
id: "scope",
title: "Clarify scope",
description: "Define the deliverable and audience for #{goal}",
status: :pending
},
%{
id: "draft",
title: "Create first draft",
description: "Produce the first working output for #{goal}",
status: :pending
},
%{
id: "review",
title: "Review and finalize",
description: "Check gaps, polish the output, and finalize #{goal}",
status: :pending
}
]
end
end
defmodule MyApp.RunNextTaskAction do
use Jido.Action,
name: "run_next_task",
schema: []
def run(_params, context) do
agent = MyApp.TaskAgent.new(state: context.state)
tasks = MyApp.TaskMemory.tasks(agent)
case Enum.find(tasks, &(&1.status == :pending)) do
nil ->
{:ok,
%{
status: :complete,
current_task_id: nil,
last_result: "All tasks are complete.",
summary: MyApp.TaskMemory.summary(tasks)
}}
task ->
result = execute_task(task, context.state.goal)
updated_tasks =
Enum.map(tasks, fn current ->
if current.id == task.id do
Map.merge(current, %{status: :done, result: result})
else
current
end
end)
agent = MyApp.TaskMemory.put_tasks(agent, updated_tasks)
summary = MyApp.TaskMemory.summary(updated_tasks)
status = if summary.pending == 0, do: :complete, else: :working
{:ok,
%{
status: status,
current_task_id: task.id,
last_result: result,
summary: summary,
__memory__: agent.state.__memory__
}}
end
end
defp execute_task(task, goal) do
case task.id do
"scope" ->
"Scope defined for #{goal}. Audience, deliverable, and success criteria are now explicit."
"draft" ->
"First draft prepared for #{goal}. The core structure exists and is ready for review."
"review" ->
"Review completed for #{goal}. Final polish and delivery notes are recorded."
end
end
end
The task list itself lives in Memory. The visible schema only tracks high-level progress.
defmodule MyApp.TaskAgent do
use Jido.Agent,
name: "task_agent",
description: "Plans a goal into tasks and executes them one step at a time",
schema: [
goal: [type: :string, default: nil],
status: [type: :atom, default: :idle],
current_task_id: [type: :string, default: nil],
last_result: [type: :string, default: nil],
planned_count: [type: :integer, default: 0],
summary: [type: :map, default: %{total: 0, pending: 0, done: 0}]
]
def signal_routes(_ctx) do
[
{"goal.plan", MyApp.PlanGoalAction},
{"task.run_next", MyApp.RunNextTaskAction}
]
end
end
defmodule MyApp.TaskHelpers do
def send_signal(pid, type, data \\ %{}) do
signal = Jido.Signal.new!(type, data, source: "/livebook")
Jido.AgentServer.call(pid, signal)
end
def snapshot(pid) do
{:ok, server_state} = Jido.AgentServer.state(pid)
%{
state: server_state.agent.state,
tasks: MyApp.TaskMemory.tasks(server_state.agent)
}
end
def run_until_complete(pid, max_steps \\ 10) do
Enum.reduce_while(1..max_steps, snapshot(pid), fn _, _acc ->
current = snapshot(pid)
if current.state.status == :complete do
{:halt, current}
else
{:ok, _agent} = send_signal(pid, "task.run_next")
{:cont, snapshot(pid)}
end
end)
end
end
Start the agent and create the task list.
task_agent_id = "task-agent-#{System.unique_integer([:positive])}"
{:ok, pid} =
Jido.start_agent(
runtime,
MyApp.TaskAgent,
id: task_agent_id
)
{:ok, _agent} =
MyApp.TaskHelpers.send_signal(
pid,
"goal.plan",
%{goal: "Write a README for an Elixir HTTP client library called Fetch"}
)
plan_snapshot = MyApp.TaskHelpers.snapshot(pid)
IO.inspect(plan_snapshot.state.summary, label: "Summary")
Enum.each(plan_snapshot.tasks, fn task ->
IO.inspect({task.id, task.title, task.status}, label: "Task")
end)
You should see three pending tasks in memory.
Advance exactly one task so the state transition is easy to inspect.
{:ok, _agent} = MyApp.TaskHelpers.send_signal(pid, "task.run_next")
one_step_snapshot = MyApp.TaskHelpers.snapshot(pid)
IO.inspect(one_step_snapshot.state.status, label: "Agent status")
IO.inspect(one_step_snapshot.state.last_result, label: "Last result")
Enum.each(one_step_snapshot.tasks, fn task ->
IO.inspect({task.id, task.status, Map.get(task, :result)}, label: "Task state")
end)
At this point, one task is :done and the rest remain :pending. That is the key resume property: you can stop here and continue later without rebuilding the plan.
final_snapshot = MyApp.TaskHelpers.run_until_complete(pid)
IO.inspect(final_snapshot.state.summary, label: "Final summary")
Enum.each(final_snapshot.tasks, fn task ->
IO.inspect({task.id, task.status, task.result}, label: "Completed task")
end)
The agent reuses the task list already stored in Memory, so each task.run_next call only needs the current agent state.
The runnable path above keeps planning deterministic so the task loop is easy to understand. In a production AI agent, the most common upgrade is to replace build_tasks/1 with model-backed decomposition and keep the rest of the state machine intact.
For example, you can generate the initial task list with a planning action:
{:ok, result} =
Jido.Exec.run(Jido.AI.Actions.Planning.Decompose, %{
goal: "Write a README for an Elixir HTTP client library called Fetch",
max_depth: 2,
model: "openai:gpt-4o-mini"
})
Or expose task actions as tools inside a Jido.AI.Agent so the model decides when to advance:
defmodule MyApp.AITaskAgent do
use Jido.AI.Agent,
name: "ai_task_agent",
tools: [MyApp.PlanGoalAction, MyApp.RunNextTaskAction]
end
The important part is the separation of concerns: