import asyncio import os from typing import Optional from app.exceptions import ToolError from app.tools.base import BaseTool, CLIResult _BASH_DESCRIPTION = """在终端中执行 bash 命令。 * 长时间运行的命令:对于可能无限期运行的命令,应该在后台运行并将输出重定向到文件,例如 command = `python3 app.py > server.log 2>&1 &`。 * 交互式:如果 bash 命令返回退出代码 `-1`,这意味着进程尚未完成。助手必须向终端发送第二次调用,使用空的 `command`(这将检索任何额外的日志),或者它可以向正在运行的进程的 STDIN 发送附加文本(将 `command` 设置为文本),或者它可以发送 command=`ctrl+c` 来中断进程。 * 超时:如果命令执行结果说 "Command timed out. Sending SIGINT to the process",助手应该重试在后台运行该命令。 """ class _BashSession: """bash shell 的会话。""" _started: bool _process: asyncio.subprocess.Process command: str = "/bin/bash" _output_delay: float = 0.2 # 秒 _timeout: float = 120.0 # 秒 _sentinel: str = "<>" def __init__(self): self._started = False self._timed_out = False async def start(self): if self._started: return self._process = await asyncio.create_subprocess_shell( self.command, preexec_fn=os.setsid, shell=True, bufsize=0, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._started = True def stop(self): """终止 bash shell。""" if not self._started: raise ToolError("Session has not started.") if self._process.returncode is not None: return self._process.terminate() async def run(self, command: str): """在 bash shell 中执行命令。""" if not self._started: raise ToolError("Session has not started.") if self._process.returncode is not None: return CLIResult( system="tool must be restarted", error=f"bash has exited with returncode {self._process.returncode}", ) if self._timed_out: raise ToolError( f"timed out: bash has not returned in {self._timeout} seconds and must be restarted", ) # 我们知道这些不是 None,因为我们使用 PIPEs 创建了进程 assert self._process.stdin assert self._process.stdout assert self._process.stderr # 向进程发送命令 self._process.stdin.write( command.encode() + f"; echo '{self._sentinel}'\n".encode() ) await self._process.stdin.drain() # 从进程读取输出,直到找到标记 try: async with asyncio.timeout(self._timeout): while True: await asyncio.sleep(self._output_delay) # 如果我们直接从 stdout/stderr 读取,它将永远等待 EOF。 # 改为直接使用 StreamReader 缓冲区。 output = ( self._process.stdout._buffer.decode() ) # pyright: ignore[reportAttributeAccessIssue] if self._sentinel in output: # 去除标记并中断 output = output[: output.index(self._sentinel)] break except asyncio.TimeoutError: self._timed_out = True raise ToolError( f"timed out: bash has not returned in {self._timeout} seconds and must be restarted", ) from None if output.endswith("\n"): output = output[:-1] error = ( self._process.stderr._buffer.decode() ) # pyright: ignore[reportAttributeAccessIssue] if error.endswith("\n"): error = error[:-1] # 清除缓冲区,以便可以正确读取下一个输出 self._process.stdout._buffer.clear() # pyright: ignore[reportAttributeAccessIssue] self._process.stderr._buffer.clear() # pyright: ignore[reportAttributeAccessIssue] return CLIResult(output=output, error=error) class Bash(BaseTool): """用于执行 bash 命令的工具""" name: str = "bash" description: str = _BASH_DESCRIPTION parameters: dict = { "type": "object", "properties": { "command": { "type": "string", "description": "要执行的 bash 命令。当先前的退出代码为 `-1` 时可以为空以查看其他日志。可以是 `ctrl+c` 来中断当前正在运行的进程。", }, }, "required": ["command"], } _session: Optional[_BashSession] = None async def execute( self, command: str | None = None, restart: bool = False, **kwargs ) -> CLIResult: if restart: if self._session: self._session.stop() self._session = _BashSession() await self._session.start() return CLIResult(system="tool has been restarted.") if self._session is None: self._session = _BashSession() await self._session.start() if command is not None: return await self._session.run(command) raise ToolError("no command provided.") if __name__ == "__main__": bash = Bash() rst = asyncio.run(bash.execute("ls -l")) print(rst)