🚀 Oferta especial: 60% OFF no CrazyStack - Últimas vagas!Garantir vaga →
MÓDULO 3 - AULA 7

Async Generators

Processamento assíncrono sob demanda. APIs, bancos de dados e streams de dados em tempo real

🎯 O próximo nível

Generators síncronos são poderosos, mas a magia real acontece quando combinamos com async/await. Async generators permitem processar streams infinitos de dados assíncronos: APIs, bancos de dados, WebSockets, arquivos gigantes.

🔄 Async Generators: O conceito

💡 Async + Generator = Magia

Um async generator é uma função que pode pausar (yield) e aguardar operações assíncronas (await) ao mesmo tempo. Perfeito para processar dados que chegam ao longo do tempo: API calls, DB queries, file reads.

async-generator-basic.js
// 🔄 ASYNC GENERATOR BÁSICO: async function*
async function* simpleAsyncGenerator() {
  console.log('🚀 Iniciando async generator...');
  
  // ⏰ YIELD + AWAIT: Pausa e aguarda operação assíncrona
  yield await new Promise(resolve => 
    setTimeout(() => resolve('Primeiro resultado'), 1000)
  );
  
  yield await new Promise(resolve => 
    setTimeout(() => resolve('Segundo resultado'), 500)
  );
  
  yield await new Promise(resolve => 
    setTimeout(() => resolve('Terceiro resultado'), 800)
  );
  
  console.log('✅ Async generator finalizado');
}

// 🔄 CONSUMINDO com for await...of
async function demonstrateAsyncGenerator() {
  console.log('=== DEMONSTRAÇÃO: for await...of ===');
  
  // ⏰ PROCESSA resultados conforme chegam
  for await (const result of simpleAsyncGenerator()) {
    console.log(`📥 Recebido: ${result} às ${new Date().toLocaleTimeString()}`);
  }
  
  console.log('🏁 Processamento finalizado');
}

// 🚀 EXECUTAR demonstração
demonstrateAsyncGenerator();

// 📊 RESULTADO:
// - Cada yield aguarda operação assíncrona
// - for await...of processa conforme dados chegam
// - Controle total sobre timing e fluxo
// - Não bloqueia event loop

// 💡 DIFERENÇA FUNDAMENTAL:
// Generator normal: yield valor sincronamente
// Async generator: yield Promise que resolve assincronamente

🌐 API Streaming: Processamento em tempo real

✅ APIs sob demanda

Ao invés de fazer uma requisição gigante que demora 30 segundos e pode dar timeout, fazemos várias pequenas requisições conforme necessário. Rate limiting automático e controle total.

api-streaming.js
// 🌐 API STREAMING: Processa APIs sob demanda
class APIStreamer {
  constructor(baseUrl, options = {}) {
    this.baseUrl = baseUrl;
    this.rateLimit = options.rateLimit || 5; // requests/segundo
    this.timeout = options.timeout || 10000; // 10s timeout
    this.retries = options.retries || 3;
    this.requestCount = 0;
  }

  // ⏱️ RATE LIMITING: Controla velocidade de requisições
  async waitForRateLimit() {
    if (this.lastRequest) {
      const minInterval = 1000 / this.rateLimit;
      const elapsed = Date.now() - this.lastRequest;
      
      if (elapsed < minInterval) {
        const waitTime = minInterval - elapsed;
        console.log(`⏸️ Rate limit: aguardando ${waitTime}ms`);
        await new Promise(resolve => setTimeout(resolve, waitTime));
      }
    }
    this.lastRequest = Date.now();
  }

  // 🌐 REQUISIÇÃO com retry e timeout
  async fetchWithRetry(url) {
    let lastError;
    
    for (let attempt = 1; attempt <= this.retries; attempt++) {
      try {
        await this.waitForRateLimit();
        
        console.log(`🌐 Request #${++this.requestCount}: ${url} (tentativa ${attempt})`);
        
        // Simula requisição HTTP (em produção seria fetch real)
        const response = await this.simulateAPICall(url);
        
        if (response.ok) {
          return response.data;
        } else {
          throw new Error(`HTTP ${response.status}: ${response.error}`);
        }
        
      } catch (error) {
        lastError = error;
        console.log(`❌ Tentativa ${attempt} falhou: ${error.message}`);
        
        if (attempt < this.retries) {
          const backoff = Math.pow(2, attempt) * 1000; // Exponential backoff
          console.log(`⏰ Aguardando ${backoff}ms antes de tentar novamente...`);
          await new Promise(resolve => setTimeout(resolve, backoff));
        }
      }
    }
    
    throw new Error(`Falha após ${this.retries} tentativas: ${lastError.message}`);
  }

  // 🎭 SIMULA API call (substitua por fetch real)
  async simulateAPICall(url) {
    // Simula latência da rede
    await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 200));
    
    // Simula ocasionais falhas de rede (5% chance)
    if (Math.random() < 0.05) {
      throw new Error('Network timeout');
    }
    
    // Simula resposta da API
    const page = parseInt(url.match(/page=(\d+)/)?.[1] || '1');
    const limit = parseInt(url.match(/limit=(\d+)/)?.[1] || '10');
    
    return {
      ok: true,
      data: {
        page,
        limit,
        users: Array.from({ length: limit }, (_, i) => ({
          id: (page - 1) * limit + i + 1,
          name: `User ${(page - 1) * limit + i + 1}`,
          email: `user${(page - 1) * limit + i + 1}@api.com`,
          fetchedAt: new Date().toISOString()
        })),
        hasNext: page < 10 // Simula 10 páginas total
      }
    };
  }

  // 🔄 STREAM de usuários de API paginada
  async* streamUsers(options = {}) {
    const { startPage = 1, limit = 50, filter = null } = options;
    let currentPage = startPage;
    let hasNextPage = true;
    
    console.log(`🚀 Iniciando stream de usuários (página ${startPage}, ${limit}/página)`);
    
    while (hasNextPage) {
      try {
        // 🌐 FETCH página atual
        const url = `${this.baseUrl}/users?page=${currentPage}&limit=${limit}`;
        const response = await this.fetchWithRetry(url);
        
        console.log(`📦 Página ${currentPage}: ${response.users.length} usuários`);
        
        // 🔄 YIELD cada usuário individualmente
        for (const user of response.users) {
          // 🔍 FILTRO opcional
          if (!filter || filter(user)) {
            yield user;
          }
        }
        
        // 📄 PRÓXIMA página
        hasNextPage = response.hasNext;
        currentPage++;
        
        // ✅ ESTATÍSTICAS
        console.log(`📊 Stream: página ${currentPage - 1}, total requests: ${this.requestCount}`);
        
      } catch (error) {
        console.error(`❌ Erro ao buscar página ${currentPage}: ${error.message}`);
        // Decide se continua ou para baseado no tipo de erro
        if (error.message.includes('timeout')) {
          console.log('⏭️ Continuando com próxima página...');
          currentPage++;
        } else {
          throw error; // Erro fatal, para o stream
        }
      }
    }
    
    console.log('🏁 Stream de API finalizado');
  }

  // 📊 ESTATÍSTICAS do stream
  getStats() {
    return {
      requestCount: this.requestCount,
      rateLimit: this.rateLimit,
      avgRequestTime: this.lastRequest ? Date.now() - this.lastRequest : 0
    };
  }
}

// 🚀 EXEMPLO DE USO: Processando API em stream
async function apiStreamExample() {
  console.log('🌐 DEMONSTRAÇÃO: API Streaming');
  console.log('=' .repeat(40));
  
  // 🔧 CONFIGURAÇÃO do streamer
  const streamer = new APIStreamer('https://api.exemplo.com', {
    rateLimit: 3, // 3 requests/segundo
    timeout: 5000,
    retries: 2
  });
  
  // 🔍 FILTRO: Só usuários com ID par
  const evenUsersFilter = user => user.id % 2 === 0;
  
  let processedCount = 0;
  let evenCount = 0;
  
  try {
    // 🔄 PROCESSA stream com filtro
    for await (const user of streamer.streamUsers({ 
      startPage: 1, 
      limit: 20,
      filter: evenUsersFilter
    })) {
      
      processedCount++;
      if (user.id % 2 === 0) evenCount++;
      
      console.log(`👤 Processando: ${user.name} (ID: ${user.id})`);
      
      // 🎯 PROCESSAMENTO do usuário
      // Aqui você faria: salvar no banco, enviar email, etc.
      
      // 🛑 DEMO: Para após 50 usuários processados
      if (processedCount >= 50) {
        console.log('\n⏸️ Demo finalizada - stream funcionando perfeitamente!');
        break;
      }
    }
    
  } catch (error) {
    console.error('❌ Erro no stream:', error.message);
  } finally {
    // 📊 RELATÓRIO
    const stats = streamer.getStats();
    console.log('\n📊 ESTATÍSTICAS DO STREAM:');
    console.log(`📦 Usuários processados: ${processedCount}`);
    console.log(`👥 Usuários pares: ${evenCount}`);
    console.log(`🌐 Total requests: ${stats.requestCount}`);
    console.log(`⚡ Rate limit: ${stats.rateLimit} req/sec`);
  }
}

// 🚀 EXECUTAR exemplo
apiStreamExample();

// 💡 VANTAGENS DO API STREAMING:
// 
// ✅ CONTROLE TOTAL:
// - Rate limiting automático
// - Retry com exponential backoff  
// - Timeout configurável
// - Filtros aplicados sob demanda
//
// ✅ EFICIÊNCIA:
// - Só faz requests necessários
// - Para imediatamente quando encontra o que precisa
// - Não sobrecarrega APIs
// - Memory usage constante
//
// ✅ ROBUSTEZ:
// - Lida com falhas de rede
// - Não para por um erro
// - Métricas de monitoramento
// - Graceful degradation

💾 Database Streaming: Processamento massivo

🚀 Bancos de dados

Processar milhões de registros do banco sem explodir memória. Cursors automáticos, batch processing e controle de transações para operações que podem durar horas.

database-streaming.js
// 💾 DATABASE STREAMING: Processa milhões de registros
class DatabaseStreamer {
  constructor(connection, options = {}) {
    this.connection = connection;
    this.batchSize = options.batchSize || 1000;
    this.timeout = options.timeout || 30000;
    this.maxRetries = options.maxRetries || 3;
    this.processedCount = 0;
    this.errorCount = 0;
  }

  // 🔄 STREAM de registros com cursor automático
  async* streamRecords(query, params = []) {
    let cursor = null;
    let hasMore = true;
    let batchNumber = 0;
    
    console.log(`💾 Iniciando stream DB: "${query.substring(0, 50)}..."`);
    console.log(`📦 Batch size: ${this.batchSize}`);
    
    try {
      while (hasMore) {
        batchNumber++;
        console.log(`\n📦 Processando batch #${batchNumber}...`);
        
        // 🔍 QUERY com cursor/offset
        const batchQuery = this.buildCursorQuery(query, cursor);
        const startTime = Date.now();
        
        // 💾 EXECUTA query com timeout
        const records = await this.executeQueryWithTimeout(batchQuery, params);
        
        const queryTime = Date.now() - startTime;
        console.log(`⚡ Query executada em ${queryTime}ms: ${records.length} registros`);
        
        if (records.length === 0) {
          hasMore = false;
          console.log('✅ Não há mais registros para processar');
          break;
        }
        
        // 🔄 YIELD cada registro individualmente
        for (const record of records) {
          try {
            // 🔄 YIELD com metadata útil
            yield {
              ...record,
              _meta: {
                batchNumber,
                batchPosition: records.indexOf(record),
                totalProcessed: ++this.processedCount,
                fetchedAt: new Date().toISOString()
              }
            };
            
          } catch (error) {
            this.errorCount++;
            console.error(`❌ Erro ao processar registro ${record.id}: ${error.message}`);
          }
        }
        
        // 📍 ATUALIZA cursor para próximo batch
        cursor = this.extractCursor(records);
        hasMore = records.length === this.batchSize; // Se retornou menos, acabou
        
        // 📊 ESTATÍSTICAS do batch
        console.log(`✅ Batch #${batchNumber} processado: ${records.length} registros`);
        console.log(`📊 Total acumulado: ${this.processedCount} registros`);
        
        // ⏸️ PAUSA entre batches para não sobrecarregar DB
        if (hasMore) {
          await new Promise(resolve => setTimeout(resolve, 100));
        }
      }
      
    } catch (error) {
      console.error('❌ Erro crítico no stream DB:', error.message);
      throw error;
    } finally {
      console.log(`\n🏁 Stream DB finalizado: ${this.processedCount} registros, ${this.errorCount} erros`);
    }
  }

  // 🔍 CONSTRÓI query com cursor
  buildCursorQuery(baseQuery, cursor) {
    if (!cursor) {
      // Primeira query - adiciona ORDER BY e LIMIT
      return `${baseQuery} ORDER BY id ASC LIMIT ${this.batchSize}`;
    } else {
      // Queries subsequentes - adiciona WHERE com cursor
      return `${baseQuery} AND id > ${cursor} ORDER BY id ASC LIMIT ${this.batchSize}`;
    }
  }

  // 📍 EXTRAI cursor do último registro
  extractCursor(records) {
    return records.length > 0 ? records[records.length - 1].id : null;
  }

  // ⏱️ EXECUTA query com timeout
  async executeQueryWithTimeout(query, params) {
    return new Promise(async (resolve, reject) => {
      const timeoutId = setTimeout(() => {
        reject(new Error(`Query timeout após ${this.timeout}ms`));
      }, this.timeout);
      
      try {
        // 🎭 SIMULA query do banco (substitua por query real)
        const result = await this.simulateDbQuery(query, params);
        clearTimeout(timeoutId);
        resolve(result);
      } catch (error) {
        clearTimeout(timeoutId);
        reject(error);
      }
    });
  }

  // 🎭 SIMULA query do banco (substitua por implementação real)
  async simulateDbQuery(query, params) {
    // Simula latência do banco
    await new Promise(resolve => setTimeout(resolve, Math.random() * 200 + 50));
    
    // Extrai cursor da query
    const cursorMatch = query.match(/id > (\d+)/);
    const startId = cursorMatch ? parseInt(cursorMatch[1]) + 1 : 1;
    
    // Simula registros do banco
    const records = [];
    for (let i = 0; i < this.batchSize && startId + i <= 50000; i++) {
      const id = startId + i;
      records.push({
        id,
        name: `Record ${id}`,
        email: `record${id}@db.com`,
        created_at: new Date(Date.now() - Math.random() * 365 * 24 * 60 * 60 * 1000),
        data: `Sample data for record ${id}`.repeat(Math.floor(Math.random() * 5) + 1)
      });
    }
    
    return records;
  }

  // 📊 ESTATÍSTICAS do processamento
  getStats() {
    return {
      processedCount: this.processedCount,
      errorCount: this.errorCount,
      successRate: this.processedCount / (this.processedCount + this.errorCount) * 100,
      batchSize: this.batchSize
    };
  }
}

// 🚀 EXEMPLO: Processamento massivo de dados
async function databaseStreamExample() {
  console.log('💾 DEMONSTRAÇÃO: Database Streaming');
  console.log('=' .repeat(50));
  
  // 🔧 SETUP do streamer
  const dbStreamer = new DatabaseStreamer(null, {
    batchSize: 500, // 500 registros por batch
    timeout: 10000, // 10s timeout por query
    maxRetries: 2
  });
  
  // 📝 QUERY para buscar usuários ativos
  const query = `
    SELECT id, name, email, created_at, data 
    FROM users 
    WHERE active = true
  `;
  
  let processedUsers = 0;
  let recentUsers = 0;
  const oneYearAgo = Date.now() - 365 * 24 * 60 * 60 * 1000;
  
  try {
    console.log('🔄 Iniciando processamento de usuários ativos...');
    
    // 🔄 PROCESSA cada registro conforme chega
    for await (const user of dbStreamer.streamRecords(query)) {
      processedUsers++;
      
      // 📊 ANÁLISE: Usuários recentes (último ano)
      if (new Date(user.created_at).getTime() > oneYearAgo) {
        recentUsers++;
      }
      
      // 🔧 PROCESSAMENTO personalizado
      if (user.id % 1000 === 0) {
        console.log(`🎯 Marco: ${user.name} (ID: ${user.id}) - Total: ${user._meta.totalProcessed}`);
      }
      
      // 🎯 AQUI: Faria processamento real
      // - Enviar email de marketing
      // - Atualizar cache
      // - Gerar relatórios
      // - Sync com sistema externo
      
      // 🛑 DEMO: Para após 5000 usuários
      if (processedUsers >= 5000) {
        console.log('\n⏸️ Demo finalizada - processamento funcionando!');
        break;
      }
    }
    
  } catch (error) {
    console.error('❌ Erro no processamento:', error.message);
  } finally {
    // 📊 RELATÓRIO FINAL
    const stats = dbStreamer.getStats();
    console.log('\n📊 RELATÓRIO FINAL:');
    console.log(`👥 Usuários processados: ${processedUsers}`);
    console.log(`🆕 Usuários recentes: ${recentUsers} (${(recentUsers/processedUsers*100).toFixed(1)}%)`);
    console.log(`✅ Taxa de sucesso: ${stats.successRate.toFixed(2)}%`);
    console.log(`⚡ Batch size: ${stats.batchSize}`);
    console.log(`❌ Erros: ${stats.errorCount}`);
  }
}

// 🚀 EXECUTAR exemplo
databaseStreamExample();

// 💡 VANTAGENS DO DB STREAMING:
//
// ✅ MEMÓRIA CONTROLADA:
// - Nunca carrega mais que ${batchSize} registros
// - Memory usage constante independente do tamanho da tabela
// - Pode processar bilhões de registros
//
// ✅ PERFORMANCE:
// - Cursor automático (sem OFFSET caro)
// - Queries eficientes com índices
// - Timeout para queries lentas
// - Pausas entre batches
//
// ✅ ROBUSTEZ:
// - Continua de onde parou em caso de erro
// - Estatísticas em tempo real
// - Graceful handling de timeouts
// - Monitoring de progresso
//
// ✅ FLEXIBILIDADE:
// - Batch size configurável
// - Filtros aplicados sob demanda
// - Processamento personalizado por registro
// - Early termination quando necessário

🔌 WebSocket Streaming: Dados em tempo real

⚡ Real-time

WebSockets + Async Generators = streaming de dados em tempo real. Chat messages, notificações, atualizações de preços, logs - tudo processado sob demanda conforme chega.

websocket-streaming.js
// 🔌 WEBSOCKET STREAMING: Dados em tempo real
class WebSocketStreamer {
  constructor(url, options = {}) {
    this.url = url;
    this.options = options;
    this.ws = null;
    this.messageQueue = [];
    this.connected = false;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
    this.reconnectDelay = options.reconnectDelay || 1000;
  }

  // 🔌 CONECTA ao WebSocket
  async connect() {
    return new Promise((resolve, reject) => {
      console.log(`🔌 Conectando ao WebSocket: ${this.url}`);
      
      // 🎭 SIMULA WebSocket (substitua por WebSocket real)
      this.ws = this.createMockWebSocket();
      
      this.ws.onopen = () => {
        console.log('✅ WebSocket conectado');
        this.connected = true;
        this.reconnectAttempts = 0;
        resolve();
      };
      
      this.ws.onmessage = (event) => {
        const message = JSON.parse(event.data);
        this.messageQueue.push({
          ...message,
          receivedAt: Date.now()
        });
      };
      
      this.ws.onclose = () => {
        console.log('🔌 WebSocket desconectado');
        this.connected = false;
        this.attemptReconnect();
      };
      
      this.ws.onerror = (error) => {
        console.error('❌ Erro no WebSocket:', error);
        reject(error);
      };
    });
  }

  // 🔄 RECONNECT automático
  async attemptReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error('❌ Max tentativas de reconexão atingidas');
      return;
    }
    
    this.reconnectAttempts++;
    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
    
    console.log(`🔄 Tentativa de reconexão #${this.reconnectAttempts} em ${delay}ms`);
    
    setTimeout(() => {
      this.connect().catch(error => {
        console.error('❌ Falha na reconexão:', error);
      });
    }, delay);
  }

  // 🎭 MOCK WebSocket para demonstração
  createMockWebSocket() {
    const mockWs = {
      onopen: null,
      onmessage: null,
      onclose: null,
      onerror: null,
      readyState: 1
    };
    
    // Simula mensagens chegando
    setTimeout(() => mockWs.onopen?.(), 100);
    
    // Gera mensagens de exemplo
    let messageId = 1;
    const messageTypes = ['chat', 'notification', 'price_update', 'log'];
    
    const sendMessage = () => {
      if (mockWs.readyState === 1) {
        const type = messageTypes[Math.floor(Math.random() * messageTypes.length)];
        const message = this.generateMockMessage(type, messageId++);
        
        mockWs.onmessage?.({
          data: JSON.stringify(message)
        });
        
        // Próxima mensagem em 100-2000ms
        setTimeout(sendMessage, Math.random() * 1900 + 100);
      }
    };
    
    setTimeout(sendMessage, 500);
    
    return mockWs;
  }

  // 🎭 GERA mensagens de exemplo
  generateMockMessage(type, id) {
    const messages = {
      chat: {
        type: 'chat',
        id,
        user: `User${Math.floor(Math.random() * 100)}`,
        message: [
          'Hello everyone!',
          'How is everyone doing?',
          'Great weather today!',
          'Anyone working on cool projects?',
          'Just deployed a new feature!'
        ][Math.floor(Math.random() * 5)],
        timestamp: Date.now()
      },
      notification: {
        type: 'notification',
        id,
        title: 'New Update Available',
        body: 'Version 2.1.0 is now available for download',
        priority: ['low', 'medium', 'high'][Math.floor(Math.random() * 3)],
        timestamp: Date.now()
      },
      price_update: {
        type: 'price_update',
        id,
        symbol: ['BTCUSD', 'ETHUSD', 'AAPL', 'GOOGL'][Math.floor(Math.random() * 4)],
        price: (Math.random() * 1000 + 100).toFixed(2),
        change: (Math.random() * 20 - 10).toFixed(2),
        timestamp: Date.now()
      },
      log: {
        type: 'log',
        id,
        level: ['info', 'warn', 'error', 'debug'][Math.floor(Math.random() * 4)],
        message: `System message #${id}`,
        service: ['api', 'auth', 'db', 'cache'][Math.floor(Math.random() * 4)],
        timestamp: Date.now()
      }
    };
    
    return messages[type];
  }

  // 🔄 STREAM de mensagens em tempo real
  async* streamMessages(filter = null) {
    if (!this.connected) {
      await this.connect();
    }
    
    console.log('📡 Iniciando stream de mensagens em tempo real...');
    
    let processedCount = 0;
    let lastHeartbeat = Date.now();
    
    while (this.connected || this.messageQueue.length > 0) {
      // 📥 PROCESSA mensagens da queue
      while (this.messageQueue.length > 0) {
        const message = this.messageQueue.shift();
        
        // 🔍 APLICA filtro se especificado
        if (!filter || filter(message)) {
          processedCount++;
          
          yield {
            ...message,
            _meta: {
              processedCount,
              queueSize: this.messageQueue.length,
              connected: this.connected,
              latency: Date.now() - message.receivedAt
            }
          };
        }
      }
      
      // 💓 HEARTBEAT a cada 5 segundos
      if (Date.now() - lastHeartbeat > 5000) {
        console.log(`💓 Stream ativo: ${processedCount} mensagens processadas, queue: ${this.messageQueue.length}`);
        lastHeartbeat = Date.now();
      }
      
      // ⏸️ Aguarda novas mensagens
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    
    console.log('🏁 Stream de WebSocket finalizado');
  }

  // 🔌 DESCONECTA
  disconnect() {
    if (this.ws) {
      this.ws.close();
      this.connected = false;
    }
  }
}

// 🚀 EXEMPLO: Stream de dados em tempo real
async function webSocketStreamExample() {
  console.log('🔌 DEMONSTRAÇÃO: WebSocket Streaming');
  console.log('=' .repeat(45));
  
  const streamer = new WebSocketStreamer('wss://api.exemplo.com/stream', {
    maxReconnectAttempts: 3,
    reconnectDelay: 1000
  });
  
  // 🔍 FILTROS específicos
  const chatFilter = msg => msg.type === 'chat';
  const highPriorityFilter = msg => msg.priority === 'high' || msg.type === 'price_update';
  
  let processedMessages = 0;
  let chatMessages = 0;
  let priceUpdates = 0;
  
  try {
    console.log('🔄 Iniciando stream de mensagens...');
    
    // 🔄 PROCESSA mensagens conforme chegam
    for await (const message of streamer.streamMessages()) {
      processedMessages++;
      
      // 📊 ESTATÍSTICAS por tipo
      switch (message.type) {
        case 'chat':
          chatMessages++;
          console.log(`💬 Chat: ${message.user}: ${message.message}`);
          break;
          
        case 'price_update':
          priceUpdates++;
          console.log(`📈 Price: ${message.symbol} = $${message.price} (${message.change})`);
          break;
          
        case 'notification':
          if (message.priority === 'high') {
            console.log(`🚨 High Priority: ${message.title}`);
          }
          break;
          
        case 'log':
          if (message.level === 'error') {
            console.log(`❌ Error Log: ${message.message} [${message.service}]`);
          }
          break;
      }
      
      // 📊 ESTATÍSTICAS periódicas
      if (processedMessages % 20 === 0) {
        console.log(`\n📊 Estatísticas: ${processedMessages} mensagens, queue: ${message._meta.queueSize}`);
        console.log(`💬 Chats: ${chatMessages}, 📈 Preços: ${priceUpdates}`);
        console.log(`⚡ Latência: ${message._meta.latency}ms\n`);
      }
      
      // 🛑 DEMO: Para após 100 mensagens
      if (processedMessages >= 100) {
        console.log('\n⏸️ Demo finalizada - stream em tempo real funcionando!');
        break;
      }
    }
    
  } catch (error) {
    console.error('❌ Erro no stream:', error);
  } finally {
    streamer.disconnect();
    
    // 📊 RELATÓRIO FINAL
    console.log('\n📊 RELATÓRIO FINAL:');
    console.log(`📨 Total mensagens: ${processedMessages}`);
    console.log(`💬 Mensagens de chat: ${chatMessages}`);
    console.log(`📈 Atualizações de preço: ${priceUpdates}`);
  }
}

// 🚀 EXECUTAR exemplo
webSocketStreamExample();

// 💡 VANTAGENS DO WEBSOCKET STREAMING:
//
// ✅ TEMPO REAL:
// - Processa dados conforme chegam
// - Latência mínima
// - Não perde mensagens
// - Queue automática
//
// ✅ ROBUSTEZ:
// - Reconexão automática
// - Exponential backoff
// - Error handling
// - Heartbeat monitoring
//
// ✅ FLEXIBILIDADE:
// - Filtros aplicados sob demanda
// - Múltiplos tipos de mensagem
// - Processamento personalizado
// - Estatísticas em tempo real
//
// ✅ ESCALABILIDADE:
// - Memory usage constante
// - Não bloqueia event loop
// - Múltiplos consumers
// - Rate limiting natural

🚀 Checkpoint: Async Master

Async GeneratorsAPI StreamingDatabase StreamingWebSocket Streaming

Agora você domina processamento assíncrono em tempo real com controle total sobre recursos!