Skip to content

Node.js 异步编程模式

Node.js 异步特性

Node.js 的核心优势在于其异步非阻塞 I/O 模型,这使得它能够高效处理大量并发请求。

回调模式(Callback)

基本回调

javascript
const fs = require('fs');

// 异步读取文件
fs.readFile('file.txt', 'utf8', (err, data) => {
  if (err) {
    console.error('读取失败:', err);
    return;
  }
  console.log('文件内容:', data);
});

回调地狱问题

javascript
// 嵌套回调导致代码难以维护
fs.readFile('file1.txt', 'utf8', (err1, data1) => {
  if (err1) {
    console.error(err1);
    return;
  }
  
  fs.readFile('file2.txt', 'utf8', (err2, data2) => {
    if (err2) {
      console.error(err2);
      return;
    }
    
    fs.writeFile('output.txt', data1 + data2, (err3) => {
      if (err3) {
        console.error(err3);
        return;
      }
      console.log('完成');
    });
  });
});

Promise 模式

基本 Promise

javascript
const fs = require('fs').promises;

// Promise 链式调用
fs.readFile('file1.txt', 'utf8')
  .then(data1 => {
    return fs.readFile('file2.txt', 'utf8');
  })
  .then(data2 => {
    return fs.writeFile('output.txt', data2);
  })
  .then(() => {
    console.log('完成');
  })
  .catch(err => {
    console.error('错误:', err);
  });

创建 Promise

javascript
function readFilePromise(filePath) {
  return new Promise((resolve, reject) => {
    fs.readFile(filePath, 'utf8', (err, data) => {
      if (err) {
        reject(err);
      } else {
        resolve(data);
      }
    });
  });
}

// 使用
readFilePromise('file.txt')
  .then(data => console.log(data))
  .catch(err => console.error(err));

Promise 工具方法

javascript
// Promise.all - 等待所有 Promise 完成
const promises = [
  fs.readFile('file1.txt', 'utf8'),
  fs.readFile('file2.txt', 'utf8'),
  fs.readFile('file3.txt', 'utf8')
];

Promise.all(promises)
  .then(results => {
    console.log('所有文件读取完成:', results);
  })
  .catch(err => {
    console.error('读取失败:', err);
  });

// Promise.allSettled - 等待所有 Promise 完成(无论成功或失败)
Promise.allSettled(promises)
  .then(results => {
    results.forEach((result, index) => {
      if (result.status === 'fulfilled') {
        console.log(`文件 ${index + 1} 读取成功`);
      } else {
        console.log(`文件 ${index + 1} 读取失败`);
      }
    });
  });

// Promise.race - 返回最先完成的 Promise
Promise.race(promises)
  .then(result => {
    console.log('最快的文件:', result);
  });

// Promise.any - 返回第一个成功的 Promise(Node.js 15+)
Promise.any(promises)
  .then(result => {
    console.log('第一个成功的文件:', result);
  })
  .catch(err => {
    console.error('所有 Promise 都失败');
  });

async/await 模式

基本用法

javascript
async function readFiles() {
  try {
    const data1 = await fs.readFile('file1.txt', 'utf8');
    const data2 = await fs.readFile('file2.txt', 'utf8');
    await fs.writeFile('output.txt', data1 + data2);
    console.log('完成');
  } catch (err) {
    console.error('错误:', err);
  }
}

readFiles();

并行执行

javascript
async function readFilesParallel() {
  try {
    // 并行读取多个文件
    const [data1, data2, data3] = await Promise.all([
      fs.readFile('file1.txt', 'utf8'),
      fs.readFile('file2.txt', 'utf8'),
      fs.readFile('file3.txt', 'utf8')
    ]);
    
    console.log('所有文件读取完成');
    return { data1, data2, data3 };
  } catch (err) {
    console.error('错误:', err);
  }
}

循环中的 async/await

javascript
// 串行执行
async function processFiles(files) {
  for (const file of files) {
    const data = await fs.readFile(file, 'utf8');
    console.log(`处理 ${file}:`, data);
  }
}

// 并行执行
async function processFilesParallel(files) {
  const promises = files.map(file => fs.readFile(file, 'utf8'));
  const results = await Promise.all(promises);
  return results;
}

常见异步模式

1. 重试模式

javascript
async function retry(fn, maxAttempts = 3, delay = 1000) {
  for (let i = 0; i < maxAttempts; i++) {
    try {
      return await fn();
    } catch (err) {
      if (i === maxAttempts - 1) throw err;
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}

// 使用
retry(() => fetch('https://api.example.com/data'))
  .then(data => console.log(data))
  .catch(err => console.error('重试失败:', err));

2. 超时模式

javascript
function timeout(promise, ms) {
  return Promise.race([
    promise,
    new Promise((_, reject) =>
      setTimeout(() => reject(new Error('操作超时')), ms)
    )
  ]);
}

// 使用
timeout(fs.readFile('file.txt', 'utf8'), 5000)
  .then(data => console.log(data))
  .catch(err => console.error(err));

3. 批量处理

javascript
async function batchProcess(items, batchSize, processor) {
  const results = [];
  
  for (let i = 0; i < items.length; i += batchSize) {
    const batch = items.slice(i, i + batchSize);
    const batchResults = await Promise.all(
      batch.map(item => processor(item))
    );
    results.push(...batchResults);
  }
  
  return results;
}

// 使用
const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const results = await batchProcess(items, 3, async (item) => {
  // 处理每个项目
  return item * 2;
});

4. 队列处理

javascript
class AsyncQueue {
  constructor(concurrency = 1) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }
  
  async add(task) {
    return new Promise((resolve, reject) => {
      this.queue.push({ task, resolve, reject });
      this.process();
    });
  }
  
  async process() {
    if (this.running >= this.concurrency || this.queue.length === 0) {
      return;
    }
    
    this.running++;
    const { task, resolve, reject } = this.queue.shift();
    
    try {
      const result = await task();
      resolve(result);
    } catch (err) {
      reject(err);
    } finally {
      this.running--;
      this.process();
    }
  }
}

// 使用
const queue = new AsyncQueue(2); // 并发数为 2

for (let i = 0; i < 10; i++) {
  queue.add(async () => {
    console.log(`处理任务 ${i}`);
    await new Promise(resolve => setTimeout(resolve, 1000));
    return `任务 ${i} 完成`;
  });
}

5. 缓存模式

javascript
function cacheAsync(fn, ttl = 60000) {
  const cache = new Map();
  
  return async function(...args) {
    const key = JSON.stringify(args);
    const cached = cache.get(key);
    
    if (cached && Date.now() - cached.timestamp < ttl) {
      return cached.value;
    }
    
    const value = await fn(...args);
    cache.set(key, { value, timestamp: Date.now() });
    return value;
  };
}

// 使用
const cachedFetch = cacheAsync(async (url) => {
  const response = await fetch(url);
  return response.json();
}, 60000); // 缓存 60 秒

事件驱动模式

EventEmitter

javascript
const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();

// 监听事件
myEmitter.on('event', (data) => {
  console.log('事件触发:', data);
});

// 触发事件
myEmitter.emit('event', { message: 'Hello' });

// 只监听一次
myEmitter.once('event', () => {
  console.log('只触发一次');
});

// 移除监听器
myEmitter.removeListener('event', handler);

异步事件处理

javascript
class AsyncEventEmitter extends EventEmitter {
  async emitAsync(event, ...args) {
    const listeners = this.listeners(event);
    const results = await Promise.all(
      listeners.map(listener => listener(...args))
    );
    return results;
  }
}

const emitter = new AsyncEventEmitter();

emitter.on('process', async (data) => {
  await new Promise(resolve => setTimeout(resolve, 1000));
  return `处理完成: ${data}`;
});

emitter.emitAsync('process', '数据').then(results => {
  console.log('所有处理完成:', results);
});

流式处理

处理大文件

javascript
const fs = require('fs');
const { pipeline } = require('stream/promises');

async function processLargeFile() {
  const readStream = fs.createReadStream('large-file.txt');
  const writeStream = fs.createWriteStream('output.txt');
  
  await pipeline(
    readStream,
    // 可以添加转换流
    writeStream
  );
  
  console.log('处理完成');
}

错误处理最佳实践

统一错误处理

javascript
async function handleAsync(fn) {
  try {
    return [null, await fn()];
  } catch (err) {
    return [err, null];
  }
}

// 使用
const [err, data] = await handleAsync(() => fs.readFile('file.txt', 'utf8'));
if (err) {
  console.error('错误:', err);
} else {
  console.log('数据:', data);
}

错误边界

javascript
async function safeAsync(fn, fallback) {
  try {
    return await fn();
  } catch (err) {
    console.error('错误:', err);
    return fallback;
  }
}

// 使用
const data = await safeAsync(
  () => fs.readFile('file.txt', 'utf8'),
  '默认内容'
);

总结

Node.js 异步编程要点:

  • 回调模式:基础的异步处理方式
  • Promise:解决回调地狱,提供链式调用
  • async/await:让异步代码更易读
  • 常见模式:重试、超时、批量处理、队列、缓存
  • 事件驱动:使用 EventEmitter 处理事件
  • 流式处理:高效处理大文件
  • 错误处理:统一的错误处理策略