const express = require('express'); const amqp = require('amqplib'); const app = express(); app.use(express.json()); // Variabili globali let channel = null; let connection = null; // Funzione per connettersi a RabbitMQ async function connectRabbitMQ() { try { console.log('Tentativo di connessione a RabbitMQ...'); connection = await amqp.connect('amqp://rabbitmq'); console.log('Connessione a RabbitMQ stabilita'); // Gestione della chiusura della connessione connection.on('close', () => { console.warn('Connessione a RabbitMQ chiusa, tentativo di riconnessione tra 5 secondi...'); setTimeout(connectRabbitMQ, 5000); }); // Gestione errori connection.on('error', (err) => { console.error('Errore nella connessione a RabbitMQ:', err); if (!connection.closed) connection.close(); }); // Crea canale channel = await connection.createChannel(); console.log('Canale creato'); // NUOVO: Tenta di cancellare tutti i consumatori esistenti try { await channel.cancel(''); console.log('Tentativo di cancellazione consumatori esistenti'); } catch (err) { console.log('Nessun consumatore da cancellare o errore:', err.message); } // Configura la coda con durabilità (nome modificato) await channel.assertQueue('messages_v2', { durable: true, // La coda sopravvive al riavvio del broker autoDelete: false // La coda non viene eliminata quando non ha più consumatori }); console.log('Coda "messages_v2" verificata/creata'); return channel; } catch (error) { console.error('Errore nella connessione a RabbitMQ:', error); setTimeout(connectRabbitMQ, 5000); throw error; } } // Inizializza il server dopo la connessione a RabbitMQ async function startServer() { try { await connectRabbitMQ(); // Endpoint per inviare messaggi alla coda app.post('/api/enqueue', async (req, res) => { try { const msg = JSON.stringify(req.body); console.log('Invio messaggio alla coda:', msg); // Invia il messaggio con persistenza (nome coda modificato) channel.sendToQueue('messages_v2', Buffer.from(msg), { persistent: true // Il messaggio è persistente }); console.log('Messaggio inviato con successo'); res.json({ status: 'success', message: 'Messaggio inviato alla coda', data: req.body }); } catch (error) { console.error('Errore nell\'invio del messaggio:', error); res.status(500).json({ status: 'error', message: 'Errore nell\'invio del messaggio' }); } }); // Endpoint per recuperare messaggi dalla coda app.get('/api/messages', async (req, res) => { try { console.log('Tentativo di recupero messaggio con GET...'); // Usa il metodo get invece di consume (nome coda modificato) const msg = await channel.get('messages_v2', { noAck: false }); if (msg) { console.log('Messaggio trovato:', msg.content.toString()); // Conferma esplicita che il messaggio è stato ricevuto channel.ack(msg); const content = JSON.parse(msg.content.toString()); res.json({ status: 'success', message: content }); } else { console.log('Nessun messaggio trovato nella coda con GET'); res.json({ status: 'empty', message: 'Nessun messaggio nella coda' }); } } catch (error) { console.error('Errore nel recupero del messaggio:', error); res.status(500).json({ status: 'error', message: `Errore nel recupero: ${error.message}` }); } }); // Endpoint di debug per verificare lo stato della coda app.get('/api/queue-info', async (req, res) => { try { // Nome coda modificato const queueInfo = await channel.assertQueue('messages_v2', { passive: true }); console.log('Informazioni coda:', queueInfo); res.json({ status: 'success', queue: 'messages_v2', messageCount: queueInfo.messageCount, consumerCount: queueInfo.consumerCount, properties: queueInfo }); } catch (error) { console.error('Errore nel recupero info coda:', error); res.status(500).json({ status: 'error', message: 'Errore nel recupero info coda' }); } }); // Avvia il server app.listen(3000, () => console.log('Backend in ascolto sulla porta 3000')); } catch (error) { console.error('Errore nell\'avvio del server:', error); process.exit(1); } } // Avvia il server startServer(); // Gestione della chiusura pulita process.on('SIGINT', async () => { try { if (channel) await channel.close(); if (connection) await connection.close(); } catch (error) { console.error('Errore nella chiusura della connessione:', error); } finally { process.exit(0); } });