JavaScript生成器实战:构建惰性数据流处理系统
一、技术优势
生成器使大数据处理内存占用降低80%,响应速度提升3倍
二、核心实现
1. 基础数据生成器
// 无限ID生成器
function* idGenerator() {
let id = 1;
while (true) {
yield id++;
}
}
// 分批数据生成器
function* batchGenerator(data, size = 10) {
for (let i = 0; i < data.length; i += size) {
yield data.slice(i, i + size);
}
}
// 使用示例
const ids = idGenerator();
console.log(ids.next().value); // 1
console.log(ids.next().value); // 2
const batches = batchGenerator(Array(100).fill(0), 20);
batches.next().value; // 前20条数据
2. 数据处理管道
// 生成器管道组合
function* pipeline(source, ...operations) {
for (const item of source) {
let result = item;
for (const op of operations) {
result = op(result);
if (result === null) break;
}
if (result !== null) yield result;
}
}
// 处理函数示例
const filterEven = x => x % 2 === 0 ? x : null;
const square = x => x * x;
// 组合使用
const numbers = function* () {
for (let i = 1; i <= 100; i++) yield i;
};
const result = pipeline(numbers(), filterEven, square);
Array.from(result); // [4, 16, 36, ..., 10000]
三、高级应用
1. 异步数据流处理
// 异步生成器
async function* asyncBatchFetch(urls, batchSize = 5) {
for (let i = 0; i fetch(url).then(r => r.json()))
);
yield* results;
}
}
// 使用示例
(async () => {
const urls = ['/api/1', '/api/2', ...];
for await (const data of asyncBatchFetch(urls)) {
process(data);
}
})();
2. 状态机实现
// 有限状态机
function* stateMachine() {
let state = 'idle';
while (true) {
const event = yield state;
switch (state) {
case 'idle':
if (event === 'start') state = 'running';
break;
case 'running':
if (event === 'pause') state = 'paused';
else if (event === 'stop') state = 'stopped';
break;
case 'paused':
if (event === 'resume') state = 'running';
else if (event === 'stop') state = 'stopped';
break;
}
}
}
// 使用示例
const machine = stateMachine();
machine.next(); // {value: 'idle', done: false}
machine.next('start'); // {value: 'running', done: false}
四、完整案例
日志分析系统
// 日志解析管道
function* logReader(file) {
for (const line of readLines(file)) {
yield parseLogLine(line);
}
}
function* filterErrors(logs) {
for (const log of logs) {
if (log.level === 'ERROR') yield log;
}
}
function* aggregateErrors(errors) {
const counts = new Map();
for (const err of errors) {
counts.set(err.message, (counts.get(err.message) || 0) + 1);
}
yield* counts.entries();
}
// 组合使用
const pipeline = aggregateErrors(
filterErrors(
logReader('app.log')
)
);
for (const [error, count] of pipeline) {
console.log(`${error}: ${count}次`);
}
function runDemo() {
function* generateData() {
for (let i = 1; i 0.5) {
yield { …item, processed: true };
}
}
}
const output = document.getElementById(‘output’);
output.innerHTML = ”;
const dataStream = processData(generateData());
for (const item of dataStream) {
output.innerHTML += `处理记录: ID=${item.id}, 值=${item.value.toFixed(2)}
`;
if (output.scrollHeight > 100) {
output.scrollTop = output.scrollHeight;
}
}
}