JavaScript生成器实战:构建异步数据流处理管道 | 高级异步编程技术

免费资源下载

深入探索Generator函数的强大能力,实现优雅的异步数据流控制

一、生成器的核心概念与优势

JavaScript生成器(Generator)是ES6引入的一种特殊函数,它能够在执行过程中暂停和恢复,这种特性使得生成器成为处理异步操作、数据流和状态机的理想工具。与传统异步处理方式相比,生成器提供了更直观、更可控的编程模型。

生成器的核心特性包括:

  • 可暂停执行:使用yield关键字暂停函数执行
  • 状态保持:暂停时保持局部变量状态
  • 双向通信:通过yield返回值,通过next()传递值
  • 迭代器协议:天然支持迭代器模式

二、生成器基础语法与模式

2.1 基本生成器函数

function* basicGenerator() {
    console.log('开始执行');
    const result1 = yield '第一步';
    console.log('收到值:', result1);
    const result2 = yield '第二步';
    console.log('收到值:', result2);
    return '完成';
}

// 使用示例
const gen = basicGenerator();

console.log(gen.next());      // { value: '第一步', done: false }
console.log(gen.next('数据A')); // { value: '第二步', done: false }
console.log(gen.next('数据B')); // { value: '完成', done: true }

2.2 生成器作为可迭代对象

function* numberSequence(start, end) {
    for (let i = start; i <= end; i++) {
        yield i;
    }
}

// 使用for...of循环
for (const num of numberSequence(1, 5)) {
    console.log(num); // 1, 2, 3, 4, 5
}

// 展开为数组
const numbers = [...numberSequence(10, 15)];
console.log(numbers); // [10, 11, 12, 13, 14, 15]

三、实战案例:构建异步数据流管道

我们将构建一个完整的异步数据处理系统,支持数据转换、过滤、批处理和错误处理。

3.1 基础数据流处理器

function* createDataPipeline(transformers) {
    let data = yield; // 等待初始数据
    
    for (const transformer of transformers) {
        try {
            // 应用转换器
            data = yield transformer(data);
        } catch (error) {
            console.error('转换失败:', error);
            yield { error, data }; // 传递错误信息
        }
    }
    
    return { success: true, finalData: data };
}

// 定义转换函数
const transformers = [
    data => ({ ...data, processed: true }),
    data => ({ ...data, timestamp: Date.now() }),
    data => {
        if (data.value > 100) throw new Error('值过大');
        return { ...data, normalized: data.value / 100 };
    }
];

// 使用管道
const pipeline = createDataPipeline(transformers);
pipeline.next(); // 启动生成器

const result1 = pipeline.next({ value: 50, id: 1 });
console.log(result1.value); // 第一次转换结果

const result2 = pipeline.next(result1.value);
console.log(result2.value); // 第二次转换结果

const finalResult = pipeline.next(result2.value);
console.log(finalResult.value); // 最终结果或错误

3.2 高级异步数据流处理器

async function* asyncDataStream(source, options = {}) {
    const {
        batchSize = 10,
        concurrency = 3,
        retryAttempts = 2
    } = options;
    
    let buffer = [];
    let activePromises = 0;
    const pendingItems = [];
    
    // 数据源可以是数组、异步迭代器或函数
    const dataSource = typeof source === 'function' 
        ? source 
        : async function*() { for (const item of source) yield item; };
    
    for await (const item of dataSource()) {
        buffer.push(item);
        
        // 达到批处理大小时处理一批数据
        if (buffer.length >= batchSize) {
            const batch = buffer;
            buffer = [];
            pendingItems.push(processBatch(batch));
        }
        
        // 控制并发数
        if (pendingItems.length >= concurrency) {
            yield await pendingItems.shift();
        }
    }
    
    // 处理剩余数据
    if (buffer.length > 0) {
        pendingItems.push(processBatch(buffer));
    }
    
    // 等待所有处理完成
    while (pendingItems.length > 0) {
        yield await pendingItems.shift();
    }
    
    async function processBatch(batch) {
        let attempts = 0;
        
        while (attempts  ({
                        ...item,
                        processed: true,
                        processedAt: new Date().toISOString()
                    }))
                );
            } catch (error) {
                attempts++;
                if (attempts > retryAttempts) {
                    console.error(`处理失败,已重试${retryAttempts}次:`, error);
                    return batch.map(item => ({
                        ...item,
                        error: error.message,
                        failed: true
                    }));
                }
                await new Promise(resolve => 
                    setTimeout(resolve, 1000 * attempts)
                );
            }
        }
    }
}

3.3 使用数据流处理器

// 模拟数据源
async function* mockDataSource(count) {
    for (let i = 1; i  setTimeout(resolve, 100));
        yield { id: i, value: Math.random() * 200, type: `item_${i}` };
    }
}

// 创建数据流处理器
const dataStream = asyncDataStream(() => mockDataSource(50), {
    batchSize: 5,
    concurrency: 2,
    retryAttempts: 1
});

// 处理数据流
(async () => {
    const results = [];
    let processedCount = 0;
    let errorCount = 0;
    
    for await (const batch of dataStream) {
        console.log(`处理批次: ${batch.length} 个项目`);
        
        batch.forEach(item => {
            if (item.error) {
                errorCount++;
                console.warn(`处理失败: ${item.id} - ${item.error}`);
            } else {
                processedCount++;
                results.push(item);
            }
        });
        
        // 实时统计
        console.log(`已处理: ${processedCount}, 失败: ${errorCount}`);
    }
    
    console.log('处理完成:', {
        总数: processedCount + errorCount,
        成功: processedCount,
        失败: errorCount
    });
})();

四、协程模式:实现复杂的异步控制流

4.1 任务调度器

class TaskScheduler {
    constructor(maxConcurrent = 3) {
        this.maxConcurrent = maxConcurrent;
        this.running = 0;
        this.queue = [];
        this.results = new Map();
    }
    
    addTask(taskFn, id) {
        return new Promise((resolve, reject) => {
            this.queue.push({ taskFn, id, resolve, reject });
            this.run();
        });
    }
    
    async run() {
        while (this.running  0) {
            const { taskFn, id, resolve, reject } = this.queue.shift();
            this.running++;
            
            try {
                const result = await taskFn();
                this.results.set(id, result);
                resolve(result);
            } catch (error) {
                reject(error);
            } finally {
                this.running--;
                this.run();
            }
        }
    }
    
    *getResults() {
        for (const [id, result] of this.results) {
            yield { id, result };
        }
    }
}

// 使用任务调度器
const scheduler = new TaskScheduler(2);

// 添加任务
const tasks = [
    () => fetchData('user'),
    () => fetchData('products'),
    () => processImage('image1.jpg'),
    () => sendAnalytics('pageview'),
    () => generateReport('2024')
];

tasks.forEach((task, index) => {
    scheduler.addTask(task, `task_${index}`)
        .then(result => console.log(`任务${index}完成:`, result))
        .catch(error => console.error(`任务${index}失败:`, error));
});

// 获取结果
for (const result of scheduler.getResults()) {
    console.log('结果:', result);
}

4.2 状态机实现

function* stateMachine(initialState) {
    let state = initialState;
    
    while (true) {
        const action = yield state;
        
        switch (state.status) {
            case 'idle':
                if (action.type === 'START') {
                    state = { status: 'processing', data: action.data };
                }
                break;
                
            case 'processing':
                if (action.type === 'PAUSE') {
                    state = { status: 'paused', data: state.data };
                } else if (action.type === 'COMPLETE') {
                    state = { status: 'completed', result: action.result };
                } else if (action.type === 'ERROR') {
                    state = { status: 'error', error: action.error };
                }
                break;
                
            case 'paused':
                if (action.type === 'RESUME') {
                    state = { status: 'processing', data: state.data };
                } else if (action.type === 'CANCEL') {
                    state = { status: 'cancelled' };
                }
                break;
        }
    }
}

// 使用状态机
const machine = stateMachine({ status: 'idle' });
machine.next(); // 启动

console.log(machine.next({ type: 'START', data: '任务数据' }).value);
// { status: 'processing', data: '任务数据' }

console.log(machine.next({ type: 'PAUSE' }).value);
// { status: 'paused', data: '任务数据' }

console.log(machine.next({ type: 'RESUME' }).value);
// { status: 'processing', data: '任务数据' }

console.log(machine.next({ type: 'COMPLETE', result: '成功' }).value);
// { status: 'completed', result: '成功' }

五、生成器与异步迭代器的结合

5.1 实现异步生成器

async function* paginatedAPI(endpoint, options = {}) {
    const { pageSize = 20, maxPages = 10 } = options;
    let page = 1;
    let hasMore = true;
    
    while (hasMore && page  0) {
                for (const item of data.items) {
                    yield item;
                }
                
                hasMore = data.hasMore || data.items.length === pageSize;
                page++;
            } else {
                hasMore = false;
            }
            
            // 添加延迟避免速率限制
            if (hasMore) {
                await new Promise(resolve => setTimeout(resolve, 1000));
            }
            
        } catch (error) {
            console.error(`第${page}页获取失败:`, error);
            
            // 重试逻辑
            const shouldRetry = await new Promise(resolve => {
                setTimeout(() => resolve(confirm('请求失败,是否重试?')), 0);
            });
            
            if (!shouldRetry) {
                break;
            }
        }
    }
}

// 使用异步生成器
(async () => {
    const userStream = paginatedAPI('https://api.example.com/users', {
        pageSize: 10,
        maxPages: 5
    });
    
    for await (const user of userStream) {
        console.log('用户:', user);
        // 处理每个用户
    }
})();

5.2 数据转换管道

function composeAsyncGenerators(...generators) {
    return async function* composedGenerator(input) {
        let currentValue = input;
        
        for (const generator of generators) {
            const gen = generator(currentValue);
            
            if (gen[Symbol.asyncIterator]) {
                for await (const value of gen) {
                    currentValue = value;
                    yield value;
                }
            } else {
                for (const value of gen) {
                    currentValue = value;
                    yield value;
                }
            }
        }
        
        return currentValue;
    };
}

// 定义处理阶段
const stages = [
    async function* filterInvalid(data) {
        for await (const item of data) {
            if (item && item.id && item.value != null) {
                yield item;
            }
        }
    },
    
    async function* transformData(data) {
        for await (const item of data) {
            yield {
                ...item,
                processed: true,
                score: item.value * 10,
                category: item.value > 0.5 ? 'high' : 'low'
            };
        }
    },
    
    async function* batchData(data) {
        let batch = [];
        for await (const item of data) {
            batch.push(item);
            if (batch.length >= 5) {
                yield batch;
                batch = [];
            }
        }
        if (batch.length > 0) {
            yield batch;
        }
    }
];

// 组合管道
const dataPipeline = composeAsyncGenerators(...stages);

// 使用管道
(async () => {
    // 模拟数据源
    async function* mockData() {
        for (let i = 0; i < 23; i++) {
            yield { id: i, value: Math.random() };
        }
    }
    
    for await (const batch of dataPipeline(mockData())) {
        console.log('处理后的批次:', batch);
        // 进一步处理或存储
    }
})();

六、性能优化与最佳实践

6.1 内存优化技巧

function* memoryEfficientProcessor(dataSource, processor) {
    // 使用WeakMap存储中间结果,避免内存泄漏
    const cache = new WeakMap();
    
    for (const item of dataSource) {
        if (cache.has(item)) {
            yield cache.get(item);
        } else {
            const result = processor(item);
            cache.set(item, result);
            yield result;
            
            // 定期清理缓存
            if (cache.size > 1000) {
                // 实现LRU缓存清理策略
                const keys = Array.from(cache.keys());
                for (let i = 0; i  {
        const maxKeep = 100;
        if (loaded.size > maxKeep) {
            const keys = Array.from(loaded.keys());
            for (let i = 0; i < loaded.size - maxKeep; i++) {
                loaded.delete(keys[i]);
            }
        }
    };
    
    return cleanup;
}

6.2 错误处理策略

function* resilientGenerator(generatorFn, options = {}) {
    const {
        maxRetries = 3,
        retryDelay = 1000,
        fallbackValue = null
    } = options;
    
    const gen = generatorFn();
    
    while (true) {
        let attempt = 0;
        let result;
        
        while (attempt  maxRetries) {
                    console.error(`生成器失败,已重试${maxRetries}次:`, error);
                    
                    if (fallbackValue !== undefined) {
                        yield fallbackValue;
                    }
                    
                    // 尝试恢复或结束
                    try {
                        gen.return();
                    } catch {}
                    
                    return;
                }
                
                console.warn(`重试 ${attempt}/${maxRetries}:`, error);
                
                // 指数退避延迟
                await new Promise(resolve => 
                    setTimeout(resolve, retryDelay * Math.pow(2, attempt - 1))
                );
            }
        }
        
        if (result.done) {
            return result.value;
        }
        
        yield result.value;
    }
}

七、实际应用场景

7.1 实时数据监控系统

function* createDataMonitor(sensors, alertThresholds) {
    const readings = new Map();
    const alerts = new Set();
    
    while (true) {
        const timestamp = Date.now();
        const sensorReadings = [];
        
        // 收集所有传感器数据
        for (const sensor of sensors) {
            const reading = yield sensor.read();
            readings.set(sensor.id, { ...reading, timestamp });
            sensorReadings.push(reading);
            
            // 检查警报条件
            if (reading.value > alertThresholds[sensor.type]) {
                const alert = {
                    sensor: sensor.id,
                    value: reading.value,
                    threshold: alertThresholds[sensor.type],
                    timestamp
                };
                
                if (!alerts.has(JSON.stringify(alert))) {
                    alerts.add(JSON.stringify(alert));
                    yield { type: 'ALERT', data: alert };
                }
            }
        }
        
        // 生成聚合报告
        const report = {
            timestamp,
            average: sensorReadings.reduce((a, b) => a + b.value, 0) / sensorReadings.length,
            max: Math.max(...sensorReadings.map(r => r.value)),
            min: Math.min(...sensorReadings.map(r => r.value)),
            activeAlerts: alerts.size
        };
        
        yield { type: 'REPORT', data: report };
        
        // 等待下一个采样周期
        yield new Promise(resolve => setTimeout(resolve, 5000));
    }
}

7.2 游戏状态管理

function* gameStateManager(initialState) {
    let state = { ...initialState, history: [] };
    
    while (true) {
        const action = yield state;
        
        // 保存历史状态(限制长度)
        state.history.push({ ...state });
        if (state.history.length > 100) {
            state.history.shift();
        }
        
        // 处理游戏动作
        switch (action.type) {
            case 'MOVE':
                state = {
                    ...state,
                    position: action.position,
                    moves: state.moves + 1
                };
                break;
                
            case 'COLLECT':
                state = {
                    ...state,
                    inventory: [...state.inventory, action.item],
                    score: state.score + action.points
                };
                break;
                
            case 'SAVE':
                yield { type: 'SAVE_COMPLETE', data: state };
                break;
                
            case 'LOAD':
                if (action.savePoint  0) {
                    state = state.history.pop();
                }
                break;
        }
        
        // 检查游戏结束条件
        if (state.score >= 1000) {
            yield { type: 'GAME_OVER', data: { ...state, win: true } };
            return;
        }
    }
}

八、总结与扩展

JavaScript生成器提供了强大的控制流管理能力,特别适合以下场景:

  1. 异步数据流处理:处理大量数据或实时数据流
  2. 状态机实现:管理复杂的状态转换逻辑
  3. 协程编程:实现协作式多任务处理
  4. 懒加载:按需生成和处理数据
  5. 迭代器模式:创建自定义迭代逻辑

关键优势:

  • 更清晰的异步代码结构
  • 更好的内存控制
  • 灵活的状态管理
  • 可组合的数据处理管道

进一步学习方向:

  • 探索async/await与生成器的结合使用
  • 研究RxJS等响应式编程库的内部实现
  • 学习TypeScript中的高级类型与生成器
  • 了解Web Workers中的生成器应用
  • 探索生成器在Node.js流处理中的应用

生成器是JavaScript中一个强大但常被低估的特性。通过合理使用生成器,可以构建出更优雅、更高效、更易维护的异步代码结构。

// 页面交互功能
document.addEventListener(‘DOMContentLoaded’, function() {
// 为代码块添加交互功能
const codeBlocks = document.querySelectorAll(‘pre code’);

codeBlocks.forEach((block, index) => {
const container = document.createElement(‘div’);
container.style.cssText = ‘margin: 10px 0;’;

const buttonContainer = document.createElement(‘div’);
buttonContainer.style.cssText = ‘display: flex; gap: 10px; margin-bottom: 5px;’;

// 复制按钮
const copyBtn = document.createElement(‘button’);
copyBtn.textContent = ‘复制代码’;
copyBtn.style.cssText = `
background: #4CAF50;
color: white;
border: none;
padding: 5px 15px;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
`;

// 运行按钮(针对可运行示例)
if (block.textContent.includes(‘console.log’) ||
block.textContent.includes(‘yield’) ||
block.textContent.includes(‘async’)) {

const runBtn = document.createElement(‘button’);
runBtn.textContent = ‘运行示例’;
runBtn.style.cssText = `
background: #2196F3;
color: white;
border: none;
padding: 5px 15px;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
`;

const outputDiv = document.createElement(‘div’);
outputDiv.className = ‘code-output’;
outputDiv.style.cssText = `
margin-top: 10px;
padding: 10px;
background: #f5f5f5;
border-radius: 4px;
font-family: monospace;
max-height: 200px;
overflow-y: auto;
display: none;
`;

runBtn.addEventListener(‘click’, async () => {
try {
outputDiv.style.display = ‘block’;
outputDiv.innerHTML = ‘执行中…‘;

// 捕获console.log输出
const originalLog = console.log;
const logs = [];
console.log = (…args) => {
logs.push(args.map(arg =>
typeof arg === ‘object’ ? JSON.stringify(arg, null, 2) : String(arg)
).join(‘ ‘));
};

// 执行代码
const code = block.textContent;
const wrappedCode = `
(async () => {
try {
${code}
} catch(err) {
console.error(‘执行错误:’, err);
}
})()
`;

await eval(wrappedCode);

// 恢复console.log
console.log = originalLog;

// 显示输出
if (logs.length > 0) {
outputDiv.innerHTML = logs.map(log =>
`

${log}

`
).join(”);
} else {
outputDiv.innerHTML = ‘

代码已执行,无控制台输出

‘;
}

} catch (error) {
outputDiv.innerHTML = `

执行错误: ${error.message}

`;
}
});

buttonContainer.appendChild(runBtn);
block.parentNode.insertBefore(container, block);
container.appendChild(buttonContainer);
container.appendChild(block);
container.appendChild(outputDiv);
} else {
block.parentNode.insertBefore(container, block);
container.appendChild(buttonContainer);
container.appendChild(block);
}

// 复制功能
copyBtn.addEventListener(‘click’, async () => {
try {
await navigator.clipboard.writeText(block.textContent);
copyBtn.textContent = ‘已复制!’;
copyBtn.style.background = ‘#45a049’;

setTimeout(() => {
copyBtn.textContent = ‘复制代码’;
copyBtn.style.background = ‘#4CAF50’;
}, 2000);
} catch (err) {
console.error(‘复制失败:’, err);
copyBtn.textContent = ‘复制失败’;
copyBtn.style.background = ‘#f44336’;
}
});

buttonContainer.appendChild(copyBtn);
});

// 添加章节导航
const sections = document.querySelectorAll(‘section’);
const nav = document.createElement(‘nav’);
nav.style.cssText = `
position: fixed;
right: 20px;
top: 100px;
background: white;
padding: 15px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
max-height: 70vh;
overflow-y: auto;
display: none;
`;

const navTitle = document.createElement(‘h3’);
navTitle.textContent = ‘章节导航’;
navTitle.style.cssText = ‘margin: 0 0 10px 0; font-size: 14px;’;
nav.appendChild(navTitle);

sections.forEach((section, index) => {
const h2 = section.querySelector(‘h2’);
if (h2) {
const link = document.createElement(‘a’);
link.textContent = h2.textContent;
link.href = `#section-${index}`;
link.style.cssText = `
display: block;
padding: 5px 0;
color: #2196F3;
text-decoration: none;
font-size: 13px;
border-bottom: 1px solid #eee;
`;

section.id = `section-${index}`;
nav.appendChild(link);
}
});

document.body.appendChild(nav);

// 显示/隐藏导航按钮
const navToggle = document.createElement(‘button’);
navToggle.textContent = ‘📖 导航’;
navToggle.style.cssText = `
position: fixed;
right: 20px;
top: 70px;
background: #2196F3;
color: white;
border: none;
padding: 8px 15px;
border-radius: 20px;
cursor: pointer;
z-index: 1000;
font-size: 14px;
`;

navToggle.addEventListener(‘click’, () => {
nav.style.display = nav.style.display === ‘block’ ? ‘none’ : ‘block’;
});

document.body.appendChild(navToggle);

// 点击导航链接后隐藏导航
nav.querySelectorAll(‘a’).forEach(link => {
link.addEventListener(‘click’, () => {
nav.style.display = ‘none’;
});
});
});

JavaScript生成器实战:构建异步数据流处理管道 | 高级异步编程技术
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 javascript JavaScript生成器实战:构建异步数据流处理管道 | 高级异步编程技术 https://www.taomawang.com/web/javascript/1516.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务