Node.js 异步编程模式
Node.js 异步特性
Node.js 的核心优势在于其异步非阻塞 I/O 模型,这使得它能够高效处理大量并发请求。
回调模式(Callback)
基本回调
javascript
const fs = require('fs');
// 异步读取文件
fs.readFile('file.txt', 'utf8', (err, data) => {
if (err) {
console.error('读取失败:', err);
return;
}
console.log('文件内容:', data);
});回调地狱问题
javascript
// 嵌套回调导致代码难以维护
fs.readFile('file1.txt', 'utf8', (err1, data1) => {
if (err1) {
console.error(err1);
return;
}
fs.readFile('file2.txt', 'utf8', (err2, data2) => {
if (err2) {
console.error(err2);
return;
}
fs.writeFile('output.txt', data1 + data2, (err3) => {
if (err3) {
console.error(err3);
return;
}
console.log('完成');
});
});
});Promise 模式
基本 Promise
javascript
const fs = require('fs').promises;
// Promise 链式调用
fs.readFile('file1.txt', 'utf8')
.then(data1 => {
return fs.readFile('file2.txt', 'utf8');
})
.then(data2 => {
return fs.writeFile('output.txt', data2);
})
.then(() => {
console.log('完成');
})
.catch(err => {
console.error('错误:', err);
});创建 Promise
javascript
function readFilePromise(filePath) {
return new Promise((resolve, reject) => {
fs.readFile(filePath, 'utf8', (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
// 使用
readFilePromise('file.txt')
.then(data => console.log(data))
.catch(err => console.error(err));Promise 工具方法
javascript
// Promise.all - 等待所有 Promise 完成
const promises = [
fs.readFile('file1.txt', 'utf8'),
fs.readFile('file2.txt', 'utf8'),
fs.readFile('file3.txt', 'utf8')
];
Promise.all(promises)
.then(results => {
console.log('所有文件读取完成:', results);
})
.catch(err => {
console.error('读取失败:', err);
});
// Promise.allSettled - 等待所有 Promise 完成(无论成功或失败)
Promise.allSettled(promises)
.then(results => {
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`文件 ${index + 1} 读取成功`);
} else {
console.log(`文件 ${index + 1} 读取失败`);
}
});
});
// Promise.race - 返回最先完成的 Promise
Promise.race(promises)
.then(result => {
console.log('最快的文件:', result);
});
// Promise.any - 返回第一个成功的 Promise(Node.js 15+)
Promise.any(promises)
.then(result => {
console.log('第一个成功的文件:', result);
})
.catch(err => {
console.error('所有 Promise 都失败');
});async/await 模式
基本用法
javascript
async function readFiles() {
try {
const data1 = await fs.readFile('file1.txt', 'utf8');
const data2 = await fs.readFile('file2.txt', 'utf8');
await fs.writeFile('output.txt', data1 + data2);
console.log('完成');
} catch (err) {
console.error('错误:', err);
}
}
readFiles();并行执行
javascript
async function readFilesParallel() {
try {
// 并行读取多个文件
const [data1, data2, data3] = await Promise.all([
fs.readFile('file1.txt', 'utf8'),
fs.readFile('file2.txt', 'utf8'),
fs.readFile('file3.txt', 'utf8')
]);
console.log('所有文件读取完成');
return { data1, data2, data3 };
} catch (err) {
console.error('错误:', err);
}
}循环中的 async/await
javascript
// 串行执行
async function processFiles(files) {
for (const file of files) {
const data = await fs.readFile(file, 'utf8');
console.log(`处理 ${file}:`, data);
}
}
// 并行执行
async function processFilesParallel(files) {
const promises = files.map(file => fs.readFile(file, 'utf8'));
const results = await Promise.all(promises);
return results;
}常见异步模式
1. 重试模式
javascript
async function retry(fn, maxAttempts = 3, delay = 1000) {
for (let i = 0; i < maxAttempts; i++) {
try {
return await fn();
} catch (err) {
if (i === maxAttempts - 1) throw err;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// 使用
retry(() => fetch('https://api.example.com/data'))
.then(data => console.log(data))
.catch(err => console.error('重试失败:', err));2. 超时模式
javascript
function timeout(promise, ms) {
return Promise.race([
promise,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('操作超时')), ms)
)
]);
}
// 使用
timeout(fs.readFile('file.txt', 'utf8'), 5000)
.then(data => console.log(data))
.catch(err => console.error(err));3. 批量处理
javascript
async function batchProcess(items, batchSize, processor) {
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await Promise.all(
batch.map(item => processor(item))
);
results.push(...batchResults);
}
return results;
}
// 使用
const items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const results = await batchProcess(items, 3, async (item) => {
// 处理每个项目
return item * 2;
});4. 队列处理
javascript
class AsyncQueue {
constructor(concurrency = 1) {
this.concurrency = concurrency;
this.running = 0;
this.queue = [];
}
async add(task) {
return new Promise((resolve, reject) => {
this.queue.push({ task, resolve, reject });
this.process();
});
}
async process() {
if (this.running >= this.concurrency || this.queue.length === 0) {
return;
}
this.running++;
const { task, resolve, reject } = this.queue.shift();
try {
const result = await task();
resolve(result);
} catch (err) {
reject(err);
} finally {
this.running--;
this.process();
}
}
}
// 使用
const queue = new AsyncQueue(2); // 并发数为 2
for (let i = 0; i < 10; i++) {
queue.add(async () => {
console.log(`处理任务 ${i}`);
await new Promise(resolve => setTimeout(resolve, 1000));
return `任务 ${i} 完成`;
});
}5. 缓存模式
javascript
function cacheAsync(fn, ttl = 60000) {
const cache = new Map();
return async function(...args) {
const key = JSON.stringify(args);
const cached = cache.get(key);
if (cached && Date.now() - cached.timestamp < ttl) {
return cached.value;
}
const value = await fn(...args);
cache.set(key, { value, timestamp: Date.now() });
return value;
};
}
// 使用
const cachedFetch = cacheAsync(async (url) => {
const response = await fetch(url);
return response.json();
}, 60000); // 缓存 60 秒事件驱动模式
EventEmitter
javascript
const EventEmitter = require('events');
class MyEmitter extends EventEmitter {}
const myEmitter = new MyEmitter();
// 监听事件
myEmitter.on('event', (data) => {
console.log('事件触发:', data);
});
// 触发事件
myEmitter.emit('event', { message: 'Hello' });
// 只监听一次
myEmitter.once('event', () => {
console.log('只触发一次');
});
// 移除监听器
myEmitter.removeListener('event', handler);异步事件处理
javascript
class AsyncEventEmitter extends EventEmitter {
async emitAsync(event, ...args) {
const listeners = this.listeners(event);
const results = await Promise.all(
listeners.map(listener => listener(...args))
);
return results;
}
}
const emitter = new AsyncEventEmitter();
emitter.on('process', async (data) => {
await new Promise(resolve => setTimeout(resolve, 1000));
return `处理完成: ${data}`;
});
emitter.emitAsync('process', '数据').then(results => {
console.log('所有处理完成:', results);
});流式处理
处理大文件
javascript
const fs = require('fs');
const { pipeline } = require('stream/promises');
async function processLargeFile() {
const readStream = fs.createReadStream('large-file.txt');
const writeStream = fs.createWriteStream('output.txt');
await pipeline(
readStream,
// 可以添加转换流
writeStream
);
console.log('处理完成');
}错误处理最佳实践
统一错误处理
javascript
async function handleAsync(fn) {
try {
return [null, await fn()];
} catch (err) {
return [err, null];
}
}
// 使用
const [err, data] = await handleAsync(() => fs.readFile('file.txt', 'utf8'));
if (err) {
console.error('错误:', err);
} else {
console.log('数据:', data);
}错误边界
javascript
async function safeAsync(fn, fallback) {
try {
return await fn();
} catch (err) {
console.error('错误:', err);
return fallback;
}
}
// 使用
const data = await safeAsync(
() => fs.readFile('file.txt', 'utf8'),
'默认内容'
);总结
Node.js 异步编程要点:
- 回调模式:基础的异步处理方式
- Promise:解决回调地狱,提供链式调用
- async/await:让异步代码更易读
- 常见模式:重试、超时、批量处理、队列、缓存
- 事件驱动:使用 EventEmitter 处理事件
- 流式处理:高效处理大文件
- 错误处理:统一的错误处理策略