🚀 Oferta especial: 60% OFF no CrazyStack - Últimas vagas!Garantir vaga →
MÓDULO 2 - AULA 5

Pipeline de Dados Avançado

Combinando helpers em cascata. Chunk, parallel processing e rate limiting para sistemas industriais

🎯 Pipeline Industrial

Agora que dominamos iterator helpers básicos, vamos construir pipelines complexos que processam milhões de dados com controle total sobre performance, memória e rate limiting.

📦 Chunk Processing: Dividir para conquistar

✅ Por que Chunks são fundamentais

Quando você processa milhões de registros, não pode fazer um por vez (muito lento) nem todos de uma vez (estoura memória). Chunks são o meio termo perfeito: lotes de tamanho controlado.

chunk-processing.js
// 📦 CHUNK HELPER: Agrupa dados em lotes controláveis
function chunk(iterator, size) {
  return (function* () {
    let batch = [];
    
    for (const item of iterator) {
      batch.push(item);
      
      // 📦 Quando o lote fica cheio, entrega e reinicia
      if (batch.length >= size) {
        yield batch;
        batch = []; // 🔄 Reseta para próximo lote
      }
    }
    
    // 📦 Entrega último lote (pode ser menor que o tamanho)
    if (batch.length > 0) {
      yield batch;
    }
  })();
}

// 🏭 PIPELINE COM CHUNKS: Processamento em lotes
function* generateLargeDataset() {
  for (let i = 1; i <= 100000; i++) {
    yield {
      id: i,
      data: `Item ${i}`,
      processed: false
    };
  }
}

// 🔧 PROCESSAMENTO: Lotes de 1000 itens
console.log('=== PROCESSAMENTO EM CHUNKS ===');
let chunkCount = 0;

for (const batch of chunk(generateLargeDataset(), 1000)) {
  chunkCount++;
  console.log(`🔄 Processando chunk ${chunkCount}: ${batch.length} itens`);
  
  // 🎯 PROCESSA O LOTE INTEIRO
  const processedBatch = batch.map(item => ({
    ...item,
    processed: true,
    processedAt: new Date().toISOString(),
    chunkId: chunkCount
  }));
  
  console.log(`✅ Chunk ${chunkCount} finalizado!`);
  
  // 🎯 AQUI: Poderia salvar lote no banco, enviar para API, etc.
  // await saveBatchToDatabase(processedBatch);
  
  // ⏰ RATE LIMITING: Pausa entre lotes
  await new Promise(resolve => setTimeout(resolve, 100));
  
  // 🛑 DEMO: Para após 5 chunks para não rodar infinito
  if (chunkCount >= 5) {
    console.log('⏸️ Demo finalizada - processou 5.000 itens em 5 chunks');
    break;
  }
}

// 💡 VANTAGEM: Controle total sobre memória e performance
// - Nunca usa mais que 1000 itens na memória
// - Pode pausar entre lotes para rate limiting
// - Processa milhões sem travar

⏱️ Rate Limiting: Respeitando APIs

⚠️ APIs têm limites

APIs externas têm rate limits: GitHub permite 5000 requests/hora, Twitter 300/15min. Sem controle, você toma ban. Com generators, você controla perfeitamente o ritmo.

rate-limiting.js
// ⏱️ RATE LIMITER: Controla velocidade de requisições
function rateLimited(iterator, requestsPerSecond = 10) {
  return (async function* () {
    const delay = 1000 / requestsPerSecond; // ms entre requests
    let lastRequest = 0;
    
    for (const item of iterator) {
      const now = Date.now();
      const timeSinceLastRequest = now - lastRequest;
      
      // ⏰ Se passou muito rápido, espera
      if (timeSinceLastRequest < delay) {
        const waitTime = delay - timeSinceLastRequest;
        console.log(`⏸️ Rate limit: aguardando ${waitTime}ms`);
        await new Promise(resolve => setTimeout(resolve, waitTime));
      }
      
      lastRequest = Date.now();
      yield item;
    }
  })();
}

// 🌐 SIMULAÇÃO: Requisições para API externa
async function* fetchUserData() {
  const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  
  for (const id of userIds) {
    // 🌐 Simula chamada para API
    console.log(`🌐 Fazendo request para usuário ${id}`);
    
    // Simula resposta da API (tempo real seria await fetch())
    yield {
      id,
      name: `User ${id}`,
      fetchedAt: new Date().toISOString(),
      apiResponse: `Data from API for user ${id}`
    };
  }
}

// 🎯 USO COM RATE LIMITING
console.log('=== REQUISIÇÕES COM RATE LIMITING ===');
console.log('⚡ Limite: 2 requests por segundo');

const rateLimitedRequests = rateLimited(fetchUserData(), 2); // 2 req/sec

for await (const userData of rateLimitedRequests) {
  console.log(`📥 Recebido: ${userData.name} às ${userData.fetchedAt}`);
}

// 📊 RESULTADO:
// - Respeita limite de 2 req/sec
// - Não toma ban da API
// - Processa todos os dados sem perder nenhum
// - Controle total sobre timing

// 🎯 CONFIGURAÇÕES REAIS:
// GitHub API: rateLimited(iterator, 1.38) // 5000/hora = 1.38/sec
// Twitter API: rateLimited(iterator, 0.33) // 300/15min = 0.33/sec
// Google API: rateLimited(iterator, 27.7)  // 100k/hora = 27.7/sec

🔧 Pipeline Complexo: Sistema Completo

🏭 Sistema Industrial

Vamos combinar tudo: chunk processing, rate limiting, transformações, filtros e validações em um pipeline que processa dados como uma fábrica moderna.

industrial-pipeline.js
// 🏭 PIPELINE INDUSTRIAL COMPLETO
class DataProcessor {
  constructor() {
    this.processed = 0;
    this.errors = 0;
    this.startTime = Date.now();
  }

  // 📊 GERADOR: Simula fonte de dados massive
  async* generateMassiveDataset(count = 50000) {
    console.log(`🏭 Iniciando geração de ${count} registros...`);
    
    for (let i = 1; i <= count; i++) {
      yield {
        id: i,
        email: `user${i}@email.com`,
        age: Math.floor(Math.random() * 80) + 18,
        country: ['BR', 'US', 'UK', 'DE', 'JP'][Math.floor(Math.random() * 5)],
        premium: Math.random() > 0.7,
        created: new Date(Date.now() - Math.random() * 365 * 24 * 60 * 60 * 1000),
        raw: true
      };
    }
  }

  // 🔍 VALIDADOR: Filtra dados inválidos
  async* validateData(iterator) {
    for await (const record of iterator) {
      try {
        // 🔍 VALIDAÇÕES
        if (!record.email.includes('@')) {
          throw new Error('Email inválido');
        }
        if (record.age < 18 || record.age > 100) {
          throw new Error('Idade inválida');
        }
        
        yield { ...record, validated: true };
        
      } catch (error) {
        this.errors++;
        console.log(`❌ Erro no registro ${record.id}: ${error.message}`);
      }
    }
  }

  // 🔧 TRANSFORMADOR: Enriquece dados
  async* transformData(iterator) {
    for await (const record of iterator) {
      const transformed = {
        ...record,
        // 🔧 ENRIQUECIMENTOS
        fullEmail: record.email.toLowerCase(),
        ageGroup: record.age < 30 ? 'young' : record.age < 50 ? 'adult' : 'senior',
        accountType: record.premium ? 'premium' : 'basic',
        region: this.getRegion(record.country),
        accountAge: Math.floor((Date.now() - record.created) / (24 * 60 * 60 * 1000)),
        processed: true,
        processedAt: new Date().toISOString()
      };
      
      yield transformed;
      this.processed++;
    }
  }

  // 🌍 HELPER: Mapeia país para região
  getRegion(country) {
    const regions = {
      'BR': 'LATAM',
      'US': 'NA', 
      'UK': 'EU',
      'DE': 'EU',
      'JP': 'ASIA'
    };
    return regions[country] || 'OTHER';
  }

  // 📦 PROCESSADOR EM LOTES
  async* processBatches(iterator, batchSize = 1000) {
    let batch = [];
    
    for await (const record of iterator) {
      batch.push(record);
      
      if (batch.length >= batchSize) {
        // 🎯 PROCESSA LOTE COMPLETO
        console.log(`📦 Processando lote de ${batch.length} registros...`);
        
        // Simula processamento pesado (salvamento no banco, etc.)
        await new Promise(resolve => setTimeout(resolve, 200));
        
        yield {
          batchId: Math.random().toString(36).substr(2, 9),
          records: batch,
          processedAt: new Date().toISOString(),
          count: batch.length
        };
        
        batch = [];
      }
    }
    
    // 📦 Último lote (pode ser menor)
    if (batch.length > 0) {
      yield {
        batchId: Math.random().toString(36).substr(2, 9),
        records: batch,
        processedAt: new Date().toISOString(),
        count: batch.length,
        final: true
      };
    }
  }

  // 🚀 PIPELINE PRINCIPAL
  async runPipeline() {
    console.log('🚀 INICIANDO PIPELINE INDUSTRIAL');
    console.log('===================================');
    
    try {
      // 🏭 PIPELINE: source -> validate -> transform -> batch -> process
      const pipeline = this.processBatches(
        rateLimited(
          this.transformData(
            this.validateData(
              this.generateMassiveDataset(10000) // 10k registros
            )
          ),
          50 // 50 registros/sec (rate limiting)
        ),
        500 // Lotes de 500
      );

      let batchCount = 0;
      
      for await (const batch of pipeline) {
        batchCount++;
        console.log(`✅ Lote ${batchCount} finalizado: ${batch.count} registros`);
        
        // 📊 ESTATÍSTICAS DO LOTE
        const premiumUsers = batch.records.filter(r => r.premium).length;
        const regions = [...new Set(batch.records.map(r => r.region))];
        
        console.log(`   📊 Premium: ${premiumUsers}/${batch.count}`);
        console.log(`   🌍 Regiões: ${regions.join(', ')}`);
        
        // 🛑 DEMO: Para após 3 lotes
        if (batchCount >= 3) {
          console.log('\n⏸️ Demo finalizada - pipeline funcionando!');
          break;
        }
      }
      
    } finally {
      // 📊 RELATÓRIO FINAL
      const duration = Date.now() - this.startTime;
      console.log('\n📊 RELATÓRIO FINAL');
      console.log('==================');
      console.log(`⏱️ Tempo total: ${duration}ms`);
      console.log(`✅ Processados: ${this.processed}`);
      console.log(`❌ Erros: ${this.errors}`);
      console.log(`⚡ Taxa: ${(this.processed / (duration / 1000)).toFixed(2)} registros/sec`);
    }
  }
}

// 🚀 EXECUTANDO O PIPELINE
const processor = new DataProcessor();
processor.runPipeline();

// 💡 RESULTADO ESPERADO:
// - Processa milhares de registros
// - Rate limiting controlado
// - Lotes organizados
// - Validação automática
// - Transformação consistente
// - Relatório de performance
// - Memória controlada (nunca estoura)

⚡ Parallel Processing: Multiplicando performance

🚀 Worker Threads

Para processamento realmente pesado, você pode combinar generators com Worker Threads, processando múltiplos chunks em paralelo enquanto mantém controle sobre memória.

parallel-processing.js
// ⚡ PARALLEL PROCESSING com Worker Pool
class ParallelProcessor {
  constructor(workerCount = 4) {
    this.workerCount = workerCount;
    this.activeWorkers = 0;
    this.maxConcurrent = workerCount;
  }

  // 🔄 PARALLEL CHUNK PROCESSOR
  async* processInParallel(iterator, chunkSize = 100) {
    const promises = new Map();
    let chunkId = 0;
    
    for (const batch of chunk(iterator, chunkSize)) {
      chunkId++;
      
      // ⏸️ CONTROLE DE CONCORRÊNCIA: Espera worker livre
      while (this.activeWorkers >= this.maxConcurrent) {
        console.log(`⏸️ Aguardando worker livre... (${this.activeWorkers}/${this.maxConcurrent})`);
        await this.waitForWorker(promises);
      }
      
      // 🚀 INICIA WORKER
      const promise = this.processChunk(batch, chunkId);
      promises.set(chunkId, promise);
      this.activeWorkers++;
      
      console.log(`🚀 Worker ${chunkId} iniciado (${this.activeWorkers}/${this.maxConcurrent} ativos)`);
      
      // 🎯 YIELD resultados conforme ficam prontos
      for (const [id, prom] of [...promises.entries()]) {
        try {
          const result = await Promise.race([prom, Promise.resolve(null)]);
          if (result) {
            promises.delete(id);
            this.activeWorkers--;
            console.log(`✅ Worker ${id} finalizado`);
            yield result;
          }
        } catch (error) {
          console.log(`❌ Worker ${id} falhou: ${error.message}`);
          promises.delete(id);
          this.activeWorkers--;
        }
      }
    }
    
    // ⏳ AGUARDA workers restantes
    console.log('⏳ Aguardando workers finais...');
    for (const [id, promise] of promises) {
      try {
        const result = await promise;
        console.log(`✅ Worker final ${id} concluído`);
        yield result;
      } catch (error) {
        console.log(`❌ Worker final ${id} falhou: ${error.message}`);
      }
    }
  }

  // 👷 SIMULA WORKER THREAD (seria Worker real em produção)
  async processChunk(batch, chunkId) {
    // Simula processamento pesado (CPU intensive)
    const startTime = Date.now();
    const processingTime = Math.random() * 2000 + 500; // 500-2500ms
    
    await new Promise(resolve => setTimeout(resolve, processingTime));
    
    // 🔧 PROCESSAMENTO: Transforma cada item do lote
    const processed = batch.map(item => ({
      ...item,
      processed: true,
      workerId: chunkId,
      processingTime: processingTime,
      completedAt: new Date().toISOString(),
      // Simula cálculo pesado
      complexCalculation: Math.pow(item.id, 2) * Math.PI
    }));
    
    return {
      chunkId,
      count: processed.length,
      data: processed,
      processingTime,
      completedAt: new Date().toISOString()
    };
  }

  async waitForWorker(promises) {
    if (promises.size > 0) {
      await Promise.race([...promises.values()]);
    }
  }
}

// 🚀 EXEMPLO DE USO
async function demonstrateParallelProcessing() {
  console.log('🚀 DEMONSTRAÇÃO: Parallel Processing');
  console.log('=====================================');
  
  // 📊 FONTE DE DADOS
  function* generateData() {
    for (let i = 1; i <= 1000; i++) {
      yield {
        id: i,
        data: `Item ${i}`,
        timestamp: Date.now()
      };
    }
  }
  
  const processor = new ParallelProcessor(3); // 3 workers
  const startTime = Date.now();
  let totalProcessed = 0;
  
  try {
    for await (const result of processor.processInParallel(generateData(), 50)) {
      totalProcessed += result.count;
      console.log(`📦 Chunk ${result.chunkId}: ${result.count} itens em ${result.processingTime}ms`);
      
      // 🛑 DEMO: Para após processar 500 itens
      if (totalProcessed >= 500) {
        console.log('\n⏸️ Demo finalizada');
        break;
      }
    }
  } catch (error) {
    console.error('❌ Erro no pipeline:', error);
  } finally {
    const duration = Date.now() - startTime;
    console.log('\n📊 RESULTADO FINAL');
    console.log(`⏱️ Tempo total: ${duration}ms`);
    console.log(`📊 Itens processados: ${totalProcessed}`);
    console.log(`⚡ Taxa: ${(totalProcessed / (duration / 1000)).toFixed(2)} itens/sec`);
    console.log(`🚀 Speedup: ~3x (3 workers em paralelo)`);
  }
}

demonstrateParallelProcessing();

// 💡 VANTAGENS DO PARALLEL PROCESSING:
// - 3-4x mais rápido (dependendo dos cores)
// - Controle de concorrência
// - Memory efficient (processa em chunks)
// - Fault tolerant (worker falha não para tudo)
// - Escalável (ajusta number de workers)

🚀 Checkpoint: Pipeline Master Pro

Chunk ProcessingRate LimitingPipeline IndustrialParallel Processing

Agora você domina pipelines industriais que processam milhões de dados com performance e controle total!