Contents

Node.js Worker Threads 实战:用多线程突破单线程瓶颈,处理CPU密集型任务

为什么 Node.js 需要多线程?

Node.js 的核心优势在于事件驱动的非阻塞 I/O 模型,这让它在处理 HTTP 请求、文件读写、数据库查询等 I/O 密集型任务时表现出色。但这种模型有一个天然的短板:单线程无法充分利用多核 CPU

当你的应用遇到以下场景时,单线程的局限性就会暴露:

  • 大量 JSON 数据的序列化 / 反序列化
  • 图片或视频的编码处理
  • 复杂的数学计算或加密运算
  • 大文件的压缩与解压
  • 数据库查询结果的大量聚合处理

在 Node.js v10.5 引入 worker_threads 之前,我们只能通过 child_processcluster 模块来实现并行处理。但 worker_threads 提供了更轻量、更高效的方案——线程之间可以共享内存(SharedArrayBuffer),启动开销远小于子进程。

Worker Threads 基础用法

先从一个最简单的例子开始。假设我们有一个 CPU 密集型的任务——计算一个大范围的质数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// worker.js
const { parentPort, workerData } = require('worker_threads');

function countPrimes(start, end) {
  let count = 0;
  for (let i = start; i <= end; i++) {
    if (isPrime(i)) count++;
  }
  return count;
}

function isPrime(n) {
  if (n < 2) return false;
  if (n === 2) return true;
  if (n % 2 === 0) return false;
  for (let i = 3; i <= Math.sqrt(n); i += 2) {
    if (n % i === 0) return false;
  }
  return true;
}

const result = countPrimes(workerData.start, workerData.end);
parentPort.postMessage(result);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// main.js
const { Worker } = require('worker_threads');

// 单线程方式
function singleThreadRange(total) {
  let count = 0;
  for (let i = 2; i <= total; i++) {
    if (isPrime(i)) count++;
  }
  return count;
}

// 多线程方式
function multiThreadRange(total, threadCount = 4) {
  return new Promise((resolve, reject) => {
    const chunkSize = Math.ceil(total / threadCount);
    const results = [];
    let completed = 0;

    for (let i = 0; i < threadCount; i++) {
      const start = i * chunkSize + 2;
      const end = Math.min((i + 1) * chunkSize, total);

      const worker = new Worker('./worker.js', {
        workerData: { start, end }
      });

      worker.on('message', (count) => {
        results.push(count);
        completed++;
        if (completed === threadCount) {
          resolve(results.reduce((a, b) => a + b, 0));
        }
      });

      worker.on('error', reject);
    }
  });
}

// 性能对比
async function benchmark() {
  const total = 1_000_000;
  const threadCount = require('os').cpus().length;

  console.time('单线程');
  const r1 = singleThreadRange(total);
  console.timeEnd('单线程');

  console.time(`多线程(${threadCount}个)`);
  const r2 = await multiThreadRange(total, threadCount);
  console.timeEnd(`多线程(${threadCount}个)`);

  console.log(`结果一致: ${r1 === r2}`);
}

benchmark();

在我的 8 核开发机上,计算 100 万范围内的质数,单线程耗时约 180ms,8 线程并行仅需约 35ms,性能提升约 5 倍

生产级封装:带错误处理和资源回收的 Worker 池

直接使用 Worker Thread 裸 API 存在几个问题:每次创建 Worker 都有开销、没有并发控制、错误处理分散。在生产环境中,我们通常需要一个 Worker Pool(线程池)来统一管理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// worker-pool.js
const { Worker } = require('worker_threads');
const os = require('os');

class WorkerPool {
  constructor(workerFile, maxWorkers = os.cpus().length) {
    this.workerFile = workerFile;
    this.maxWorkers = maxWorkers;
    this.workers = [];
    this.taskQueue = [];
  }

  runTask(data, timeout = 30000) {
    return new Promise((resolve, reject) => {
      const task = { data, resolve, reject, timeout };
      
      // 有空闲 Worker,立即执行
      if (this.workers.length < this.maxWorkers) {
        this._executeTask(task);
      } else {
        // 否则排队等待
        this.taskQueue.push(task);
      }
    });
  }

  _executeTask(task) {
    const worker = new Worker(this.workerFile, {
      workerData: task.data
    });

    const timer = setTimeout(() => {
      worker.terminate();
      task.reject(new Error(`Worker 超时 (${task.timeout}ms)`));
    }, task.timeout);

    worker.on('message', (result) => {
      clearTimeout(timer);
      task.resolve(result);
      this._recycleWorker(worker);
    });

    worker.on('error', (err) => {
      clearTimeout(timer);
      task.reject(err);
      this._recycleWorker(worker);
    });
  }

  _recycleWorker(worker) {
    // 处理完一个任务后,检查队列中是否还有等待的任务
    if (this.taskQueue.length > 0) {
      const nextTask = this.taskQueue.shift();
      this._executeTask(nextTask);
    } else {
      // 无等待任务时关闭 Worker,释放资源
      worker.terminate();
    }
  }
}

module.exports = WorkerPool;

使用线程池后,主进程代码变得非常简洁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// app.js
const WorkerPool = require('./worker-pool');

const pool = new WorkerPool('./worker.js', 4);

async function processImageBatch(images) {
  const promises = images.map((img) =>
    pool.runTask({ imagePath: img }, 10000)
  );

  try {
    const results = await Promise.all(promises);
    console.log(`成功处理 ${results.length} 张图片`);
    return results;
  } catch (err) {
    console.error('部分任务失败:', err.message);
  }
}

实际场景:用 Worker Threads 加速图片处理

来看一个更贴近实际的例子——用 Sharp 库在 Worker 中并行处理一批图片:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// image-processor.js (Worker)
const { parentPort, workerData } = require('worker_threads');
const sharp = require('sharp');
const path = require('path');

async function processImage({ inputPath, outputPath, width, height }) {
  const result = await sharp(inputPath)
    .resize(width, height, { fit: 'cover' })
    .webp({ quality: 80 })
    .toFile(outputPath);

  return {
    original: inputPath,
    output: outputPath,
    size: result.size
  };
}

processImage(workerData)
  .then((info) => parentPort.postMessage({ success: true, info }))
  .catch((err) => parentPort.postMessage({ success: false, error: err.message }));
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// main.js
const { Worker } = require('worker_threads');
const path = require('path');
const fs = require('fs');

async function batchProcessImages(imagePaths, concurrency = 4) {
  const results = [];
  const queue = [...imagePaths];
  const workers = new Set();

  function runNext() {
    if (queue.length === 0) return null;

    const imgPath = queue.shift();
    const outputPath = imgPath
      .replace(/\/input\//, '/output/')
      .replace(/\.\w+$/, '.webp');

    fs.mkdirSync(path.dirname(outputPath), { recursive: true });

    return new Promise((resolve) => {
      const worker = new Worker('./image-processor.js', {
        workerData: {
          inputPath: imgPath,
          outputPath,
          width: 800,
          height: 600
        }
      });

      workers.add(worker);

      worker.on('message', (msg) => {
        results.push(msg);
        workers.delete(worker);
        resolve();
      });

      worker.on('error', (err) => {
        results.push({ success: false, error: err.message });
        workers.delete(worker);
        resolve();
      });
    });
  }

  // 维持指定并发数运行
  const runners = [];
  for (let i = 0; i < concurrency; i++) {
    runners.push(
      (async () => {
        while (queue.length > 0 || workers.size > 0) {
          await runNext();
        }
      })()
    );
  }

  await Promise.all(runners);
  return results;
}

// 使用
const images = fs.readdirSync('./input')
  .filter(f => /\.(jpg|png)$/i.test(f))
  .map(f => path.join('./input', f));

batchProcessImages(images, 4).then((results) => {
  console.log(`处理完成: ${results.filter(r => r.success).length}/${results.length}`);
});

共享内存:零拷贝数据传递

Worker Threads 最强大的特性之一是支持 SharedArrayBuffer,多个线程可以操作同一块内存,避免了序列化 / 反序列化的开销:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// shared-memory.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
  // 主线程:创建共享内存
  const sharedBuffer = new SharedArrayBuffer(1024 * Int32Array.BYTES_PER_ELEMENT);
  const sharedArray = new Int32Array(sharedBuffer);

  // 初始化数据
  for (let i = 0; i < 1024; i++) {
    sharedArray[i] = Math.floor(Math.random() * 100);
  }

  // 启动 4 个 Worker 处理不同的区间
  const workers = [];
  const chunkSize = 256;

  for (let i = 0; i < 4; i++) {
    const worker = new Worker(__filename, {
      workerData: {
        buffer: sharedBuffer,
        offset: i * chunkSize,
        length: chunkSize
      }
    });
    workers.push(worker);
  }

  Promise.all(workers.map(w => new Promise(r => w.on('exit', r)))).then(() => {
    // 读取各 Worker 写回的结果
    const total = sharedArray[0]; // 第一个位置存总和
    console.log(`4 个线程计算的总和: ${total}`);
  });

} else {
  // Worker 线程:直接在共享内存上操作
  const { buffer, offset, length } = workerData;
  const arr = new Int32Array(buffer);

  let sum = 0;
  for (let i = offset; i < offset + length; i++) {
    sum += arr[i];
  }

  // 使用原子操作安全地累加结果
  Atomics.add(arr, 0, sum);
}

注意 Atomics.add 的使用——当多个线程同时写入共享内存时,必须使用原子操作来保证数据一致性。

性能建议与最佳实践

  1. 任务粒度要合适:Worker 启动有固定开销(约 2-5ms),如果单个任务只需不到 1ms,线程切换的开销反而会拖慢速度。建议把大量小任务合并成批次处理。

  2. Worker 数量 ≠ CPU 核心数:对于 I/O 混合型任务,Worker 数量可以大于 CPU 核心数;纯计算任务建议不超过核心数。

  3. 避免频繁创建/销毁 Worker:在生产环境中,使用线程池复用 Worker 实例,避免反复初始化的开销。

  4. 注意内存开销:每个 Worker 都有独立的 V8 实例,默认约消耗 5-10MB 内存。8 核机器上开 8 个 Worker 就是额外 40-80MB。

  5. Worker 中避免使用共享模块:Worker 启动时会重新加载所有 require 的模块。如果多个 Worker 依赖大量公共模块,考虑通过 workerData 传递预处理好的数据,而不是让每个 Worker 重新加载和初始化。

小结

Worker Threads 不是要替代 Node.js 的异步 I/O 模型,而是补齐了它在 CPU 密集型任务上的短板。当你遇到接口响应变慢、事件循环被阻塞的场景时,把计算密集的部分卸载到 Worker 中,往往能以最小的改动获得最大的性能提升。

关键是识别出真正的瓶颈——如果你的慢查询来自数据库 I/O,加 Worker 线程无济于事;但如果是因为大 JSON 的解析、复杂的数据转换拖慢了响应,Worker Threads 就是你的利器。