diff --git a/server/monitor-types/rabbitmq.js b/server/monitor-types/rabbitmq.js index 251f1b8ac..194091fd7 100644 --- a/server/monitor-types/rabbitmq.js +++ b/server/monitor-types/rabbitmq.js @@ -17,45 +17,83 @@ class RabbitMqMonitorType extends MonitorType { throw new Error("Invalid RabbitMQ Nodes"); } - for (let baseUrl of baseUrls) { + if (baseUrls.length === 0) { + throw new Error("No RabbitMQ nodes configured"); + } + + const errors = []; + + for (let i = 0; i < baseUrls.length; i++) { + const baseUrl = baseUrls[i]; + const nodeIndex = i + 1; + try { - // Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com - if ( !baseUrl.endsWith("/") ) { - baseUrl += "/"; - } - const options = { - // Do not start with slash, it will strip the trailing slash from baseUrl - url: new URL("api/health/checks/alarms/", baseUrl).href, - method: "get", - timeout: monitor.timeout * 1000, - headers: { - "Accept": "application/json", - "Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"), - }, - signal: axiosAbortSignal((monitor.timeout + 10) * 1000), - // Capture reason for 503 status - validateStatus: (status) => status === 200 || status === 503, - }; - log.debug("monitor", `[${monitor.name}] Axios Request: ${JSON.stringify(options)}`); - const res = await axios.request(options); - log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`); - if (res.status === 200) { - heartbeat.status = UP; - heartbeat.msg = "OK"; - break; - } else if (res.status === 503) { - throw new Error(res.data.reason); - } else { - throw new Error(`${res.status} - ${res.statusText}`); - } + await this.checkSingleNode(monitor, baseUrl, `${nodeIndex}/${baseUrls.length}`); + // If checkSingleNode succeeds (doesn't throw), set heartbeat to UP + heartbeat.status = UP; + heartbeat.msg = baseUrls.length === 1 ? "Node is reachable and there are no alerts in the cluster" : `One of the ${baseUrls.length} nodes is reachable and there are no alerts in the cluster`; + return; } catch (error) { - if (axios.isCancel(error)) { - log.debug("monitor", `[${monitor.name}] Request timed out`); - throw new Error("Request timed out"); - } else { - log.debug("monitor", `[${monitor.name}] Axios Error: ${JSON.stringify(error.message)}`); - throw new Error(error.message); - } + log.warn(this.name, `Node ${nodeIndex}: ${error.message}`); + errors.push(`Node ${nodeIndex}: ${error.message}`); + } + } + + // If we reach here, all nodes failed + throw new Error(`All ${errors.length} nodes failed because ${errors.join("; ")}`); + } + + /** + * Check a single RabbitMQ node + * @param {object} monitor Monitor configuration + * @param {string} baseUrl Base URL of the RabbitMQ node + * @param {string} nodeInfo Node index info for logging (e.g., "1/3") + * @returns {Promise} + * @throws {Error} If the node check fails + */ + async checkSingleNode(monitor, baseUrl, nodeInfo) { + // Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com + let normalizedUrl = baseUrl; + if (!normalizedUrl.endsWith("/")) { + normalizedUrl += "/"; + } + + const options = { + // Do not start with slash, it will strip the trailing slash from baseUrl + url: new URL("api/health/checks/alarms/", normalizedUrl).href, + method: "get", + timeout: monitor.timeout * 1000, + headers: { + "Accept": "application/json", + "Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"), + }, + signal: axiosAbortSignal((monitor.timeout + 10) * 1000), + // Capture reason for 503 status + validateStatus: (status) => status === 200 || status === 503, + }; + + log.debug("monitor", `[${monitor.name}] Checking node ${nodeInfo}: ${baseUrl}`); + + try { + const res = await axios.request(options); + log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`); + + if (res.status === 200) { + log.debug("monitor", `[${monitor.name}] Node ${nodeInfo} is healthy`); + // Success - return without error + } else if (res.status === 503) { + throw new Error(res.data.reason); + } else { + throw new Error(`${res.status} - ${res.statusText}`); + } + } catch (error) { + if (axios.isCancel(error)) { + throw new Error("Request timed out"); + } else if (error.response) { + // Re-throw with the original error message if it's already formatted + throw error; + } else { + throw new Error(error.message); } } } diff --git a/test/backend-test/monitors/test-rabbitmq.js b/test/backend-test/monitors/test-rabbitmq.js index 63d358dfd..76181f03b 100644 --- a/test/backend-test/monitors/test-rabbitmq.js +++ b/test/backend-test/monitors/test-rabbitmq.js @@ -17,6 +17,7 @@ describe("RabbitMQ Single Node", { rabbitmqNodes: JSON.stringify([ connectionString ]), rabbitmqUsername: "guest", rabbitmqPassword: "guest", + timeout: 10, }; const heartbeat = { @@ -27,7 +28,7 @@ describe("RabbitMQ Single Node", { try { await rabbitMQMonitor.check(monitor, heartbeat, {}); assert.strictEqual(heartbeat.status, UP); - assert.strictEqual(heartbeat.msg, "OK"); + assert.strictEqual(heartbeat.msg, "Node is reachable and there are no alerts in the cluster"); } finally { rabbitMQContainer.stop(); } @@ -39,6 +40,7 @@ describe("RabbitMQ Single Node", { rabbitmqNodes: JSON.stringify([ "http://localhost:15672" ]), rabbitmqUsername: "rabbitmqUser", rabbitmqPassword: "rabbitmqPass", + timeout: 10, }; const heartbeat = { @@ -55,4 +57,193 @@ describe("RabbitMQ Single Node", { ); }); + test("checkSingleNode() succeeds when node is healthy", async () => { + const rabbitMQContainer = await new RabbitMQContainer().withStartupTimeout(60000).start(); + const rabbitMQMonitor = new RabbitMqMonitorType(); + const connectionString = `http://${rabbitMQContainer.getHost()}:${rabbitMQContainer.getMappedPort(15672)}`; + + const monitor = { + name: "Test Monitor", + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + try { + // Should not throw - just validates the node is healthy + await rabbitMQMonitor.checkSingleNode(monitor, connectionString, "1/1"); + } finally { + rabbitMQContainer.stop(); + } + }); + + test("checkSingleNode() throws error when node is unreachable", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + name: "Test Monitor", + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + // Should reject with any error (connection refused, timeout, etc.) + await assert.rejects( + rabbitMQMonitor.checkSingleNode(monitor, "http://localhost:15672", "1/1"), + Error + ); + }); +}); + +describe("RabbitMQ Multi-Node (Mocked)", () => { + test("check() succeeds when first node is healthy", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]), + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + // Mock checkSingleNode to succeed on first call (just don't throw) + let callCount = 0; + rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => { + callCount++; + // Success - don't throw + }; + + await rabbitMQMonitor.check(monitor, heartbeat, {}); + assert.strictEqual(heartbeat.status, UP); + assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster"); + assert.strictEqual(callCount, 1, "Should only check first node"); + }); + + test("check() succeeds when second node is healthy after first fails", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]), + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + // Mock checkSingleNode to fail first, succeed second + let callCount = 0; + rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => { + callCount++; + if (callCount === 1) { + throw new Error("Node 1 connection failed"); + } + // Second call succeeds - don't throw + }; + + await rabbitMQMonitor.check(monitor, heartbeat, {}); + assert.strictEqual(heartbeat.status, UP); + assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster"); + assert.strictEqual(callCount, 2, "Should check both nodes"); + }); + + test("check() fails with consolidated error when all nodes are down", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + rabbitmqNodes: JSON.stringify([ + "http://node1:15672", + "http://node2:15672", + "http://node3:15672" + ]), + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + // Mock checkSingleNode to always fail + let callCount = 0; + rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => { + callCount++; + throw new Error(`Connection failed to node ${callCount}`); + }; + + await assert.rejects( + rabbitMQMonitor.check(monitor, heartbeat, {}), + (error) => { + assert.match(error.message, /All 3 nodes failed/); + assert.match(error.message, /Node 1:/); + assert.match(error.message, /Node 2:/); + assert.match(error.message, /Node 3:/); + return true; + } + ); + assert.strictEqual(callCount, 3, "Should check all three nodes"); + }); + + test("check() fails when no nodes are configured", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + rabbitmqNodes: JSON.stringify([]), + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + await assert.rejects( + rabbitMQMonitor.check(monitor, heartbeat, {}), + /No RabbitMQ nodes configured/ + ); + }); + + test("check() tries all nodes before failing", async () => { + const rabbitMQMonitor = new RabbitMqMonitorType(); + const monitor = { + rabbitmqNodes: JSON.stringify([ + "http://node1:15672", + "http://node2:15672", + "http://node3:15672", + "http://node4:15672" + ]), + rabbitmqUsername: "guest", + rabbitmqPassword: "guest", + timeout: 10, + }; + + const heartbeat = { + msg: "", + status: PENDING, + }; + + const checkedNodes = []; + rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => { + checkedNodes.push(url); + throw new Error(`Failed: ${url}`); + }; + + await assert.rejects( + rabbitMQMonitor.check(monitor, heartbeat, {}), + /All 4 nodes failed/ + ); + + assert.strictEqual(checkedNodes.length, 4, "Should check all 4 nodes"); + assert.strictEqual(checkedNodes[0], "http://node1:15672"); + assert.strictEqual(checkedNodes[1], "http://node2:15672"); + assert.strictEqual(checkedNodes[2], "http://node3:15672"); + assert.strictEqual(checkedNodes[3], "http://node4:15672"); + }); });