0%

Claude Code 源码解析 (7):后台任务管理的完整方案

Claude Code 源码解析 (7):后台任务管理的完整方案

导读: 这是 Claude Code 20 个功能特性源码解析系列的第 7 篇,深入分析任务管理系统 (TaskCreate/List/Output/Stop) 的架构设计。


📋 目录

  1. 问题引入:为什么需要后台任务?
  2. 技术原理:任务管理核心架构
  3. 设计思想:为什么这样设计
  4. 解决方案:完整实现详解
  5. OpenClaw 最佳实践
  6. 总结

问题引入:为什么需要后台任务?

痛点场景

场景 1:长时间任务阻塞

1
2
3
4
5
6
7
8
9
用户:"帮我分析这个 10GB 的日志文件"

AI 同步执行:
- 读取文件... (30 秒)
- 分析内容... (2 分钟)
- 生成报告... (30 秒)

用户等待 3 分钟,无法做其他事情
→ 体验极差

场景 2:无法中断任务

1
2
3
4
5
6
7
8
用户:"运行这个测试"

AI 开始执行测试套件 (1000 个测试)
- 第 100 个测试时,用户发现配置错了
- 想中断,但没有方法
- 只能等待 10 分钟全部完成

→ 浪费时间和资源

场景 3:进度不透明

1
2
3
4
5
6
7
8
9
用户:"处理这些数据"

AI:(静默执行 5 分钟)
AI:"完成了"

用户:"刚才在做什么?进展如何?"
AI:"..."

→ 用户不知道进度,无法预估时间

核心问题

设计 AI 助手的任务管理系统时,面临以下挑战:

  1. 阻塞问题

    • 长时间任务如何不阻塞主线程?
    • 用户如何继续与 AI 交互?
  2. 可中断问题

    • 用户如何取消不需要的任务?
    • 如何清理已分配的资源?
  3. 进度跟踪问题

    • 如何实时报告任务进度?
    • 如何预估剩余时间?
  4. 结果获取问题

    • 任务完成后如何通知用户?
    • 如何获取任务输出和结果?

Claude Code 用 TaskCreate/List/Output/Stop 工具解决了这些问题。


技术原理:任务管理核心架构

整体架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
┌─────────────────────────────────────────────────────────────┐
│ 用户请求 │
│ "帮我分析这个日志文件" │
└─────────────────────┬───────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ TaskCreateTool │
│ - 创建任务记录 │
│ - 启动后台进程 │
│ - 返回任务 ID │
└─────────────┬───────────────────────────────────────────────┘


┌─────────────────────────┐
│ 任务队列 │
│ ┌─────────────────┐ │
│ │ Task-001 │ │
│ │ Task-002 │ │
│ │ Task-003 │ │
│ └─────────────────┘ │
└─────────────────────────┘

┌─────────┴─────────┐
│ │
▼ ▼
┌────────┐ ┌────────┐
│ Worker │ │ Worker │
│ Pool │ │ Pool │
│ │ │ │
│ ┌────┐ │ │ ┌────┐ │
│ │T1 │ │ │ │T2 │ │
│ └────┘ │ │ └────┘ │
└────────┘ └────────┘


┌─────────────────────────┐
│ 任务输出缓冲 │
│ - stdout │
│ - stderr │
│ - progress │
└─────────────────────────┘


┌─────────────────────────────────────────────────────────────┐
│ TaskOutputTool / TaskListTool / TaskStopTool │
│ - 获取输出 │
│ - 列出任务 │
│ - 停止任务 │
└─────────────────────────────────────────────────────────────┘

任务定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
interface Task {
// 基本信息
id: string;
name: string;
description?: string;

// 命令/操作
command: string;
args?: string[];
cwd?: string;
env?: Record<string, string>;

// 状态
status: TaskStatus;
createdAt: Date;
startedAt?: Date;
completedAt?: Date;

// 执行信息
pid?: number;
exitCode?: number;
error?: string;

// 输出
output: TaskOutput[];
truncated: boolean;

// 进度
progress?: {
current: number;
total?: number;
percentage: number;
message?: string;
};

// 超时设置
timeout?: number; // 毫秒
timedOut: boolean;
}

type TaskStatus =
| 'pending' // 等待执行
| 'running' // 正在执行
| 'completed' // 完成
| 'failed' // 失败
| 'cancelled' // 已取消
| 'timed_out'; // 超时

interface TaskOutput {
type: 'stdout' | 'stderr' | 'progress' | 'error';
data: string;
timestamp: Date;
}

任务创建器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class TaskCreator {
private taskStore: TaskStore;
private workerPool: WorkerPool;

constructor(config: TaskCreatorConfig) {
this.taskStore = new TaskStore(config);
this.workerPool = new WorkerPool(config);
}

async create(input: TaskCreateInput): Promise<Task> {
// 1. 创建任务记录
const task: Task = {
id: generateTaskId(),
name: input.name || `task-${Date.now()}`,
description: input.description,
command: input.command,
args: input.args || [],
cwd: input.cwd || process.cwd(),
env: { ...process.env, ...input.env },

status: 'pending',
createdAt: new Date(),
output: [],
truncated: false,
timedOut: false,

timeout: input.timeout,
};

// 2. 保存到存储
await this.taskStore.save(task);

// 3. 启动执行 (不等待)
this.executeInBackground(task);

return task;
}

private async executeInBackground(task: Task): Promise<void> {
// 更新状态
task.status = 'running';
task.startedAt = new Date();
await this.taskStore.update(task);

try {
// 启动子进程
const process = spawn(task.command, task.args, {
cwd: task.cwd,
env: task.env,
shell: true,
});

task.pid = process.pid;

// 处理输出
process.stdout.on('data', (data) => {
this.handleOutput(task, 'stdout', data.toString());
});

process.stderr.on('data', (data) => {
this.handleOutput(task, 'stderr', data.toString());
});

// 处理完成
process.on('exit', (code) => {
task.status = code === 0 ? 'completed' : 'failed';
task.exitCode = code || 0;
task.completedAt = new Date();
this.taskStore.update(task);
});

// 处理错误
process.on('error', (error) => {
task.status = 'failed';
task.error = error.message;
task.completedAt = new Date();
this.taskStore.update(task);
});

// 超时处理
if (task.timeout) {
setTimeout(() => {
if (task.status === 'running') {
process.kill();
task.status = 'timed_out';
task.timedOut = true;
task.completedAt = new Date();
this.taskStore.update(task);
}
}, task.timeout);
}

} catch (error) {
task.status = 'failed';
task.error = error.message;
task.completedAt = new Date();
await this.taskStore.update(task);
}
}

private async handleOutput(
task: Task,
type: 'stdout' | 'stderr',
data: string
): Promise<void> {
const output: TaskOutput = {
type,
data,
timestamp: new Date(),
};

task.output.push(output);

// 限制输出大小
const maxOutput = 1000; // 最大 1000 条
if (task.output.length > maxOutput) {
task.output = task.output.slice(-maxOutput);
task.truncated = true;
}

// 实时保存
await this.taskStore.update(task);

// 通知订阅者
this.notifySubscribers(task.id, output);
}
}

任务存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class TaskStore {
private db: Database;
private cache: Map<string, Task> = new Map();

constructor(config: TaskStoreConfig) {
this.db = new Database(config.dbPath);
this.initSchema();
}

private initSchema(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT,
description TEXT,
command TEXT,
args TEXT,
cwd TEXT,
env TEXT,
status TEXT,
created_at TEXT,
started_at TEXT,
completed_at TEXT,
pid INTEGER,
exit_code INTEGER,
error TEXT,
output TEXT,
truncated INTEGER,
timeout INTEGER,
timed_out INTEGER
)
`);
}

async save(task: Task): Promise<void> {
await this.db.run(`
INSERT INTO tasks (
id, name, command, status, created_at, output, truncated
) VALUES (?, ?, ?, ?, ?, ?, ?)
`, [
task.id,
task.name,
task.command,
task.status,
task.createdAt.toISOString(),
JSON.stringify(task.output),
task.truncated ? 1 : 0,
]);

this.cache.set(task.id, task);
}

async update(task: Task): Promise<void> {
await this.db.run(`
UPDATE tasks SET
status = ?,
started_at = ?,
completed_at = ?,
pid = ?,
exit_code = ?,
error = ?,
output = ?,
truncated = ?,
timed_out = ?
WHERE id = ?
`, [
task.status,
task.startedAt?.toISOString(),
task.completedAt?.toISOString(),
task.pid,
task.exitCode,
task.error,
JSON.stringify(task.output),
task.truncated ? 1 : 0,
task.timedOut ? 1 : 0,
task.id,
]);

this.cache.set(task.id, task);
}

async get(taskId: string): Promise<Task | null> {
// 先查缓存
const cached = this.cache.get(taskId);
if (cached) {
return cached;
}

// 查数据库
const row = await this.db.get('SELECT * FROM tasks WHERE id = ?', [taskId]);

if (!row) {
return null;
}

const task = this.rowToTask(row);
this.cache.set(taskId, task);
return task;
}

async list(filter?: TaskFilter): Promise<Task[]> {
let query = 'SELECT * FROM tasks';
const params: any[] = [];

if (filter) {
const conditions: string[] = [];

if (filter.status) {
conditions.push('status = ?');
params.push(filter.status);
}

if (filter.createdAfter) {
conditions.push('created_at > ?');
params.push(filter.createdAfter.toISOString());
}

if (conditions.length > 0) {
query += ' WHERE ' + conditions.join(' AND ');
}
}

query += ' ORDER BY created_at DESC';

const rows = await this.db.all(query, params);
return rows.map(row => this.rowToTask(row));
}

async delete(taskId: string): Promise<void> {
await this.db.run('DELETE FROM tasks WHERE id = ?', [taskId]);
this.cache.delete(taskId);
}

private rowToTask(row: any): Task {
return {
id: row.id,
name: row.name,
description: row.description,
command: row.command,
args: JSON.parse(row.args || '[]'),
cwd: row.cwd,
env: JSON.parse(row.env || '{}'),
status: row.status,
createdAt: new Date(row.created_at),
startedAt: row.started_at ? new Date(row.started_at) : undefined,
completedAt: row.completed_at ? new Date(row.completed_at) : undefined,
pid: row.pid,
exitCode: row.exit_code,
error: row.error,
output: JSON.parse(row.output || '[]'),
truncated: !!row.truncated,
timeout: row.timeout,
timedOut: !!row.timed_out,
};
}
}

进度跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class TaskProgressTracker {
private subscribers: Map<string, Set<ProgressListener>> = new Map();

// 订阅任务进度
subscribe(taskId: string, listener: ProgressListener): () => void {
if (!this.subscribers.has(taskId)) {
this.subscribers.set(taskId, new Set());
}

this.subscribers.get(taskId)!.add(listener);

// 返回取消订阅函数
return () => {
this.subscribers.get(taskId)?.delete(listener);
};
}

// 通知进度更新
notifyProgress(taskId: string, progress: TaskProgress): void {
const listeners = this.subscribers.get(taskId);

if (listeners) {
for (const listener of listeners) {
try {
listener(progress);
} catch (error) {
console.error(`[ProgressTracker] Listener error:`, error);
}
}
}
}

// 解析进度 (从输出中提取)
parseProgress(output: string): TaskProgress | null {
// 常见进度格式:
// "Processing: 50%"
// "[3/10] Testing..."
// "Step 3 of 10"

const percentageMatch = output.match(/(\d+)%/);
const fractionMatch = output.match(/\[(\d+)\/(\d+)\]/);
const stepMatch = output.match(/Step (\d+) of (\d+)/);

if (percentageMatch) {
return {
percentage: parseInt(percentageMatch[1]),
message: output.trim(),
};
}

if (fractionMatch) {
const current = parseInt(fractionMatch[1]);
const total = parseInt(fractionMatch[2]);
return {
current,
total,
percentage: Math.round((current / total) * 100),
message: output.trim(),
};
}

if (stepMatch) {
const current = parseInt(stepMatch[1]);
const total = parseInt(stepMatch[2]);
return {
current,
total,
percentage: Math.round((current / total) * 100),
message: output.trim(),
};
}

return null;
}
}

interface TaskProgress {
current?: number;
total?: number;
percentage: number;
message?: string;
estimatedRemaining?: number; // 预估剩余时间 (毫秒)
}

type ProgressListener = (progress: TaskProgress) => void;

流式输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class TaskOutputStream {
private buffers: Map<string, string[]> = new Map();
private readonly MAX_BUFFER = 100; // 保留最近 100 行

// 追加输出
append(taskId: string, line: string): void {
if (!this.buffers.has(taskId)) {
this.buffers.set(taskId, []);
}

const buffer = this.buffers.get(taskId)!;
buffer.push(line);

// 限制大小
if (buffer.length > this.MAX_BUFFER) {
buffer.shift();
}
}

// 获取输出 (支持分页)
get(taskId: string, options?: {
since?: Date;
limit?: number;
type?: 'stdout' | 'stderr' | 'all';
}): string[] {
const buffer = this.buffers.get(taskId) || [];

let result = [...buffer];

// 类型过滤
if (options?.type && options.type !== 'all') {
// 需要输出包含类型信息
// 简化实现:返回全部
}

// 数量限制
if (options?.limit) {
result = result.slice(-options.limit);
}

return result;
}

// 流式迭代
async *stream(taskId: string): AsyncIterable<string> {
const listeners = new Set<(line: string) => void>();

const unsubscribe = this.onNewLine(taskId, (line) => {
listeners.forEach(l => l(line));
});

try {
// 先返回已有输出
const existing = this.get(taskId);
for (const line of existing) {
yield line;
}

// 等待新输出
while (true) {
const line = await this.waitForLine(taskId, listeners);
yield line;
}
} finally {
unsubscribe();
}
}

private onNewLine(taskId: string, callback: (line: string) => void): () => void {
// 实现略
return () => {};
}

private async waitForLine(
taskId: string,
listeners: Set<(line: string) => void>
): Promise<string> {
return new Promise((resolve) => {
const handler = (line: string) => {
listeners.delete(handler);
resolve(line);
};
listeners.add(handler);
});
}
}

设计思想:为什么这样设计

思想 1:异步执行,不阻塞主线程

问题: 同步执行长时间任务会阻塞 AI 与用户交互。

解决: 后台异步执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 同步执行 (不好)
async function executeTask(command: string): Promise<string> {
const result = await exec(command); // 阻塞 3 分钟
return result;
}

// 异步执行 (推荐)
async function createTask(command: string): Promise<Task> {
const task = await taskCreator.create({ command });
return task; // 立即返回,不阻塞
}

// 用户获取任务 ID 后,可以继续对话
// "任务已启动 (ID: task-123),你可以继续问我其他问题"

设计智慧:

后台任务让 AI 可以”一心多用”。

思想 2:任务可中断

问题: 用户发现任务错了,无法中断。

解决: 支持取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class TaskController {
async cancel(taskId: string): Promise<void> {
const task = await this.taskStore.get(taskId);

if (!task || task.status !== 'running') {
throw new Error('Task not running');
}

// 发送 SIGTERM
if (task.pid) {
process.kill(task.pid, 'SIGTERM');

// 5 秒后强制终止
setTimeout(() => {
try {
process.kill(task.pid, 'SIGKILL');
} catch {}
}, 5000);
}

// 更新状态
task.status = 'cancelled';
task.completedAt = new Date();
await this.taskStore.update(task);

// 清理资源
await this.cleanup(task);
}
}

思想 3:进度透明

问题: 用户不知道任务进展。

解决: 实时进度报告。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 进度报告格式
interface ProgressReport {
taskId: string;
status: 'running' | 'completed' | 'failed';
progress: {
percentage: number;
current: number;
total: number;
message: string;
};
output: string; // 最近输出
estimatedRemaining: number; // 预估剩余时间
}

// 定期发送进度
setInterval(() => {
const progress = tracker.getProgress(taskId);
notifyUser({
type: 'task_progress',
taskId,
progress,
});
}, 5000); // 每 5 秒更新

用户体验:

1
2
3
4
5
6
7
8
9
10
11
任务进度:task-123
┌─────────────────────────────────────┐
│ 分析日志文件 │
│ ████████░░░░░░░░░░░░░ 40% │
│ 已处理:400MB / 1000MB │
│ 预估剩余:6 分钟 │
│ │
│ 最近输出: │
│ Processing: user_events.log │
│ Found 150,000 records... │
└─────────────────────────────────────┘

思想 4:输出可追溯

问题: 任务完成后找不到输出。

解决: 持久化存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 输出存储策略
class TaskOutputStorage {
// 1. 实时写入数据库
async appendOutput(taskId: string, output: TaskOutput): Promise<void> {
await this.db.run(`
UPDATE tasks SET output = json_insert(
COALESCE(output, json_array()),
'$[#]', ?
) WHERE id = ?
`, [JSON.stringify(output), taskId]);
}

// 2. 同时写入文件 (大任务)
async appendToFile(taskId: string, data: string): Promise<void> {
const logPath = `/tmp/tasks/${taskId}.log`;
await fs.promises.appendFile(logPath, data);
}

// 3. 支持导出
async export(taskId: string, format: 'text' | 'json'): Promise<string> {
const task = await this.get(taskId);

if (format === 'json') {
return JSON.stringify(task.output, null, 2);
}

return task.output.map(o => o.data).join('');
}
}

思想 5:资源自动清理

问题: 任务完成后资源未释放。

解决: 自动清理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class TaskResourceManager {
async cleanup(task: Task): Promise<void> {
// 1. 终止子进程 (如果还在运行)
if (task.pid) {
try {
process.kill(task.pid);
} catch {}
}

// 2. 清理临时文件
const tempFiles = await this.getTempFiles(task.id);
for (const file of tempFiles) {
await fs.promises.unlink(file);
}

// 3. 释放内存
this.outputStream.clear(task.id);

// 4. 通知订阅者
this.notifyCleanup(task.id);
}

// 定期清理旧任务
async cleanupOldTasks(maxAge: number): Promise<void> {
const cutoff = new Date(Date.now() - maxAge);
const oldTasks = await this.taskStore.list({
createdBefore: cutoff,
status: ['completed', 'failed', 'cancelled'],
});

for (const task of oldTasks) {
await this.cleanup(task);
await this.taskStore.delete(task.id);
}
}
}

解决方案:完整实现详解

TaskCreateTool 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
export class TaskCreateTool extends Tool {
name = 'task_create';
description = '创建后台任务';

inputSchema = {
type: 'object',
properties: {
name: {
type: 'string',
description: '任务名称',
},
command: {
type: 'string',
description: '要执行的命令',
},
args: {
type: 'array',
items: { type: 'string' },
description: '命令参数',
},
cwd: {
type: 'string',
description: '工作目录',
},
timeout: {
type: 'number',
description: '超时时间 (秒)',
},
background: {
type: 'boolean',
description: '是否后台执行',
default: true,
},
},
required: ['command'],
};

private taskCreator: TaskCreator;

constructor(config: TaskToolConfig) {
super();
this.taskCreator = new TaskCreator(config);
}

async execute(input: TaskCreateInput, context: ToolContext): Promise<ToolResult> {
try {
// 创建任务
const task = await this.taskCreator.create({
name: input.name,
command: input.command,
args: input.args,
cwd: input.cwd || context.cwd,
env: context.env,
timeout: input.timeout ? input.timeout * 1000 : undefined,
background: input.background !== false,
});

return {
success: true,
taskId: task.id,
status: task.status,
message: `任务已创建:${task.name}`,
info: {
id: task.id,
name: task.name,
command: task.command,
createdAt: task.createdAt,
background: task.status === 'running',
},
};

} catch (error) {
return {
success: false,
error: error.message,
};
}
}
}

TaskListTool 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
export class TaskListTool extends Tool {
name = 'task_list';
description = '列出任务';

inputSchema = {
type: 'object',
properties: {
status: {
type: 'string',
enum: ['pending', 'running', 'completed', 'failed', 'cancelled'],
description: '按状态过滤',
},
limit: {
type: 'number',
description: '最大返回数量',
default: 20,
},
verbose: {
type: 'boolean',
description: '显示详细信息',
default: false,
},
},
};

private taskStore: TaskStore;

async execute(input: TaskListInput, context: ToolContext): Promise<ToolResult> {
const tasks = await this.taskStore.list({
status: input.status,
limit: input.limit,
});

const summary = tasks.map(task => ({
id: task.id,
name: task.name,
status: task.status,
createdAt: task.createdAt,
duration: task.completedAt
? formatDuration(task.completedAt.getTime() - task.startedAt!.getTime())
: undefined,
progress: task.progress ? `${task.progress.percentage}%` : undefined,
}));

return {
success: true,
tasks: summary,
total: tasks.length,
};
}
}

TaskOutputTool 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
export class TaskOutputTool extends Tool {
name = 'task_output';
description = '获取任务输出';

inputSchema = {
type: 'object',
properties: {
taskId: {
type: 'string',
description: '任务 ID',
},
follow: {
type: 'boolean',
description: '是否持续跟踪',
default: false,
},
lines: {
type: 'number',
description: '返回行数',
default: 100,
},
type: {
type: 'string',
enum: ['stdout', 'stderr', 'all'],
default: 'all',
},
},
required: ['taskId'],
};

private taskStore: TaskStore;
private outputStream: TaskOutputStream;

async execute(input: TaskOutputInput, context: ToolContext): Promise<ToolResult> {
const task = await this.taskStore.get(input.taskId);

if (!task) {
return {
success: false,
error: `Task not found: ${input.taskId}`,
};
}

// 获取输出
let output = this.outputStream.get(input.taskId, {
limit: input.lines,
type: input.type,
});

// 如果 follow 且任务还在运行,设置流式
if (input.follow && task.status === 'running') {
return {
success: true,
output: output.join('\n'),
streaming: true,
taskId: task.id,
status: task.status,
};
}

return {
success: true,
output: output.join('\n'),
taskId: task.id,
status: task.status,
exitCode: task.exitCode,
};
}
}

TaskStopTool 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
export class TaskStopTool extends Tool {
name = 'task_stop';
description = '停止任务';

inputSchema = {
type: 'object',
properties: {
taskId: {
type: 'string',
description: '任务 ID',
},
force: {
type: 'boolean',
description: '是否强制停止',
default: false,
},
},
required: ['taskId'],
};

private taskController: TaskController;

async execute(input: TaskStopInput, context: ToolContext): Promise<ToolResult> {
try {
await this.taskController.cancel(input.taskId, {
force: input.force,
});

return {
success: true,
message: `任务已停止:${input.taskId}`,
};

} catch (error) {
return {
success: false,
error: error.message,
};
}
}
}

OpenClaw 最佳实践

实践 1:创建后台任务

1
2
3
4
5
6
7
8
9
10
11
12
# 创建长时间任务
openclaw run task_create \
--name "分析日志" \
--command "python analyze.py --file large.log" \
--timeout 300

# 输出:
任务已创建:分析日志
任务 ID: task-20260403-001
状态:running

# 继续做其他事情,稍后检查

实践 2:查看任务列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 查看所有任务
openclaw run task_list

# 输出:
任务列表 (共 3 个):

┌───────────────────────────────────────────┐
│ ID │ 名称 │ 状态 │ 进度 │
├───────────────────────────────────────────┤
│ task-001 │ 分析日志 │ running │ 45% │
│ task-002 │ 运行测试 │ completed│ 100% │
│ task-003 │ 构建项目 │ failed │ 60% │
└───────────────────────────────────────────┘

# 只查看运行中的任务
openclaw run task_list --status running

实践 3:获取任务输出

1
2
3
4
5
6
7
8
9
10
11
12
# 获取最近 100 行输出
openclaw run task_output --taskId task-001 --lines 100

# 持续跟踪输出
openclaw run task_output --taskId task-001 --follow

# 输出:
Processing: user_events.log
Found 150,000 records...
Analyzing patterns...
Progress: 45%
...

实践 4:停止任务

1
2
3
4
5
# 正常停止
openclaw run task_stop --taskId task-001

# 强制停止
openclaw run task_stop --taskId task-001 --force

实践 5:任务组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# ~/.openclaw/config/task-workflows.yaml

workflows:
# 数据分析流程
- name: analyze-data
description: 完整数据分析流程
tasks:
- name: extract
command: python extract.py
timeout: 300

- name: transform
command: python transform.py
timeout: 600
depends_on: extract

- name: load
command: python load.py
timeout: 300
depends_on: transform

- name: notify
command: echo "Analysis complete!"
depends_on: load

# 使用
openclaw run workflow analyze-data

实践 6:任务监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 查看任务详情
openclaw run task_info --taskId task-001

# 输出:
任务详情:task-001
─────────────────────────────────────
名称:分析日志
命令:python analyze.py --file large.log
状态:running
创建时间:2026-04-03 20:00:00
开始时间:2026-04-03 20:00:05
持续时间:5 分钟 30 秒
进度:45% (450MB / 1000MB)
预估剩余:7 分钟

PID: 12345
内存:256MB
CPU: 15%

最近输出:
Processing: user_events.log
Found 150,000 records...

总结

核心要点

  1. 异步执行 - 后台运行,不阻塞主线程
  2. 可中断 - 随时取消不需要的任务
  3. 进度透明 - 实时报告进展
  4. 输出可追溯 - 持久化存储输出
  5. 资源清理 - 自动释放资源

设计智慧

好的任务管理让用户”启动即忘”。

Claude Code 的任务管理设计告诉我们:

  • 异步执行提升用户体验
  • 可中断性避免资源浪费
  • 进度透明建立信任
  • 持久化保证可追溯性

任务状态流转

1
2
3
pending → running → completed

failed / cancelled / timed_out

下一步

  • 实现任务创建工具
  • 添加任务存储
  • 实现进度跟踪
  • 添加任务监控 UI

系列文章:

  • [1] Bash 命令执行的安全艺术 (已发布)
  • [2] 差异编辑的设计艺术 (已发布)
  • [3] 文件搜索的底层原理 (已发布)
  • [4] 多 Agent 协作的架构设计 (已发布)
  • [5] 技能系统的设计哲学 (已发布)
  • [6] MCP 协议集成的完整指南 (已发布)
  • [7] 后台任务管理的完整方案 (本文)
  • [8] Web 抓取的 SSRF 防护设计 (待发布)

上一篇: Claude Code 源码解析 (6):MCP 协议集成的完整指南


关于作者: John,OpenClaw 平台开发者,专注 AI 助手架构设计与实现。