Mini Agent 源码解析——4 Agent 的工作流程

Mini Agent 源码解析——4 Agent 的工作流程

导言

这一篇的主题是 Mini Agent 的工作流程。一个完整的执行周期中,Agent 依次经历:接收用户消息并加入对话历史、调用 LLM 推理、判断是否需要调用工具、执行工具并把结果注回上下文、继续下一轮推理,直到任务完成或达到步数上限。这些环节串联起来,就是 Mini Agent 从接收到返回的完整链路。

代码案例

通过 examples/04_full_agent.py,可以完整看到一个具备全部能力的 Agent 是如何初始化的,以及它处理多步骤任务时的典型执行模式。

1. Agent 的完整初始化

一个 Agent 运行时需要准备好几样东西:LLMClientsystem_prompttools 列表,以及 workspace_dir。这些并不是散落在各处的配置,而是由 Agent 类统一接收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
llm_client = LLMClient(
api_key=config.llm.api_key,
api_base=config.llm.api_base,
model=config.llm.model,
)

tools = [
ReadTool(workspace_dir=workspace_dir),
WriteTool(workspace_dir=workspace_dir),
EditTool(workspace_dir=workspace_dir),
BashTool(),
]

agent = Agent(
llm_client=llm_client,
system_prompt=system_prompt,
tools=tools,
max_steps=config.agent.max_steps,
workspace_dir=workspace_dir,
)

这里值得注意的不是某个单独配置项,而是整体结构的清晰度:LLMClient 负责模型通信,tools 列表负责把工具能力注入 Agent,workspace_dir 负责限定文件操作的范围,剩下的一切由 Agent.run() 内部自动调度。这种把关注点分离的做法让整个初始化过程非常容易理解。

2. 工具列表的组装顺序

示例中还有一个细节值得单独看:工具的注册顺序是基础工具先加入,Session Note 工具后加入,最后尝试加载 MCP 工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
tools = [
ReadTool(workspace_dir=workspace_dir),
WriteTool(workspace_dir=workspace_dir),
EditTool(workspace_dir=workspace_dir),
BashTool(),
]
print("✓ Loaded 4 basic tools")

memory_file = Path(workspace_dir) / ".agent_memory.json"
tools.extend(
[
SessionNoteTool(memory_file=str(memory_file)),
RecallNoteTool(memory_file=str(memory_file)),
]
)
print("✓ Loaded 2 Session Note tools")

try:
mcp_tools = await load_mcp_tools_async(config_path="mini_agent/config/mcp.json")
if mcp_tools:
tools.extend(mcp_tools)
print(f"✓ Loaded {len(mcp_tools)} MCP tools")
except Exception as e:
print(f"⚠️ MCP tools not loaded: {e}")

这个顺序并不是强制的,但它的逻辑很合理:先把核心能力注册进去,再补充记忆层,最后尝试扩展协议层。万一 MCP 加载失败,前面注册的基础工具不受影响,Agent 仍然可以正常运行。

3. 多步骤任务的执行

给 Agent 一个需要多工具配合的任务,最能体现工作流程的价值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
task = """
Please help me with the following tasks:

1. Create a Python script called 'calculator.py' that:
- Has functions for add, subtract, multiply, divide
- Has a main() function that demonstrates usage
- Includes proper docstrings and type hints

2. Create a README.md file that:
- Describes the calculator script
- Shows how to run it
- Lists the available functions

3. Test the calculator by running it with bash

4. Remember this project info:
- Project: Simple Calculator
- Language: Python
- Purpose: Demonstration of agent capabilities
"""

agent.add_user_message(task)
result = await agent.run()

注意这里调用的是 agent.run() 而不是手动写循环。这个方法内部会持续调度 LLM 和工具,直到任务完成或达到 max_steps 上限。对调用方来说,不需要关心「下一步该干什么」的问题,Agent 内部已经把这个决策逻辑封装好了。

4. 多轮对话的交互模式

除了单次任务,示例中还演示了一种更接近实际使用的交互方式:多轮对话。同一个 Agent 实例上可以多次调用 add_user_message() 并分别 run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
conversations = [
"Create a file called 'data.txt' with the numbers 1 to 5, one per line.",
"Now read the file and tell me what's in it.",
"Count how many lines are in the file using bash.",
]

for i, message in enumerate(conversations, 1):
print(f"\n{'=' * 60}")
print(f"Turn {i}:")
print(f"{'=' * 60}")
print(f"User: {message}\n")

agent.add_user_message(message)

try:
result = await agent.run()
print(f"Agent: {result}\n")
except Exception as e:
print(f"Error: {e}")
break

这说明 Agent 实例本身是有状态的:每一次 add_user_message() 都会把新消息追加到内部对话历史,run() 则在这段历史的基础上继续推理。这个模式非常适合需要「先做一件事、再基于结果做另一件事」的场景。

源码解析

mini_agent/agent.py 是整个工作流程的核心实现所在。Agent 类的 run() 方法封装了完整的执行循环,但这个方法并不是孤立运转的,它依赖几个配合机制:消息历史管理、Token 配额控制、工具分发与结果回填、以及取消信号检查。下面逐层拆解。

1. 初始化:把工具注册为字典

Agent.__init__ 并不只是做属性赋值,它实际上完成了整个运行时的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(
self,
llm_client: LLMClient,
system_prompt: str,
tools: list[Tool],
max_steps: int = 50,
workspace_dir: str = "./workspace",
token_limit: int = 80000,
):
self.llm = llm_client
self.tools = {tool.name: tool for tool in tools}
self.max_steps = max_steps
self.token_limit = token_limit
self.workspace_dir = Path(workspace_dir)

self.workspace_dir.mkdir(parents=True, exist_ok=True)

# 绑定键盘事件以停止运行
self.cancel_event: Optional[asyncio.Event] = None

# Inject workspace information into system prompt if not already present
if "Current Workspace" not in system_prompt:
workspace_info = f"\n\n## Current Workspace\nYou are currently working in: `{self.workspace_dir.absolute()}`\nAll relative paths will be resolved relative to this directory."
system_prompt = system_prompt + workspace_info

self.system_prompt = system_prompt

self.messages: list[Message] = [Message(role="system", content=system_prompt)]

self.logger = AgentLogger()

self.api_total_tokens: int = 0
# Flag to skip token check right after summary (avoid consecutive triggers)
self._skip_next_token_check: bool = False

这里有一个值得注意的转换:传入的是一个 list[Tool],但实际存储的是 {tool.name: tool} 字典。这个设计让工具查找从 O(n) 降到了 O(1),在执行循环中每次根据函数名查找工具时可以更快定位。

另外,初始化阶段会把 system prompt 包装成一条 role="system" 的消息,加入 self.messages 列表,作为对话历史的起点。后续所有用户消息和 Agent 响应都追加到这个列表里,形成完整的上下文链。

2. 主循环:run() 的执行骨架

run() 是整个 Agent 的入口,内部是一个标准的 while 循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async def run(self, cancel_event: Optional[asyncio.Event] = None) -> str:
if cancel_event is not None:
self.cancel_event = cancel_event

self.logger.start_new_run()
step = 0

while step < self.max_steps:
if self._check_cancelled():
self._cleanup_incomplete_messages()
return "Task cancelled by user."

await self._summarize_messages()

tool_list = list(self.tools.values())
response = await self.llm.generate(messages=self.messages, tools=tool_list)

self.messages.append(Message(
role="assistant",
content=response.content,
thinking=response.thinking,
tool_calls=response.tool_calls,
))

if not response.tool_calls:
return response.content

for tool_call in response.tool_calls:
result = await self.tools[tool_call.function.name].execute(**tool_call.function.arguments)
self.messages.append(Message(
role="tool",
content=result.content if result.success else f"Error: {result.error}",
tool_call_id=tool_call.id,
name=tool_call.function.name,
))

step += 1

return f"Task couldn't be completed after {self.max_steps} steps."

从结构上看,循环内只做了四件事:检查取消状态、调用 LLM、记录响应到历史、分发工具调用。循环何时终止的条件有三个:

  • LLM 不再发起工具调用,说明任务已完成,直接返回
  • 达到 max_steps 上限,防止无限循环
  • 收到取消信号,安全退出并清理未完成的消息

3. 工具分发:如何根据函数名找到对应工具

工具调用的入口在 run() 的这段逻辑里:

1
2
3
4
5
6
7
8
9
for tool_call in response.tool_calls:
function_name = tool_call.function.name
arguments = tool_call.function.arguments

if function_name not in self.tools:
result = ToolResult(success=False, error=f"Unknown tool: {function_name}")
else:
tool = self.tools[function_name]
result = await tool.execute(**arguments)

关键在于 response.tool_calls 的结构。它来自 LLM 的响应,包含了模型决定调用的工具名称和入参。框架负责把这个信息映射到具体工具对象上,再通过 execute() 方法真正执行。

执行完成后,工具结果以 role="tool" 的消息形式追加到 self.messages 里,这样下一轮 LLM 调用时就能看到这次工具的返回值,从而决定下一步动作。

4. Token 管理:超过上限时如何压缩历史

随着工具调用次数增加,self.messages 会不断膨胀。_summarize_messages() 负责在每轮循环开始前检查是否超出 token_limit,超限时触发一次摘要压缩:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def _summarize_messages(self):
# 如果刚刚压缩完,就不压缩了,等下一次 LLM Response
if self._skip_next_token_check:
self._skip_next_token_check = False
return

estimated_tokens = self._estimate_tokens()
should_summarize = estimated_tokens > self.token_limit or self.api_total_tokens > self.token_limit

if not should_summarize:
return

user_indices = [i for i, msg in enumerate(self.messages) if msg.role == "user" and i > 0]

# 只对话了一轮,不压缩
if len(user_indices) < 1:
print(f"{Colors.BRIGHT_YELLOW}⚠️ Insufficient messages, cannot summarize{Colors.RESET}")
return

# Summarization logic:
# - 保留 system prompt
# - 保留所有 user message(用户意图)
# - 把相邻 user 之间的执行过程合并成一段 summary
# - 如果最后一轮还在执行中(有 agent/tool 消息但没有下一个 user),也一起摘要

new_messages = [self.messages[0]]
summary_count = 0
for i, user_idx in enumerate(user_indices):
# 添加用户消息
new_messages.append(self.messages[user_idx])

# 确定压缩区间
# 如果不是最后一个 user,则压缩两个 user 之间的 message
# 如果是最后一个 user,则压缩后面所有 llm message
if i < len(user_indices) - 1:
next_user_idx = user_indices[i + 1]
else:
next_user_idx = len(self.messages)

execution_messages = self.messages[user_idx + 1 : next_user_idx]

# If there are execution messages in this round, summarize them
if execution_messages:
summary_text = await self._create_summary(execution_messages, i + 1)
if summary_text:
summary_message = Message(
role="user",
content=f"[Assistant Execution Summary]\n\n{summary_text}",
)
new_messages.append(summary_message)
summary_count += 1

self.messages = new_messages

# 打个标记,下一轮不要直接压缩了
self._skip_next_token_check = True

摘要策略很有意思:不是简单地截断前面部分的历史,而是把「用户意图」保留下来,把「执行过程」压缩成摘要。这样模型在后续推理时仍然知道用户最初想要什么,但不需要重复阅读中间所有步骤的细节。

5. 取消机制:如何在安全点中断执行

cancel_event 允许外部通过设置一个 asyncio.Event 来中断 Agent:

1
2
3
4
def _check_cancelled(self) -> bool:
if self.cancel_event is not None and self.cancel_event.is_set():
return True
return False

检查点分布在循环开头、工具执行前后等多个位置。收到取消信号时,代码不会直接抛出异常,而是先调用 _cleanup_incomplete_messages() 把当前步骤产生的部分消息清理掉,再返回取消提示。这样做的好处是消息历史不会被污染,下一次重新 run() 时不会带上上一次未完成的残片。

具体来看 _cleanup_incomplete_messages() 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
def _cleanup_incomplete_messages(self):
last_assistant_idx = -1
for i in range(len(self.messages) - 1, -1, -1):
if self.messages[i].role == "assistant":
last_assistant_idx = i
break

if last_assistant_idx == -1:
return

removed_count = len(self.messages) - last_assistant_idx
if removed_count > 0:
self.messages = self.messages[:last_assistant_idx]

这个函数的设计意图非常清晰:只保留到上一个「已完成」的 assistant 消息,把本轮正在执行中的所有消息都删掉

为什么要这样?考虑一个具体的取消场景:模型已经返回了一个 tool_calls 请求,Agent 开始逐个执行工具调用。在执行到第三个工具时,用户按下了Esc 键触发了取消。这时候 self.messages 里已经有:

1
2
3
4
5
6
system
user
assistant (带 tool_calls)
tool (工具 1 的结果)
tool (工具 2 的结果)
... 工具 3 正在执行中,取消信号来了 ...

如果直接就这样停下,下次 run() 时模型会看到不完整的执行链——它还记得自己发起了三个工具调用,但历史里只有两个工具的返回值,第三个工具的结果不知所踪。这会让模型陷入困惑,不知道该继续还是重试。

_cleanup_incomplete_messages() 的做法是:找到最后一个 role="assistant" 的消息,把这条消息之后的所有内容都删掉。在上面的例子里,删除线部分就是会被清理掉的。这样留下来的消息历史是干净的——模型知道的是「我发起过工具调用,但还没全部执行完就被中断了」,而不是「我有三个调用但只有两个结果」。

清理完成后,run() 返回取消提示,用户可以重新发起请求。这一次 run() 的上下文里不会带上任何残片,Agent 可以从头开始或接着之前的进度继续工作。

6. 工作流程的整体脉络

把上面的细节串起来,可以更清晰地看到整个执行链路:

  1. run() 开始,初始化日志和循环计数器
  2. 进入 while 循环,检查取消信号和 Token 上限
  3. 调用 llm.generate(),把当前完整的消息历史发送给模型
  4. 模型返回,可能是纯文本响应,也可能是工具调用请求
  5. 如果有工具调用,遍历每个调用、查找工具、执行、结果写回消息历史
  6. 本轮结束,计数器加一,回到循环开头继续
  7. 如果模型返回纯文本,说明任务完成,直接退出循环并返回
  8. 如果达到 max_steps,退出循环并提示上限

整个流程最核心的设计点在于:消息历史是循环的载体,LLM 是决策中心,工具是执行单元,取消和摘要机制是安全网。每个组件各司其职,组合在一起就成了一个可以稳定运转的 Agent 系统。

总结

这一篇围绕 Mini Agent 的工作流程展开,从代码示例到源码实现完整梳理了一遍。

在示例层面,examples/04_full_agent.py 展示了 Agent 的组装方式:LLMClient、system_prompt、tools 列表、workspace_dir,四个要素缺一不可。工具按层次注册——基础工具、记忆工具、MCP 工具——而不是一股脑全部塞进去,这样的设计让工具体系的扩展思路变得很清晰。

在源码层面,mini_agent/agent.py 是整个系统的核心。run() 方法用 while 循环驱动整个执行周期,每一轮都要依次经历:取消检查、Token 超限判断、LLM 推理、工具分发、结果写回消息历史。循环何时退出由三个条件共同决定:模型不再发起工具调用、达到步数上限、收到外部取消信号。

消息历史是整个运行时的骨架。所有内容——system prompt、用户指令、模型响应、工具返回值——都积累在 self.messages 里,每轮 LLM 调用都是把这份完整历史整体发送出去。Token 配额管理和摘要压缩机制保证了历史不会无限膨胀,取消机制和 _cleanup_incomplete_messages() 则保证了中断后消息历史的干净恢复。


Mini Agent 源码解析——4 Agent 的工作流程
https://onlyar.site/2026/04/15/MiniMax-Agent-Guide-4/
作者
Only(AR)
发布于
2026年4月15日
许可协议