深入探索Generator函数的强大能力,实现优雅的异步数据流控制
一、生成器的核心概念与优势
JavaScript生成器(Generator)是ES6引入的一种特殊函数,它能够在执行过程中暂停和恢复,这种特性使得生成器成为处理异步操作、数据流和状态机的理想工具。与传统异步处理方式相比,生成器提供了更直观、更可控的编程模型。
生成器的核心特性包括:
- 可暂停执行:使用yield关键字暂停函数执行
- 状态保持:暂停时保持局部变量状态
- 双向通信:通过yield返回值,通过next()传递值
- 迭代器协议:天然支持迭代器模式
二、生成器基础语法与模式
2.1 基本生成器函数
function* basicGenerator() {
console.log('开始执行');
const result1 = yield '第一步';
console.log('收到值:', result1);
const result2 = yield '第二步';
console.log('收到值:', result2);
return '完成';
}
// 使用示例
const gen = basicGenerator();
console.log(gen.next()); // { value: '第一步', done: false }
console.log(gen.next('数据A')); // { value: '第二步', done: false }
console.log(gen.next('数据B')); // { value: '完成', done: true }
2.2 生成器作为可迭代对象
function* numberSequence(start, end) {
for (let i = start; i <= end; i++) {
yield i;
}
}
// 使用for...of循环
for (const num of numberSequence(1, 5)) {
console.log(num); // 1, 2, 3, 4, 5
}
// 展开为数组
const numbers = [...numberSequence(10, 15)];
console.log(numbers); // [10, 11, 12, 13, 14, 15]
三、实战案例:构建异步数据流管道
我们将构建一个完整的异步数据处理系统,支持数据转换、过滤、批处理和错误处理。
3.1 基础数据流处理器
function* createDataPipeline(transformers) {
let data = yield; // 等待初始数据
for (const transformer of transformers) {
try {
// 应用转换器
data = yield transformer(data);
} catch (error) {
console.error('转换失败:', error);
yield { error, data }; // 传递错误信息
}
}
return { success: true, finalData: data };
}
// 定义转换函数
const transformers = [
data => ({ ...data, processed: true }),
data => ({ ...data, timestamp: Date.now() }),
data => {
if (data.value > 100) throw new Error('值过大');
return { ...data, normalized: data.value / 100 };
}
];
// 使用管道
const pipeline = createDataPipeline(transformers);
pipeline.next(); // 启动生成器
const result1 = pipeline.next({ value: 50, id: 1 });
console.log(result1.value); // 第一次转换结果
const result2 = pipeline.next(result1.value);
console.log(result2.value); // 第二次转换结果
const finalResult = pipeline.next(result2.value);
console.log(finalResult.value); // 最终结果或错误
3.2 高级异步数据流处理器
async function* asyncDataStream(source, options = {}) {
const {
batchSize = 10,
concurrency = 3,
retryAttempts = 2
} = options;
let buffer = [];
let activePromises = 0;
const pendingItems = [];
// 数据源可以是数组、异步迭代器或函数
const dataSource = typeof source === 'function'
? source
: async function*() { for (const item of source) yield item; };
for await (const item of dataSource()) {
buffer.push(item);
// 达到批处理大小时处理一批数据
if (buffer.length >= batchSize) {
const batch = buffer;
buffer = [];
pendingItems.push(processBatch(batch));
}
// 控制并发数
if (pendingItems.length >= concurrency) {
yield await pendingItems.shift();
}
}
// 处理剩余数据
if (buffer.length > 0) {
pendingItems.push(processBatch(buffer));
}
// 等待所有处理完成
while (pendingItems.length > 0) {
yield await pendingItems.shift();
}
async function processBatch(batch) {
let attempts = 0;
while (attempts ({
...item,
processed: true,
processedAt: new Date().toISOString()
}))
);
} catch (error) {
attempts++;
if (attempts > retryAttempts) {
console.error(`处理失败,已重试${retryAttempts}次:`, error);
return batch.map(item => ({
...item,
error: error.message,
failed: true
}));
}
await new Promise(resolve =>
setTimeout(resolve, 1000 * attempts)
);
}
}
}
}
3.3 使用数据流处理器
// 模拟数据源
async function* mockDataSource(count) {
for (let i = 1; i setTimeout(resolve, 100));
yield { id: i, value: Math.random() * 200, type: `item_${i}` };
}
}
// 创建数据流处理器
const dataStream = asyncDataStream(() => mockDataSource(50), {
batchSize: 5,
concurrency: 2,
retryAttempts: 1
});
// 处理数据流
(async () => {
const results = [];
let processedCount = 0;
let errorCount = 0;
for await (const batch of dataStream) {
console.log(`处理批次: ${batch.length} 个项目`);
batch.forEach(item => {
if (item.error) {
errorCount++;
console.warn(`处理失败: ${item.id} - ${item.error}`);
} else {
processedCount++;
results.push(item);
}
});
// 实时统计
console.log(`已处理: ${processedCount}, 失败: ${errorCount}`);
}
console.log('处理完成:', {
总数: processedCount + errorCount,
成功: processedCount,
失败: errorCount
});
})();
四、协程模式:实现复杂的异步控制流
4.1 任务调度器
class TaskScheduler {
constructor(maxConcurrent = 3) {
this.maxConcurrent = maxConcurrent;
this.running = 0;
this.queue = [];
this.results = new Map();
}
addTask(taskFn, id) {
return new Promise((resolve, reject) => {
this.queue.push({ taskFn, id, resolve, reject });
this.run();
});
}
async run() {
while (this.running 0) {
const { taskFn, id, resolve, reject } = this.queue.shift();
this.running++;
try {
const result = await taskFn();
this.results.set(id, result);
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.run();
}
}
}
*getResults() {
for (const [id, result] of this.results) {
yield { id, result };
}
}
}
// 使用任务调度器
const scheduler = new TaskScheduler(2);
// 添加任务
const tasks = [
() => fetchData('user'),
() => fetchData('products'),
() => processImage('image1.jpg'),
() => sendAnalytics('pageview'),
() => generateReport('2024')
];
tasks.forEach((task, index) => {
scheduler.addTask(task, `task_${index}`)
.then(result => console.log(`任务${index}完成:`, result))
.catch(error => console.error(`任务${index}失败:`, error));
});
// 获取结果
for (const result of scheduler.getResults()) {
console.log('结果:', result);
}
4.2 状态机实现
function* stateMachine(initialState) {
let state = initialState;
while (true) {
const action = yield state;
switch (state.status) {
case 'idle':
if (action.type === 'START') {
state = { status: 'processing', data: action.data };
}
break;
case 'processing':
if (action.type === 'PAUSE') {
state = { status: 'paused', data: state.data };
} else if (action.type === 'COMPLETE') {
state = { status: 'completed', result: action.result };
} else if (action.type === 'ERROR') {
state = { status: 'error', error: action.error };
}
break;
case 'paused':
if (action.type === 'RESUME') {
state = { status: 'processing', data: state.data };
} else if (action.type === 'CANCEL') {
state = { status: 'cancelled' };
}
break;
}
}
}
// 使用状态机
const machine = stateMachine({ status: 'idle' });
machine.next(); // 启动
console.log(machine.next({ type: 'START', data: '任务数据' }).value);
// { status: 'processing', data: '任务数据' }
console.log(machine.next({ type: 'PAUSE' }).value);
// { status: 'paused', data: '任务数据' }
console.log(machine.next({ type: 'RESUME' }).value);
// { status: 'processing', data: '任务数据' }
console.log(machine.next({ type: 'COMPLETE', result: '成功' }).value);
// { status: 'completed', result: '成功' }
五、生成器与异步迭代器的结合
5.1 实现异步生成器
async function* paginatedAPI(endpoint, options = {}) {
const { pageSize = 20, maxPages = 10 } = options;
let page = 1;
let hasMore = true;
while (hasMore && page 0) {
for (const item of data.items) {
yield item;
}
hasMore = data.hasMore || data.items.length === pageSize;
page++;
} else {
hasMore = false;
}
// 添加延迟避免速率限制
if (hasMore) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
} catch (error) {
console.error(`第${page}页获取失败:`, error);
// 重试逻辑
const shouldRetry = await new Promise(resolve => {
setTimeout(() => resolve(confirm('请求失败,是否重试?')), 0);
});
if (!shouldRetry) {
break;
}
}
}
}
// 使用异步生成器
(async () => {
const userStream = paginatedAPI('https://api.example.com/users', {
pageSize: 10,
maxPages: 5
});
for await (const user of userStream) {
console.log('用户:', user);
// 处理每个用户
}
})();
5.2 数据转换管道
function composeAsyncGenerators(...generators) {
return async function* composedGenerator(input) {
let currentValue = input;
for (const generator of generators) {
const gen = generator(currentValue);
if (gen[Symbol.asyncIterator]) {
for await (const value of gen) {
currentValue = value;
yield value;
}
} else {
for (const value of gen) {
currentValue = value;
yield value;
}
}
}
return currentValue;
};
}
// 定义处理阶段
const stages = [
async function* filterInvalid(data) {
for await (const item of data) {
if (item && item.id && item.value != null) {
yield item;
}
}
},
async function* transformData(data) {
for await (const item of data) {
yield {
...item,
processed: true,
score: item.value * 10,
category: item.value > 0.5 ? 'high' : 'low'
};
}
},
async function* batchData(data) {
let batch = [];
for await (const item of data) {
batch.push(item);
if (batch.length >= 5) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}
];
// 组合管道
const dataPipeline = composeAsyncGenerators(...stages);
// 使用管道
(async () => {
// 模拟数据源
async function* mockData() {
for (let i = 0; i < 23; i++) {
yield { id: i, value: Math.random() };
}
}
for await (const batch of dataPipeline(mockData())) {
console.log('处理后的批次:', batch);
// 进一步处理或存储
}
})();
六、性能优化与最佳实践
6.1 内存优化技巧
function* memoryEfficientProcessor(dataSource, processor) {
// 使用WeakMap存储中间结果,避免内存泄漏
const cache = new WeakMap();
for (const item of dataSource) {
if (cache.has(item)) {
yield cache.get(item);
} else {
const result = processor(item);
cache.set(item, result);
yield result;
// 定期清理缓存
if (cache.size > 1000) {
// 实现LRU缓存清理策略
const keys = Array.from(cache.keys());
for (let i = 0; i {
const maxKeep = 100;
if (loaded.size > maxKeep) {
const keys = Array.from(loaded.keys());
for (let i = 0; i < loaded.size - maxKeep; i++) {
loaded.delete(keys[i]);
}
}
};
return cleanup;
}
6.2 错误处理策略
function* resilientGenerator(generatorFn, options = {}) {
const {
maxRetries = 3,
retryDelay = 1000,
fallbackValue = null
} = options;
const gen = generatorFn();
while (true) {
let attempt = 0;
let result;
while (attempt maxRetries) {
console.error(`生成器失败,已重试${maxRetries}次:`, error);
if (fallbackValue !== undefined) {
yield fallbackValue;
}
// 尝试恢复或结束
try {
gen.return();
} catch {}
return;
}
console.warn(`重试 ${attempt}/${maxRetries}:`, error);
// 指数退避延迟
await new Promise(resolve =>
setTimeout(resolve, retryDelay * Math.pow(2, attempt - 1))
);
}
}
if (result.done) {
return result.value;
}
yield result.value;
}
}
七、实际应用场景
7.1 实时数据监控系统
function* createDataMonitor(sensors, alertThresholds) {
const readings = new Map();
const alerts = new Set();
while (true) {
const timestamp = Date.now();
const sensorReadings = [];
// 收集所有传感器数据
for (const sensor of sensors) {
const reading = yield sensor.read();
readings.set(sensor.id, { ...reading, timestamp });
sensorReadings.push(reading);
// 检查警报条件
if (reading.value > alertThresholds[sensor.type]) {
const alert = {
sensor: sensor.id,
value: reading.value,
threshold: alertThresholds[sensor.type],
timestamp
};
if (!alerts.has(JSON.stringify(alert))) {
alerts.add(JSON.stringify(alert));
yield { type: 'ALERT', data: alert };
}
}
}
// 生成聚合报告
const report = {
timestamp,
average: sensorReadings.reduce((a, b) => a + b.value, 0) / sensorReadings.length,
max: Math.max(...sensorReadings.map(r => r.value)),
min: Math.min(...sensorReadings.map(r => r.value)),
activeAlerts: alerts.size
};
yield { type: 'REPORT', data: report };
// 等待下一个采样周期
yield new Promise(resolve => setTimeout(resolve, 5000));
}
}
7.2 游戏状态管理
function* gameStateManager(initialState) {
let state = { ...initialState, history: [] };
while (true) {
const action = yield state;
// 保存历史状态(限制长度)
state.history.push({ ...state });
if (state.history.length > 100) {
state.history.shift();
}
// 处理游戏动作
switch (action.type) {
case 'MOVE':
state = {
...state,
position: action.position,
moves: state.moves + 1
};
break;
case 'COLLECT':
state = {
...state,
inventory: [...state.inventory, action.item],
score: state.score + action.points
};
break;
case 'SAVE':
yield { type: 'SAVE_COMPLETE', data: state };
break;
case 'LOAD':
if (action.savePoint 0) {
state = state.history.pop();
}
break;
}
// 检查游戏结束条件
if (state.score >= 1000) {
yield { type: 'GAME_OVER', data: { ...state, win: true } };
return;
}
}
}
八、总结与扩展
JavaScript生成器提供了强大的控制流管理能力,特别适合以下场景:
- 异步数据流处理:处理大量数据或实时数据流
- 状态机实现:管理复杂的状态转换逻辑
- 协程编程:实现协作式多任务处理
- 懒加载:按需生成和处理数据
- 迭代器模式:创建自定义迭代逻辑
关键优势:
- 更清晰的异步代码结构
- 更好的内存控制
- 灵活的状态管理
- 可组合的数据处理管道
进一步学习方向:
- 探索async/await与生成器的结合使用
- 研究RxJS等响应式编程库的内部实现
- 学习TypeScript中的高级类型与生成器
- 了解Web Workers中的生成器应用
- 探索生成器在Node.js流处理中的应用
生成器是JavaScript中一个强大但常被低估的特性。通过合理使用生成器,可以构建出更优雅、更高效、更易维护的异步代码结构。
// 页面交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 为代码块添加交互功能
const codeBlocks = document.querySelectorAll(‘pre code’);
codeBlocks.forEach((block, index) => {
const container = document.createElement(‘div’);
container.style.cssText = ‘margin: 10px 0;’;
const buttonContainer = document.createElement(‘div’);
buttonContainer.style.cssText = ‘display: flex; gap: 10px; margin-bottom: 5px;’;
// 复制按钮
const copyBtn = document.createElement(‘button’);
copyBtn.textContent = ‘复制代码’;
copyBtn.style.cssText = `
background: #4CAF50;
color: white;
border: none;
padding: 5px 15px;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
`;
// 运行按钮(针对可运行示例)
if (block.textContent.includes(‘console.log’) ||
block.textContent.includes(‘yield’) ||
block.textContent.includes(‘async’)) {
const runBtn = document.createElement(‘button’);
runBtn.textContent = ‘运行示例’;
runBtn.style.cssText = `
background: #2196F3;
color: white;
border: none;
padding: 5px 15px;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
`;
const outputDiv = document.createElement(‘div’);
outputDiv.className = ‘code-output’;
outputDiv.style.cssText = `
margin-top: 10px;
padding: 10px;
background: #f5f5f5;
border-radius: 4px;
font-family: monospace;
max-height: 200px;
overflow-y: auto;
display: none;
`;
runBtn.addEventListener(‘click’, async () => {
try {
outputDiv.style.display = ‘block’;
outputDiv.innerHTML = ‘执行中…‘;
// 捕获console.log输出
const originalLog = console.log;
const logs = [];
console.log = (…args) => {
logs.push(args.map(arg =>
typeof arg === ‘object’ ? JSON.stringify(arg, null, 2) : String(arg)
).join(‘ ‘));
};
// 执行代码
const code = block.textContent;
const wrappedCode = `
(async () => {
try {
${code}
} catch(err) {
console.error(‘执行错误:’, err);
}
})()
`;
await eval(wrappedCode);
// 恢复console.log
console.log = originalLog;
// 显示输出
if (logs.length > 0) {
outputDiv.innerHTML = logs.map(log =>
`
`
).join(”);
} else {
outputDiv.innerHTML = ‘
‘;
}
} catch (error) {
outputDiv.innerHTML = `
`;
}
});
buttonContainer.appendChild(runBtn);
block.parentNode.insertBefore(container, block);
container.appendChild(buttonContainer);
container.appendChild(block);
container.appendChild(outputDiv);
} else {
block.parentNode.insertBefore(container, block);
container.appendChild(buttonContainer);
container.appendChild(block);
}
// 复制功能
copyBtn.addEventListener(‘click’, async () => {
try {
await navigator.clipboard.writeText(block.textContent);
copyBtn.textContent = ‘已复制!’;
copyBtn.style.background = ‘#45a049’;
setTimeout(() => {
copyBtn.textContent = ‘复制代码’;
copyBtn.style.background = ‘#4CAF50’;
}, 2000);
} catch (err) {
console.error(‘复制失败:’, err);
copyBtn.textContent = ‘复制失败’;
copyBtn.style.background = ‘#f44336’;
}
});
buttonContainer.appendChild(copyBtn);
});
// 添加章节导航
const sections = document.querySelectorAll(‘section’);
const nav = document.createElement(‘nav’);
nav.style.cssText = `
position: fixed;
right: 20px;
top: 100px;
background: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
max-height: 70vh;
overflow-y: auto;
display: none;
`;
const navTitle = document.createElement(‘h3’);
navTitle.textContent = ‘章节导航’;
navTitle.style.cssText = ‘margin: 0 0 10px 0; font-size: 14px;’;
nav.appendChild(navTitle);
sections.forEach((section, index) => {
const h2 = section.querySelector(‘h2’);
if (h2) {
const link = document.createElement(‘a’);
link.textContent = h2.textContent;
link.href = `#section-${index}`;
link.style.cssText = `
display: block;
padding: 5px 0;
color: #2196F3;
text-decoration: none;
font-size: 13px;
border-bottom: 1px solid #eee;
`;
section.id = `section-${index}`;
nav.appendChild(link);
}
});
document.body.appendChild(nav);
// 显示/隐藏导航按钮
const navToggle = document.createElement(‘button’);
navToggle.textContent = ‘📖 导航’;
navToggle.style.cssText = `
position: fixed;
right: 20px;
top: 70px;
background: #2196F3;
color: white;
border: none;
padding: 8px 15px;
border-radius: 20px;
cursor: pointer;
z-index: 1000;
font-size: 14px;
`;
navToggle.addEventListener(‘click’, () => {
nav.style.display = nav.style.display === ‘block’ ? ‘none’ : ‘block’;
});
document.body.appendChild(navToggle);
// 点击导航链接后隐藏导航
nav.querySelectorAll(‘a’).forEach(link => {
link.addEventListener(‘click’, () => {
nav.style.display = ‘none’;
});
});
});

