fix: RabbitMQ monitor to more properly handle all nodes failure (#6646)

Co-authored-by: Frank Elsinga <frank@elsinga.de>
pull/6653/head
Angel98518 1 month ago committed by GitHub
parent 5accda390e
commit 27c0ae8f1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,45 +17,83 @@ class RabbitMqMonitorType extends MonitorType {
throw new Error("Invalid RabbitMQ Nodes");
}
for (let baseUrl of baseUrls) {
if (baseUrls.length === 0) {
throw new Error("No RabbitMQ nodes configured");
}
const errors = [];
for (let i = 0; i < baseUrls.length; i++) {
const baseUrl = baseUrls[i];
const nodeIndex = i + 1;
try {
// Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com
if ( !baseUrl.endsWith("/") ) {
baseUrl += "/";
}
const options = {
// Do not start with slash, it will strip the trailing slash from baseUrl
url: new URL("api/health/checks/alarms/", baseUrl).href,
method: "get",
timeout: monitor.timeout * 1000,
headers: {
"Accept": "application/json",
"Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"),
},
signal: axiosAbortSignal((monitor.timeout + 10) * 1000),
// Capture reason for 503 status
validateStatus: (status) => status === 200 || status === 503,
};
log.debug("monitor", `[${monitor.name}] Axios Request: ${JSON.stringify(options)}`);
const res = await axios.request(options);
log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`);
if (res.status === 200) {
heartbeat.status = UP;
heartbeat.msg = "OK";
break;
} else if (res.status === 503) {
throw new Error(res.data.reason);
} else {
throw new Error(`${res.status} - ${res.statusText}`);
}
await this.checkSingleNode(monitor, baseUrl, `${nodeIndex}/${baseUrls.length}`);
// If checkSingleNode succeeds (doesn't throw), set heartbeat to UP
heartbeat.status = UP;
heartbeat.msg = baseUrls.length === 1 ? "Node is reachable and there are no alerts in the cluster" : `One of the ${baseUrls.length} nodes is reachable and there are no alerts in the cluster`;
return;
} catch (error) {
if (axios.isCancel(error)) {
log.debug("monitor", `[${monitor.name}] Request timed out`);
throw new Error("Request timed out");
} else {
log.debug("monitor", `[${monitor.name}] Axios Error: ${JSON.stringify(error.message)}`);
throw new Error(error.message);
}
log.warn(this.name, `Node ${nodeIndex}: ${error.message}`);
errors.push(`Node ${nodeIndex}: ${error.message}`);
}
}
// If we reach here, all nodes failed
throw new Error(`All ${errors.length} nodes failed because ${errors.join("; ")}`);
}
/**
* Check a single RabbitMQ node
* @param {object} monitor Monitor configuration
* @param {string} baseUrl Base URL of the RabbitMQ node
* @param {string} nodeInfo Node index info for logging (e.g., "1/3")
* @returns {Promise<void>}
* @throws {Error} If the node check fails
*/
async checkSingleNode(monitor, baseUrl, nodeInfo) {
// Without a trailing slash, path in baseUrl will be removed. https://example.com/api -> https://example.com
let normalizedUrl = baseUrl;
if (!normalizedUrl.endsWith("/")) {
normalizedUrl += "/";
}
const options = {
// Do not start with slash, it will strip the trailing slash from baseUrl
url: new URL("api/health/checks/alarms/", normalizedUrl).href,
method: "get",
timeout: monitor.timeout * 1000,
headers: {
"Accept": "application/json",
"Authorization": "Basic " + Buffer.from(`${monitor.rabbitmqUsername || ""}:${monitor.rabbitmqPassword || ""}`).toString("base64"),
},
signal: axiosAbortSignal((monitor.timeout + 10) * 1000),
// Capture reason for 503 status
validateStatus: (status) => status === 200 || status === 503,
};
log.debug("monitor", `[${monitor.name}] Checking node ${nodeInfo}: ${baseUrl}`);
try {
const res = await axios.request(options);
log.debug("monitor", `[${monitor.name}] Axios Response: status=${res.status} body=${JSON.stringify(res.data)}`);
if (res.status === 200) {
log.debug("monitor", `[${monitor.name}] Node ${nodeInfo} is healthy`);
// Success - return without error
} else if (res.status === 503) {
throw new Error(res.data.reason);
} else {
throw new Error(`${res.status} - ${res.statusText}`);
}
} catch (error) {
if (axios.isCancel(error)) {
throw new Error("Request timed out");
} else if (error.response) {
// Re-throw with the original error message if it's already formatted
throw error;
} else {
throw new Error(error.message);
}
}
}

@ -17,6 +17,7 @@ describe("RabbitMQ Single Node", {
rabbitmqNodes: JSON.stringify([ connectionString ]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
@ -27,7 +28,7 @@ describe("RabbitMQ Single Node", {
try {
await rabbitMQMonitor.check(monitor, heartbeat, {});
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "OK");
assert.strictEqual(heartbeat.msg, "Node is reachable and there are no alerts in the cluster");
} finally {
rabbitMQContainer.stop();
}
@ -39,6 +40,7 @@ describe("RabbitMQ Single Node", {
rabbitmqNodes: JSON.stringify([ "http://localhost:15672" ]),
rabbitmqUsername: "rabbitmqUser",
rabbitmqPassword: "rabbitmqPass",
timeout: 10,
};
const heartbeat = {
@ -55,4 +57,193 @@ describe("RabbitMQ Single Node", {
);
});
test("checkSingleNode() succeeds when node is healthy", async () => {
const rabbitMQContainer = await new RabbitMQContainer().withStartupTimeout(60000).start();
const rabbitMQMonitor = new RabbitMqMonitorType();
const connectionString = `http://${rabbitMQContainer.getHost()}:${rabbitMQContainer.getMappedPort(15672)}`;
const monitor = {
name: "Test Monitor",
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
try {
// Should not throw - just validates the node is healthy
await rabbitMQMonitor.checkSingleNode(monitor, connectionString, "1/1");
} finally {
rabbitMQContainer.stop();
}
});
test("checkSingleNode() throws error when node is unreachable", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
name: "Test Monitor",
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
// Should reject with any error (connection refused, timeout, etc.)
await assert.rejects(
rabbitMQMonitor.checkSingleNode(monitor, "http://localhost:15672", "1/1"),
Error
);
});
});
describe("RabbitMQ Multi-Node (Mocked)", () => {
test("check() succeeds when first node is healthy", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
msg: "",
status: PENDING,
};
// Mock checkSingleNode to succeed on first call (just don't throw)
let callCount = 0;
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
callCount++;
// Success - don't throw
};
await rabbitMQMonitor.check(monitor, heartbeat, {});
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster");
assert.strictEqual(callCount, 1, "Should only check first node");
});
test("check() succeeds when second node is healthy after first fails", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
rabbitmqNodes: JSON.stringify([ "http://node1:15672", "http://node2:15672" ]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
msg: "",
status: PENDING,
};
// Mock checkSingleNode to fail first, succeed second
let callCount = 0;
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
callCount++;
if (callCount === 1) {
throw new Error("Node 1 connection failed");
}
// Second call succeeds - don't throw
};
await rabbitMQMonitor.check(monitor, heartbeat, {});
assert.strictEqual(heartbeat.status, UP);
assert.strictEqual(heartbeat.msg, "One of the 2 nodes is reachable and there are no alerts in the cluster");
assert.strictEqual(callCount, 2, "Should check both nodes");
});
test("check() fails with consolidated error when all nodes are down", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
rabbitmqNodes: JSON.stringify([
"http://node1:15672",
"http://node2:15672",
"http://node3:15672"
]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
msg: "",
status: PENDING,
};
// Mock checkSingleNode to always fail
let callCount = 0;
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
callCount++;
throw new Error(`Connection failed to node ${callCount}`);
};
await assert.rejects(
rabbitMQMonitor.check(monitor, heartbeat, {}),
(error) => {
assert.match(error.message, /All 3 nodes failed/);
assert.match(error.message, /Node 1:/);
assert.match(error.message, /Node 2:/);
assert.match(error.message, /Node 3:/);
return true;
}
);
assert.strictEqual(callCount, 3, "Should check all three nodes");
});
test("check() fails when no nodes are configured", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
rabbitmqNodes: JSON.stringify([]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
msg: "",
status: PENDING,
};
await assert.rejects(
rabbitMQMonitor.check(monitor, heartbeat, {}),
/No RabbitMQ nodes configured/
);
});
test("check() tries all nodes before failing", async () => {
const rabbitMQMonitor = new RabbitMqMonitorType();
const monitor = {
rabbitmqNodes: JSON.stringify([
"http://node1:15672",
"http://node2:15672",
"http://node3:15672",
"http://node4:15672"
]),
rabbitmqUsername: "guest",
rabbitmqPassword: "guest",
timeout: 10,
};
const heartbeat = {
msg: "",
status: PENDING,
};
const checkedNodes = [];
rabbitMQMonitor.checkSingleNode = async (mon, url, nodeInfo) => {
checkedNodes.push(url);
throw new Error(`Failed: ${url}`);
};
await assert.rejects(
rabbitMQMonitor.check(monitor, heartbeat, {}),
/All 4 nodes failed/
);
assert.strictEqual(checkedNodes.length, 4, "Should check all 4 nodes");
assert.strictEqual(checkedNodes[0], "http://node1:15672");
assert.strictEqual(checkedNodes[1], "http://node2:15672");
assert.strictEqual(checkedNodes[2], "http://node3:15672");
assert.strictEqual(checkedNodes[3], "http://node4:15672");
});
});

Loading…
Cancel
Save