JavaScript 异步迭代与流式数据处理:从 Fetch 到 Async Generator

2025年,JavaScript的异步迭代(Async Iteration)和流式数据处理已经成为前端处理大文件、实时数据、AI对话等场景的核心技术。本文通过四个实战案例,带你掌握 for await...ofAsyncGeneratorReadableStream 等关键API。


1. 为什么需要流式数据处理?

传统方式一次性加载完整数据,对于大文件或实时流会导致内存爆炸和长时间等待。流式处理允许我们边接收边处理,降低内存占用,提升用户体验。典型场景:

  • 大文件上传/下载(视频、日志)
  • AI对话流式输出(类似ChatGPT打字机效果)
  • 实时数据推送(股票行情、监控指标)
  • 数据库游标逐行返回

2. 核心基础:异步迭代器与 Async Generator

ES2018 引入了异步迭代器(Symbol.asyncIterator)和 for await...of 循环。异步生成器函数(async function*)可以轻松创建异步迭代器。

// 异步生成器:每隔1秒产生一个数字
async function* numberStream(maxNum) {
    for (let i = 0; i  setTimeout(resolve, 1000));
        yield i;
    }
}

// 消费异步生成器
(async () => {
    for await (const num of numberStream(5)) {
        console.log('收到:', num);
    }
    console.log('流结束');
})();

与普通生成器的区别:

  • 使用 async function* 定义
  • 内部可以使用 await
  • 返回的迭代器是异步的,使用 for await...of 遍历

3. 实战案例一:Fetch API 流式读取响应体

Fetch API 的 response.body 是一个 ReadableStream,我们可以通过 getReader() 逐块读取数据,实现流式处理。

async function streamFetch(url) {
    const response = await fetch(url);
    
    if (!response.ok) throw new Error('请求失败');
    
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    
    let result = '';
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        // 将二进制块解码为文本
        const chunk = decoder.decode(value, { stream: true });
        result += chunk;
        
        // 实时处理每个块(例如更新UI)
        console.log('收到块:', chunk);
    }
    
    return result;
}

// 使用示例:流式读取大文本文件
streamFetch('/large-file.txt').then(fullText => {
    console.log('完整内容长度:', fullText.length);
});

关键点:

  • response.body.getReader() 获取流读取器
  • reader.read() 返回 { done, value }done 表示流是否结束
  • TextDecoderstream: true 选项处理跨块字符

4. 实战案例二:模拟AI对话流式输出

使用 Async Generator 模拟ChatGPT风格的流式输出,每次产生一个单词。

// 模拟AI流式响应
async function* aiResponseStream(prompt) {
    const words = [
        '你好', '!', '我是', 'AI', '助手', '。',
        '今天', '天气', '真不错', ',', '适合', '学习', 'JavaScript', '。'
    ];
    
    for (const word of words) {
        await new Promise(resolve => setTimeout(resolve, 200)); // 模拟思考延迟
        yield word;
    }
}

// 流式输出到页面
async function displayAIResponse(prompt) {
    const outputElement = document.getElementById('output');
    outputElement.textContent = '';
    
    for await (const word of aiResponseStream(prompt)) {
        outputElement.textContent += word;
        // 模拟打字机效果
        await new Promise(resolve => requestAnimationFrame(resolve));
    }
}

// 调用
displayAIResponse('你好');

5. 实战案例三:使用 Async Generator 创建可取消的数据流

结合 AbortController 实现流式数据的取消操作,避免资源浪费。

function createCancellableStream(signal) {
    return new ReadableStream({
        async start(controller) {
            for (let i = 0; i  setTimeout(resolve, 200));
            }
            controller.close();
        }
    });
}

async function consumeStream() {
    const abortController = new AbortController();
    const signal = abortController.signal;
    
    const stream = createCancellableStream(signal);
    const reader = stream.getReader();
    
    // 5秒后取消流
    setTimeout(() => {
        abortController.abort();
        console.log('流已取消');
    }, 5000);
    
    try {
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            console.log('处理:', value);
        }
    } catch (err) {
        if (err.name === 'AbortError') {
            console.log('流被取消,清理资源');
        } else {
            throw err;
        }
    }
}

consumeStream();

6. 实战案例四:大文件分块上传与进度显示

使用流式读取文件,分块上传到服务器,并实时显示进度。

async function uploadFileInChunks(file, url, chunkSize = 1024 * 1024) {
    const totalChunks = Math.ceil(file.size / chunkSize);
    let uploadedChunks = 0;
    
    // 使用 File 的 stream() 方法获取 ReadableStream
    const fileStream = file.stream();
    const reader = fileStream.getReader();
    
    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        // 每个 value 是一个 Uint8Array 块
        const chunk = value;
        
        // 上传当前块
        const formData = new FormData();
        formData.append('chunk', new Blob([chunk]));
        formData.append('chunkIndex', uploadedChunks);
        formData.append('totalChunks', totalChunks);
        
        await fetch(url, {
            method: 'POST',
            body: formData
        });
        
        uploadedChunks++;
        const progress = Math.round((uploadedChunks / totalChunks) * 100);
        console.log(`上传进度: ${progress}%`);
        
        // 更新UI进度条
        updateProgressBar(progress);
    }
    
    console.log('上传完成');
}

// 使用示例
const fileInput = document.getElementById('fileInput');
fileInput.addEventListener('change', (e) => {
    const file = e.target.files[0];
    if (file) {
        uploadFileInChunks(file, '/api/upload');
    }
});

7. 进阶:组合多个 Async Generator

我们可以将多个异步流组合起来,实现更复杂的数据处理管道。

// 转换流:将数字翻倍
async function* doubleStream(source) {
    for await (const value of source) {
        yield value * 2;
    }
}

// 过滤流:只保留偶数
async function* evenStream(source) {
    for await (const value of source) {
        if (value % 2 === 0) {
            yield value;
        }
    }
}

// 使用组合
async function* numberGenerator() {
    for (let i = 1; i  setTimeout(resolve, 200));
        yield i;
    }
}

(async () => {
    const pipeline = evenStream(doubleStream(numberGenerator()));
    
    for await (const val of pipeline) {
        console.log('管道输出:', val);
    }
    // 输出: 4, 8, 12, 16, 20
})();

8. 性能对比:流式 vs 非流式

指标 传统全量加载 流式处理
内存占用 全部数据加载到内存 仅保留当前块
首字节时间 等待完整响应 几乎即时
用户体验 等待后展示 边加载边展示
适用场景 小数据量 大数据、实时数据

9. 最佳实践总结

  • 始终处理取消:使用 AbortController 或检查 signal.aborted
  • 控制生产速度:如果消费者比生产者慢,考虑背压(backpressure)机制
  • 使用合适的块大小:Fetch API 默认块大小约 64KB,可根据场景调整
  • 错误处理:在 for await...of 中使用 try/catch
  • 与 Web Workers 结合:复杂流处理可放到 Worker 中执行
// 带错误处理的流式读取
async function safeStreamRead(url) {
    try {
        const response = await fetch(url);
        const reader = response.body.getReader();
        
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            // 处理数据
        }
    } catch (err) {
        console.error('流读取失败:', err);
    }
}

10. 总结

通过本文的案例,你掌握了JavaScript流式数据处理的核心技术:

  • Async Generator 与 for await...of
  • Fetch API 流式读取响应体
  • 可取消的数据流
  • 大文件分块上传
  • 组合多个异步流

流式处理是构建高性能、低延迟Web应用的关键。结合Async Generator和ReadableStream,你可以轻松应对各种实时数据处理场景。


本文原创,基于ES2023标准。所有代码均在Chrome 120+、Node.js 22+中测试通过。

JavaScript 异步迭代与流式数据处理:从 Fetch 到 Async Generator
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

淘吗网 javascript JavaScript 异步迭代与流式数据处理:从 Fetch 到 Async Generator https://www.taomawang.com/web/javascript/1750.html

下一篇:

已经没有下一篇了!

常见问题

相关文章

猜你喜欢
发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务