Mini Agent 源码解析——4
Agent 的工作流程
导言
这一篇的主题是 Mini Agent 的工作流程。一个完整的执行周期中,Agent
依次经历:接收用户消息并加入对话历史、调用 LLM
推理、判断是否需要调用工具、执行工具并把结果注回上下文、继续下一轮推理,直到任务完成或达到步数上限。这些环节串联起来,就是
Mini Agent 从接收到返回的完整链路。
代码案例
通过
examples/04_full_agent.py,可以完整看到一个具备全部能力的
Agent 是如何初始化的,以及它处理多步骤任务时的典型执行模式。
1. Agent 的完整初始化
一个 Agent
运行时需要准备好几样东西:LLMClient、system_prompt、tools
列表,以及
workspace_dir。这些并不是散落在各处的配置,而是由
Agent 类统一接收:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 llm_client = LLMClient( api_key=config.llm.api_key, api_base=config.llm.api_base, model=config.llm.model, ) tools = [ ReadTool(workspace_dir=workspace_dir), WriteTool(workspace_dir=workspace_dir), EditTool(workspace_dir=workspace_dir), BashTool(), ] agent = Agent( llm_client=llm_client, system_prompt=system_prompt, tools=tools, max_steps=config.agent.max_steps, workspace_dir=workspace_dir, )
这里值得注意的不是某个单独配置项,而是整体结构的清晰度:LLMClient
负责模型通信,tools 列表负责把工具能力注入
Agent,workspace_dir 负责限定文件操作的范围,剩下的一切由
Agent.run()
内部自动调度。这种把关注点分离的做法让整个初始化过程非常容易理解。
2. 工具列表的组装顺序
示例中还有一个细节值得单独看:工具的注册顺序是基础工具先加入,Session
Note 工具后加入,最后尝试加载 MCP 工具:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 tools = [ ReadTool(workspace_dir=workspace_dir), WriteTool(workspace_dir=workspace_dir), EditTool(workspace_dir=workspace_dir), BashTool(), ]print ("✓ Loaded 4 basic tools" ) memory_file = Path(workspace_dir) / ".agent_memory.json" tools.extend( [ SessionNoteTool(memory_file=str (memory_file)), RecallNoteTool(memory_file=str (memory_file)), ] )print ("✓ Loaded 2 Session Note tools" )try : mcp_tools = await load_mcp_tools_async(config_path="mini_agent/config/mcp.json" ) if mcp_tools: tools.extend(mcp_tools) print (f"✓ Loaded {len (mcp_tools)} MCP tools" )except Exception as e: print (f"⚠️ MCP tools not loaded: {e} " )
这个顺序并不是强制的,但它的逻辑很合理:先把核心能力注册进去,再补充记忆层,最后尝试扩展协议层。万一
MCP 加载失败,前面注册的基础工具不受影响,Agent 仍然可以正常运行。
3. 多步骤任务的执行
给 Agent 一个需要多工具配合的任务,最能体现工作流程的价值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 task = """ Please help me with the following tasks: 1. Create a Python script called 'calculator.py' that: - Has functions for add, subtract, multiply, divide - Has a main() function that demonstrates usage - Includes proper docstrings and type hints 2. Create a README.md file that: - Describes the calculator script - Shows how to run it - Lists the available functions 3. Test the calculator by running it with bash 4. Remember this project info: - Project: Simple Calculator - Language: Python - Purpose: Demonstration of agent capabilities """ agent.add_user_message(task) result = await agent.run()
注意这里调用的是 agent.run()
而不是手动写循环。这个方法内部会持续调度 LLM 和工具,直到任务完成或达到
max_steps
上限。对调用方来说,不需要关心「下一步该干什么」的问题,Agent
内部已经把这个决策逻辑封装好了。
4. 多轮对话的交互模式
除了单次任务,示例中还演示了一种更接近实际使用的交互方式:多轮对话。同一个
Agent 实例上可以多次调用 add_user_message()
并分别 run():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 conversations = [ "Create a file called 'data.txt' with the numbers 1 to 5, one per line." , "Now read the file and tell me what's in it." , "Count how many lines are in the file using bash." , ]for i, message in enumerate (conversations, 1 ): print (f"\n{'=' * 60 } " ) print (f"Turn {i} :" ) print (f"{'=' * 60 } " ) print (f"User: {message} \n" ) agent.add_user_message(message) try : result = await agent.run() print (f"Agent: {result} \n" ) except Exception as e: print (f"Error: {e} " ) break
这说明 Agent 实例本身是有状态的:每一次
add_user_message()
都会把新消息追加到内部对话历史,run()
则在这段历史的基础上继续推理。这个模式非常适合需要「先做一件事、再基于结果做另一件事」的场景。
源码解析
mini_agent/agent.py
是整个工作流程的核心实现所在。Agent 类的 run()
方法封装了完整的执行循环,但这个方法并不是孤立运转的,它依赖几个配合机制:消息历史管理、Token
配额控制、工具分发与结果回填、以及取消信号检查。下面逐层拆解。
1. 初始化:把工具注册为字典
Agent.__init__
并不只是做属性赋值,它实际上完成了整个运行时的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def __init__ ( self, llm_client: LLMClient, system_prompt: str , tools: list [Tool], max_steps: int = 50 , workspace_dir: str = "./workspace" , token_limit: int = 80000 , ): self .llm = llm_client self .tools = {tool.name: tool for tool in tools} self .max_steps = max_steps self .token_limit = token_limit self .workspace_dir = Path(workspace_dir) self .workspace_dir.mkdir(parents=True , exist_ok=True ) self .cancel_event: Optional [asyncio.Event] = None if "Current Workspace" not in system_prompt: workspace_info = f"\n\n## Current Workspace\nYou are currently working in: `{self.workspace_dir.absolute()} `\nAll relative paths will be resolved relative to this directory." system_prompt = system_prompt + workspace_info self .system_prompt = system_prompt self .messages: list [Message] = [Message(role="system" , content=system_prompt)] self .logger = AgentLogger() self .api_total_tokens: int = 0 self ._skip_next_token_check: bool = False
这里有一个值得注意的转换:传入的是一个
list[Tool],但实际存储的是 {tool.name: tool}
字典。这个设计让工具查找从 O(n) 降到了
O(1),在执行循环中每次根据函数名查找工具时可以更快定位。
另外,初始化阶段会把 system prompt 包装成一条
role="system" 的消息,加入 self.messages
列表,作为对话历史的起点。后续所有用户消息和 Agent
响应都追加到这个列表里,形成完整的上下文链。
2. 主循环:run() 的执行骨架
run() 是整个 Agent 的入口,内部是一个标准的 while
循环:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 async def run (self, cancel_event: Optional [asyncio.Event] = None ) -> str : if cancel_event is not None : self .cancel_event = cancel_event self .logger.start_new_run() step = 0 while step < self .max_steps: if self ._check_cancelled(): self ._cleanup_incomplete_messages() return "Task cancelled by user." await self ._summarize_messages() tool_list = list (self .tools.values()) response = await self .llm.generate(messages=self .messages, tools=tool_list) self .messages.append(Message( role="assistant" , content=response.content, thinking=response.thinking, tool_calls=response.tool_calls, )) if not response.tool_calls: return response.content for tool_call in response.tool_calls: result = await self .tools[tool_call.function.name].execute(**tool_call.function.arguments) self .messages.append(Message( role="tool" , content=result.content if result.success else f"Error: {result.error} " , tool_call_id=tool_call.id , name=tool_call.function.name, )) step += 1 return f"Task couldn't be completed after {self.max_steps} steps."
从结构上看,循环内只做了四件事:检查取消状态、调用
LLM、记录响应到历史、分发工具调用。循环何时终止的条件有三个:
LLM 不再发起工具调用 ,说明任务已完成,直接返回
达到 max_steps 上限 ,防止无限循环
收到取消信号 ,安全退出并清理未完成的消息
3.
工具分发:如何根据函数名找到对应工具
工具调用的入口在 run() 的这段逻辑里:
1 2 3 4 5 6 7 8 9 for tool_call in response.tool_calls: function_name = tool_call.function.name arguments = tool_call.function.arguments if function_name not in self .tools: result = ToolResult(success=False , error=f"Unknown tool: {function_name} " ) else : tool = self .tools[function_name] result = await tool.execute(**arguments)
关键在于 response.tool_calls 的结构。它来自 LLM
的响应,包含了模型决定调用的工具名称和入参。框架负责把这个信息映射到具体工具对象上,再通过
execute() 方法真正执行。
执行完成后,工具结果以 role="tool" 的消息形式追加到
self.messages 里,这样下一轮 LLM
调用时就能看到这次工具的返回值,从而决定下一步动作。
4. Token
管理:超过上限时如何压缩历史
随着工具调用次数增加,self.messages
会不断膨胀。_summarize_messages()
负责在每轮循环开始前检查是否超出
token_limit,超限时触发一次摘要压缩:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 async def _summarize_messages (self ): if self ._skip_next_token_check: self ._skip_next_token_check = False return estimated_tokens = self ._estimate_tokens() should_summarize = estimated_tokens > self .token_limit or self .api_total_tokens > self .token_limit if not should_summarize: return user_indices = [i for i, msg in enumerate (self .messages) if msg.role == "user" and i > 0 ] if len (user_indices) < 1 : print (f"{Colors.BRIGHT_YELLOW} ⚠️ Insufficient messages, cannot summarize{Colors.RESET} " ) return new_messages = [self .messages[0 ]] summary_count = 0 for i, user_idx in enumerate (user_indices): new_messages.append(self .messages[user_idx]) if i < len (user_indices) - 1 : next_user_idx = user_indices[i + 1 ] else : next_user_idx = len (self .messages) execution_messages = self .messages[user_idx + 1 : next_user_idx] if execution_messages: summary_text = await self ._create_summary(execution_messages, i + 1 ) if summary_text: summary_message = Message( role="user" , content=f"[Assistant Execution Summary]\n\n{summary_text} " , ) new_messages.append(summary_message) summary_count += 1 self .messages = new_messages self ._skip_next_token_check = True
摘要策略很有意思:不是简单地截断前面部分的历史,而是把「用户意图」保留下来,把「执行过程」压缩成摘要。这样模型在后续推理时仍然知道用户最初想要什么,但不需要重复阅读中间所有步骤的细节。
5.
取消机制:如何在安全点中断执行
cancel_event 允许外部通过设置一个
asyncio.Event 来中断 Agent:
1 2 3 4 def _check_cancelled (self ) -> bool : if self .cancel_event is not None and self .cancel_event.is_set(): return True return False
检查点分布在循环开头、工具执行前后等多个位置。收到取消信号时,代码不会直接抛出异常,而是先调用
_cleanup_incomplete_messages()
把当前步骤产生的部分消息清理掉,再返回取消提示。这样做的好处是消息历史不会被污染,下一次重新
run() 时不会带上上一次未完成的残片。
具体来看 _cleanup_incomplete_messages() 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 def _cleanup_incomplete_messages (self ): last_assistant_idx = -1 for i in range (len (self .messages) - 1 , -1 , -1 ): if self .messages[i].role == "assistant" : last_assistant_idx = i break if last_assistant_idx == -1 : return removed_count = len (self .messages) - last_assistant_idx if removed_count > 0 : self .messages = self .messages[:last_assistant_idx]
这个函数的设计意图非常清晰:只保留到上一个「已完成」的
assistant 消息,把本轮正在执行中的所有消息都删掉 。
为什么要这样?考虑一个具体的取消场景:模型已经返回了一个
tool_calls 请求,Agent
开始逐个执行工具调用。在执行到第三个工具时,用户按下了Esc
键触发了取消。这时候 self.messages 里已经有:
1 2 3 4 5 6 systemuser assistant (带 tool_calls) tool (工具 1 的结果) tool (工具 2 的结果) ... 工具 3 正在执行中,取消信号来了 ...
如果直接就这样停下,下次 run()
时模型会看到不完整的执行链——它还记得自己发起了三个工具调用,但历史里只有两个工具的返回值,第三个工具的结果不知所踪。这会让模型陷入困惑,不知道该继续还是重试。
_cleanup_incomplete_messages() 的做法是:找到最后一个
role="assistant"
的消息,把这条消息之后的所有内容都删掉。在上面的例子里,删除线部分就是会被清理掉的。这样留下来的消息历史是干净的——模型知道的是「我发起过工具调用,但还没全部执行完就被中断了」,而不是「我有三个调用但只有两个结果」。
清理完成后,run()
返回取消提示,用户可以重新发起请求。这一次 run()
的上下文里不会带上任何残片,Agent
可以从头开始或接着之前的进度继续工作。
6. 工作流程的整体脉络
把上面的细节串起来,可以更清晰地看到整个执行链路:
run()
开始 ,初始化日志和循环计数器
进入 while 循环 ,检查取消信号和 Token 上限
调用
llm.generate(),把当前完整的消息历史发送给模型
模型返回 ,可能是纯文本响应,也可能是工具调用请求
如果有工具调用 ,遍历每个调用、查找工具、执行、结果写回消息历史
本轮结束 ,计数器加一,回到循环开头继续
如果模型返回纯文本 ,说明任务完成,直接退出循环并返回
如果达到
max_steps ,退出循环并提示上限
整个流程最核心的设计点在于:消息历史是循环的载体,LLM
是决策中心,工具是执行单元,取消和摘要机制是安全网 。每个组件各司其职,组合在一起就成了一个可以稳定运转的
Agent 系统。
总结
这一篇围绕 Mini Agent
的工作流程展开,从代码示例到源码实现完整梳理了一遍。
在示例层面,examples/04_full_agent.py 展示了 Agent
的组装方式:LLMClient、system_prompt、tools
列表、workspace_dir,四个要素缺一不可。工具按层次注册——基础工具、记忆工具、MCP
工具——而不是一股脑全部塞进去,这样的设计让工具体系的扩展思路变得很清晰。
在源码层面,mini_agent/agent.py
是整个系统的核心。run() 方法用 while
循环驱动整个执行周期,每一轮都要依次经历:取消检查、Token 超限判断、LLM
推理、工具分发、结果写回消息历史。循环何时退出由三个条件共同决定:模型不再发起工具调用、达到步数上限、收到外部取消信号。
消息历史是整个运行时的骨架。所有内容——system
prompt、用户指令、模型响应、工具返回值——都积累在
self.messages 里,每轮 LLM
调用都是把这份完整历史整体发送出去。Token
配额管理和摘要压缩机制保证了历史不会无限膨胀,取消机制和
_cleanup_incomplete_messages()
则保证了中断后消息历史的干净恢复。