发布日期:2024年1月 | 作者:JavaScript异步编程专家
一、生成器基础概念
生成器(Generator)是ES6引入的特殊函数,它可以在执行过程中暂停和恢复,为JavaScript带来了协程-like的编程能力。生成器函数使用function*
语法声明,通过yield
关键字控制执行流程。
基本语法与特性:
// 定义生成器函数
function* basicGenerator() {
console.log('开始执行');
yield '第一个值';
console.log('恢复执行');
yield '第二个值';
console.log('结束执行');
return '最终结果';
}
// 使用生成器
const generator = basicGenerator();
console.log(generator.next()); // { value: '第一个值', done: false }
console.log(generator.next()); // { value: '第二个值', done: false }
console.log(generator.next()); // { value: '最终结果', done: true }
生成器核心方法:
- next():恢复生成器执行,返回{value, done}对象
- return():终止生成器执行
- throw():向生成器抛出错误
二、生成器高级特性
1. 双向通信
生成器支持通过next()
方法进行双向数据传递:
function* twoWayCommunication() {
const name = yield '请输入您的名字:';
const age = yield `你好 ${name},请输入您的年龄:`;
return `${name},您 ${age} 岁了`;
}
const communicator = twoWayCommunication();
console.log(communicator.next().value); // "请输入您的名字:"
console.log(communicator.next('张三').value); // "你好 张三,请输入您的年龄:"
console.log(communicator.next(25).value); // "张三,您 25 岁了"
2. 生成器委托
使用yield*
实现生成器委托,组合多个生成器:
function* numberGenerator() {
yield 1;
yield 2;
yield 3;
}
function* colorGenerator() {
yield 'red';
yield 'green';
yield 'blue';
}
function* combinedGenerator() {
yield* numberGenerator();
yield* colorGenerator();
yield '混合完成';
}
const combined = combinedGenerator();
for (let value of combined) {
console.log(value); // 1, 2, 3, 'red', 'green', 'blue', '混合完成'
}
3. 无限序列生成
function* fibonacci() {
let [prev, curr] = [0, 1];
while (true) {
yield curr;
[prev, curr] = [curr, prev + curr];
}
}
const fib = fibonacci();
console.log(fib.next().value); // 1
console.log(fib.next().value); // 1
console.log(fib.next().value); // 2
console.log(fib.next().value); // 3
console.log(fib.next().value); // 5
三、异步迭代器详解
ES2018引入的异步迭代器允许我们处理异步数据流,使用for await...of
循环遍历异步数据源。
1. 异步生成器
async function* asyncNumberGenerator() {
for (let i = 1; i setTimeout(resolve, 1000));
yield i;
}
}
// 使用 for await...of
(async function() {
for await (const num of asyncNumberGenerator()) {
console.log(num); // 每秒输出: 1, 2, 3
}
})();
2. 自定义异步可迭代对象
class AsyncDataStream {
constructor(data, delay = 1000) {
this.data = data;
this.delay = delay;
}
async *[Symbol.asyncIterator]() {
for (const item of this.data) {
await new Promise(resolve => setTimeout(resolve, this.delay));
yield item;
}
}
}
// 使用示例
const stream = new AsyncDataStream(['A', 'B', 'C'], 500);
(async () => {
for await (const chunk of stream) {
console.log(`接收到数据: ${chunk}`);
}
})();
3. 错误处理机制
async function* asyncGeneratorWithError() {
try {
yield '第一步';
await new Promise(resolve => setTimeout(resolve, 1000));
yield '第二步';
throw new Error('模拟错误');
} catch (error) {
yield `捕获错误: ${error.message}`;
}
}
(async () => {
const generator = asyncGeneratorWithError();
try {
for await (const value of generator) {
console.log(value);
}
} catch (error) {
console.error('外部捕获:', error);
}
})();
四、实战案例:构建流式数据处理管道
下面我们构建一个完整的流式数据处理系统,模拟真实世界的数据处理场景。
1. 数据源生成器
// 模拟数据源 - 传感器数据流
async function* sensorDataStream(sensorId, count = 10) {
for (let i = 0; i setTimeout(resolve, 200));
const data = {
sensorId,
timestamp: new Date().toISOString(),
value: Math.random() * 100,
sequence: i
};
yield data;
}
}
2. 数据处理中间件
// 数据过滤器
async function* filterData(source, predicate) {
for await (const data of source) {
if (predicate(data)) {
yield data;
}
}
}
// 数据转换器
async function* transformData(source, transformer) {
for await (const data of source) {
yield transformer(data);
}
}
// 数据聚合器
async function* aggregateData(source, windowSize = 3) {
let buffer = [];
for await (const data of source) {
buffer.push(data);
if (buffer.length >= windowSize) {
const average = buffer.reduce((sum, item) => sum + item.value, 0) / buffer.length;
yield {
type: 'aggregate',
timestamp: new Date().toISOString(),
averageValue: average,
dataPoints: buffer.length
};
buffer = [];
}
}
// 处理剩余数据
if (buffer.length > 0) {
const average = buffer.reduce((sum, item) => sum + item.value, 0) / buffer.length;
yield {
type: 'aggregate',
timestamp: new Date().toISOString(),
averageValue: average,
dataPoints: buffer.length
};
}
}
3. 完整数据处理管道
class DataProcessingPipeline {
constructor() {
this.stages = [];
}
addStage(stage) {
this.stages.push(stage);
return this; // 支持链式调用
}
async *process(source) {
let currentSource = source;
for (const stage of this.stages) {
currentSource = stage(currentSource);
}
for await (const result of currentSource) {
yield result;
}
}
}
// 构建完整管道
const pipeline = new DataProcessingPipeline()
.addStage(source => filterData(source, data => data.value > 30))
.addStage(source => transformData(source, data => ({
...data,
normalizedValue: (data.value / 100).toFixed(2),
status: data.value > 70 ? 'HIGH' : data.value > 40 ? 'MEDIUM' : 'LOW'
})))
.addStage(source => aggregateData(source, 2));
// 运行数据处理
(async () => {
console.log('开始处理传感器数据...');
const sensorSource = sensorDataStream('sensor-001', 8);
for await (const result of pipeline.process(sensorSource)) {
console.log('处理结果:', result);
}
console.log('数据处理完成');
})();
五、真实场景应用
1. 分页数据获取
async function* paginatedDataFetcher(url, pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${url}?page=${page}&size=${pageSize}`);
const data = await response.json();
if (data.items && data.items.length > 0) {
for (const item of data.items) {
yield item;
}
hasMore = data.hasMore;
page++;
} else {
hasMore = false;
}
}
}
// 使用示例
(async () => {
const userFetcher = paginatedDataFetcher('/api/users');
for await (const user of userFetcher) {
console.log('用户:', user.name);
// 可以随时break停止获取
if (user.id === 100) break;
}
})();
2. WebSocket消息流处理
class WebSocketStream {
constructor(url) {
this.url = url;
this.socket = null;
this.resolvers = [];
}
async *[Symbol.asyncIterator]() {
this.socket = new WebSocket(this.url);
this.socket.onmessage = (event) => {
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift();
resolve({ value: JSON.parse(event.data), done: false });
}
};
this.socket.onclose = () => {
this.resolvers.forEach(resolve =>
resolve({ value: undefined, done: true }));
};
while (true) {
const promise = new Promise((resolve) => {
this.resolvers.push(resolve);
});
const result = await promise;
if (result.done) break;
yield result.value;
}
}
close() {
if (this.socket) {
this.socket.close();
}
}
}
// 使用示例
(async () => {
const wsStream = new WebSocketStream('ws://localhost:8080/stream');
try {
for await (const message of wsStream) {
console.log('收到消息:', message);
// 处理实时消息流
}
} finally {
wsStream.close();
}
})();
六、总结
生成器和异步迭代器为JavaScript异步编程带来了革命性的变化:
- 生成器提供了可控的执行流程,支持暂停和恢复
- 异步迭代器使得处理异步数据流变得简单直观
- for await…of语法让异步循环更加清晰
- 组合使用可以构建复杂的数据处理管道
这些特性在现代JavaScript应用中广泛使用,特别是在处理流式数据、实现复杂异步逻辑和构建响应式系统时表现出色。掌握生成器和异步迭代器将极大提升你处理复杂异步场景的能力。
进一步学习建议:
- 深入学习RxJS等响应式编程库
- 探索Node.js流(Stream)与异步迭代器的结合
- 了解生成器在状态机和协程中的应用
- 实践在React/Vue等框架中的异步数据获取模式