From e864c3fe5265e60cce7ae4bc9018ac94f0d654df Mon Sep 17 00:00:00 2001 From: pi Date: Fri, 10 Apr 2026 23:56:18 +0100 Subject: [PATCH] feat: add process runner for subagents --- src/process-runner.test.ts | 113 +++++++++++++++++++++++++++++++++++++ src/process-runner.ts | 84 +++++++++++++++++++++++++++ src/schema.ts | 2 + 3 files changed, 199 insertions(+) create mode 100644 src/process-runner.test.ts create mode 100644 src/process-runner.ts diff --git a/src/process-runner.test.ts b/src/process-runner.test.ts new file mode 100644 index 0000000..19486dc --- /dev/null +++ b/src/process-runner.test.ts @@ -0,0 +1,113 @@ +import test from "node:test"; +import assert from "node:assert/strict"; +import { EventEmitter } from "node:events"; +import { mkdtemp, readFile, writeFile } from "node:fs/promises"; +import { join } from "node:path"; +import { tmpdir } from "node:os"; +import { createRunArtifacts } from "./artifacts.ts"; +import { monitorRun } from "./monitor.ts"; +import { createProcessSingleRunner } from "./process-runner.ts"; + +class FakeChild extends EventEmitter {} + +test("createProcessSingleRunner launches wrapper without tmux and returns monitored result", async () => { + const cwd = await mkdtemp(join(tmpdir(), "pi-subagents-process-")); + let metaPathSeen = ""; + + const runSingleTask = createProcessSingleRunner({ + createArtifacts: createRunArtifacts, + buildWrapperSpawn(metaPath: string) { + metaPathSeen = metaPath; + return { command: process.execPath, args: ["-e", "process.exit(0)"] }; + }, + spawnChild() { + const child = new FakeChild() as any; + process.nextTick(async () => { + const meta = JSON.parse(await readFile(metaPathSeen, "utf8")); + await writeFile( + meta.resultPath, + JSON.stringify( + { + runId: meta.runId, + mode: meta.mode, + agent: meta.agent, + agentSource: meta.agentSource, + task: meta.task, + requestedModel: meta.requestedModel, + resolvedModel: meta.resolvedModel, + sessionPath: meta.sessionPath, + exitCode: 0, + finalText: "done", + stdoutPath: meta.stdoutPath, + stderrPath: meta.stderrPath, + transcriptPath: meta.transcriptPath, + resultPath: meta.resultPath, + eventsPath: meta.eventsPath, + }, + null, + 2, + ), + "utf8", + ); + child.emit("close", 0); + }); + return child; + }, + monitorRun: (input) => monitorRun({ ...input, pollMs: 1 }), + }); + + const result = await runSingleTask({ + cwd, + meta: { + mode: "single", + agent: "scout", + agentSource: "builtin", + task: "inspect auth", + requestedModel: "openai/gpt-5", + resolvedModel: "openai/gpt-5", + }, + }); + + assert.equal(result.finalText, "done"); + assert.equal(result.exitCode, 0); + assert.match(result.resultPath ?? "", /\.pi\/subagents\/runs\//); +}); + +test("createProcessSingleRunner writes error result.json when wrapper launch fails", async () => { + const cwd = await mkdtemp(join(tmpdir(), "pi-subagents-process-")); + + const runSingleTask = createProcessSingleRunner({ + createArtifacts: createRunArtifacts, + buildWrapperSpawn() { + return { command: process.execPath, args: ["-e", "process.exit(0)"] }; + }, + spawnChild() { + const child = new FakeChild() as any; + process.nextTick(() => { + child.emit("error", new Error("spawn boom")); + }); + return child; + }, + monitorRun: (input) => monitorRun({ ...input, pollMs: 1 }), + }); + + const result = await runSingleTask({ + cwd, + meta: { + mode: "single", + agent: "scout", + agentSource: "builtin", + task: "inspect auth", + requestedModel: "openai/gpt-5", + resolvedModel: "openai/gpt-5", + }, + }); + + assert.equal(result.exitCode, 1); + assert.equal(result.stopReason, "error"); + assert.match(result.errorMessage ?? "", /spawn boom/); + + const saved = JSON.parse(await readFile(result.resultPath!, "utf8")); + assert.equal(saved.exitCode, 1); + assert.match(saved.errorMessage ?? "", /spawn boom/); +}); diff --git a/src/process-runner.ts b/src/process-runner.ts new file mode 100644 index 0000000..f612ca3 --- /dev/null +++ b/src/process-runner.ts @@ -0,0 +1,84 @@ +import { spawn } from "node:child_process"; +import { writeFile } from "node:fs/promises"; +import type { RunSingleTask } from "./runner.ts"; + +function makeLaunchFailureResult(artifacts: any, meta: Record, cwd: string, error: unknown) { + const message = error instanceof Error ? error.stack ?? error.message : String(error); + const startedAt = new Date().toISOString(); + + return { + runId: artifacts.runId, + mode: meta.mode, + taskIndex: meta.taskIndex, + step: meta.step, + agent: meta.agent, + agentSource: meta.agentSource, + task: meta.task, + cwd, + requestedModel: meta.requestedModel, + resolvedModel: meta.resolvedModel, + sessionPath: artifacts.sessionPath, + startedAt, + finishedAt: new Date().toISOString(), + exitCode: 1, + stopReason: "error", + finalText: "", + stdoutPath: artifacts.stdoutPath, + stderrPath: artifacts.stderrPath, + transcriptPath: artifacts.transcriptPath, + resultPath: artifacts.resultPath, + eventsPath: artifacts.eventsPath, + errorMessage: message, + }; +} + +export function createProcessSingleRunner(deps: { + createArtifacts: (cwd: string, meta: Record) => Promise; + buildWrapperSpawn: (metaPath: string) => { command: string; args: string[]; env?: NodeJS.ProcessEnv }; + spawnChild?: typeof spawn; + monitorRun: (input: { eventsPath: string; resultPath: string; onEvent?: (event: any) => void }) => Promise; +}): RunSingleTask { + const spawnChild = deps.spawnChild ?? spawn; + + return async function runSingleTask(input) { + const artifacts = await deps.createArtifacts(input.cwd, input.meta); + const spawnSpec = deps.buildWrapperSpawn(artifacts.metaPath); + + const writeLaunchFailure = async (error: unknown) => { + const result = makeLaunchFailureResult(artifacts, input.meta, input.cwd, error); + await writeFile(artifacts.resultPath, JSON.stringify(result, null, 2), "utf8"); + return result; + }; + + try { + const child = spawnChild(spawnSpec.command, spawnSpec.args, { + cwd: input.cwd, + env: { ...process.env, ...(spawnSpec.env ?? {}) }, + stdio: ["ignore", "ignore", "ignore"], + }); + + child.once("error", (error) => { + void writeLaunchFailure(error); + }); + } catch (error) { + return writeLaunchFailure(error); + } + + const result = await deps.monitorRun({ + eventsPath: artifacts.eventsPath, + resultPath: artifacts.resultPath, + onEvent: input.onEvent, + }); + + return { + ...result, + runId: result.runId ?? artifacts.runId, + sessionPath: result.sessionPath ?? artifacts.sessionPath, + stdoutPath: result.stdoutPath ?? artifacts.stdoutPath, + stderrPath: result.stderrPath ?? artifacts.stderrPath, + transcriptPath: result.transcriptPath ?? artifacts.transcriptPath, + resultPath: artifacts.resultPath, + eventsPath: artifacts.eventsPath, + }; + }; +} diff --git a/src/schema.ts b/src/schema.ts index 0a8c68d..8179969 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -72,8 +72,10 @@ export interface SubagentRunResult { finalText: string; stdoutPath?: string; stderrPath?: string; + transcriptPath?: string; resultPath?: string; eventsPath?: string; + errorMessage?: string; } export interface SubagentToolDetails {