🚀 Oferta especial: 60% OFF no CrazyStack - Últimas vagas!Garantir vaga →
Node.js

A Função Mais Poderosa do JavaScript que Muda Tudo

Descubra Generator Functions e Iterator Helpers - as propostas mais importantes para processamento sob demanda que transformam como você programa em JavaScript e Node.js.

CrazyStack
15 min de leitura
Node.jsJavaScriptStreamsIterator Helpers

Por que isso é importante

Node.js e JavaScript foram projetados para executar aplicações de alto poder de processamento usando quase nada de recursos. O conceito-chave é processamento sob demanda: ao invés de acumular dados em memória, você os processa conforme necessário. Pense num cenário onde você deve mover 2GB de dados de uma tabela para outra - você não vai carregar os 2GB em memória, mas ir de 100 em 100 itens, sempre na onda do "dividir para conquistar".

O que são Generator Functions?

As Generator Functions são funções especiais marcadas com um asterisco (*) que podem retornar múltiplos valores ao longo do tempo. Uma Generator Function pode retornar múltiplos resultados - por isso o nome "função geradora". Para gerar esses dados você usa a palavra-chave yield, que notifica quem está consumindo essa função geradora que um valor foi retornado.

Pela especificação, quem estiver consumindo essa função vai ficar chamando o método .next() dela, para saber se mais algum valor foi retornado ou deve parar a execução. Ou simplesmente você pode usar o iterator com o for...of do JavaScript e imprimir resultado por resultado.

generator-basico.js
// 📚 CONCEITO: Generator Function básica
// Pense numa máquina de venda que entrega um produto por vez
function* simpleGenerator() {
  yield 1;    // 🎯 "Entrega" o primeiro valor e PAUSA
  yield 2;    // 🎯 "Entrega" o segundo valor e PAUSA  
  yield 3;    // 🎯 "Entrega" o terceiro valor e PAUSA
  return 'fim'; // 🏁 Sinaliza que terminou
}

// 🔧 PASSO 1: Criando o iterador (como inserir moeda na máquina)
const iterator = simpleGenerator();

// 🔧 PASSO 2: Consumindo valores um por vez (como apertar o botão)
console.log(iterator.next()); // { value: 1, done: false } ← Primeiro produto
console.log(iterator.next()); // { value: 2, done: false } ← Segundo produto
console.log(iterator.next()); // { value: 3, done: false } ← Terceiro produto
console.log(iterator.next()); // { value: 'fim', done: true } ← Máquina vazia

// 💡 ANALOGIA: Como uma esteira de produção que entrega um item por vez
// ao invés de entregar todos de uma vez numa caixa gigante

A pergunta fundamental: Por que não arrays?

"Tá, mas para que você retornaria mais de um valor de uma função em JavaScript? Por que não só retornar um array com todos os itens dentro?" Essa é a maior dúvida entre desenvolvedores, então vamos entender o problema real.

A diferença está no conceito de processamento sob demanda. Quando você retorna um array, está carregando TODOS os dados na memória de uma só vez. Com generators, você processa item por item, criando uma verdadeira "linha de produção" onde cada etapa é executada conforme necessário.

A analogia da linha de produção

Pense em como funciona o processamento de um arquivo CSV: você lê a primeira linha do arquivo, transforma em JSON e usa o yield para falar: "Olha, essa linha está pronta! Quem estiver escutando, usa aí que eu vou para o próximo".

É como se fosse uma linha de produção: "Eu consegui um resultado aqui, eu mando para frente, já pego o próximo, mando para frente e já era". Esse "manda para frente" é o que fazemos com o uso da palavra-chave yield.

Como funciona o método .next()

Ao chamar uma função geradora, ela retorna um iterador com o método .next(), que serve para consumir os valores um por um. Cada chamada retorna um objeto com duas propriedades: value e done. Essa abordagem dá controle total sobre quando avançar para o próximo valor.

next-method-control.js
// 📚 CONCEITO: Controle manual com .next()
// Pense numa linha de montagem onde VOCÊ decide quando avançar
function* dataProcessor() {
  yield 'Processando arquivo 1...';  // 🔄 Primeira etapa
  yield 'Processando arquivo 2...';  // 🔄 Segunda etapa  
  yield 'Processando arquivo 3...';  // 🔄 Terceira etapa
  return 'Todos os arquivos processados!'; // ✅ Resultado final
}

// 🎮 CONTROLE TOTAL: Você decide quando "apertar play"
const processor = dataProcessor();

// 🔄 EXECUÇÃO PASSO-A-PASSO
let result = processor.next(); // Pega o primeiro status
while (!result.done) {         // Enquanto não terminou...
  console.log('Status:', result.value); // Mostra progresso atual
  
  // ⏰ PAUSA CONTROLADA: Simula processamento assíncrono
  await new Promise(resolve => setTimeout(resolve, 1000));
  
  result = processor.next(); // 👆 SÓ avança quando você quiser!
}

console.log('Resultado final:', result.value);

// 💡 VANTAGEM: Ideal para operações que precisam de pausa entre etapas
// (rate limiting, processamento batch, progresso visual)

Iterando com for-of

Além de usar .next() manualmente, o JavaScript permite consumir iteradores usando a sintaxe for...of, que torna o código mais limpo e legível. Essa forma de iteração cuida automaticamente do done.

for-of-iteration.js
// 📚 CONCEITO: for...of - A forma "automática" de consumir
// Pense numa esteira rolante que você só observa passando
function* fibonacci(limit) {
  let a = 0, b = 1; // 🌱 Sementes da sequência
  
  while (a <= limit) {
    yield a;          // 📤 Entrega o número atual
    [a, b] = [b, a + b]; // 🔄 Calcula próximos (destructuring)
  }
}

// 🎯 FORMA AUTOMÁTICA: for...of cuida do .next() pra você
console.log('Sequência de Fibonacci até 100:');
for (const num of fibonacci(100)) {
  console.log(num); // 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89
  // 💡 JavaScript automaticamente chama .next() até done: true
}

// 🔧 CONVERSÃO RÁPIDA: Spread operator pega todos de uma vez
const fibArray = [...fibonacci(20)];
console.log('Array:', fibArray); // [0, 1, 1, 2, 3, 5, 8, 13]

// ⚡ VANTAGEM: Código mais limpo, sem gerenciar .next() manualmente
// ⚠️ CUIDADO: for...of não dá controle sobre pausas entre iterações

Por que não usar arrays?

Arrays carregam todos os dados de uma vez, o que pode ser inviável em cenários com múltiplos gigabytes de informação. Com generators, você processa item por item, evitando estouros de memória e melhorando a performance do sistema.

memory-comparison.js
// ❌ PROBLEMA: Array gigante na memória 
// Imagine encher uma piscina inteira antes de usar uma gota
function getAllUsers() {
  const users = []; // 🏊 "Piscina" que vai crescer absurdamente
  
  // Simula buscar 1 milhão de usuários DE UMA VEZ
  for (let i = 0; i < 1000000; i++) {
    users.push({ 
      id: i, 
      name: 'User ' + i, 
      email: 'user' + i + '@email.com' 
    });
  }
  return users; // 🔥 BOOM! 200MB+ na memória
}

// ✅ SOLUÇÃO: Generator = torneira que pinga sob demanda
// Imagine uma torneira: só sai água quando você abre
function* getAllUsersStream() {
  // Processa usuários em lotes pequenos de 100
  for (let batch = 0; batch < 10000; batch++) {
    for (let i = 0; i < 100; i++) {
      const userId = batch * 100 + i;
      yield { // 💧 Uma "gota" por vez
        id: userId, 
        name: 'User ' + userId, 
        email: 'user' + userId + '@email.com' 
      };
    }
    // 🎯 AQUI: Poderia pausar, buscar do banco, fazer rate limiting
  }
}

// 🎯 USO INTELIGENTE: Só processa o que precisa
for (const user of getAllUsersStream()) {
  if (user.id > 5000) break; // 🛑 Para quando não precisar mais
  console.log(user.name);
  // 💡 Só usou 5000 usuários, economizou 195MB de RAM!
}

// 📊 COMPARAÇÃO:
// Array: 1M users = ~200MB RAM + tempo de criação
// Generator: 5K users = ~1MB RAM + parada instantânea

Processamento Sob Demanda na Prática

Imagine mover 2GB de dados entre bancos. Ao invés de usar um array com todos os dados, você pode usar generators para pegar 100 itens, processar e mover. Isso reduz o uso de memória e torna o sistema mais eficiente.

data-migration.js
// Simulação de migração de dados entre bancos
function* migrateData(source, destination, batchSize = 100) {
  let offset = 0;
  let hasMore = true;
  
  while (hasMore) {
    // Busca lote do banco origem
    const batch = source.getBatch(offset, batchSize);
    
    if (batch.length === 0) {
      hasMore = false;
      break;
    }
    
    // Processa cada item do lote
    for (const item of batch) {
      // Transforma dados se necessário
      const transformedItem = {
        ...item,
        migrated_at: new Date(),
        version: '2.0'
      };
      
      yield transformedItem;
    }
    
    offset += batchSize;
    console.log('Processados ' + offset + ' registros...');
  }
}

// Uso: move dados em lotes controlados
async function executeMigration() {
  const sourceDB = new Database('source');
  const destDB = new Database('destination');
  
  for (const item of migrateData(sourceDB, destDB, 50)) {
    await destDB.insert(item);
    
    // Controle de taxa para não sobrecarregar
    if (item.id % 1000 === 0) {
      await new Promise(resolve => setTimeout(resolve, 100));
    }
  }
  
  console.log('Migração concluída!');
}

Generators com CSV

Uma aplicação prática: leitura de um arquivo CSV linha a linha. Com yield, é possível transformar cada linha em JSON e emitir esse dado para o consumidor. Isso simula uma linha de produção, onde cada item é tratado de forma isolada.

csv-processor.js
import fs from 'fs';
import readline from 'readline';

// 📚 CONCEITO: Generator que lê arquivo CSV linha por linha
// Pense numa biblioteca onde você lê um livro página por página,
// ao invés de carregar o livro inteiro na memória
function* csvToJsonGenerator(filePath) {
  // 🔧 PASSO 1: Abre o "livro" (arquivo) para leitura
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity // 🛡️ Trata quebras de linha corretamente
  });
  
  let headers = null;   // 📋 Vai guardar os nomes das colunas
  let lineNumber = 0;   // 📊 Contador de linhas
  
  // 🔄 PROCESSA linha por linha (não carrega tudo!)
  for await (const line of rl) {
    lineNumber++;
    
    if (lineNumber === 1) {
      // 📋 PRIMEIRA LINHA: Extrai os cabeçalhos
      headers = line.split(',').map(h => h.trim());
      continue; // Pula para próxima linha
    }
    
    // 🔄 CONVERSÃO: CSV → JSON objeto por objeto
    const values = line.split(',').map(v => v.trim());
    const jsonObject = {};
    
    // 🔗 MAPEIA: cabeçalho → valor
    headers.forEach((header, index) => {
      jsonObject[header] = values[index] || '';
    });
    
    yield jsonObject; // 📤 Entrega um objeto JSON por vez
  }
}

// 🎯 USO PRÁTICO: Processamento inteligente
async function processLargeCSV() {
  let processedCount = 0;
  
  // 📖 Lê o CSV "página por página"
  for (const record of csvToJsonGenerator('./users.csv')) {
    
    // 🎯 FILTRO: Só processa usuários ativos
    if (record.status === 'active') {
      await sendEmail(record.email, 'Newsletter');
      processedCount++;
    }
    
    // 🛑 LIMITE: Para depois de 1000 emails (anti-spam)
    if (processedCount >= 1000) break;
  }
  
  console.log('Enviados ' + processedCount + ' emails');
}

// 💡 VANTAGENS desta abordagem:
// ✅ Memória constante (não importa se CSV tem 1MB ou 10GB)
// ✅ Processamento sob demanda (para quando quiser)
// ✅ Controle de rate limiting (evita spam)
// ✅ Não trava a aplicação

Iterator Helpers: A Evolução

A nova proposta Iterator Helpers traz os métodos comuns como map, filter, reduce e take para iteradores. Isso aproxima a experiência de trabalhar com arrays, mas aplicável a dados gerados sob demanda.

iterator-helpers.js
// 📚 CONCEITO: Iterator Helpers = .map(), .filter() para generators
// Pense numa linha de produção com estações de trabalho
function* infiniteNumbers() {
  let i = 0;
  while (true) {
    yield i++; // 🏭 Fábrica de números infinitos
  }
}

// 🔮 FUTURO: Iterator Helpers nativos (proposal em andamento)
// Será possível fazer isso quando estiver pronto:

// const result = infiniteNumbers()
//   .map(x => x * 2)           // 🔧 Estação 1: Multiplica por 2
//   .filter(x => x % 3 === 0)  // 🔍 Estação 2: Filtra múltiplos de 3
//   .take(5);                  // ✂️ Estação 3: Pega apenas 5

// 🛠️ POR ENQUANTO: Implementação manual dos helpers
function map(iterator, fn) {
  return (function* () {    // 🔄 Retorna novo generator
    for (const value of iterator) {
      yield fn(value);      // 🔧 Aplica transformação
    }
  })();
}

function filter(iterator, predicate) {
  return (function* () {    // 🔄 Retorna novo generator
    for (const value of iterator) {
      if (predicate(value)) { // 🔍 Testa condição
        yield value;        // ✅ Passa apenas se válido
      }
    }
  })();
}

function take(iterator, count) {
  return (function* () {    // 🔄 Retorna novo generator
    let taken = 0;
    for (const value of iterator) {
      if (taken >= count) break; // ✂️ Para no limite
      yield value;
      taken++;
    }
  })();
}

// 🏭 LINHA DE PRODUÇÃO: Combina as estações
const pipeline = take(         // 3️⃣ Pega só 5 resultados
  filter(                      // 2️⃣ Filtra múltiplos de 3
    map(infiniteNumbers(), x => x * 2), // 1️⃣ Multiplica por 2
    x => x % 3 === 0
  ),
  5
);

console.log([...pipeline]); // [0, 6, 12, 18, 24]

// 💡 VANTAGEM: Processamento lazy - só calcula quando precisa!
// Mesmo sendo "infinito", só processa 5 números

A evolução do JavaScript moderno

Node.js está se modernizando há muito tempo. Hoje em dia você não necessariamente precisa usar aquelas classes de Stream diretamente. Você pode passar um async generator para ele e ele já vai saber trabalhar. O código fica muito mais simples e elegante.

A primeira proposta que está disponível na maioria dos lugares que rodam JavaScript são os Iterator Helpers. Se fosse para resumir: é pensar que funções como map, filter e reduce, que temos em arrays, agora podem se aplicar para funções de processamento sob demanda.

Node.js já implementou as funções de filter, map e outras que vão estar disponíveis no JavaScript no futuro. Você pode trabalhar com map, filter, take e vários outros métodos, como se estivesse trabalhando com dados em memória, mas agora processando dados sob demanda.

Async Iterator Helpers

Versão assincrona dos helpers permite processar dados que dependem de awaits, como streams de dados ou requisições assíncronas. Você pode filtrar, transformar e consumir dados que chegam em momentos diferentes.

async-iterator-helpers.js
// Async Generator que simula requisições
function* fetchUsersAsync() {
  const userIds = [1, 2, 3, 4, 5];
  
  for (const id of userIds) {
    // Simula delay de requisição
    yield new Promise(async (resolve) => {
      await new Promise(r => setTimeout(r, 500));
      resolve({
        id,
        name: 'User ' + id,
        email: 'user' + id + '@example.com',
        active: id % 2 === 0
      });
    });
  }
}

// Helpers assíncronos customizados
function mapAsync(asyncIterator, fn) {
  return (async function* () {
    for await (const value of asyncIterator) {
      yield fn(value);
    }
  })();
}

function filterAsync(asyncIterator, predicate) {
  return (async function* () {
    for await (const value of asyncIterator) {
      if (predicate(value)) {
        yield value;
      }
    }
  })();
}

// Uso prático: processa usuários ativos com transformação
async function processActiveUsers() {
  const pipeline = filterAsync(
    mapAsync(fetchUsersAsync(), user => ({ ...user, processed: true })),
    user => user.active
  );
  
  for await (const user of pipeline) {
    console.log('Processando usuário ativo: ' + user.name);
    // Aqui você faria operações como envio de emails, etc.
  }
}

Exemplo Prático com Iterator Helpers

Suponha um iterador que gera nomes. Podemos aplicar map para transformar os dados, filter para selecionar e take para limitar resultados. Tudo isso sob demanda, sem criar listas intermediárias.

practical-iterator-example.js
// Generator que simula base de dados de usuários
function* generateUsers() {
  const names = ['Ana', 'Bruno', 'Carlos', 'Diana', 'Eduardo', 'Fernanda'];
  const domains = ['gmail.com', 'yahoo.com', 'hotmail.com'];
  
  for (let i = 0; i < 1000; i++) {
    const name = names[i % names.length];
    const domain = domains[i % domains.length];
    
    yield {
      id: i + 1,
      name: name + (i + 1),
      email: name.toLowerCase() + (i + 1) + '@' + domain,
      age: 18 + (i % 60),
      premium: Math.random() > 0.7
    };
  }
}

// Pipeline de transformação com helpers
const processedUsers = take(
  filter(
    map(
      generateUsers(),
      user => ({
        ...user,
        fullEmail: user.email.toUpperCase(),
        category: user.age < 30 ? 'jovem' : user.age < 50 ? 'adulto' : 'senior'
      })
    ),
    user => user.premium && user.age >= 25
  ),
  10
);

// Consumo eficiente - apenas 10 usuários processados
console.log('Usuários premium processados:');
for (const user of processedUsers) {
  console.log(user.name + ' (' + user.category + ') - ' + user.fullEmail);
}

// Exemplo de pipeline para relatório
function generateReport() {
  const stats = { jovem: 0, adulto: 0, senior: 0 };
  
  const reportPipeline = take(
    filter(
      map(generateUsers(), user => ({ ...user, category: user.age < 30 ? 'jovem' : user.age < 50 ? 'adulto' : 'senior' })),
      user => user.premium
    ),
    100 // Amostra de 100 usuários premium
  );
  
  for (const user of reportPipeline) {
    stats[user.category]++;
  }
  
  return stats;
}

console.log('Relatório de usuários premium:', generateReport());

Lendo CSV de Forma Incorreta

Uma prática ruim é ler um arquivo grande e empilhar os dados em listas com on('data'). Embora funcione, isso coloca tudo em memória e pode quebrar com arquivos muito grandes, além de prejudicar o desempenho.

csv-anti-patterns.js
import fs from 'fs';

// ❌ ANTI-PADRÃO: Acumula tudo em memória
function readCSVBadWay(filePath) {
  const data = [];
  const stream = fs.createReadStream(filePath);
  
  stream.on('data', (chunk) => {
    // 🔥 PROBLEMA: Acumula chunks em array
    data.push(chunk.toString());
  });
  
  stream.on('end', () => {
    // 🔥 PROBLEMA: Só processa quando tudo está em memória
    const fullContent = data.join('');
    const lines = fullContent.split('\n');
    
    // Se o arquivo for 2GB, vai consumir 2GB de RAM!
    lines.forEach(line => {
      console.log('Processando:', line);
    });
  });
}

// ❌ OUTRO ANTI-PADRÃO: Usando readFileSync
function readCSVSyncBadWay(filePath) {
  // 🔥 PROBLEMA: Bloqueia thread e carrega tudo na memória
  const content = fs.readFileSync(filePath, 'utf-8');
  const lines = content.split('\n');
  
  // Para arquivos grandes, isso pode travar a aplicação
  return lines.map(line => {
    const columns = line.split(',');
    return {
      id: columns[0],
      name: columns[1],
      email: columns[2]
    };
  });
}

// 🚨 Problemas dessas abordagens:
// 1. Consumo excessivo de memória
// 2. Travamento da aplicação
// 3. Falha para arquivos grandes (>1GB)
// 4. Não aproveitamento do processamento sob demanda
// 5. Dificuldade para implementar pause/resume

// Exemplo de como isso quebra:
try {
  readCSVBadWay('./arquivo-2gb.csv'); // 💥 Out of memory
} catch (error) {
  console.error('Falhou:', error.message);
}

Streams e Transform

Streams resolvem isso usando uma abordagem de pipeline. A leitura via Readable, transformação via Transform e gravação com Writable garantem que cada pedaço é tratado e descartado antes do próximo chegar.

streams-transform.js
import fs from 'fs';
import { Transform, pipeline } from 'stream';
import { promisify } from 'util';

const pipelineAsync = promisify(pipeline);

// 📚 CONCEITO: Transform Stream = estação de trabalho numa linha de produção
// Recebe dados "crus", processa e envia "processados" para próxima estação
class CSVTransform extends Transform {
  constructor() {
    super({ objectMode: true }); // 🎯 Trabalha com objetos, não bytes
    this.buffer = '';      // 📦 "Caixa" para guardar dados incompletos
    this.lineNumber = 0;   // 📊 Contador de linhas processadas
    this.headers = null;   // 📋 Nomes das colunas do CSV
  }
  
  // 🔄 FUNÇÃO PRINCIPAL: Processa cada "pedaço" que chega
  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString(); // 📝 Adiciona novo texto ao buffer
    const lines = this.buffer.split('\n'); // ✂️ Corta em linhas
    
    // 🔒 IMPORTANTE: Guarda a última linha (pode estar incompleta)
    this.buffer = lines.pop();
    
    // 🔄 Processa cada linha completa
    for (const line of lines) {
      if (line.trim()) { // ⚡ Ignora linhas vazias
        this.lineNumber++;
        
        if (this.lineNumber === 1) {
          // 📋 PRIMEIRA LINHA: Extrai cabeçalhos
          this.headers = line.split(',').map(h => h.trim());
        } else {
          // 🔄 CONVERSÃO: linha CSV → objeto JSON
          const values = line.split(',').map(v => v.trim());
          const record = {};
          
          // 🔗 MAPEIA: header[0] → value[0], header[1] → value[1], etc.
          this.headers.forEach((header, index) => {
            record[header] = values[index] || '';
          });
          
          // 📤 ENVIA para próxima estação da linha de produção
          this.push(record);
        }
      }
    }
    
    callback(); // ✅ Sinaliza: "terminei de processar este chunk"
  }
  
  // 🏁 FINALIZAÇÃO: Processa última linha que sobrou no buffer
  _flush(callback) {
    if (this.buffer.trim()) {
      const values = this.buffer.split(',').map(v => v.trim());
      const record = {};
      
      this.headers.forEach((header, index) => {
        record[header] = values[index] || '';
      });
      
      this.push(record); // 📤 Envia último registro
    }
    callback(); // ✅ Sinaliza: "terminei completamente"
  }
}

// ✅ Writable Stream para processar registros
class RecordProcessor extends Transform {
  constructor() {
    super({ objectMode: true });
    this.processedCount = 0;
  }
  
  _transform(record, encoding, callback) {
    // Processa apenas usuários ativos
    if (record.status === 'active') {
      this.processedCount++;
      
      // Adiciona timestamp de processamento
      const processedRecord = {
        ...record,
        processed_at: new Date().toISOString(),
        batch_id: Math.floor(this.processedCount / 100) + 1
      };
      
      this.push(JSON.stringify(processedRecord) + '\n');
      
      // Log a cada 1000 registros
      if (this.processedCount % 1000 === 0) {
        console.log('Processados ' + this.processedCount + ' registros ativos');
      }
    }
    
    callback();
  }
}

// ✅ Uso com pipeline eficiente
async function processLargeCSV() {
  try {
    await pipelineAsync(
      fs.createReadStream('./users.csv'),     // Readable
      new CSVTransform(),                      // Transform CSV→JSON
      new RecordProcessor(),                   // Transform Filter+Process
      fs.createWriteStream('./processed.jsonl') // Writable
    );
    
    console.log('Processamento concluído!');
  } catch (error) {
    console.error('Erro no pipeline:', error);
  }
}

// 🚀 EXECUÇÃO: Inicia a linha de produção completa
processLargeCSV();

// 💡 RESULTADO: 
// ✅ Arquivo de 2GB processado com apenas ~50MB de RAM
// ✅ Processamento pausável e retomável
// ✅ Progress feedback em tempo real
// ✅ Error handling individual por linha
// ✅ Pipeline escalável para qualquer tamanho de arquivo

JSON por linha com Stream

Você pode transformar CSV em JSON-L (um JSON por linha) usando stream. Cada linha é convertida via biblioteca csvtojson, filtrada e salva isoladamente. Isso evita acúmulos em memória mesmo com arquivos grandes.

csv-to-jsonl.js
import fs from 'fs';
import { Transform, pipeline } from 'stream';
import { promisify } from 'util';
import csv from 'csvtojson'; // npm install csvtojson

const pipelineAsync = promisify(pipeline);

// Transform que filtra e formata registros para JSON-L
class JSONLTransform extends Transform {
  constructor(filterFn = () => true) {
    super({ objectMode: true });
    this.filterFn = filterFn;
    this.recordCount = 0;
    this.filteredCount = 0;
  }
  
  _transform(record, encoding, callback) {
    this.recordCount++;
    
    // Aplica filtro customizado
    if (this.filterFn(record)) {
      this.filteredCount++;
      
      // Adiciona metadados úteis
      const enrichedRecord = {
        ...record,
        _metadata: {
          processed_at: new Date().toISOString(),
          record_number: this.recordCount,
          filtered_number: this.filteredCount
        }
      };
      
      // Emite como JSON-L (uma linha JSON por registro)
      this.push(JSON.stringify(enrichedRecord) + '\n');
    }
    
    // Progress feedback
    if (this.recordCount % 10000 === 0) {
      console.log('Processados: ' + this.recordCount + ', Filtrados: ' + this.filteredCount);
    }
    
    callback();
  }
  
  _flush(callback) {
    console.log('Processamento concluído: ' + this.filteredCount + '/' + this.recordCount + ' registros mantidos');
    callback();
  }
}

// Função principal para converter CSV em JSON-L
async function csvToJsonL(inputFile, outputFile, filterFunction) {
  try {
    await pipelineAsync(
      fs.createReadStream(inputFile),
      
      // Converte CSV para objetos JSON
      csv({
        trim: true,
        checkType: true,
        ignoreEmpty: true
      }),
      
      // Filtra e transforma para JSON-L
      new JSONLTransform(filterFunction),
      
      // Salva resultado
      fs.createWriteStream(outputFile)
    );
    
    console.log('Conversão concluída: ' + inputFile + '' + outputFile);
    
  } catch (error) {
    console.error('Erro na conversão:', error);
  }
}

// Exemplo de uso com filtros diferentes

// 1. Filtrar apenas usuários ativos
csvToJsonL(
  './users.csv',
  './active-users.jsonl',
  (user) => user.status === 'active'
);

// 2. Filtrar usuários premium com idade > 25
csvToJsonL(
  './users.csv',
  './premium-users.jsonl',
  (user) => user.premium === 'true' && parseInt(user.age) > 25
);

// 3. Converter tudo sem filtro
csvToJsonL(
  './products.csv',
  './products.jsonl',
  () => true // Mantém todos os registros
);

// Exemplo de leitura do JSON-L gerado
function readJSONL(filePath, callback) {
  const readable = fs.createReadStream(filePath, { encoding: 'utf-8' });
  let buffer = '';
  
  readable.on('data', (chunk) => {
    buffer += chunk;
    const lines = buffer.split('\n');
    buffer = lines.pop(); // Guarda linha incompleta
    
    lines.forEach(line => {
      if (line.trim()) {
        try {
          const record = JSON.parse(line);
          callback(record);
        } catch (err) {
          console.error('Erro ao parsear linha:', line);
        }
      }
    });
  });
  
  readable.on('end', () => {
    if (buffer.trim()) {
      try {
        const record = JSON.parse(buffer);
        callback(record);
      } catch (err) {
        console.error('Erro ao parsear última linha:', buffer);
      }
    }
  });
}

// Teste de leitura
readJSONL('./active-users.jsonl', (user) => {
  console.log('Usuário ativo: ' + user.name + ' - ' + user.email);
});

Diferenças entre Node.js e JavaScript puro

É importante entender a diferença entre o que está disponível no Node.js versus JavaScript puro. Quando você está trabalhando com async generators no JavaScript no momento atual, eles ainda não têm aquelas funções de filter e map nativamente.

A alternativa que o Node.js encontrou foi primeiro transformar numa Readable Stream e aí sim você tem esses métodos. Quando os Iterator Helpers forem aprovados 100% em JavaScript, e os Async Iterator Helpers entrarem também, não vamos precisar fazer esse Readable.from().

Isso acontece porque o módulo Streams é parte do Node.js, e a sintaxe JavaScript que interpreta o código é parte do interpretador V8. Node.js se adiantou e adicionou os helpers em cima de streams, não em cima de async iterators.

O futuro da especificação

Por essas propostas estarem em estágio de desenvolvimento, pode ser que nas próximas versões do Node.js mude o funcionamento interno. Honestamente, é improvável que mude a API pública. Pode adicionar mais operadores, refinar aqui e ali, mas alterações que quebrem projetos existentes são difíceis de acontecer.

Por isso, use e abuse desses helpers e principalmente de funções de processamento sob demanda, como streams em Node.js e JavaScript em geral. Esse tipo de conhecimento vale para desenvolvedores de todos os níveis.

Evite Processamento em Lote

Guardar tudo em memória para só depois processar é um anti-padrão. Prefira pipelines de stream que processam e liberam os dados imediatamente. Isso é essencial em ambientes com limitações de memória ou tempo real.

avoid-batch-processing.js
// ❌ ANTI-PADRÃO: Processamento em lote (batch)
function processBatchData(filePath) {
  const allRecords = [];
  
  return new Promise((resolve, reject) => {
    fs.createReadStream(filePath)
      .pipe(csv())
      .on('data', (record) => {
        // 🔥 PROBLEMA: Acumula TUDO em memória
        allRecords.push(record);
      })
      .on('end', () => {
        // 🔥 PROBLEMA: Só processa quando termina de ler
        console.log('Iniciando processamento de ' + allRecords.length + ' registros...');
        
        const processedRecords = allRecords
          .filter(record => record.status === 'active')
          .map(record => ({ ...record, processed: true }))
          .slice(0, 1000); // Limita por memória
        
        resolve(processedRecords);
      })
      .on('error', reject);
  });
}

// ✅ BOA PRÁTICA: Processamento em stream
function processStreamData(inputPath, outputPath) {
  let processedCount = 0;
  let activeCount = 0;
  
  const processTransform = new Transform({
    objectMode: true,
    transform(record, encoding, callback) {
      processedCount++;
      
      // Processa imediatamente, sem acumular
      if (record.status === 'active') {
        activeCount++;
        
        // Transformação assíncrona se necessário
        const enrichedRecord = {
          ...record,
          processed: true,
          processed_at: new Date().toISOString(),
          batch_id: Math.floor(activeCount / 100)
        };
        
        // Libera imediatamente para próximo stream
        this.push(JSON.stringify(enrichedRecord) + '\n');
        
        // Feedback em tempo real
        if (processedCount % 1000 === 0) {
          console.log('Processados: ' + processedCount + ', Ativos: ' + activeCount);
        }
      }
      
      callback();
    },
    
    flush(callback) {
      console.log('Stream concluído: ' + activeCount + '/' + processedCount + ' ativos');
      callback();
    }
  });
  
  return pipeline(
    fs.createReadStream(inputPath),
    csv(),
    processTransform,
    fs.createWriteStream(outputPath),
    (err) => {
      if (err) {
        console.error('Pipeline falhou:', err);
      } else {
        console.log('Pipeline concluído com sucesso!');
      }
    }
  );
}

// ⚡ Comparação de performance e memória
const startTime = Date.now();
const startMemory = process.memoryUsage();

// Método Stream (recomendado)
processStreamData('./large-file.csv', './output-stream.jsonl')
  .then(() => {
    const endTime = Date.now();
    const endMemory = process.memoryUsage();
    
    console.log('\n=== RESULTADOS STREAM ===');
    console.log('Tempo: ' + (endTime - startTime) + 'ms');
    console.log('Memória usada: ' + ((endMemory.heapUsed - startMemory.heapUsed) / 1024 / 1024) + 'MB');
    console.log('Memória pico: ' + (endMemory.heapUsed / 1024 / 1024) + 'MB');
  });

// Evite o método batch para arquivos grandes!
// processBatchData('./large-file.csv') // 💥 Pode estourar memória

Readable → Transform → Writable

Esse trio é a base do processamento em stream. Um Readable emite os dados, o Transform processa cada pedaço, e o Writable grava os dados transformados. Cada elemento é responsável por uma fase do fluxo.

stream-pipeline.js
import { Readable, Transform, Writable, pipeline } from 'stream';
import { promisify } from 'util';

const pipelineAsync = promisify(pipeline);

// 1️⃣ READABLE: Fonte de dados
class DataSource extends Readable {
  constructor(options = {}) {
    super({ objectMode: true, ...options });
    this.currentId = 1;
    this.maxRecords = options.maxRecords || 1000;
  }
  
  _read() {
    if (this.currentId <= this.maxRecords) {
      // Simula geração de dados sob demanda
      const record = {
        id: this.currentId,
        name: 'User ' + this.currentId,
        email: 'user' + this.currentId + '@example.com',
        createdAt: new Date().toISOString(),
        type: this.currentId % 3 === 0 ? 'premium' : 'regular'
      };
      
      this.push(record);
      this.currentId++;
      
      // Simula delay entre registros
      if (this.currentId % 100 === 0) {
        setTimeout(() => {}, 10);
      }
    } else {
      // Sinaliza fim do stream
      this.push(null);
    }
  }
}

// 2️⃣ TRANSFORM: Processamento de dados
class DataProcessor extends Transform {
  constructor() {
    super({ objectMode: true });
    this.processedCount = 0;
    this.premiumCount = 0;
  }
  
  _transform(record, encoding, callback) {
    this.processedCount++;
    
    // Aplica lógica de transformação
    if (record.type === 'premium') {
      this.premiumCount++;
      
      // Enriquece dados premium
      const enrichedRecord = {
        ...record,
        plan: 'premium',
        features: ['feature1', 'feature2', 'feature3'],
        processed_at: new Date().toISOString(),
        priority: 'high'
      };
      
      this.push(enrichedRecord);
    } else {
      // Transformação básica para regular
      const basicRecord = {
        ...record,
        plan: 'basic',
        features: ['feature1'],
        processed_at: new Date().toISOString(),
        priority: 'normal'
      };
      
      this.push(basicRecord);
    }
    
    // Log de progresso
    if (this.processedCount % 250 === 0) {
      console.log('Transformados: ' + this.processedCount + ' (' + this.premiumCount + ' premium)');
    }
    
    callback();
  }
}

// 3️⃣ WRITABLE: Destino dos dados
class DataSink extends Writable {
  constructor() {
    super({ objectMode: true });
    this.savedCount = 0;
    this.premiumSaved = 0;
  }
  
  _write(record, encoding, callback) {
    this.savedCount++;
    
    // Simula salvamento (banco, arquivo, API, etc.)
    if (record.plan === 'premium') {
      this.premiumSaved++;
      // Salvamento especial para premium
      this.savePremiumUser(record);
    } else {
      // Salvamento padrão
      this.saveRegularUser(record);
    }
    
    // Feedback a cada 200 salvamentos
    if (this.savedCount % 200 === 0) {
      console.log('Salvos: ' + this.savedCount + ' (' + this.premiumSaved + ' premium)');
    }
    
    callback();
  }
  
  savePremiumUser(record) {
    // Simula operação de salvamento premium
    console.log('⭐ Premium: ' + record.name + ' (' + record.features.length + ' features)');
  }
  
  saveRegularUser(record) {
    // Simula operação de salvamento regular
    console.log('👤 Regular: ' + record.name);
  }
  
  _final(callback) {
    console.log('
✅ Pipeline concluído: ' + this.savedCount + ' usuarios salvos');
    console.log('   - Premium: ' + this.premiumSaved);
    console.log('   - Regular: ' + (this.savedCount - this.premiumSaved));
    callback();
  }
}

// 🚀 Execução do pipeline completo
async function runDataPipeline() {
  console.log('Iniciando pipeline Readable → Transform → Writable...');
  
  try {
    await pipelineAsync(
      new DataSource({ maxRecords: 1000 }),  // Fonte
      new DataProcessor(),                   // Processamento
      new DataSink()                         // Destino
    );
    
    console.log('\n🎉 Pipeline executado com sucesso!');
    
  } catch (error) {
    console.error('🚨 Erro no pipeline:', error);
  }
}

// Inicia o processamento
runDataPipeline();

// 📊 Benefícios desta arquitetura:
// 1. Memória constante independente do tamanho dos dados
// 2. Processamento paralelo (backpressure automático)
// 3. Falha rápida com error handling
// 4. Componentes reutilizáveis e testáveis
// 5. Escalabilidade horizontal

Evite memória compartilhada

Evite estratégias que acumulam os dados esperando o término da leitura. Use callbacks ou streams para lidar com cada parte do caminho. Isso mantém o sistema performático e resiliente.

avoid-shared-memory.js
// ❌ PROBLEMA: Memória compartilhada (shared state)
class BadDataProcessor {
  constructor() {
    this.allData = []; // 🔥 Acumula tudo na memória
    this.results = []; // 🔥 Mais acúmulo
    this.errors = [];  // 🔥 Ainda mais memória
  }
  
  async processFile(filePath) {
    // Lê tudo de uma vez
    const stream = fs.createReadStream(filePath);
    
    for await (const chunk of stream) {
      this.allData.push(chunk); // 🔥 Cresce indefinidamente
    }
    
    // Processa tudo de uma vez
    for (const item of this.allData) {
      try {
        const result = await this.processItem(item);
        this.results.push(result); // 🔥 Duplica uso de memória
      } catch (error) {
        this.errors.push(error); // 🔥 Acumula erros também
      }
    }
    
    return this.results; // ⚠️ Retorna array gigante
  }
}

// ✅ SOLUÇÃO: Stream sem estado compartilhado
class GoodDataProcessor {
  constructor(onResult, onError) {
    this.onResult = onResult;   // Callback para sucesso
    this.onError = onError;     // Callback para erro
    this.processedCount = 0;
  }
  
  async processFile(filePath) {
    const stream = fs.createReadStream(filePath);
    
    for await (const chunk of stream) {
      try {
        // Processa imediatamente sem guardar
        const result = await this.processItem(chunk);
        
        // Envia resultado e descarta
        this.onResult(result);
        this.processedCount++;
        
        // Log de progresso sem guardar histórico
        if (this.processedCount % 1000 === 0) {
          console.log('Processados: ' + this.processedCount);
        }
        
      } catch (error) {
        // Trata erro e descarta
        this.onError(error);
      }
    }
    
    return this.processedCount; // Retorna apenas contador
  }
  
  async processItem(chunk) {
    // Processa sem guardar estado
    return {
      id: Date.now(),
      data: chunk.toString().toUpperCase(),
      timestamp: new Date().toISOString()
    };
  }
}

// 🎯 Uso correto com callbacks
function processLargeFile() {
  const results = []; // Apenas para este exemplo
  let errorCount = 0;
  
  const processor = new GoodDataProcessor(
    (result) => {
      // Processa resultado imediatamente
      console.log('Resultado: ' + result.id);
      
      // Se precisar guardar, faça em lotes pequenos
      results.push(result);
      if (results.length >= 100) {
        // Salva lote e limpa memória
        saveBatch(results.splice(0, 100));
      }
    },
    (error) => {
      errorCount++;
      console.error('Erro #' + errorCount + ':', error.message);
      // Não acumula o erro, apenas conta
    }
  );
  
  return processor.processFile('./large-file.txt');
}

// 📊 Comparação de uso de memória
function memoryUsageExample() {
  const startMemory = process.memoryUsage().heapUsed;
  
  console.log('=== TESTE DE MEMÓRIA ===');
  console.log('Início: ' + (startMemory / 1024 / 1024).toFixed(2) + 'MB');
  
  // Simula processamento stream
  processLargeFile().then((count) => {
    const endMemory = process.memoryUsage().heapUsed;
    const difference = endMemory - startMemory;
    
    console.log('Fim: ' + (endMemory / 1024 / 1024).toFixed(2) + 'MB');
    console.log('Diferença: ' + (difference / 1024 / 1024).toFixed(2) + 'MB');
    console.log('Processados: ' + count + ' itens');
  });
}

// Helper para salvar lotes
function saveBatch(batch) {
  // Simula salvamento em banco/arquivo
  console.log('Salvando lote de ' + batch.length + ' itens...');
  // batch é descartado após esta função
}

memoryUsageExample();

Usando o for await...of

Em funções assíncronas, o for await...of permite consumir iteradores que retornam Promises. Ideal para pipelines onde a origem dos dados ou transformações são assíncronas.

for-await-of.js
// Async Generator que simula busca de dados em API
async function* fetchUsersFromAPI(userIds) {
  for (const userId of userIds) {
    try {
      // Simula requisição HTTP
      const response = await fetch('https://api.example.com/users/' + userId + '');
      const user = await response.json();
      
      yield {
        ...user,
        fetched_at: new Date().toISOString(),
        source: 'api'
      };
      
      // Rate limiting: pausa entre requisições
      await new Promise(resolve => setTimeout(resolve, 100));
      
    } catch (error) {
     console.error(`Erro ao buscar usuário ${userId}: ${error.message}`);
      
      // Yield de erro para manter o fluxo
      yield {
        id: userId,
        error: error.message,
        fetched_at: new Date().toISOString(),
        source: 'error'
      };
    }
  }
}

// Async Generator para processamento de dados
async function* processUsersAsync(userStream) {
  for await (const user of userStream) {
    if (user.error) {
      // Pula usuários com erro
      continue;
    }
    
    // Simula processamento assíncrono
    const enrichedUser = await enrichUserData(user);
    
    // Aplica validações
    if (enrichedUser.email && enrichedUser.active) {
      yield enrichedUser;
    }
  }
}

// Função de enriquecimento assíncrona
async function enrichUserData(user) {
  // Simula busca de dados adicionais
  const profile = await fetchUserProfile(user.id);
  const permissions = await fetchUserPermissions(user.id);
  
  return {
    ...user,
    profile,
    permissions,
    enriched_at: new Date().toISOString()
  };
}

// Simulação de funções de API
async function fetchUserProfile(userId) {
  await new Promise(resolve => setTimeout(resolve, 50));
  return {
    avatar: 'https://api.example.com/avatars/' + userId + '.jpg',
    bio: 'Biografia do usuário ' + userId + '',
    preferences: { theme: 'dark', language: 'pt-BR' }
  };
}

async function fetchUserPermissions(userId) {
  await new Promise(resolve => setTimeout(resolve, 30));
  return {
    canRead: true,
    canWrite: userId % 2 === 0,
    canDelete: userId % 3 === 0,
    role: userId % 2 === 0 ? 'admin' : 'user'
  };
}

// 🎯 Uso prático do for await...of
async function processUsersPipeline() {
  const userIds = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
  
  console.log('Iniciando processamento assíncrono de usuários...');
  
  let processedCount = 0;
  let adminCount = 0;
  
  // Pipeline completo com for await...of
  const userStream = fetchUsersFromAPI(userIds);
  const processedStream = processUsersAsync(userStream);
  
  for await (const user of processedStream) {
    processedCount++;
    
    if (user.permissions.role === 'admin') {
      adminCount++;
      console.log('👑 Admin processado: ' + user.name + '');
    } else {
      console.log('👤 Usuário processado: ' + user.name + '');
    }
    
    // Simula salvamento assíncrono
    await saveUserToDatabase(user);
  }
  
  console.log('\\n✅ Processamento concluído:');
  console.log('   - Total processados: ' + processedCount + '');
  console.log('   - Admins: ' + adminCount + '');
  console.log('   - Usuários regulares: ' + processedCount - adminCount + '');
}

// Simula salvamento em banco
async function saveUserToDatabase(user) {
  // Simula operação de banco de dados
  await new Promise(resolve => setTimeout(resolve, 20));
  console.log('💾 Usuário ' + user.id + ' salvo no banco');
}

// 🚀 Exemplo com error handling robusto
async function robustUserProcessing() {
  try {
    await processUsersPipeline();
  } catch (error) {
    console.error('❌ Erro no pipeline:', error);
    
    // Implementa retry logic se necessário
    console.log('🔄 Tentando novamente em 5 segundos...');
    setTimeout(() => robustUserProcessing(), 5000);
  }
}

// Executa o processamento
robustUserProcessing();

// 📈 Benefícios do for await...of:
// 1. Processa streams assíncronos naturalmente
// 2. Backpressure automático
// 3. Error handling integrado
// 4. Código limpo e legível
// 5. Compatível com qualquer async iterator

Combinações poderosas

Generator Functions + Iterator Helpers + Async Iterators + Streams = pipeline robusto e enxuto. Essa composição te dá controle total de processamento mesmo em dados massivos.

powerful-combinations.js
import fs from 'fs';
import { Transform, pipeline } from 'stream';
import { promisify } from 'util';

const pipelineAsync = promisify(pipeline);

// 🧩 COMBINAÇÃO 1: Generator + Iterator Helpers
function* generateData(count) {
  for (let i = 1; i <= count; i++) {
    yield {
      id: i,
      value: Math.random() * 100,
      category: i % 3 === 0 ? 'premium' : 'regular',
      timestamp: new Date().toISOString()
    };
  }
}

// Iterator Helpers customizados
function take(iterator, count) {
  return (function* () {
    let taken = 0;
    for (const value of iterator) {
      if (taken >= count) break;
      yield value;
      taken++;
    }
  })();
}

function filter(iterator, predicate) {
  return (function* () {
    for (const value of iterator) {
      if (predicate(value)) yield value;
    }
  })();
}

function map(iterator, transform) {
  return (function* () {
    for (const value of iterator) {
      yield transform(value);
    }
  })();
}

// 🧩 COMBINAÇÃO 2: Async Generator + Stream
async function* asyncDataProcessor(inputStream) {
  let buffer = '';
  
  for await (const chunk of inputStream) {
    buffer += chunk.toString();
    const lines = buffer.split('\n');
    buffer = lines.pop(); // Guarda linha incompleta
    
    for (const line of lines) {
      if (line.trim()) {
        try {
          // Processa linha assincronamente
          const data = JSON.parse(line);
          const enriched = await enrichData(data);
          yield enriched;
        } catch (error) {
          console.error('Erro ao processar linha:', line);
        }
      }
    }
  }
  
  // Processa última linha
  if (buffer.trim()) {
    try {
      const data = JSON.parse(buffer);
      const enriched = await enrichData(data);
      yield enriched;
    } catch (error) {
      console.error('Erro na última linha:', buffer);
    }
  }
}

async function enrichData(data) {
  // Simula enriquecimento assíncrono
  await new Promise(resolve => setTimeout(resolve, 10));
  
  return {
    ...data,
    enriched: true,
    processed_at: new Date().toISOString(),
    score: Math.random() * 10
  };
}

// 🧩 COMBINAÇÃO 3: Transform Stream + Generators
class GeneratorTransform extends Transform {
  constructor(generatorFn) {
    super({ objectMode: true });
    this.generatorFn = generatorFn;
  }
  
  _transform(chunk, encoding, callback) {
    try {
      // Usa generator para processar chunk
      const generator = this.generatorFn(chunk);
      
      for (const result of generator) {
        this.push(result);
      }
      
      callback();
    } catch (error) {
      callback(error);
    }
  }
}

// Generator para transformação
function* processChunk(data) {
  if (Array.isArray(data)) {
    for (const item of data) {
      if (item.value > 50) {
        yield { ...item, category: 'high-value' };
      }
    }
  } else {
    if (data.value > 50) {
      yield { ...data, category: 'high-value' };
    }
  }
}

// 🚀 PIPELINE COMPLETO: Combinando todas as técnicas
async function masterPipeline() {
  console.log('🎯 Iniciando pipeline completo...');
  
  // 1. Geração de dados com generators + helpers
  const dataGenerator = map(
    filter(
      take(generateData(10000), 5000),
      item => item.value > 30
    ),
    item => ({ ...item, filtered: true })
  );
  
  // 2. Converte generator em stream
  const dataArray = [...dataGenerator];
  console.log('📊 Dados filtrados: ' + dataArray.length + ' itens');
  
  // 3. Processa com async generator + stream
  await pipelineAsync(
    // Input stream simulado
    new Readable({
      objectMode: true,
      read() {
        const item = dataArray.shift();
        this.push(item || null);
      }
    }),
    
    // Transform usando generator
    new GeneratorTransform(processChunk),
    
    // Output stream
    new Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        console.log('✅ Processado: ID ' + chunk.id + ', Valor: ${chunk.value.toFixed(2)}, Categoria: ${chunk.category}');
        callback();
      }
    })
  );
  
  console.log('🎉 Pipeline completo finalizado!');
}

// 📈 Pipeline de análise em tempo real
async function realTimeAnalytics() {
  let totalProcessed = 0;
  let highValueCount = 0;
  let premiumCount = 0;
  
  const analyticsGenerator = map(
    filter(
      generateData(1000),
      item => item.value > 20
    ),
    item => {
      totalProcessed++;
      if (item.value > 70) highValueCount++;
      if (item.category === 'premium') premiumCount++;
      
      return {
        ...item,
        analytics: {
          total: totalProcessed,
          highValue: highValueCount,
          premium: premiumCount,
          percentage: ((highValueCount / totalProcessed) * 100).toFixed(2)
        }
      };
    }
  );
  
  // Consome com for...of
  for (const item of analyticsGenerator) {
    if (item.id % 100 === 0) {
      console.log('📊 Analytics: ' + item.analytics.percentage + '% high-value items');
    }
  }
  
  console.log('📈 Análise final: ' + highValueCount + '/${totalProcessed} high-value (${((highValueCount/totalProcessed)*100).toFixed(2)}%)');
}

// Executa exemplos
master pipeline();
realTimeAnalytics();

// 💡 Esta combinação oferece:
// ✅ Controle de memória (generators)
// ✅ Processamento assíncrono (async generators)
// ✅ Pipelines eficientes (streams)
// ✅ Transformação declarativa (iterator helpers)
// ✅ Escalabilidade horizontal
// ✅ Error handling robusto

Otimize código com modernos recursos

Com os novos recursos do Node.js, evitar anti-padrões e promover pipelines desacoplados ficou mais fácil. Quanto mais linha de produção você simular, melhor será o desempenho da sua aplicação em escala.

modern-optimized-processing.js
// 🚀 Sistema de processamento moderno otimizado
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';
import { performance } from 'perf_hooks';
import cluster from 'cluster';
import os from 'os';

// 🧩 1. Generator com lazy evaluation
function* createOptimizedDataStream(source, batchSize = 1000) {
  let offset = 0;
  
  while (true) {
    const batch = source.getBatch(offset, batchSize);
    
    if (batch.length === 0) break;
    
    // Yield com metadata para monitoramento
    for (const item of batch) {
      yield {
        ...item,
        _meta: {
          batch: Math.floor(offset / batchSize),
          position: offset,
          timestamp: performance.now()
        }
      };
      offset++;
    }
    
    // Permite garbage collection entre batches
    if (offset % 10000 === 0) {
      setImmediate(() => {});
    }
  }
}

// 🧩 2. Iterator Helpers modernos com performance
const iteratorHelpers = {
  map: function(iterator, fn) {
    return (function* () {
      for (const value of iterator) {
        yield fn(value);
      }
    })();
  },
  
  filter: function(iterator, predicate) {
    return (function* () {
      for (const value of iterator) {
        if (predicate(value)) yield value;
      }
    })();
  },
  
  take: function(iterator, count) {
    return (function* () {
      let taken = 0;
      for (const value of iterator) {
        if (taken >= count) break;
        yield value;
        taken++;
      }
    })();
  },
  
  // Novo: chunking para processamento paralelo
  chunk: function* (iterator, size) {
    let chunk = [];
    
    for (const value of iterator) {
      chunk.push(value);
      
      if (chunk.length === size) {
        yield [...chunk]; // Clone para evitar mutação
        chunk = [];
      }
    }
    
    if (chunk.length > 0) {
      yield chunk;
    }
  },
  
  // Novo: parallel mapping com workers
  async parallelMap(iterator, asyncFn, concurrency = os.cpus().length) {
    const chunks = [...this.chunk(iterator, 100)];
    const results = [];
    
    for (let i = 0; i < chunks.length; i += concurrency) {
      const batch = chunks.slice(i, i + concurrency);
      
      const promises = batch.map(chunk => 
        this.processChunkInWorker(chunk, asyncFn)
      );
      
      const batchResults = await Promise.all(promises);
      results.push(...batchResults.flat());
    }
    
    return results;
  },
  
  async processChunkInWorker(chunk, fn) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: { chunk, fnString: fn.toString() }
      });
      
      worker.on('message', resolve);
      worker.on('error', reject);
    });
  }
};

// 🧩 3. Cluster-aware stream processing
class OptimizedProcessor {
  constructor(options = {}) {
    this.batchSize = options.batchSize || 1000;
    this.concurrency = options.concurrency || os.cpus().length;
    this.metrics = {
      processed: 0,
      errors: 0,
      startTime: performance.now()
    };
  }
  
  async processLargeDataset(dataSource) {
    if (cluster.isMaster) {
      return this.masterProcess(dataSource);
    } else {
      return this.workerProcess();
    }
  }
  
  async masterProcess(dataSource) {
    console.log(📈 Iniciando processamento distribuído...);
    
    // Cria workers
    const workers = [];
    for (let i = 0; i < this.concurrency; i++) {
      workers.push(cluster.fork());
    }
    
    // Distribui trabalho
    const dataStream = createOptimizedDataStream(dataSource, this.batchSize);
    const chunks = [...iteratorHelpers.chunk(dataStream, 100)];
    
    let workerIndex = 0;
    const promises = [];
    
    for (const chunk of chunks) {
      const worker = workers[workerIndex % workers.length];
      
      promises.push(new Promise((resolve) => {
        worker.send({ chunk });
        worker.on('message', (result) => {
          this.metrics.processed += result.processed;
          this.metrics.errors += result.errors;
          resolve(result);
        });
      }));
      
      workerIndex++;
    }
    
    const results = await Promise.all(promises);
    
    // Finaliza workers
    workers.forEach(worker => worker.kill());
    
    this.logMetrics();
    return results;
  }
  
  workerProcess() {
    process.on('message', async (data) => {
      try {
        const result = await this.processChunk(data.chunk);
        process.send(result);
      } catch (error) {
        process.send({ error: error.message, processed: 0, errors: 1 });
      }
    });
  }
  
  async processChunk(chunk) {
    let processed = 0;
    let errors = 0;
    
    for (const item of chunk) {
      try {
        await this.processItem(item);
        processed++;
      } catch (error) {
        errors++;
        console.error(Erro ao processar item ${item.id}:, error.message);
      }
    }
    
    return { processed, errors };
  }
  
  async processItem(item) {
    // Simula processamento complexo
    await new Promise(resolve => setTimeout(resolve, Math.random() * 10));
    
    // Aplica transformações otimizadas
    return {
      ...item,
      processed: true,
      worker_id: process.pid,
      processing_time: performance.now() - item._meta.timestamp
    };
  }
  
  logMetrics() {
    const duration = performance.now() - this.metrics.startTime;
    const throughput = this.metrics.processed / (duration / 1000);
    
    console.log(
      📈 Métricas finais:\n +
      Total processados: ${this.metrics.processed}\n +
      Erros: ${this.metrics.errors}\n +
      Duração: ${(duration / 1000).toFixed(2)}s\n +
      Throughput: ${throughput.toFixed(2)} items/s
    );
  }
}

// Worker thread code
if (!isMainThread) {
  const { chunk, fnString } = workerData;
  const fn = eval(${fnString});
  
  (async () => {
    try {
      const results = [];
      for (const item of chunk) {
        const result = await fn(item);
        results.push(result);
      }
      parentPort.postMessage(results);
    } catch (error) {
      parentPort.postMessage({ error: error.message });
    }
  })();
}

// 🚀 Uso da linha de produção otimizada
async function runOptimizedProduction() {
  // Simula fonte de dados
  const dataSource = {
    getBatch: (offset, limit) => {
      const data = [];
      for (let i = offset; i < offset + limit && i < 100000; i++) {
        data.push({
          id: i,
          data: {large data object ${i}},
          complexity: Math.random()
        });
      }
      return data;
    }
  };
  
  const processor = new OptimizedProcessor({
    batchSize: 1000,
    concurrency: 4
  });
  
  await processor.processLargeDataset(dataSource);
}

// 🏁 Principais otimizações implementadas:
// ✅ Lazy evaluation com generators
// ✅ Processamento paralelo com workers
// ✅ Cluster-aware distribution
// ✅ Backpressure automático
// ✅ Memory management eficiente
// ✅ Error handling robusto
// ✅ Métricas de performance
// ✅ Garbage collection otimizada

// Executa se for processo principal
if (cluster.isMaster && isMainThread) {
  runOptimizedProduction();
}

Próximos passos e aplicação prática

Essas técnicas transformam completamente como você vai programar em JavaScript. Use essas funcionalidades para processar terabytes de dados sem travar suas aplicações, respeitando o ciclo de vida do ambiente em que esteja trabalhando.

Sempre pense em processamento sob demanda: divisão de dados em chunks pequenos, pipelines eficientes e liberação de memória a cada etapa. Essa é a essência do JavaScript moderno para aplicações de alta performance.

Checklist para Processamento Sob Demanda

Entendeu como usar Generator Functions com yield
Aprendeu o uso do método .next e for-of
Explorou as vantagens dos Iterator Helpers
Implantou streams com Readable, Transform e Writable
Evitou sobrecarga de memória com pipelines
Aplicou async iterators para fluxos assíncronos
Compreendeu a diferença entre Node.js e JavaScript puro
Está preparado para usar essas técnicas em produção

Domine React e Node com o CrazyStack

Aprenda técnicas avançadas de React com nosso curso completo