掌握Generator函数的精髓,实现优雅的异步控制流和数据流处理
引言:重新认识JavaScript生成器
生成器(Generator)是ES6引入的强大特性,它允许函数在执行过程中暂停和恢复。本文将深入探讨如何利用生成器构建一个可暂停、可控制的异步数据流处理器,解决复杂异步场景下的控制难题。
一、生成器基础:超越简单的迭代器
生成器函数使用function*声明,通过yield关键字暂停执行:
function* dataProducer() {
console.log('开始生产数据');
yield '第一批数据';
console.log('继续生产');
yield '第二批数据';
return '生产完成';
}
const producer = dataProducer();
console.log(producer.next()); // { value: '第一批数据', done: false }
console.log(producer.next()); // { value: '第二批数据', done: false }
console.log(producer.next()); // { value: '生产完成', done: true }
生成器的真正威力在于双向通信:
function* twoWayCommunicator() {
const name = yield '请输入您的名字:';
const age = yield `你好 ${name},请输入年龄:`;
return `${name},${age}岁,欢迎使用系统!`;
}
const communicator = twoWayCommunicator();
console.log(communicator.next().value); // "请输入您的名字:"
console.log(communicator.next('张三').value); // "你好 张三,请输入年龄:"
console.log(communicator.next(25).value); // "张三,25岁,欢迎使用系统!"
二、设计异步数据流处理器架构
我们将构建一个支持以下特性的数据流处理器:
- 可暂停和恢复的数据处理
- 错误处理与恢复机制
- 数据转换管道
- 并发控制
- 进度追踪
三、核心实现:可暂停数据流处理器
首先创建基础的数据流处理器类:
class PausableDataStream {
constructor(dataSource) {
this.dataSource = dataSource;
this.processor = null;
this.isPaused = false;
this.queue = [];
this.resumeCallbacks = [];
}
// 创建处理生成器
*processGenerator() {
let index = 0;
while (true) {
if (this.isPaused) {
// 暂停时等待恢复信号
yield new Promise(resolve => {
this.resumeCallbacks.push(resolve);
});
}
const data = this.dataSource[index];
if (!data) break;
try {
// 处理数据并返回结果
const result = yield this.processData(data, index);
console.log(`处理第${index + 1}条数据:`, result);
index++;
} catch (error) {
console.error(`处理数据时出错:`, error);
// 错误恢复:跳过当前数据
index++;
}
}
return '所有数据处理完成';
}
processData(data, index) {
// 模拟数据处理
return new Promise(resolve => {
setTimeout(() => {
resolve({
id: index,
processed: data.toUpperCase(),
timestamp: Date.now()
});
}, Math.random() * 1000);
});
}
// 开始处理
start() {
this.processor = this.processGenerator();
this.nextStep();
}
// 执行下一步
async nextStep(value) {
if (!this.processor) return;
const { value: nextValue, done } = this.processor.next(value);
if (done) {
console.log('处理完成:', nextValue);
return;
}
if (nextValue instanceof Promise) {
const result = await nextValue;
this.nextStep(result);
} else {
this.nextStep(nextValue);
}
}
// 暂停处理
pause() {
this.isPaused = true;
console.log('处理已暂停');
}
// 恢复处理
resume() {
this.isPaused = false;
console.log('处理已恢复');
// 触发所有等待的恢复回调
this.resumeCallbacks.forEach(callback => callback());
this.resumeCallbacks = [];
}
// 添加数据到队列
enqueue(data) {
this.queue.push(data);
if (this.isPaused) {
this.resume();
}
}
}
四、实战案例:实时日志分析系统
构建一个实时处理服务器日志的系统:
class LogAnalyzer {
constructor() {
this.logStream = null;
this.analytics = {
errors: 0,
warnings: 0,
info: 0,
patterns: new Map()
};
}
// 日志处理生成器
*createLogProcessor() {
let batch = [];
const BATCH_SIZE = 10;
while (true) {
const logEntry = yield;
if (!logEntry) break;
batch.push(logEntry);
if (batch.length >= BATCH_SIZE) {
yield this.processBatch(batch);
batch = [];
}
// 实时分析
this.analyzeLog(logEntry);
}
// 处理剩余日志
if (batch.length > 0) {
yield this.processBatch(batch);
}
return this.analytics;
}
async processBatch(batch) {
console.log(`处理批次,大小: ${batch.length}`);
// 模拟异步处理
return new Promise(resolve => {
setTimeout(() => {
const results = batch.map(log => ({
...log,
analyzed: true,
processedAt: new Date().toISOString()
}));
resolve(results);
}, 500);
});
}
analyzeLog(log) {
// 分类统计
if (log.level === 'ERROR') this.analytics.errors++;
if (log.level === 'WARN') this.analytics.warnings++;
if (log.level === 'INFO') this.analytics.info++;
// 模式检测
if (log.message.includes('timeout')) {
const count = this.analytics.patterns.get('timeout') || 0;
this.analytics.patterns.set('timeout', count + 1);
}
}
// 启动日志分析
async startAnalysis(logSource) {
const processor = this.createLogProcessor();
let result = processor.next(); // 启动生成器
for (const log of logSource) {
// 发送日志到生成器
result = processor.next(log);
// 如果生成器返回Promise(批次处理)
if (result.value instanceof Promise) {
const batchResult = await result.value;
console.log('批次处理结果:', batchResult.length);
}
// 每处理50条日志显示一次统计
if ((this.analytics.errors + this.analytics.warnings + this.analytics.info) % 50 === 0) {
this.displayAnalytics();
}
}
// 完成处理
const finalResult = processor.next().value;
console.log('最终分析结果:', finalResult);
}
displayAnalytics() {
console.log('=== 实时统计 ===');
console.log(`错误: ${this.analytics.errors}`);
console.log(`警告: ${this.analytics.warnings}`);
console.log(`信息: ${this.analytics.info}`);
console.log('检测到的模式:', Object.fromEntries(this.analytics.patterns));
}
}
// 使用示例
const mockLogs = Array.from({ length: 100 }, (_, i) => ({
id: i,
timestamp: new Date(Date.now() - Math.random() * 3600000).toISOString(),
level: ['ERROR', 'WARN', 'INFO'][Math.floor(Math.random() * 3)],
message: `日志条目 ${i} - ${Math.random() > 0.7 ? '请求超时发生' : '正常操作'}`
}));
const analyzer = new LogAnalyzer();
analyzer.startAnalysis(mockLogs);
五、高级特性:生成器组合与管道
创建可组合的数据处理管道:
function* filter(predicate, source) {
for (const item of source) {
if (predicate(item)) {
yield item;
}
}
}
function* map(transform, source) {
for (const item of source) {
yield transform(item);
}
}
function* take(count, source) {
let taken = 0;
for (const item of source) {
if (taken >= count) return;
yield item;
taken++;
}
}
// 管道组合函数
function pipeline(...generators) {
return function*(source) {
let current = source;
for (const gen of generators) {
current = gen(current);
}
yield* current;
};
}
// 使用示例
const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// 创建处理管道:过滤偶数 → 乘以2 → 取前3个
const processPipeline = pipeline(
source => filter(x => x % 2 === 0, source),
source => map(x => x * 2, source),
source => take(3, source)
);
console.log('管道处理结果:');
for (const result of processPipeline(data)) {
console.log(result); // 输出: 4, 8, 12
}
六、错误处理与恢复策略
实现健壮的错误处理机制:
function* resilientProcessor(source) {
const retryQueue = [];
const MAX_RETRIES = 3;
for (const item of source) {
let retries = 0;
let success = false;
while (!success && retries
setTimeout(resolve, 1000 * retries)
);
}
}
}
}
// 处理重试队列
if (retryQueue.length > 0) {
console.log('开始处理重试队列,大小:', retryQueue.length);
yield* resilientProcessor(retryQueue);
}
}
async function processWithRetry(item) {
// 模拟可能失败的操作
if (Math.random() < 0.3) {
throw new Error('随机失败');
}
return `已处理: ${item}`;
}
七、性能优化技巧
- 懒加载数据:只在需要时生成/获取数据
- 内存管理:及时释放已处理的数据引用
- 并发控制:限制同时处理的任务数量
- 缓存策略:缓存频繁访问的中间结果
function* concurrentProcessor(tasks, concurrency = 3) {
const executing = new Set();
let index = 0;
while (index 0) {
// 启动新任务直到达到并发限制
while (executing.size < concurrency && index {
executing.delete(promise);
return { success: true, result };
}).catch(error => {
executing.delete(promise);
return { success: false, error };
});
executing.add(promise);
index++;
}
// 等待任意任务完成
if (executing.size > 0) {
const completed = yield Promise.race(executing);
if (completed.success) {
console.log('任务完成:', completed.result);
} else {
console.error('任务失败:', completed.error);
}
}
}
}
八、实际应用场景
- 大文件分块处理:逐块读取和处理大型文件
- 实时数据流:WebSocket消息的实时处理
- 分页数据加载:无限滚动列表的数据管理
- 任务调度系统:可控的任务执行流程
- 游戏状态管理:游戏逻辑的暂停和恢复
结语
JavaScript生成器提供了一种全新的异步编程范式,通过可暂停和恢复的执行流,我们能够构建更加可控、可维护的数据处理系统。本文展示的数据流处理器模式,结合了生成器的核心特性,为处理复杂异步场景提供了优雅的解决方案。
掌握生成器的关键要点:理解yield的双向通信、合理管理生成器状态、设计可组合的处理管道。这些技能将帮助你在现代JavaScript开发中处理更复杂的异步数据流场景。
// 页面交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 代码块交互增强
const codeBlocks = document.querySelectorAll(‘pre code’);
codeBlocks.forEach((block, index) => {
// 添加行号
const lines = block.textContent.split(‘n’);
const numbered = lines.map((line, i) =>
`
${i + 1}
${escapeHtml(line)}
`
).join(”);
block.innerHTML = numbered;
// 点击复制功能
block.addEventListener(‘click’, async function() {
const text = lines.join(‘n’);
try {
await navigator.clipboard.writeText(text);
const original = block.innerHTML;
block.innerHTML = ‘✓ 代码已复制到剪贴板‘;
setTimeout(() => block.innerHTML = original, 1500);
} catch (err) {
console.log(‘复制失败:’, err);
}
});
block.title = “点击复制代码”;
block.style.cursor = “pointer”;
});
function escapeHtml(text) {
const div = document.createElement(‘div’);
div.textContent = text;
return div.innerHTML;
}
// 语法高亮
const keywords = [‘function*’, ‘yield’, ‘async’, ‘await’, ‘class’, ‘const’, ‘let’, ‘new’, ‘return’, ‘try’, ‘catch’];
codeBlocks.forEach(block => {
let html = block.innerHTML;
keywords.forEach(keyword => {
const regex = new RegExp(`\b${keyword}\b`, ‘g’);
html = html.replace(regex, `${keyword}`);
});
// 字符串高亮
html = html.replace(/(‘.*?’|”.*?”)/g, ‘$1‘);
// 注释高亮
html = html.replace(///.*$/gm, ‘$&‘);
block.innerHTML = html;
});
});

