发布日期:2024年1月 | 作者:JavaScript高级技术专家
一、迭代协议与可迭代对象
JavaScript中的迭代协议是ES6引入的重要特性,它定义了标准的方式来遍历数据结构。迭代协议包含两个核心概念:可迭代协议和迭代器协议。
可迭代协议要求对象实现@@iterator
方法(通过Symbol.iterator
访问),该方法返回一个迭代器对象。迭代器协议则要求对象实现next()
方法,该方法返回包含value
和done
属性的对象。
// 内置的可迭代对象
const array = [1, 2, 3];
const string = "hello";
const map = new Map([['key1', 'value1']]);
// 使用for...of循环遍历
for (const item of array) {
console.log(item); // 1, 2, 3
}
二、迭代器实现原理
1. 自定义迭代器实现
class Range {
constructor(start, end, step = 1) {
this.start = start;
this.end = end;
this.step = step;
}
[Symbol.iterator]() {
let current = this.start;
const end = this.end;
const step = this.step;
return {
next() {
if ((step > 0 && current <= end) || (step = end)) {
const value = current;
current += step;
return { value, done: false };
}
return { value: undefined, done: true };
},
// 可选的return方法,用于提前终止迭代
return() {
console.log('迭代提前终止');
return { done: true };
}
};
}
}
// 使用示例
const range = new Range(1, 5);
for (const num of range) {
console.log(num); // 1, 2, 3, 4, 5
}
2. 无限序列迭代器
class FibonacciSequence {
[Symbol.iterator]() {
let prev = 0;
let current = 1;
return {
next() {
const value = current;
[prev, current] = [current, prev + current];
return { value, done: false };
}
};
}
}
// 使用示例(注意:这是无限序列,需要手动中断)
const fibonacci = new FibonacciSequence();
let count = 0;
for (const num of fibonacci) {
if (count++ >= 10) break;
console.log(num); // 1, 1, 2, 3, 5, 8, 13, 21, 34, 55
}
三、生成器函数详解
1. 生成器基础语法
function* numberGenerator() {
yield 1;
yield 2;
yield 3;
return 4; // return的值不会在for...of中出现
}
const generator = numberGenerator();
console.log(generator.next()); // { value: 1, done: false }
console.log(generator.next()); // { value: 2, done: false }
console.log(generator.next()); // { value: 3, done: false }
console.log(generator.next()); // { value: 4, done: true }
2. 生成器作为可迭代对象
function* fibonacciGenerator(limit = Infinity) {
let prev = 0;
let current = 1;
let count = 0;
while (count++ < limit) {
yield current;
[prev, current] = [current, prev + current];
}
}
// 使用for...of遍历
for (const num of fibonacciGenerator(10)) {
console.log(num);
}
// 使用扩展运算符
const firstTen = [...fibonacciGenerator(10)];
console.log(firstTen);
3. 双向通信与错误处理
function* twoWayCommunication() {
const name = yield '请输入您的名字:';
const age = yield `你好 ${name},请输入您的年龄:`;
const city = yield `${name},您${age}岁,请问您来自哪个城市?`;
return `用户信息:${name},${age}岁,来自${city}`;
}
function runGenerator(genFunc) {
const generator = genFunc();
let result = generator.next();
while (!result.done) {
const userInput = prompt(result.value);
if (userInput === null) {
// 处理提前终止
generator.return('用户取消输入');
break;
}
result = generator.next(userInput);
}
console.log(result.value);
}
// runGenerator(twoWayCommunication);
四、高级迭代模式
1. 迭代器组合器
function* map(iterable, mapper) {
for (const item of iterable) {
yield mapper(item);
}
}
function* filter(iterable, predicate) {
for (const item of iterable) {
if (predicate(item)) {
yield item;
}
}
}
function* take(iterable, count) {
let taken = 0;
for (const item of iterable) {
if (taken++ >= count) return;
yield item;
}
}
// 使用组合器
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const result = take(
filter(
map(numbers, x => x * 2),
x => x > 5
),
3
);
console.log([...result]); // [6, 8, 10]
2. 递归生成器
function* traverseTree(node) {
if (!node) return;
yield node.value;
if (node.children) {
for (const child of node.children) {
yield* traverseTree(child);
}
}
}
// 树结构示例
const tree = {
value: 'root',
children: [
{
value: 'child1',
children: [
{ value: 'grandchild1' },
{ value: 'grandchild2' }
]
},
{
value: 'child2',
children: [
{ value: 'grandchild3' }
]
}
]
};
for (const value of traverseTree(tree)) {
console.log(value); // root, child1, grandchild1, grandchild2, child2, grandchild3
}
3. 迭代器委托
function* combinedIterator(...iterables) {
for (const iterable of iterables) {
yield* iterable;
}
}
const combined = combinedIterator(
[1, 2, 3],
'hello',
new Set([4, 5, 6])
);
console.log([...combined]); // [1, 2, 3, 'h', 'e', 'l', 'l', 'o', 4, 5, 6]
五、异步迭代与生成器
1. 异步迭代器协议
class AsyncDataStream {
constructor(dataSource, chunkSize = 10) {
this.dataSource = dataSource;
this.chunkSize = chunkSize;
this.offset = 0;
}
[Symbol.asyncIterator]() {
return {
next: async () => {
if (this.offset >= this.dataSource.length) {
return { done: true };
}
// 模拟异步数据获取
const chunk = await this.fetchChunk();
this.offset += this.chunkSize;
return {
value: chunk,
done: false
};
}
};
}
async fetchChunk() {
// 模拟网络请求延迟
await new Promise(resolve => setTimeout(resolve, 100));
return this.dataSource.slice(
this.offset,
this.offset + this.chunkSize
);
}
}
2. 异步生成器函数
async function* asyncNumberGenerator() {
for (let i = 1; i setTimeout(resolve, 1000));
yield i;
}
}
async function consumeAsyncGenerator() {
for await (const num of asyncNumberGenerator()) {
console.log(num); // 每秒输出一个数字:1, 2, 3, 4, 5
}
}
// consumeAsyncGenerator();
3. 实时数据流处理
async function* createEventStream(eventSource, eventType) {
const eventQueue = [];
let resolveCurrent;
const eventHandler = (event) => {
if (resolveCurrent) {
resolveCurrent(event);
resolveCurrent = null;
} else {
eventQueue.push(event);
}
};
eventSource.addEventListener(eventType, eventHandler);
try {
while (true) {
if (eventQueue.length > 0) {
yield eventQueue.shift();
} else {
const event = await new Promise(resolve => {
resolveCurrent = resolve;
});
yield event;
}
}
} finally {
eventSource.removeEventListener(eventType, eventHandler);
}
}
// 使用示例
async function monitorClicks() {
const clickStream = createEventStream(document, 'click');
for await (const clickEvent of clickStream) {
console.log('点击位置:', clickEvent.clientX, clickEvent.clientY);
}
}
六、实战项目:数据流处理引擎
项目需求
构建一个灵活的数据流处理引擎,支持:
- 多种数据源接入(数组、API、文件流等)
- 可组合的数据转换操作
- 背压控制和流量管理
- 错误处理和重试机制
- 实时监控和性能统计
核心实现
class DataStreamProcessor {
constructor() {
this.operations = [];
this.errorHandlers = new Map();
}
static from(iterable) {
const processor = new DataStreamProcessor();
processor.source = iterable;
return processor;
}
map(transform) {
this.operations.push({
type: 'map',
transform
});
return this;
}
filter(predicate) {
this.operations.push({
type: 'filter',
predicate
});
return this;
}
batch(size) {
this.operations.push({
type: 'batch',
size
});
return this;
}
throttle(ms) {
this.operations.push({
type: 'throttle',
ms
});
return this;
}
onError(errorType, handler) {
this.errorHandlers.set(errorType, handler);
return this;
}
async *execute() {
if (!this.source) {
throw new Error('数据源未设置');
}
let stream = this.createSourceStream();
for (const operation of this.operations) {
stream = this.applyOperation(stream, operation);
}
yield* stream;
}
*createSourceStream() {
if (this.source[Symbol.asyncIterator]) {
// 处理异步迭代器
throw new Error('异步迭代器需要特殊处理');
} else if (this.source[Symbol.iterator]) {
// 处理同步迭代器
yield* this.source;
} else {
throw new Error('不支持的数据源类型');
}
}
*applyOperation(stream, operation) {
try {
switch (operation.type) {
case 'map':
yield* this.applyMap(stream, operation.transform);
break;
case 'filter':
yield* this.applyFilter(stream, operation.predicate);
break;
case 'batch':
yield* this.applyBatch(stream, operation.size);
break;
case 'throttle':
yield* this.applyThrottle(stream, operation.ms);
break;
default:
throw new Error(`未知操作类型: ${operation.type}`);
}
} catch (error) {
const handler = this.errorHandlers.get(error.constructor);
if (handler) {
yield* handler(error, stream);
} else {
throw error;
}
}
}
*applyMap(stream, transform) {
for (const item of stream) {
yield transform(item);
}
}
*applyFilter(stream, predicate) {
for (const item of stream) {
if (predicate(item)) {
yield item;
}
}
}
*applyBatch(stream, size) {
let batch = [];
for (const item of stream) {
batch.push(item);
if (batch.length >= size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}
*applyThrottle(stream, ms) {
let lastTime = 0;
for (const item of stream) {
const now = Date.now();
const timeSinceLast = now - lastTime;
if (timeSinceLast = pageSize) {
yield page;
page = [];
}
}
if (page.length > 0) {
yield page;
}
}
}
使用示例
// 示例1:基础数据处理
const numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const result = await DataStreamProcessor.from(numbers)
.map(x => x * 2)
.filter(x => x > 10)
.batch(2)
.reduce((sum, batch) => sum + batch.reduce((a, b) => a + b, 0), 0);
console.log('处理结果:', result); // 12 + 14 + 16 + 18 + 20 = 80
// 示例2:复杂数据处理
class User {
constructor(name, age, score) {
this.name = name;
this.age = age;
this.score = score;
}
}
const users = [
new User('张三', 25, 85),
new User('李四', 30, 92),
new User('王五', 28, 78),
new User('赵六', 35, 95),
new User('钱七', 22, 88)
];
const userStats = await DataStreamProcessor.from(users)
.filter(user => user.age >= 25)
.map(user => ({
...user,
grade: user.score >= 90 ? 'A' : user.score >= 80 ? 'B' : 'C'
}))
.batch(2)
.reduce((stats, batch) => {
batch.forEach(user => {
stats.totalScore += user.score;
stats.count++;
stats.grades[user.grade] = (stats.grades[user.grade] || 0) + 1;
});
return stats;
}, { totalScore: 0, count: 0, grades: {} });
console.log('用户统计:', userStats);
// 示例3:实时数据流监控
function* sensorDataGenerator() {
let id = 1;
while (true) {
yield {
id: id++,
timestamp: new Date(),
value: Math.random() * 100,
sensorType: ['temperature', 'humidity', 'pressure'][Math.floor(Math.random() * 3)]
};
// 模拟实时数据产生间隔
if (id data.value > 20)
.map(data => ({
...data,
status: data.value > 80 ? '警告' : data.value > 50 ? '注意' : '正常'
}))
.batch(5)
.throttle(1000); // 每秒最多处理一批
// 实时监控数据流
async function monitorSensorData() {
console.log('开始监控传感器数据...');
for await (const batch of sensorProcessor.execute()) {
console.log('收到数据批次:', {
timestamp: new Date().toISOString(),
batchSize: batch.length,
warnings: batch.filter(item => item.status === '警告').length,
data: batch
});
}
console.log('数据监控结束');
}
// monitorSensorData();
高级特性:自定义操作符
// 扩展DataStreamProcessor支持自定义操作符
DataStreamProcessor.prototype.custom = function(operatorName, operatorFunction) {
this.operations.push({
type: 'custom',
name: operatorName,
execute: operatorFunction
});
return this;
};
// 添加去重操作符
DataStreamProcessor.prototype.distinct = function(keySelector = x => x) {
return this.custom('distinct', function*(stream) {
const seen = new Set();
for (const item of stream) {
const key = keySelector(item);
if (!seen.has(key)) {
seen.add(key);
yield item;
}
}
});
};
// 添加窗口操作符
DataStreamProcessor.prototype.window = function(size) {
return this.custom('window', function*(stream) {
let window = [];
for (const item of stream) {
window.push(item);
if (window.length > size) {
window.shift();
}
if (window.length === size) {
yield [...window]; // 返回窗口的副本
}
}
});
};
// 使用自定义操作符
const advancedResult = DataStreamProcessor.from([1, 2, 2, 3, 4, 4, 5, 1])
.distinct()
.window(3)
.map(window => ({
window,
sum: window.reduce((a, b) => a + b, 0),
avg: window.reduce((a, b) => a + b, 0) / window.length
}));
async function displayAdvancedResult() {
for await (const result of advancedResult.execute()) {
console.log('窗口结果:', result);
}
}
// displayAdvancedResult();
七、总结与最佳实践
通过本教程的深入学习,我们全面掌握了JavaScript迭代器和生成器的强大功能:
- 迭代协议的核心原理和实现方式
- 生成器函数的各种应用场景
- 异步迭代在处理流数据时的优势
- 构建复杂数据流处理系统的架构设计
性能优化建议:
- 合理使用惰性求值,避免不必要的数据处理
- 在适当场景使用异步迭代处理IO密集型操作
- 注意内存使用,及时释放不再需要的迭代器
- 对于大型数据集,考虑分块处理和背压控制
适用场景:
- 大数据集的流式处理
- 实时数据监控和分析
- 复杂数据转换管道
- 自定义数据序列生成
- 异步操作的状态管理
迭代器和生成器是JavaScript中非常强大的特性,它们为处理序列数据提供了统一、灵活的解决方案。掌握这些技术将让您能够构建更加高效、可维护的数据处理应用。