k8-mini-app/backend/index.js

145 lines
4.9 KiB
JavaScript

const express = require('express');
const amqp = require('amqplib');
const app = express();
app.use(express.json());
// Variabili globali
let channel = null;
let connection = null;
// Funzione per connettersi a RabbitMQ
async function connectRabbitMQ() {
try {
console.log('Tentativo di connessione a RabbitMQ...');
connection = await amqp.connect('amqp://rabbitmq');
console.log('Connessione a RabbitMQ stabilita');
// Gestione della chiusura della connessione
connection.on('close', () => {
console.warn('Connessione a RabbitMQ chiusa, tentativo di riconnessione tra 5 secondi...');
setTimeout(connectRabbitMQ, 5000);
});
// Gestione errori
connection.on('error', (err) => {
console.error('Errore nella connessione a RabbitMQ:', err);
if (!connection.closed) connection.close();
});
// Crea canale
channel = await connection.createChannel();
console.log('Canale creato');
// NUOVO: Tenta di cancellare tutti i consumatori esistenti
try {
await channel.cancel('');
console.log('Tentativo di cancellazione consumatori esistenti');
} catch (err) {
console.log('Nessun consumatore da cancellare o errore:', err.message);
}
// Configura la coda con durabilità (nome modificato)
await channel.assertQueue('messages_v2', {
durable: true, // La coda sopravvive al riavvio del broker
autoDelete: false // La coda non viene eliminata quando non ha più consumatori
});
console.log('Coda "messages_v2" verificata/creata');
return channel;
} catch (error) {
console.error('Errore nella connessione a RabbitMQ:', error);
setTimeout(connectRabbitMQ, 5000);
throw error;
}
}
// Inizializza il server dopo la connessione a RabbitMQ
async function startServer() {
try {
await connectRabbitMQ();
// Endpoint per inviare messaggi alla coda
app.post('/api/enqueue', async (req, res) => {
try {
const msg = JSON.stringify(req.body);
console.log('Invio messaggio alla coda:', msg);
// Invia il messaggio con persistenza (nome coda modificato)
channel.sendToQueue('messages_v2', Buffer.from(msg), {
persistent: true // Il messaggio è persistente
});
console.log('Messaggio inviato con successo');
res.json({ status: 'success', message: 'Messaggio inviato alla coda', data: req.body });
} catch (error) {
console.error('Errore nell\'invio del messaggio:', error);
res.status(500).json({ status: 'error', message: 'Errore nell\'invio del messaggio' });
}
});
// Endpoint per recuperare messaggi dalla coda
app.get('/api/messages', async (req, res) => {
try {
console.log('Tentativo di recupero messaggio con GET...');
// Usa il metodo get invece di consume (nome coda modificato)
const msg = await channel.get('messages_v2', { noAck: false });
if (msg) {
console.log('Messaggio trovato:', msg.content.toString());
// Conferma esplicita che il messaggio è stato ricevuto
channel.ack(msg);
const content = JSON.parse(msg.content.toString());
res.json({ status: 'success', message: content });
} else {
console.log('Nessun messaggio trovato nella coda con GET');
res.json({ status: 'empty', message: 'Nessun messaggio nella coda' });
}
} catch (error) {
console.error('Errore nel recupero del messaggio:', error);
res.status(500).json({ status: 'error', message: `Errore nel recupero: ${error.message}` });
}
});
// Endpoint di debug per verificare lo stato della coda
app.get('/api/queue-info', async (req, res) => {
try {
// Nome coda modificato
const queueInfo = await channel.assertQueue('messages_v2', { passive: true });
console.log('Informazioni coda:', queueInfo);
res.json({
status: 'success',
queue: 'messages_v2',
messageCount: queueInfo.messageCount,
consumerCount: queueInfo.consumerCount,
properties: queueInfo
});
} catch (error) {
console.error('Errore nel recupero info coda:', error);
res.status(500).json({ status: 'error', message: 'Errore nel recupero info coda' });
}
});
// Avvia il server
app.listen(3000, () => console.log('Backend in ascolto sulla porta 3000'));
} catch (error) {
console.error('Errore nell\'avvio del server:', error);
process.exit(1);
}
}
// Avvia il server
startServer();
// Gestione della chiusura pulita
process.on('SIGINT', async () => {
try {
if (channel) await channel.close();
if (connection) await connection.close();
} catch (error) {
console.error('Errore nella chiusura della connessione:', error);
} finally {
process.exit(0);
}
});