JavaScript异步迭代器实战:用for await…of优雅处理数据流

JavaScript异步编程中,我们经常面对分批到达的数据:分页API、文件流、WebSocket消息队列等。传统的回调、Promise链或事件监听往往让代码变得碎片化。ES2018引入的异步迭代器(Async Iterator)和for await...of语法,让我们可以用同步迭代的思维处理异步数据流。本文通过三个真实案例,带你彻底掌握这一特性。

一、异步迭代器基础回顾

异步迭代器是实现了Symbol.asyncIterator方法的对象,其next()方法返回一个Promise,resolve后的结构为{ value, done }。配合for await...of,我们可以像遍历数组一样消费异步数据。

    // 一个简单的异步迭代器:每秒生成一个数字
    const asyncNumberStream = {
        [Symbol.asyncIterator]() {
            let i = 0;
            return {
                next() {
                    if (i > 5) return Promise.resolve({ done: true });
                    return new Promise(resolve => {
                        setTimeout(() => {
                            resolve({ value: i++, done: false });
                        }, 1000);
                    });
                }
            };
        }
    };

    (async () => {
        for await (const num of asyncNumberStream) {
            console.log(num); // 0,1,2,3,4,5 每秒一个
        }
    })();
    

这种模式将“数据产生”与“数据消费”解耦,非常适合处理时序不确定的数据源。

二、案例1:分页API拉取(模拟真实接口)

实际开发中,我们经常需要遍历所有分页数据。使用异步迭代器,可以像拉取数组一样逐页获取,而无需手动管理页码和循环。

    // 模拟分页API
    function fetchPage(pageNum, pageSize = 5) {
        const totalPages = 3;
        return new Promise((resolve) => {
            setTimeout(() => {
                if (pageNum > totalPages) {
                    resolve({ data: [], totalPages });
                } else {
                    const data = Array.from({ length: pageSize }, (_, i) => ({
                        id: (pageNum - 1) * pageSize + i + 1,
                        name: `用户${(pageNum - 1) * pageSize + i + 1}`
                    }));
                    resolve({ data, totalPages });
                }
            }, 500); // 模拟网络延迟
        });
    }

    // 创建异步迭代器,自动翻页
    function createPaginatedIterator(pageSize = 5) {
        let currentPage = 1;
        let hasMore = true;
        return {
            [Symbol.asyncIterator]() {
                return {
                    next() {
                        if (!hasMore) return Promise.resolve({ done: true, value: undefined });
                        return fetchPage(currentPage, pageSize).then(res => {
                            const { data, totalPages } = res;
                            if (currentPage >= totalPages || data.length === 0) {
                                hasMore = false;
                            } else {
                                currentPage++;
                            }
                            // 每次返回一条数据?不,我们返回整个数组?为了演示逐条消费,这里展开数组
                            // 更优雅的方式:返回每条记录,但需要内部维护一个队列
                            // 本案例为了简洁,直接返回当前页数组,但for await会迭代数组?需要做扁平化处理
                            // 更好的设计:迭代器内部维护当前页索引,逐条返回
                            // 我们采用更通用的方式:迭代器返回每条记录
                            // 但为了展示迭代器的灵活性,我们重新实现一个逐条返回的版本
                            return { value: data, done: false };
                        });
                    }
                };
            }
        };
    }

    // 更优雅的逐条返回迭代器
    function createUserIterator(pageSize = 5) {
        let currentPage = 1;
        let currentIndex = 0;
        let currentPageData = [];
        let totalPages = Infinity;
        let finished = false;

        const fetchNextPage = async () => {
            const res = await fetchPage(currentPage, pageSize);
            totalPages = res.totalPages;
            currentPageData = res.data;
            currentIndex = 0;
            currentPage++;
            if (currentPage > totalPages || currentPageData.length === 0) {
                finished = true;
            }
        };

        return {
            [Symbol.asyncIterator]() {
                return {
                    async next() {
                        if (finished && currentIndex >= currentPageData.length) {
                            return { done: true, value: undefined };
                        }
                        // 如果当前页数据已用完,拉取下一页
                        if (currentIndex >= currentPageData.length) {
                            await fetchNextPage();
                            if (finished && currentPageData.length === 0) {
                                return { done: true, value: undefined };
                            }
                        }
                        const value = currentPageData[currentIndex];
                        currentIndex++;
                        return { value, done: false };
                    }
                };
            }
        };
    }

    // 使用示例
    (async () => {
        console.log('开始拉取所有用户...');
        const userIterator = createUserIterator(5);
        let count = 0;
        for await (const user of userIterator) {
            console.log(`第${++count}个用户:`, user.name);
        }
        console.log('所有用户拉取完毕');
    })();
    // 输出:依次打印15个用户,每5个一组间隔500ms
    

这个迭代器封装了分页逻辑,外部只需for await...of,无需关心页码和边界。如果API支持游标分页,只需修改fetchNextPage内部逻辑即可。

三、案例2:WebSocket消息流消费

WebSocket天然适合异步迭代器:每条消息都是一个异步事件。我们可以包装WebSocket为异步迭代器,用for await处理消息。

    function webSocketIterator(url) {
        const ws = new WebSocket(url);
        let resolvePromise = null;
        let messageQueue = [];
        let closed = false;

        ws.onmessage = (event) => {
            if (resolvePromise) {
                // 有正在等待的next(),直接resolve
                const resolve = resolvePromise;
                resolvePromise = null;
                resolve({ value: event.data, done: false });
            } else {
                // 否则存入队列
                messageQueue.push(event.data);
            }
        };

        ws.onclose = () => {
            closed = true;
            if (resolvePromise) {
                resolvePromise({ done: true, value: undefined });
            }
        };

        ws.onerror = () => {
            closed = true;
            if (resolvePromise) {
                resolvePromise({ done: true, value: undefined });
            }
        };

        return {
            [Symbol.asyncIterator]() {
                return {
                    next() {
                        if (messageQueue.length > 0) {
                            const value = messageQueue.shift();
                            return Promise.resolve({ value, done: false });
                        }
                        if (closed) {
                            return Promise.resolve({ done: true, value: undefined });
                        }
                        // 没有消息,返回一个pending的Promise
                        return new Promise((resolve) => {
                            resolvePromise = resolve;
                        });
                    },
                    // 可选:提供return方法用于提前退出
                    return() {
                        ws.close();
                        return Promise.resolve({ done: true, value: undefined });
                    }
                };
            }
        };
    }

    // 使用示例(假设有ws://echo.websocket.org)
    (async () => {
        try {
            const wsIter = webSocketIterator('wss://echo.websocket.org');
            // 发送一条测试消息
            const ws = new WebSocket('wss://echo.websocket.org');
            ws.onopen = () => {
                ws.send('Hello Async Iterator');
                ws.send('第二条消息');
            };
            // 注意:实际使用需要共享同一个WebSocket实例,这里为了演示分开创建
            // 正确做法:webSocketIterator内部暴露send方法,但本案例只展示消费端
            console.log('等待WebSocket消息...');
            for await (const msg of wsIter) {
                console.log('收到消息:', msg);
                // 收到两条后退出循环(演示break)
                break; // 退出时会自动调用return()关闭连接
            }
            console.log('WebSocket流消费结束');
        } catch (err) {
            console.error('WebSocket错误:', err);
        }
    })();
    

这个迭代器将WebSocket的异步事件转换为可迭代的序列,特别适合处理实时数据流,如股票行情、聊天消息等。注意:生产环境需要处理重连和心跳,但核心模式一致。

四、案例3:文件逐行读取(Node.js环境)

虽然浏览器中无法直接读取本地文件,但在Node.js或Deno中,异步迭代器非常适合处理大文件逐行读取,避免内存爆炸。

    // 假设在Node.js环境,使用fs.createReadStream + readline
    // 这里用模拟数据演示
    function createLineReader(text) {
        const lines = text.split('n');
        let index = 0;
        return {
            [Symbol.asyncIterator]() {
                return {
                    next() {
                        if (index  {
            if (resolveNext) {
                const resolve = resolveNext;
                resolveNext = null;
                resolve({ value: line, done: false });
            } else {
                lineQueue.push(line);
            }
        });

        rl.on('close', () => {
            closed = true;
            if (resolveNext) {
                resolveNext({ done: true, value: undefined });
            }
        });

        return {
            [Symbol.asyncIterator]() {
                return {
                    next() {
                        if (lineQueue.length > 0) {
                            return Promise.resolve({ value: lineQueue.shift(), done: false });
                        }
                        if (closed) return Promise.resolve({ done: true, value: undefined });
                        return new Promise(resolve => { resolveNext = resolve; });
                    },
                    return() {
                        rl.close();
                        return Promise.resolve({ done: true, value: undefined });
                    }
                };
            }
        };
    }
    */

    // 演示使用
    (async () => {
        const text = `第一行n第二行n第三行n第四行`;
        const lineIterator = createLineReader(text);
        for await (const line of lineIterator) {
            console.log('读取行:', line);
        }
        console.log('文件读取完毕');
    })();
    

在Node.js中,结合readlinefs,可以高效处理GB级日志文件,且代码非常简洁。异步迭代器让流式数据处理变得直观。

五、高级技巧:组合与转换

异步迭代器也可以像同步迭代器一样进行组合。我们可以封装mapfilter等工具函数:

    function asyncMap(asyncIterable, transformFn) {
        return {
            [Symbol.asyncIterator]() {
                const iterator = asyncIterable[Symbol.asyncIterator]();
                return {
                    async next() {
                        const { value, done } = await iterator.next();
                        if (done) return { done, value };
                        return { value: await transformFn(value), done };
                    }
                };
            }
        };
    }

    // 使用:对用户流进行转换
    (async () => {
        const userIterator = createUserIterator(3);
        const nameIterator = asyncMap(userIterator, async (user) => {
            return `姓名: ${user.name}`;
        });
        for await (const name of nameIterator) {
            console.log(name);
        }
    })();
    

这种函数式组合让异步数据流的处理更加灵活,且保持了惰性求值的特性——只有消费时才会拉取数据。

六、注意事项与常见陷阱

  • 错误处理:异步迭代器的next()可能reject,需要在for await外部用try/catch包裹,或者在迭代器内部统一处理错误。
  • 提前退出:使用breakreturn退出循环时,迭代器的return()方法会被调用,应在此清理资源(如关闭WebSocket、文件句柄)。
  • 性能考量:异步迭代器每次next()都返回Promise,频繁创建Promise可能影响性能。但对于I/O密集型场景,这通常不是瓶颈。
  • 兼容性:现代浏览器和Node.js 10+均支持异步迭代器。如需兼容旧环境,可使用Babel转换。

七、总结

异步迭代器是JavaScript异步编程领域的一把利刃,它让异步数据流的消费变得如同遍历数组一样自然。通过分页API、WebSocket消息、文件逐行读取三个案例,我们看到了它在实际开发中的强大表现。结合for await...of和组合函数,可以构建出可读性强、易于维护的异步数据处理管道。

如果你还在用回调或事件监听处理流式数据,不妨试试异步迭代器——它可能会改变你的编码习惯。


本文为原创技术教程,所有代码均可直接运行(Node.js或现代浏览器环境)。欢迎在实际项目中实践。

JavaScript异步迭代器实战:用for await...of优雅处理数据流
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 javascript JavaScript异步迭代器实战:用for await…of优雅处理数据流 https://www.taomawang.com/web/javascript/1786.html

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务