Adds a new monitor type that queries NTP servers via UDP, parses the response per RFC 5905, and checks stratum, time offset, and root dispersion against configurable thresholds. Includes knex migration, full backend test suite, and frontend UI in the Specific Monitor Type category. Resolves #5028pull/7214/head
parent
42e8b8fbbb
commit
2f7ae74847
@ -0,0 +1,15 @@
|
||||
exports.up = function (knex) {
|
||||
return knex.schema.alterTable("monitor", function (table) {
|
||||
table.integer("ntp_stratum_threshold").defaultTo(5);
|
||||
table.integer("ntp_time_offset_threshold").defaultTo(1000);
|
||||
table.integer("ntp_root_dispersion_threshold").defaultTo(500);
|
||||
});
|
||||
};
|
||||
|
||||
exports.down = function (knex) {
|
||||
return knex.schema.alterTable("monitor", function (table) {
|
||||
table.dropColumn("ntp_stratum_threshold");
|
||||
table.dropColumn("ntp_time_offset_threshold");
|
||||
table.dropColumn("ntp_root_dispersion_threshold");
|
||||
});
|
||||
};
|
||||
@ -0,0 +1,184 @@
|
||||
const { MonitorType } = require("./monitor-type");
|
||||
const { UP } = require("../../src/util");
|
||||
const dayjs = require("dayjs");
|
||||
const dgram = require("dgram");
|
||||
|
||||
/**
|
||||
* NTP Monitor Type
|
||||
* Monitors NTP servers for availability, time accuracy, and quality metrics
|
||||
*/
|
||||
class NTPMonitorType extends MonitorType {
|
||||
name = "ntp";
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
async check(monitor, heartbeat, _server) {
|
||||
const startTime = dayjs().valueOf();
|
||||
|
||||
if (!monitor.hostname) {
|
||||
throw new Error("Hostname is required");
|
||||
}
|
||||
|
||||
const port = monitor.port || 123;
|
||||
const timeout = (monitor.timeout || 10) * 1000;
|
||||
|
||||
const ntpResult = await this.queryNTP(monitor.hostname, port, timeout);
|
||||
|
||||
heartbeat.ping = dayjs().valueOf() - startTime;
|
||||
|
||||
const { stratum, offset, rootDispersion, refid, roundTripDelay } = ntpResult;
|
||||
|
||||
heartbeat.msg = `Stratum: ${stratum}, RefID: ${refid}, Offset: ${offset.toFixed(3)}ms, Delay: ${roundTripDelay.toFixed(3)}ms, Dispersion: ${rootDispersion.toFixed(3)}ms`;
|
||||
|
||||
if (stratum === 16) {
|
||||
throw new Error("NTP server is unsynchronized (stratum 16)");
|
||||
}
|
||||
|
||||
const stratumThreshold = monitor.ntp_stratum_threshold || 5;
|
||||
if (stratum >= stratumThreshold) {
|
||||
throw new Error(`Stratum ${stratum} meets or exceeds threshold ${stratumThreshold}`);
|
||||
}
|
||||
|
||||
const offsetThreshold = monitor.ntp_time_offset_threshold || 1000;
|
||||
if (Math.abs(offset) >= offsetThreshold) {
|
||||
throw new Error(`Time offset ${offset.toFixed(3)}ms exceeds threshold ${offsetThreshold}ms`);
|
||||
}
|
||||
|
||||
const dispersionThreshold = monitor.ntp_root_dispersion_threshold || 500;
|
||||
if (rootDispersion >= dispersionThreshold) {
|
||||
throw new Error(
|
||||
`Root dispersion ${rootDispersion.toFixed(3)}ms exceeds threshold ${dispersionThreshold}ms`
|
||||
);
|
||||
}
|
||||
|
||||
heartbeat.status = UP;
|
||||
}
|
||||
|
||||
/**
|
||||
* Query an NTP server via UDP
|
||||
* @param {string} hostname NTP server hostname or IP
|
||||
* @param {number} port NTP server port (usually 123)
|
||||
* @param {number} timeout Timeout in milliseconds
|
||||
* @returns {Promise<object>} Parsed NTP response data
|
||||
*/
|
||||
queryNTP(hostname, port, timeout) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const client = dgram.createSocket(hostname.includes(":") ? "udp6" : "udp4");
|
||||
const ntpPacket = this.createNTPPacket();
|
||||
|
||||
const NTP_EPOCH_OFFSET_MS = 2208988800000;
|
||||
const t1 = Date.now() + NTP_EPOCH_OFFSET_MS;
|
||||
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
client.close();
|
||||
reject(new Error("NTP request timed out"));
|
||||
}, timeout);
|
||||
|
||||
client.on("error", (err) => {
|
||||
clearTimeout(timeoutHandle);
|
||||
client.close();
|
||||
reject(new Error(`UDP socket error: ${err.message}`));
|
||||
});
|
||||
|
||||
client.on("message", (msg) => {
|
||||
clearTimeout(timeoutHandle);
|
||||
const t4 = Date.now() + NTP_EPOCH_OFFSET_MS;
|
||||
|
||||
try {
|
||||
const result = this.parseNTPResponse(msg, t1, t4);
|
||||
client.close();
|
||||
resolve(result);
|
||||
} catch (err) {
|
||||
client.close();
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
client.send(ntpPacket, 0, ntpPacket.length, port, hostname, (err) => {
|
||||
if (err) {
|
||||
clearTimeout(timeoutHandle);
|
||||
client.close();
|
||||
reject(new Error(`Failed to send NTP request: ${err.message}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an NTP version 3 client request packet (48 bytes)
|
||||
* Byte 0: LI=0 (no warning), VN=3 (NTPv3), Mode=3 (client) = 0x1B
|
||||
* @returns {Buffer} NTP request packet
|
||||
*/
|
||||
createNTPPacket() {
|
||||
const packet = Buffer.alloc(48);
|
||||
packet[0] = 0x1b;
|
||||
return packet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse an NTP response packet and calculate offset/delay
|
||||
* @param {Buffer} msg NTP response packet (48+ bytes)
|
||||
* @param {number} t1 Client originate timestamp in ms since NTP epoch (1900)
|
||||
* @param {number} t4 Client receive timestamp in ms since NTP epoch (1900)
|
||||
* @returns {object} Parsed NTP data including stratum, offset, refid, rootDispersion, roundTripDelay
|
||||
* @throws {Error} If the packet is shorter than 48 bytes
|
||||
*/
|
||||
parseNTPResponse(msg, t1, t4) {
|
||||
if (msg.length < 48) {
|
||||
throw new Error(`Invalid NTP response: expected 48+ bytes, got ${msg.length}`);
|
||||
}
|
||||
|
||||
const leapIndicator = (msg[0] >> 6) & 0x03;
|
||||
const stratum = msg[1];
|
||||
|
||||
// Root dispersion: 32-bit unsigned fixed-point at offset 8, unit = seconds
|
||||
const rootDispersionRaw = msg.readUInt32BE(8);
|
||||
const rootDispersion = (rootDispersionRaw / 65536) * 1000;
|
||||
|
||||
// Reference ID: ASCII for stratum 0-1, IPv4 address for stratum 2+
|
||||
let refid;
|
||||
if (stratum <= 1) {
|
||||
refid = msg.toString("ascii", 12, 16).replace(/\0/g, "").trim();
|
||||
} else {
|
||||
refid = `${msg[12]}.${msg[13]}.${msg[14]}.${msg[15]}`;
|
||||
}
|
||||
|
||||
// Server receive timestamp (T2) at offset 32
|
||||
const t2 = this.readNTPTimestamp(msg, 32);
|
||||
// Server transmit timestamp (T3) at offset 40
|
||||
const t3 = this.readNTPTimestamp(msg, 40);
|
||||
|
||||
// RFC 5905 offset and delay calculations
|
||||
// offset = ((T2 - T1) + (T3 - T4)) / 2
|
||||
// delay = (T4 - T1) - (T3 - T2)
|
||||
const offset = (t2 - t1 + (t3 - t4)) / 2;
|
||||
const roundTripDelay = t4 - t1 - (t3 - t2);
|
||||
|
||||
return {
|
||||
leapIndicator,
|
||||
stratum,
|
||||
rootDispersion,
|
||||
refid,
|
||||
offset,
|
||||
roundTripDelay,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a 64-bit NTP timestamp from a buffer and convert to milliseconds since NTP epoch
|
||||
* NTP timestamps are 32 bits of seconds + 32 bits of fractional seconds since 1900-01-01
|
||||
* @param {Buffer} buf Packet buffer
|
||||
* @param {number} offset Byte offset in the buffer
|
||||
* @returns {number} Timestamp in milliseconds since NTP epoch (1900)
|
||||
*/
|
||||
readNTPTimestamp(buf, offset) {
|
||||
const seconds = buf.readUInt32BE(offset);
|
||||
const fraction = buf.readUInt32BE(offset + 4);
|
||||
return seconds * 1000 + (fraction * 1000) / 0x100000000;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
NTPMonitorType,
|
||||
};
|
||||
@ -0,0 +1,263 @@
|
||||
const { describe, test } = require("node:test");
|
||||
const assert = require("node:assert/strict");
|
||||
const { NTPMonitorType } = require("../../server/monitor-types/ntp");
|
||||
const { UP } = require("../../src/util");
|
||||
|
||||
describe("NTPMonitorType", () => {
|
||||
const ntp = new NTPMonitorType();
|
||||
|
||||
test("createNTPPacket() returns a 48-byte buffer with correct header", () => {
|
||||
const packet = ntp.createNTPPacket();
|
||||
assert.strictEqual(packet.length, 48);
|
||||
// LI=0, VN=3, Mode=3 => 0x1B
|
||||
assert.strictEqual(packet[0], 0x1b);
|
||||
// Rest should be zeros
|
||||
for (let i = 1; i < 48; i++) {
|
||||
assert.strictEqual(packet[i], 0, `byte ${i} should be zero`);
|
||||
}
|
||||
});
|
||||
|
||||
test("readNTPTimestamp() correctly converts NTP timestamp to milliseconds", () => {
|
||||
const buf = Buffer.alloc(8);
|
||||
// 1 second since NTP epoch
|
||||
buf.writeUInt32BE(1, 0);
|
||||
buf.writeUInt32BE(0, 4);
|
||||
assert.strictEqual(ntp.readNTPTimestamp(buf, 0), 1000);
|
||||
|
||||
// 0.5 seconds fractional
|
||||
buf.writeUInt32BE(0, 0);
|
||||
buf.writeUInt32BE(0x80000000, 4);
|
||||
assert.strictEqual(ntp.readNTPTimestamp(buf, 0), 500);
|
||||
});
|
||||
|
||||
test("parseNTPResponse() extracts correct fields from a valid packet", () => {
|
||||
// Construct a minimal valid NTP response packet
|
||||
const msg = Buffer.alloc(48);
|
||||
|
||||
// Byte 0: LI=0, VN=3, Mode=4 (server) => 00 011 100 = 0x1C
|
||||
msg[0] = 0x1c;
|
||||
// Stratum 2
|
||||
msg[1] = 2;
|
||||
|
||||
// Root dispersion at offset 8: 0.5 seconds = 0x00008000
|
||||
msg.writeUInt32BE(0x00008000, 8);
|
||||
|
||||
// Reference ID at offset 12: "GPS\0" for stratum 1 test not applicable here
|
||||
// For stratum 2, it's an IPv4 address
|
||||
msg[12] = 192;
|
||||
msg[13] = 168;
|
||||
msg[14] = 1;
|
||||
msg[15] = 1;
|
||||
|
||||
// Server receive timestamp (T2) at offset 32: 3912710400 seconds (approx 2024-01-01)
|
||||
msg.writeUInt32BE(3912710400, 32);
|
||||
msg.writeUInt32BE(0, 36);
|
||||
|
||||
// Server transmit timestamp (T3) at offset 40: same + 1ms
|
||||
msg.writeUInt32BE(3912710400, 40);
|
||||
msg.writeUInt32BE(4294967, 44); // ~1ms in fractional seconds
|
||||
|
||||
const t1 = 3912710400 * 1000; // Client originate in ms since NTP epoch
|
||||
const t4 = 3912710400 * 1000 + 50; // Client receive 50ms later
|
||||
|
||||
const result = ntp.parseNTPResponse(msg, t1, t4);
|
||||
|
||||
assert.strictEqual(result.stratum, 2);
|
||||
assert.strictEqual(result.leapIndicator, 0);
|
||||
assert.strictEqual(result.refid, "192.168.1.1");
|
||||
// Root dispersion: 0x8000 / 65536 * 1000 = 500ms
|
||||
assert.ok(
|
||||
Math.abs(result.rootDispersion - 500) < 0.1,
|
||||
`rootDispersion should be ~500ms, got ${result.rootDispersion}`
|
||||
);
|
||||
assert.strictEqual(typeof result.offset, "number");
|
||||
assert.strictEqual(typeof result.roundTripDelay, "number");
|
||||
});
|
||||
|
||||
test("parseNTPResponse() parses ASCII refid for stratum 1", () => {
|
||||
const msg = Buffer.alloc(48);
|
||||
msg[0] = 0x1c;
|
||||
msg[1] = 1; // Stratum 1
|
||||
msg.write("GPS\0", 12, "ascii");
|
||||
// Set timestamps to avoid NaN
|
||||
msg.writeUInt32BE(3912710400, 32);
|
||||
msg.writeUInt32BE(0, 36);
|
||||
msg.writeUInt32BE(3912710400, 40);
|
||||
msg.writeUInt32BE(0, 44);
|
||||
|
||||
const t1 = 3912710400 * 1000;
|
||||
const t4 = t1 + 10;
|
||||
|
||||
const result = ntp.parseNTPResponse(msg, t1, t4);
|
||||
assert.strictEqual(result.stratum, 1);
|
||||
assert.strictEqual(result.refid, "GPS");
|
||||
});
|
||||
|
||||
test("parseNTPResponse() rejects packets shorter than 48 bytes", () => {
|
||||
const short = Buffer.alloc(20);
|
||||
assert.throws(() => ntp.parseNTPResponse(short, 0, 0), /expected 48\+ bytes/);
|
||||
});
|
||||
|
||||
test("check() throws for stratum 16 (unsynchronized)", async () => {
|
||||
const monitor = {
|
||||
hostname: "localhost",
|
||||
port: 123,
|
||||
timeout: 5,
|
||||
ntp_stratum_threshold: 5,
|
||||
ntp_time_offset_threshold: 1000,
|
||||
ntp_root_dispersion_threshold: 500,
|
||||
};
|
||||
const heartbeat = {};
|
||||
|
||||
// Stub queryNTP to return stratum 16
|
||||
const originalQuery = ntp.queryNTP;
|
||||
ntp.queryNTP = async () => ({
|
||||
stratum: 16,
|
||||
offset: 0,
|
||||
rootDispersion: 10,
|
||||
refid: "INIT",
|
||||
roundTripDelay: 5,
|
||||
leapIndicator: 3,
|
||||
});
|
||||
|
||||
try {
|
||||
await assert.rejects(() => ntp.check(monitor, heartbeat, null), /unsynchronized.*stratum 16/);
|
||||
} finally {
|
||||
ntp.queryNTP = originalQuery;
|
||||
}
|
||||
});
|
||||
|
||||
test("check() throws when stratum exceeds threshold", async () => {
|
||||
const monitor = {
|
||||
hostname: "localhost",
|
||||
port: 123,
|
||||
timeout: 5,
|
||||
ntp_stratum_threshold: 2,
|
||||
ntp_time_offset_threshold: 1000,
|
||||
ntp_root_dispersion_threshold: 500,
|
||||
};
|
||||
const heartbeat = {};
|
||||
|
||||
const originalQuery = ntp.queryNTP;
|
||||
ntp.queryNTP = async () => ({
|
||||
stratum: 3,
|
||||
offset: 0.5,
|
||||
rootDispersion: 10,
|
||||
refid: "GPS",
|
||||
roundTripDelay: 5,
|
||||
leapIndicator: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
await assert.rejects(() => ntp.check(monitor, heartbeat, null), /Stratum 3 meets or exceeds threshold 2/);
|
||||
} finally {
|
||||
ntp.queryNTP = originalQuery;
|
||||
}
|
||||
});
|
||||
|
||||
test("check() throws when offset exceeds threshold", async () => {
|
||||
const monitor = {
|
||||
hostname: "localhost",
|
||||
port: 123,
|
||||
timeout: 5,
|
||||
ntp_stratum_threshold: 5,
|
||||
ntp_time_offset_threshold: 100,
|
||||
ntp_root_dispersion_threshold: 500,
|
||||
};
|
||||
const heartbeat = {};
|
||||
|
||||
const originalQuery = ntp.queryNTP;
|
||||
ntp.queryNTP = async () => ({
|
||||
stratum: 2,
|
||||
offset: -150.5,
|
||||
rootDispersion: 10,
|
||||
refid: "GPS",
|
||||
roundTripDelay: 5,
|
||||
leapIndicator: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
await assert.rejects(() => ntp.check(monitor, heartbeat, null), /Time offset.*exceeds threshold 100ms/);
|
||||
} finally {
|
||||
ntp.queryNTP = originalQuery;
|
||||
}
|
||||
});
|
||||
|
||||
test("check() throws when dispersion exceeds threshold", async () => {
|
||||
const monitor = {
|
||||
hostname: "localhost",
|
||||
port: 123,
|
||||
timeout: 5,
|
||||
ntp_stratum_threshold: 5,
|
||||
ntp_time_offset_threshold: 1000,
|
||||
ntp_root_dispersion_threshold: 50,
|
||||
};
|
||||
const heartbeat = {};
|
||||
|
||||
const originalQuery = ntp.queryNTP;
|
||||
ntp.queryNTP = async () => ({
|
||||
stratum: 2,
|
||||
offset: 1.5,
|
||||
rootDispersion: 100,
|
||||
refid: "GPS",
|
||||
roundTripDelay: 5,
|
||||
leapIndicator: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
await assert.rejects(() => ntp.check(monitor, heartbeat, null), /Root dispersion.*exceeds threshold 50ms/);
|
||||
} finally {
|
||||
ntp.queryNTP = originalQuery;
|
||||
}
|
||||
});
|
||||
|
||||
test("check() sets heartbeat UP when all thresholds pass", async () => {
|
||||
const monitor = {
|
||||
hostname: "localhost",
|
||||
port: 123,
|
||||
timeout: 5,
|
||||
ntp_stratum_threshold: 5,
|
||||
ntp_time_offset_threshold: 1000,
|
||||
ntp_root_dispersion_threshold: 500,
|
||||
};
|
||||
const heartbeat = {};
|
||||
|
||||
const originalQuery = ntp.queryNTP;
|
||||
ntp.queryNTP = async () => ({
|
||||
stratum: 2,
|
||||
offset: 1.5,
|
||||
rootDispersion: 10.2,
|
||||
refid: "GPS",
|
||||
roundTripDelay: 5.3,
|
||||
leapIndicator: 0,
|
||||
});
|
||||
|
||||
try {
|
||||
await ntp.check(monitor, heartbeat, null);
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
assert.match(heartbeat.msg, /Stratum: 2/);
|
||||
assert.match(heartbeat.msg, /RefID: GPS/);
|
||||
assert.match(heartbeat.msg, /Offset: 1\.500ms/);
|
||||
assert.match(heartbeat.msg, /Dispersion: 10\.200ms/);
|
||||
assert.strictEqual(typeof heartbeat.ping, "number");
|
||||
} finally {
|
||||
ntp.queryNTP = originalQuery;
|
||||
}
|
||||
});
|
||||
|
||||
test(
|
||||
"queryNTP() can reach a public NTP server",
|
||||
{
|
||||
skip: !!process.env.CI,
|
||||
},
|
||||
async () => {
|
||||
const result = await ntp.queryNTP("time.google.com", 123, 10000);
|
||||
assert.strictEqual(typeof result.stratum, "number");
|
||||
assert.ok(result.stratum >= 1 && result.stratum <= 15, `stratum should be 1-15, got ${result.stratum}`);
|
||||
assert.strictEqual(typeof result.offset, "number");
|
||||
assert.strictEqual(typeof result.roundTripDelay, "number");
|
||||
assert.strictEqual(typeof result.rootDispersion, "number");
|
||||
assert.strictEqual(typeof result.refid, "string");
|
||||
}
|
||||
);
|
||||
});
|
||||
Loading…
Reference in new issue