Combinando helpers em cascata. Chunk, parallel processing e rate limiting para sistemas industriais
Agora que dominamos iterator helpers básicos, vamos construir pipelines complexos que processam milhões de dados com controle total sobre performance, memória e rate limiting.
Quando você processa milhões de registros, não pode fazer um por vez (muito lento) nem todos de uma vez (estoura memória). Chunks são o meio termo perfeito: lotes de tamanho controlado.
// 📦 CHUNK HELPER: Agrupa dados em lotes controláveis
function chunk(iterator, size) {
return (function* () {
let batch = [];
for (const item of iterator) {
batch.push(item);
// 📦 Quando o lote fica cheio, entrega e reinicia
if (batch.length >= size) {
yield batch;
batch = []; // 🔄 Reseta para próximo lote
}
}
// 📦 Entrega último lote (pode ser menor que o tamanho)
if (batch.length > 0) {
yield batch;
}
})();
}
// 🏭 PIPELINE COM CHUNKS: Processamento em lotes
function* generateLargeDataset() {
for (let i = 1; i <= 100000; i++) {
yield {
id: i,
data: `Item ${i}`,
processed: false
};
}
}
// 🔧 PROCESSAMENTO: Lotes de 1000 itens
console.log('=== PROCESSAMENTO EM CHUNKS ===');
let chunkCount = 0;
for (const batch of chunk(generateLargeDataset(), 1000)) {
chunkCount++;
console.log(`🔄 Processando chunk ${chunkCount}: ${batch.length} itens`);
// 🎯 PROCESSA O LOTE INTEIRO
const processedBatch = batch.map(item => ({
...item,
processed: true,
processedAt: new Date().toISOString(),
chunkId: chunkCount
}));
console.log(`✅ Chunk ${chunkCount} finalizado!`);
// 🎯 AQUI: Poderia salvar lote no banco, enviar para API, etc.
// await saveBatchToDatabase(processedBatch);
// ⏰ RATE LIMITING: Pausa entre lotes
await new Promise(resolve => setTimeout(resolve, 100));
// 🛑 DEMO: Para após 5 chunks para não rodar infinito
if (chunkCount >= 5) {
console.log('⏸️ Demo finalizada - processou 5.000 itens em 5 chunks');
break;
}
}
// 💡 VANTAGEM: Controle total sobre memória e performance
// - Nunca usa mais que 1000 itens na memória
// - Pode pausar entre lotes para rate limiting
// - Processa milhões sem travar
APIs externas têm rate limits: GitHub permite 5000 requests/hora, Twitter 300/15min. Sem controle, você toma ban. Com generators, você controla perfeitamente o ritmo.
// ⏱️ RATE LIMITER: Controla velocidade de requisições
function rateLimited(iterator, requestsPerSecond = 10) {
return (async function* () {
const delay = 1000 / requestsPerSecond; // ms entre requests
let lastRequest = 0;
for (const item of iterator) {
const now = Date.now();
const timeSinceLastRequest = now - lastRequest;
// ⏰ Se passou muito rápido, espera
if (timeSinceLastRequest < delay) {
const waitTime = delay - timeSinceLastRequest;
console.log(`⏸️ Rate limit: aguardando ${waitTime}ms`);
await new Promise(resolve => setTimeout(resolve, waitTime));
}
lastRequest = Date.now();
yield item;
}
})();
}
// 🌐 SIMULAÇÃO: Requisições para API externa
async function* fetchUserData() {
const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
for (const id of userIds) {
// 🌐 Simula chamada para API
console.log(`🌐 Fazendo request para usuário ${id}`);
// Simula resposta da API (tempo real seria await fetch())
yield {
id,
name: `User ${id}`,
fetchedAt: new Date().toISOString(),
apiResponse: `Data from API for user ${id}`
};
}
}
// 🎯 USO COM RATE LIMITING
console.log('=== REQUISIÇÕES COM RATE LIMITING ===');
console.log('⚡ Limite: 2 requests por segundo');
const rateLimitedRequests = rateLimited(fetchUserData(), 2); // 2 req/sec
for await (const userData of rateLimitedRequests) {
console.log(`📥 Recebido: ${userData.name} às ${userData.fetchedAt}`);
}
// 📊 RESULTADO:
// - Respeita limite de 2 req/sec
// - Não toma ban da API
// - Processa todos os dados sem perder nenhum
// - Controle total sobre timing
// 🎯 CONFIGURAÇÕES REAIS:
// GitHub API: rateLimited(iterator, 1.38) // 5000/hora = 1.38/sec
// Twitter API: rateLimited(iterator, 0.33) // 300/15min = 0.33/sec
// Google API: rateLimited(iterator, 27.7) // 100k/hora = 27.7/sec
Vamos combinar tudo: chunk processing, rate limiting, transformações, filtros e validações em um pipeline que processa dados como uma fábrica moderna.
// 🏭 PIPELINE INDUSTRIAL COMPLETO
class DataProcessor {
constructor() {
this.processed = 0;
this.errors = 0;
this.startTime = Date.now();
}
// 📊 GERADOR: Simula fonte de dados massive
async* generateMassiveDataset(count = 50000) {
console.log(`🏭 Iniciando geração de ${count} registros...`);
for (let i = 1; i <= count; i++) {
yield {
id: i,
email: `user${i}@email.com`,
age: Math.floor(Math.random() * 80) + 18,
country: ['BR', 'US', 'UK', 'DE', 'JP'][Math.floor(Math.random() * 5)],
premium: Math.random() > 0.7,
created: new Date(Date.now() - Math.random() * 365 * 24 * 60 * 60 * 1000),
raw: true
};
}
}
// 🔍 VALIDADOR: Filtra dados inválidos
async* validateData(iterator) {
for await (const record of iterator) {
try {
// 🔍 VALIDAÇÕES
if (!record.email.includes('@')) {
throw new Error('Email inválido');
}
if (record.age < 18 || record.age > 100) {
throw new Error('Idade inválida');
}
yield { ...record, validated: true };
} catch (error) {
this.errors++;
console.log(`❌ Erro no registro ${record.id}: ${error.message}`);
}
}
}
// 🔧 TRANSFORMADOR: Enriquece dados
async* transformData(iterator) {
for await (const record of iterator) {
const transformed = {
...record,
// 🔧 ENRIQUECIMENTOS
fullEmail: record.email.toLowerCase(),
ageGroup: record.age < 30 ? 'young' : record.age < 50 ? 'adult' : 'senior',
accountType: record.premium ? 'premium' : 'basic',
region: this.getRegion(record.country),
accountAge: Math.floor((Date.now() - record.created) / (24 * 60 * 60 * 1000)),
processed: true,
processedAt: new Date().toISOString()
};
yield transformed;
this.processed++;
}
}
// 🌍 HELPER: Mapeia país para região
getRegion(country) {
const regions = {
'BR': 'LATAM',
'US': 'NA',
'UK': 'EU',
'DE': 'EU',
'JP': 'ASIA'
};
return regions[country] || 'OTHER';
}
// 📦 PROCESSADOR EM LOTES
async* processBatches(iterator, batchSize = 1000) {
let batch = [];
for await (const record of iterator) {
batch.push(record);
if (batch.length >= batchSize) {
// 🎯 PROCESSA LOTE COMPLETO
console.log(`📦 Processando lote de ${batch.length} registros...`);
// Simula processamento pesado (salvamento no banco, etc.)
await new Promise(resolve => setTimeout(resolve, 200));
yield {
batchId: Math.random().toString(36).substr(2, 9),
records: batch,
processedAt: new Date().toISOString(),
count: batch.length
};
batch = [];
}
}
// 📦 Último lote (pode ser menor)
if (batch.length > 0) {
yield {
batchId: Math.random().toString(36).substr(2, 9),
records: batch,
processedAt: new Date().toISOString(),
count: batch.length,
final: true
};
}
}
// 🚀 PIPELINE PRINCIPAL
async runPipeline() {
console.log('🚀 INICIANDO PIPELINE INDUSTRIAL');
console.log('===================================');
try {
// 🏭 PIPELINE: source -> validate -> transform -> batch -> process
const pipeline = this.processBatches(
rateLimited(
this.transformData(
this.validateData(
this.generateMassiveDataset(10000) // 10k registros
)
),
50 // 50 registros/sec (rate limiting)
),
500 // Lotes de 500
);
let batchCount = 0;
for await (const batch of pipeline) {
batchCount++;
console.log(`✅ Lote ${batchCount} finalizado: ${batch.count} registros`);
// 📊 ESTATÍSTICAS DO LOTE
const premiumUsers = batch.records.filter(r => r.premium).length;
const regions = [...new Set(batch.records.map(r => r.region))];
console.log(` 📊 Premium: ${premiumUsers}/${batch.count}`);
console.log(` 🌍 Regiões: ${regions.join(', ')}`);
// 🛑 DEMO: Para após 3 lotes
if (batchCount >= 3) {
console.log('\n⏸️ Demo finalizada - pipeline funcionando!');
break;
}
}
} finally {
// 📊 RELATÓRIO FINAL
const duration = Date.now() - this.startTime;
console.log('\n📊 RELATÓRIO FINAL');
console.log('==================');
console.log(`⏱️ Tempo total: ${duration}ms`);
console.log(`✅ Processados: ${this.processed}`);
console.log(`❌ Erros: ${this.errors}`);
console.log(`⚡ Taxa: ${(this.processed / (duration / 1000)).toFixed(2)} registros/sec`);
}
}
}
// 🚀 EXECUTANDO O PIPELINE
const processor = new DataProcessor();
processor.runPipeline();
// 💡 RESULTADO ESPERADO:
// - Processa milhares de registros
// - Rate limiting controlado
// - Lotes organizados
// - Validação automática
// - Transformação consistente
// - Relatório de performance
// - Memória controlada (nunca estoura)
Para processamento realmente pesado, você pode combinar generators com Worker Threads, processando múltiplos chunks em paralelo enquanto mantém controle sobre memória.
// ⚡ PARALLEL PROCESSING com Worker Pool
class ParallelProcessor {
constructor(workerCount = 4) {
this.workerCount = workerCount;
this.activeWorkers = 0;
this.maxConcurrent = workerCount;
}
// 🔄 PARALLEL CHUNK PROCESSOR
async* processInParallel(iterator, chunkSize = 100) {
const promises = new Map();
let chunkId = 0;
for (const batch of chunk(iterator, chunkSize)) {
chunkId++;
// ⏸️ CONTROLE DE CONCORRÊNCIA: Espera worker livre
while (this.activeWorkers >= this.maxConcurrent) {
console.log(`⏸️ Aguardando worker livre... (${this.activeWorkers}/${this.maxConcurrent})`);
await this.waitForWorker(promises);
}
// 🚀 INICIA WORKER
const promise = this.processChunk(batch, chunkId);
promises.set(chunkId, promise);
this.activeWorkers++;
console.log(`🚀 Worker ${chunkId} iniciado (${this.activeWorkers}/${this.maxConcurrent} ativos)`);
// 🎯 YIELD resultados conforme ficam prontos
for (const [id, prom] of [...promises.entries()]) {
try {
const result = await Promise.race([prom, Promise.resolve(null)]);
if (result) {
promises.delete(id);
this.activeWorkers--;
console.log(`✅ Worker ${id} finalizado`);
yield result;
}
} catch (error) {
console.log(`❌ Worker ${id} falhou: ${error.message}`);
promises.delete(id);
this.activeWorkers--;
}
}
}
// ⏳ AGUARDA workers restantes
console.log('⏳ Aguardando workers finais...');
for (const [id, promise] of promises) {
try {
const result = await promise;
console.log(`✅ Worker final ${id} concluído`);
yield result;
} catch (error) {
console.log(`❌ Worker final ${id} falhou: ${error.message}`);
}
}
}
// 👷 SIMULA WORKER THREAD (seria Worker real em produção)
async processChunk(batch, chunkId) {
// Simula processamento pesado (CPU intensive)
const startTime = Date.now();
const processingTime = Math.random() * 2000 + 500; // 500-2500ms
await new Promise(resolve => setTimeout(resolve, processingTime));
// 🔧 PROCESSAMENTO: Transforma cada item do lote
const processed = batch.map(item => ({
...item,
processed: true,
workerId: chunkId,
processingTime: processingTime,
completedAt: new Date().toISOString(),
// Simula cálculo pesado
complexCalculation: Math.pow(item.id, 2) * Math.PI
}));
return {
chunkId,
count: processed.length,
data: processed,
processingTime,
completedAt: new Date().toISOString()
};
}
async waitForWorker(promises) {
if (promises.size > 0) {
await Promise.race([...promises.values()]);
}
}
}
// 🚀 EXEMPLO DE USO
async function demonstrateParallelProcessing() {
console.log('🚀 DEMONSTRAÇÃO: Parallel Processing');
console.log('=====================================');
// 📊 FONTE DE DADOS
function* generateData() {
for (let i = 1; i <= 1000; i++) {
yield {
id: i,
data: `Item ${i}`,
timestamp: Date.now()
};
}
}
const processor = new ParallelProcessor(3); // 3 workers
const startTime = Date.now();
let totalProcessed = 0;
try {
for await (const result of processor.processInParallel(generateData(), 50)) {
totalProcessed += result.count;
console.log(`📦 Chunk ${result.chunkId}: ${result.count} itens em ${result.processingTime}ms`);
// 🛑 DEMO: Para após processar 500 itens
if (totalProcessed >= 500) {
console.log('\n⏸️ Demo finalizada');
break;
}
}
} catch (error) {
console.error('❌ Erro no pipeline:', error);
} finally {
const duration = Date.now() - startTime;
console.log('\n📊 RESULTADO FINAL');
console.log(`⏱️ Tempo total: ${duration}ms`);
console.log(`📊 Itens processados: ${totalProcessed}`);
console.log(`⚡ Taxa: ${(totalProcessed / (duration / 1000)).toFixed(2)} itens/sec`);
console.log(`🚀 Speedup: ~3x (3 workers em paralelo)`);
}
}
demonstrateParallelProcessing();
// 💡 VANTAGENS DO PARALLEL PROCESSING:
// - 3-4x mais rápido (dependendo dos cores)
// - Controle de concorrência
// - Memory efficient (processa em chunks)
// - Fault tolerant (worker falha não para tudo)
// - Escalável (ajusta number de workers)
Agora você domina pipelines industriais que processam milhões de dados com performance e controle total!