为什么 Node.js 需要多线程?
Node.js 的核心优势在于事件驱动的非阻塞 I/O 模型,这让它在处理 HTTP 请求、文件读写、数据库查询等 I/O 密集型任务时表现出色。但这种模型有一个天然的短板:单线程无法充分利用多核 CPU。
当你的应用遇到以下场景时,单线程的局限性就会暴露:
- 大量 JSON 数据的序列化 / 反序列化
- 图片或视频的编码处理
- 复杂的数学计算或加密运算
- 大文件的压缩与解压
- 数据库查询结果的大量聚合处理
在 Node.js v10.5 引入 worker_threads 之前,我们只能通过 child_process 或 cluster 模块来实现并行处理。但 worker_threads 提供了更轻量、更高效的方案——线程之间可以共享内存(SharedArrayBuffer),启动开销远小于子进程。
Worker Threads 基础用法
先从一个最简单的例子开始。假设我们有一个 CPU 密集型的任务——计算一个大范围的质数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
// worker.js
const { parentPort, workerData } = require('worker_threads');
function countPrimes(start, end) {
let count = 0;
for (let i = start; i <= end; i++) {
if (isPrime(i)) count++;
}
return count;
}
function isPrime(n) {
if (n < 2) return false;
if (n === 2) return true;
if (n % 2 === 0) return false;
for (let i = 3; i <= Math.sqrt(n); i += 2) {
if (n % i === 0) return false;
}
return true;
}
const result = countPrimes(workerData.start, workerData.end);
parentPort.postMessage(result);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
// main.js
const { Worker } = require('worker_threads');
// 单线程方式
function singleThreadRange(total) {
let count = 0;
for (let i = 2; i <= total; i++) {
if (isPrime(i)) count++;
}
return count;
}
// 多线程方式
function multiThreadRange(total, threadCount = 4) {
return new Promise((resolve, reject) => {
const chunkSize = Math.ceil(total / threadCount);
const results = [];
let completed = 0;
for (let i = 0; i < threadCount; i++) {
const start = i * chunkSize + 2;
const end = Math.min((i + 1) * chunkSize, total);
const worker = new Worker('./worker.js', {
workerData: { start, end }
});
worker.on('message', (count) => {
results.push(count);
completed++;
if (completed === threadCount) {
resolve(results.reduce((a, b) => a + b, 0));
}
});
worker.on('error', reject);
}
});
}
// 性能对比
async function benchmark() {
const total = 1_000_000;
const threadCount = require('os').cpus().length;
console.time('单线程');
const r1 = singleThreadRange(total);
console.timeEnd('单线程');
console.time(`多线程(${threadCount}个)`);
const r2 = await multiThreadRange(total, threadCount);
console.timeEnd(`多线程(${threadCount}个)`);
console.log(`结果一致: ${r1 === r2}`);
}
benchmark();
|
在我的 8 核开发机上,计算 100 万范围内的质数,单线程耗时约 180ms,8 线程并行仅需约 35ms,性能提升约 5 倍。
生产级封装:带错误处理和资源回收的 Worker 池
直接使用 Worker Thread 裸 API 存在几个问题:每次创建 Worker 都有开销、没有并发控制、错误处理分散。在生产环境中,我们通常需要一个 Worker Pool(线程池)来统一管理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// worker-pool.js
const { Worker } = require('worker_threads');
const os = require('os');
class WorkerPool {
constructor(workerFile, maxWorkers = os.cpus().length) {
this.workerFile = workerFile;
this.maxWorkers = maxWorkers;
this.workers = [];
this.taskQueue = [];
}
runTask(data, timeout = 30000) {
return new Promise((resolve, reject) => {
const task = { data, resolve, reject, timeout };
// 有空闲 Worker,立即执行
if (this.workers.length < this.maxWorkers) {
this._executeTask(task);
} else {
// 否则排队等待
this.taskQueue.push(task);
}
});
}
_executeTask(task) {
const worker = new Worker(this.workerFile, {
workerData: task.data
});
const timer = setTimeout(() => {
worker.terminate();
task.reject(new Error(`Worker 超时 (${task.timeout}ms)`));
}, task.timeout);
worker.on('message', (result) => {
clearTimeout(timer);
task.resolve(result);
this._recycleWorker(worker);
});
worker.on('error', (err) => {
clearTimeout(timer);
task.reject(err);
this._recycleWorker(worker);
});
}
_recycleWorker(worker) {
// 处理完一个任务后,检查队列中是否还有等待的任务
if (this.taskQueue.length > 0) {
const nextTask = this.taskQueue.shift();
this._executeTask(nextTask);
} else {
// 无等待任务时关闭 Worker,释放资源
worker.terminate();
}
}
}
module.exports = WorkerPool;
|
使用线程池后,主进程代码变得非常简洁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// app.js
const WorkerPool = require('./worker-pool');
const pool = new WorkerPool('./worker.js', 4);
async function processImageBatch(images) {
const promises = images.map((img) =>
pool.runTask({ imagePath: img }, 10000)
);
try {
const results = await Promise.all(promises);
console.log(`成功处理 ${results.length} 张图片`);
return results;
} catch (err) {
console.error('部分任务失败:', err.message);
}
}
|
实际场景:用 Worker Threads 加速图片处理
来看一个更贴近实际的例子——用 Sharp 库在 Worker 中并行处理一批图片:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// image-processor.js (Worker)
const { parentPort, workerData } = require('worker_threads');
const sharp = require('sharp');
const path = require('path');
async function processImage({ inputPath, outputPath, width, height }) {
const result = await sharp(inputPath)
.resize(width, height, { fit: 'cover' })
.webp({ quality: 80 })
.toFile(outputPath);
return {
original: inputPath,
output: outputPath,
size: result.size
};
}
processImage(workerData)
.then((info) => parentPort.postMessage({ success: true, info }))
.catch((err) => parentPort.postMessage({ success: false, error: err.message }));
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
// main.js
const { Worker } = require('worker_threads');
const path = require('path');
const fs = require('fs');
async function batchProcessImages(imagePaths, concurrency = 4) {
const results = [];
const queue = [...imagePaths];
const workers = new Set();
function runNext() {
if (queue.length === 0) return null;
const imgPath = queue.shift();
const outputPath = imgPath
.replace(/\/input\//, '/output/')
.replace(/\.\w+$/, '.webp');
fs.mkdirSync(path.dirname(outputPath), { recursive: true });
return new Promise((resolve) => {
const worker = new Worker('./image-processor.js', {
workerData: {
inputPath: imgPath,
outputPath,
width: 800,
height: 600
}
});
workers.add(worker);
worker.on('message', (msg) => {
results.push(msg);
workers.delete(worker);
resolve();
});
worker.on('error', (err) => {
results.push({ success: false, error: err.message });
workers.delete(worker);
resolve();
});
});
}
// 维持指定并发数运行
const runners = [];
for (let i = 0; i < concurrency; i++) {
runners.push(
(async () => {
while (queue.length > 0 || workers.size > 0) {
await runNext();
}
})()
);
}
await Promise.all(runners);
return results;
}
// 使用
const images = fs.readdirSync('./input')
.filter(f => /\.(jpg|png)$/i.test(f))
.map(f => path.join('./input', f));
batchProcessImages(images, 4).then((results) => {
console.log(`处理完成: ${results.filter(r => r.success).length}/${results.length}`);
});
|
共享内存:零拷贝数据传递
Worker Threads 最强大的特性之一是支持 SharedArrayBuffer,多个线程可以操作同一块内存,避免了序列化 / 反序列化的开销:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
// shared-memory.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// 主线程:创建共享内存
const sharedBuffer = new SharedArrayBuffer(1024 * Int32Array.BYTES_PER_ELEMENT);
const sharedArray = new Int32Array(sharedBuffer);
// 初始化数据
for (let i = 0; i < 1024; i++) {
sharedArray[i] = Math.floor(Math.random() * 100);
}
// 启动 4 个 Worker 处理不同的区间
const workers = [];
const chunkSize = 256;
for (let i = 0; i < 4; i++) {
const worker = new Worker(__filename, {
workerData: {
buffer: sharedBuffer,
offset: i * chunkSize,
length: chunkSize
}
});
workers.push(worker);
}
Promise.all(workers.map(w => new Promise(r => w.on('exit', r)))).then(() => {
// 读取各 Worker 写回的结果
const total = sharedArray[0]; // 第一个位置存总和
console.log(`4 个线程计算的总和: ${total}`);
});
} else {
// Worker 线程:直接在共享内存上操作
const { buffer, offset, length } = workerData;
const arr = new Int32Array(buffer);
let sum = 0;
for (let i = offset; i < offset + length; i++) {
sum += arr[i];
}
// 使用原子操作安全地累加结果
Atomics.add(arr, 0, sum);
}
|
注意 Atomics.add 的使用——当多个线程同时写入共享内存时,必须使用原子操作来保证数据一致性。
性能建议与最佳实践
-
任务粒度要合适:Worker 启动有固定开销(约 2-5ms),如果单个任务只需不到 1ms,线程切换的开销反而会拖慢速度。建议把大量小任务合并成批次处理。
-
Worker 数量 ≠ CPU 核心数:对于 I/O 混合型任务,Worker 数量可以大于 CPU 核心数;纯计算任务建议不超过核心数。
-
避免频繁创建/销毁 Worker:在生产环境中,使用线程池复用 Worker 实例,避免反复初始化的开销。
-
注意内存开销:每个 Worker 都有独立的 V8 实例,默认约消耗 5-10MB 内存。8 核机器上开 8 个 Worker 就是额外 40-80MB。
-
Worker 中避免使用共享模块:Worker 启动时会重新加载所有 require 的模块。如果多个 Worker 依赖大量公共模块,考虑通过 workerData 传递预处理好的数据,而不是让每个 Worker 重新加载和初始化。
小结
Worker Threads 不是要替代 Node.js 的异步 I/O 模型,而是补齐了它在 CPU 密集型任务上的短板。当你遇到接口响应变慢、事件循环被阻塞的场景时,把计算密集的部分卸载到 Worker 中,往往能以最小的改动获得最大的性能提升。
关键是识别出真正的瓶颈——如果你的慢查询来自数据库 I/O,加 Worker 线程无济于事;但如果是因为大 JSON 的解析、复杂的数据转换拖慢了响应,Worker Threads 就是你的利器。