发布日期:2024年1月 | 作者:JavaScript高级工程师
一、Generator函数:可暂停执行的JavaScript函数
Generator是ES6引入的特殊函数类型,它可以在执行过程中暂停和恢复,为处理复杂的数据流和异步操作提供了全新的编程范式。
Generator核心特性:
- 可暂停执行: 使用yield关键字暂停函数执行
- 状态保持: 暂停时保持函数内部状态
- 双向通信: 通过yield返回值,通过next()传递参数
- 迭代器接口: 自动实现迭代器协议
基础Generator示例:
// 简单的数字生成器
function* numberGenerator() {
let num = 0;
while (true) {
const reset = yield num++;
if (reset) {
num = 0;
}
}
}
// 使用示例
const gen = numberGenerator();
console.log(gen.next().value); // 0
console.log(gen.next().value); // 1
console.log(gen.next().value); // 2
console.log(gen.next(true).value); // 0 (重置)
console.log(gen.next().value); // 1
复杂数据处理Generator:
// 数据处理管道
function* dataProcessor(data) {
// 第一步:数据清洗
const cleaned = yield* cleanData(data);
// 第二步:数据转换
const transformed = yield* transformData(cleaned);
// 第三步:数据聚合
const aggregated = yield* aggregateData(transformed);
return aggregated;
}
function* cleanData(data) {
for (const item of data) {
if (item && typeof item === 'object') {
// 移除空值和无效字段
const cleaned = Object.fromEntries(
Object.entries(item).filter(([_, value]) =>
value != null && value !== ''
)
);
yield cleaned;
}
}
}
function* transformData(data) {
for (const item of data) {
// 数据格式标准化
const transformed = {
id: item.id || generateId(),
timestamp: new Date(item.timestamp || Date.now()),
...item
};
yield transformed;
}
}
function* aggregateData(data) {
const result = {};
for (const item of data) {
const key = item.category || 'default';
if (!result[key]) {
result[key] = [];
}
result[key].push(item);
yield { ...result }; // 返回当前聚合状态
}
return result;
}
二、迭代器协议与可迭代对象深度解析
自定义迭代器实现:
// 范围迭代器
class RangeIterator {
constructor(start, end, step = 1) {
this.start = start;
this.end = end;
this.step = step;
this.current = start;
}
[Symbol.iterator]() {
return this;
}
next() {
if (this.current <= this.end) {
const value = this.current;
this.current += this.step;
return { value, done: false };
}
return { value: undefined, done: true };
}
}
// 使用示例
const range = new RangeIterator(1, 5);
for (const num of range) {
console.log(num); // 1, 2, 3, 4, 5
}
Generator实现迭代器:
// 树形结构迭代器
function* treeTraversal(node, order = 'inorder') {
if (!node) return;
if (order === 'preorder') yield node.value;
if (node.left) yield* treeTraversal(node.left, order);
if (order === 'inorder') yield node.value;
if (node.right) yield* treeTraversal(node.right, order);
if (order === 'postorder') yield node.value;
}
// 二叉树节点
const tree = {
value: 'A',
left: {
value: 'B',
left: { value: 'D' },
right: { value: 'E' }
},
right: {
value: 'C',
left: { value: 'F' },
right: { value: 'G' }
}
};
// 中序遍历
console.log([...treeTraversal(tree, 'inorder')]);
// ['D', 'B', 'E', 'A', 'F', 'C', 'G']
三、异步Generator与for-await-of循环
异步数据流处理:
// 异步数据分页处理器
async function* paginatedDataFetcher(apiUrl, pageSize = 10) {
let page = 1;
let hasMore = true;
while (hasMore) {
try {
const response = await fetch(`${apiUrl}?page=${page}&size=${pageSize}`);
const data = await response.json();
if (data.items && data.items.length > 0) {
// 逐条产出数据项
for (const item of data.items) {
yield item;
}
// 检查是否还有更多数据
hasMore = data.hasMore || data.items.length === pageSize;
page++;
} else {
hasMore = false;
}
} catch (error) {
console.error(`获取第${page}页数据失败:`, error);
// 可以在这里实现重试逻辑
yield { error: `第${page}页数据获取失败` };
break;
}
}
}
// 使用for-await-of处理异步数据流
async function processAllData(apiUrl) {
const results = [];
const errors = [];
try {
for await (const item of paginatedDataFetcher(apiUrl)) {
if (item.error) {
errors.push(item);
} else {
results.push(item);
// 实时处理数据
await processItem(item);
}
}
} catch (error) {
console.error('数据处理过程中发生错误:', error);
}
return { results, errors };
}
async function processItem(item) {
// 模拟异步处理
await new Promise(resolve => setTimeout(resolve, 10));
console.log('处理项目:', item.id);
}
实时数据流监控:
// WebSocket数据流处理器
async function* webSocketDataStream(url, protocols = []) {
const ws = new WebSocket(url, protocols);
try {
// 等待连接建立
await new Promise((resolve, reject) => {
ws.onopen = resolve;
ws.onerror = reject;
});
console.log('WebSocket连接已建立');
while (ws.readyState === WebSocket.OPEN) {
// 等待下一条消息
const message = await new Promise((resolve, reject) => {
ws.onmessage = event => resolve(event.data);
ws.onerror = reject;
ws.onclose = () => resolve(null);
});
if (message === null) {
break; // 连接关闭
}
try {
const data = JSON.parse(message);
yield data;
} catch (parseError) {
yield { error: '消息解析失败', raw: message };
}
}
} finally {
if (ws.readyState === WebSocket.OPEN) {
ws.close();
}
}
}
// 使用示例
async function monitorRealTimeData() {
const stream = webSocketDataStream('wss://api.example.com/realtime');
for await (const data of stream) {
if (data.error) {
console.warn('数据错误:', data.error);
} else {
updateDashboard(data);
}
}
}
四、构建高效数据流处理系统
数据管道架构:
class DataStreamProcessor {
constructor() {
this.transformers = [];
this.filters = [];
}
// 添加数据转换器
pipe(transformer) {
if (typeof transformer === 'function') {
this.transformers.push(transformer);
}
return this; // 支持链式调用
}
// 添加数据过滤器
filter(filterFn) {
this.filters.push(filterFn);
return this;
}
// 处理数据流
async *process(stream) {
for await (let data of stream) {
try {
// 应用过滤器
if (this.filters.some(filter => !filter(data))) {
continue;
}
// 应用转换器
for (const transformer of this.transformers) {
data = await transformer(data);
}
yield data;
} catch (error) {
console.error('数据处理错误:', error);
yield { error: error.message, original: data };
}
}
}
}
// 使用示例
const processor = new DataStreamProcessor()
.filter(data => data.active) // 只处理活跃数据
.pipe(data => ({ ...data, processedAt: new Date() })) // 添加时间戳
.pipe(async data => {
// 异步数据增强
const enriched = await enrichData(data);
return enriched;
});
// 模拟数据流
async function* mockDataStream() {
const data = [
{ id: 1, active: true, value: 100 },
{ id: 2, active: false, value: 200 },
{ id: 3, active: true, value: 300 }
];
for (const item of data) {
await new Promise(resolve => setTimeout(resolve, 100));
yield item;
}
}
// 处理数据
async function handleData() {
for await (const result of processor.process(mockDataStream())) {
console.log('处理结果:', result);
}
}
背压控制实现:
// 带背压控制的数据流
async function* controlledStream(source, maxConcurrent = 3) {
const queue = [];
let activeCount = 0;
// 数据源监听
const sourceListener = async () => {
for await (const item of source) {
queue.push(item);
// 通知有新的数据可用
if (activeCount {
while (queue.length > 0 && activeCount 0) {
processQueue();
}
}
}
};
// 启动源监听
sourceListener();
// 启动初始处理
processQueue();
}
async function processItemWithBackpressure(item) {
// 模拟耗时处理
await new Promise(resolve =>
setTimeout(resolve, Math.random() * 1000)
);
return { ...item, processed: true };
}
五、真实场景:构建日志处理系统
日志收集与处理管道:
// 完整的日志处理系统
class LogProcessingSystem {
constructor() {
this.pipelines = new Map();
}
// 创建处理管道
createPipeline(name, config = {}) {
const pipeline = {
transformers: [],
filters: [],
batchSize: config.batchSize || 100,
flushInterval: config.flushInterval || 5000
};
this.pipelines.set(name, pipeline);
return this;
}
// 添加日志到管道
async pushLog(pipelineName, logEntry) {
const pipeline = this.pipelines.get(pipelineName);
if (!pipeline) {
throw new Error(`管道 ${pipelineName} 不存在`);
}
// 应用过滤器和转换器
let processedLog = logEntry;
for (const filter of pipeline.filters) {
if (!filter(processedLog)) return; // 被过滤掉
}
for (const transformer of pipeline.transformers) {
processedLog = await transformer(processedLog);
}
return processedLog;
}
// 生成日志流
async *generateLogStream(pipelineName, options = {}) {
const pipeline = this.pipelines.get(pipelineName);
if (!pipeline) return;
const batch = [];
let lastFlush = Date.now();
// 模拟日志源
async function* logSource() {
while (true) {
yield generateMockLog();
await new Promise(resolve =>
setTimeout(resolve, Math.random() * 1000)
);
}
}
for await (const log of logSource()) {
const processedLog = await this.pushLog(pipelineName, log);
if (processedLog) {
batch.push(processedLog);
// 批量处理条件
const shouldFlush =
batch.length >= pipeline.batchSize ||
Date.now() - lastFlush >= pipeline.flushInterval;
if (shouldFlush && batch.length > 0) {
yield [...batch];
batch.length = 0;
lastFlush = Date.now();
}
}
if (options.limit && batch.length >= options.limit) {
break;
}
}
// 处理剩余日志
if (batch.length > 0) {
yield batch;
}
}
}
// 使用示例
const logSystem = new LogProcessingSystem();
// 配置错误日志管道
logSystem.createPipeline('error-logs', { batchSize: 50 })
.filter(log => log.level === 'ERROR')
.transform(log => ({
...log,
timestamp: new Date(log.timestamp),
environment: process.env.NODE_ENV
}));
// 处理错误日志流
async function monitorErrorLogs() {
const stream = logSystem.generateLogStream('error-logs', { limit: 1000 });
for await (const batch of stream) {
console.log(`处理 ${batch.length} 条错误日志`);
// 发送到监控系统
await sendToMonitoringService(batch);
// 存储到数据库
await storeLogsInDatabase(batch);
}
}
// 模拟生成日志
function generateMockLog() {
const levels = ['INFO', 'WARN', 'ERROR'];
return {
level: levels[Math.floor(Math.random() * levels.length)],
message: `Log message ${Date.now()}`,
timestamp: new Date().toISOString(),
source: 'web-server'
};
}
六、性能优化与最佳实践
内存管理策略:
// 内存友好的大数据处理
async function* memoryEfficientProcessor(largeDataset, chunkSize = 1000) {
let position = 0;
while (position setImmediate(resolve));
}
}
错误处理与重试机制:
// 带重试的异步Generator
async function* resilientDataFetcher(fetchFunction, maxRetries = 3) {
let retryCount = 0;
while (retryCount maxRetries) {
throw new Error(`在 ${maxRetries} 次重试后仍然失败: ${error.message}`);
}
console.warn(`第 ${retryCount} 次重试...`);
// 指数退避
const delay = Math.pow(2, retryCount) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
总结
Generator函数和异步迭代器为JavaScript带来了革命性的数据处理能力:
- 惰性求值: 只在需要时计算数据,节省资源
- 内存效率: 处理大数据集时避免内存溢出
- 可组合性: 通过管道模式构建复杂的数据处理流程
- 异步友好: 原生支持异步操作和实时数据流
- 错误恢复: 提供细粒度的错误处理和重试机制
在实际项目中,Generator特别适用于:
- 大数据集的分块处理
- 实时数据流监控
- 复杂算法的逐步执行
- 异步操作的协调和管理
- 自定义迭代逻辑的实现
掌握这些高级特性将显著提升你在处理复杂数据场景下的编程能力,为构建高性能、可维护的JavaScript应用奠定坚实基础。