migrated grpc keyword to the newer monitoringtype (#4821)
Co-authored-by: Louis Lam <louislam@users.noreply.github.com>pull/5179/head^2
parent
46b07953ad
commit
b230ab0a06
@ -0,0 +1,89 @@
|
||||
const { MonitorType } = require("./monitor-type");
|
||||
const { UP, log } = require("../../src/util");
|
||||
const dayjs = require("dayjs");
|
||||
const grpc = require("@grpc/grpc-js");
|
||||
const protojs = require("protobufjs");
|
||||
|
||||
class GrpcKeywordMonitorType extends MonitorType {
|
||||
name = "grpc-keyword";
|
||||
|
||||
/**
|
||||
* @inheritdoc
|
||||
*/
|
||||
async check(monitor, heartbeat, _server) {
|
||||
const startTime = dayjs().valueOf();
|
||||
const service = this.constructGrpcService(monitor.grpcUrl, monitor.grpcProtobuf, monitor.grpcServiceName, monitor.grpcEnableTls);
|
||||
let response = await this.grpcQuery(service, monitor.grpcMethod, monitor.grpcBody);
|
||||
heartbeat.ping = dayjs().valueOf() - startTime;
|
||||
log.debug(this.name, "gRPC response:", response);
|
||||
let keywordFound = response.toString().includes(monitor.keyword);
|
||||
if (keywordFound !== !monitor.isInvertKeyword()) {
|
||||
log.debug(this.name, `GRPC response [${response}] + ", but keyword [${monitor.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${response} + "]"`);
|
||||
|
||||
let truncatedResponse = (response.length > 50) ? response.toString().substring(0, 47) + "..." : response;
|
||||
|
||||
throw new Error(`keyword [${monitor.keyword}] is ${keywordFound ? "present" : "not"} in [" + ${truncatedResponse} + "]`);
|
||||
}
|
||||
heartbeat.status = UP;
|
||||
heartbeat.msg = `${response}, keyword [${monitor.keyword}] ${keywordFound ? "is" : "not"} found`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create gRPC client
|
||||
* @param {string} url grpc Url
|
||||
* @param {string} protobufData grpc ProtobufData
|
||||
* @param {string} serviceName grpc ServiceName
|
||||
* @param {string} enableTls grpc EnableTls
|
||||
* @returns {grpc.Service} grpc Service
|
||||
*/
|
||||
constructGrpcService(url, protobufData, serviceName, enableTls) {
|
||||
const protocObject = protojs.parse(protobufData);
|
||||
const protoServiceObject = protocObject.root.lookupService(serviceName);
|
||||
const Client = grpc.makeGenericClientConstructor({});
|
||||
const credentials = enableTls ? grpc.credentials.createSsl() : grpc.credentials.createInsecure();
|
||||
const client = new Client(url, credentials);
|
||||
return protoServiceObject.create((method, requestData, cb) => {
|
||||
const fullServiceName = method.fullName;
|
||||
const serviceFQDN = fullServiceName.split(".");
|
||||
const serviceMethod = serviceFQDN.pop();
|
||||
const serviceMethodClientImpl = `/${serviceFQDN.slice(1).join(".")}/${serviceMethod}`;
|
||||
log.debug(this.name, `gRPC method ${serviceMethodClientImpl}`);
|
||||
client.makeUnaryRequest(
|
||||
serviceMethodClientImpl,
|
||||
arg => arg,
|
||||
arg => arg,
|
||||
requestData,
|
||||
cb);
|
||||
}, false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create gRPC client stib
|
||||
* @param {grpc.Service} service grpc Url
|
||||
* @param {string} method grpc Method
|
||||
* @param {string} body grpc Body
|
||||
* @returns {Promise<string>} Result of gRPC query
|
||||
*/
|
||||
async grpcQuery(service, method, body) {
|
||||
return new Promise((resolve, reject) => {
|
||||
try {
|
||||
service[method](JSON.parse(body), (err, response) => {
|
||||
if (err) {
|
||||
if (err.code !== 1) {
|
||||
reject(err);
|
||||
}
|
||||
log.debug(this.name, `ignoring ${err.code} ${err.details}, as code=1 is considered OK`);
|
||||
resolve(`${err.code} is considered OK because ${err.details}`);
|
||||
}
|
||||
resolve(JSON.stringify(response));
|
||||
});
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
GrpcKeywordMonitorType,
|
||||
};
|
||||
@ -0,0 +1,306 @@
|
||||
const { describe, test } = require("node:test");
|
||||
const assert = require("node:assert");
|
||||
const grpc = require("@grpc/grpc-js");
|
||||
const protoLoader = require("@grpc/proto-loader");
|
||||
const { GrpcKeywordMonitorType } = require("../../server/monitor-types/grpc");
|
||||
const { UP, PENDING } = require("../../src/util");
|
||||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
const os = require("os");
|
||||
|
||||
const testProto = `
|
||||
syntax = "proto3";
|
||||
package test;
|
||||
|
||||
service TestService {
|
||||
rpc Echo (EchoRequest) returns (EchoResponse);
|
||||
}
|
||||
|
||||
message EchoRequest {
|
||||
string message = 1;
|
||||
}
|
||||
|
||||
message EchoResponse {
|
||||
string message = 1;
|
||||
}
|
||||
`;
|
||||
|
||||
/**
|
||||
* Create a gRPC server for testing
|
||||
* @param {number} port Port to listen on
|
||||
* @param {object} methodHandlers Object with method handlers
|
||||
* @returns {Promise<grpc.Server>} gRPC server instance
|
||||
*/
|
||||
async function createTestGrpcServer(port, methodHandlers) {
|
||||
// Write proto to temp file
|
||||
const tmpDir = os.tmpdir();
|
||||
const protoPath = path.join(tmpDir, `test-${port}.proto`);
|
||||
fs.writeFileSync(protoPath, testProto);
|
||||
|
||||
// Load proto file
|
||||
const packageDefinition = protoLoader.loadSync(protoPath, {
|
||||
keepCase: true,
|
||||
longs: String,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true
|
||||
});
|
||||
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
|
||||
const testPackage = protoDescriptor.test;
|
||||
|
||||
const server = new grpc.Server();
|
||||
|
||||
// Add service implementation
|
||||
server.addService(testPackage.TestService.service, {
|
||||
Echo: (call, callback) => {
|
||||
if (methodHandlers.Echo) {
|
||||
methodHandlers.Echo(call, callback);
|
||||
} else {
|
||||
callback(null, { message: call.request.message });
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
server.bindAsync(
|
||||
`0.0.0.0:${port}`,
|
||||
grpc.ServerCredentials.createInsecure(),
|
||||
(err) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
} else {
|
||||
server.start();
|
||||
// Clean up temp file
|
||||
fs.unlinkSync(protoPath);
|
||||
resolve(server);
|
||||
}
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
describe("GrpcKeywordMonitorType", {
|
||||
skip: !!process.env.CI && (process.platform !== "linux" || process.arch !== "x64"),
|
||||
}, () => {
|
||||
test("gRPC keyword found in response", async () => {
|
||||
const port = 50051;
|
||||
const server = await createTestGrpcServer(port, {
|
||||
Echo: (call, callback) => {
|
||||
callback(null, { message: "Hello World with SUCCESS keyword" });
|
||||
}
|
||||
});
|
||||
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: `localhost:${port}`,
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "SUCCESS",
|
||||
invertKeyword: false,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => false,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
try {
|
||||
await grpcMonitor.check(monitor, heartbeat, {});
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
assert.ok(heartbeat.msg.includes("SUCCESS"));
|
||||
assert.ok(heartbeat.msg.includes("is"));
|
||||
} finally {
|
||||
server.forceShutdown();
|
||||
}
|
||||
});
|
||||
|
||||
test("gRPC keyword not found in response", async () => {
|
||||
const port = 50052;
|
||||
const server = await createTestGrpcServer(port, {
|
||||
Echo: (call, callback) => {
|
||||
callback(null, { message: "Hello World without the expected keyword" });
|
||||
}
|
||||
});
|
||||
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: `localhost:${port}`,
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "MISSING",
|
||||
invertKeyword: false,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => false,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
try {
|
||||
await assert.rejects(
|
||||
grpcMonitor.check(monitor, heartbeat, {}),
|
||||
(err) => {
|
||||
assert.ok(err.message.includes("MISSING"));
|
||||
assert.ok(err.message.includes("not"));
|
||||
return true;
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
server.forceShutdown();
|
||||
}
|
||||
});
|
||||
|
||||
test("gRPC inverted keyword - keyword present (should fail)", async () => {
|
||||
const port = 50053;
|
||||
const server = await createTestGrpcServer(port, {
|
||||
Echo: (call, callback) => {
|
||||
callback(null, { message: "Response with ERROR keyword" });
|
||||
}
|
||||
});
|
||||
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: `localhost:${port}`,
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "ERROR",
|
||||
invertKeyword: true,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => true,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
try {
|
||||
await assert.rejects(
|
||||
grpcMonitor.check(monitor, heartbeat, {}),
|
||||
(err) => {
|
||||
assert.ok(err.message.includes("ERROR"));
|
||||
assert.ok(err.message.includes("present"));
|
||||
return true;
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
server.forceShutdown();
|
||||
}
|
||||
});
|
||||
|
||||
test("gRPC inverted keyword - keyword not present (should pass)", async () => {
|
||||
const port = 50054;
|
||||
const server = await createTestGrpcServer(port, {
|
||||
Echo: (call, callback) => {
|
||||
callback(null, { message: "Response without error keyword" });
|
||||
}
|
||||
});
|
||||
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: `localhost:${port}`,
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "ERROR",
|
||||
invertKeyword: true,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => true,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
try {
|
||||
await grpcMonitor.check(monitor, heartbeat, {});
|
||||
assert.strictEqual(heartbeat.status, UP);
|
||||
assert.ok(heartbeat.msg.includes("ERROR"));
|
||||
assert.ok(heartbeat.msg.includes("not"));
|
||||
} finally {
|
||||
server.forceShutdown();
|
||||
}
|
||||
});
|
||||
|
||||
test("gRPC connection failure", async () => {
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: "localhost:50099",
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "SUCCESS",
|
||||
invertKeyword: false,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => false,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
await assert.rejects(
|
||||
grpcMonitor.check(monitor, heartbeat, {}),
|
||||
(err) => {
|
||||
// Should fail with connection error
|
||||
return true;
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
test("gRPC response truncation for long messages", async () => {
|
||||
const port = 50055;
|
||||
const longMessage = "A".repeat(100) + " with SUCCESS keyword";
|
||||
|
||||
const server = await createTestGrpcServer(port, {
|
||||
Echo: (call, callback) => {
|
||||
callback(null, { message: longMessage });
|
||||
}
|
||||
});
|
||||
|
||||
const grpcMonitor = new GrpcKeywordMonitorType();
|
||||
const monitor = {
|
||||
grpcUrl: `localhost:${port}`,
|
||||
grpcProtobuf: testProto,
|
||||
grpcServiceName: "test.TestService",
|
||||
grpcMethod: "echo",
|
||||
grpcBody: JSON.stringify({ message: "test" }),
|
||||
keyword: "MISSING",
|
||||
invertKeyword: false,
|
||||
grpcEnableTls: false,
|
||||
isInvertKeyword: () => false,
|
||||
};
|
||||
|
||||
const heartbeat = {
|
||||
msg: "",
|
||||
status: PENDING,
|
||||
};
|
||||
|
||||
try {
|
||||
await assert.rejects(
|
||||
grpcMonitor.check(monitor, heartbeat, {}),
|
||||
(err) => {
|
||||
// Should truncate message to 50 characters with "..."
|
||||
assert.ok(err.message.includes("..."));
|
||||
return true;
|
||||
}
|
||||
);
|
||||
} finally {
|
||||
server.forceShutdown();
|
||||
}
|
||||
});
|
||||
});
|
||||
@ -0,0 +1,7 @@
|
||||
syntax = "proto3";
|
||||
package echo;
|
||||
service EchoService {
|
||||
rpc Echo (EchoRequest) returns (EchoResponse);
|
||||
}
|
||||
message EchoRequest { string message = 1; }
|
||||
message EchoResponse { string message = 1; }
|
||||
@ -0,0 +1,22 @@
|
||||
const grpc = require("@grpc/grpc-js");
|
||||
const protoLoader = require("@grpc/proto-loader");
|
||||
const packageDef = protoLoader.loadSync("echo.proto", {});
|
||||
const grpcObject = grpc.loadPackageDefinition(packageDef);
|
||||
const { echo } = grpcObject;
|
||||
|
||||
/**
|
||||
* Echo service implementation
|
||||
* @param {object} call Call object
|
||||
* @param {Function} callback Callback function
|
||||
* @returns {void}
|
||||
*/
|
||||
function Echo(call, callback) {
|
||||
callback(null, { message: call.request.message });
|
||||
}
|
||||
|
||||
const server = new grpc.Server();
|
||||
server.addService(echo.EchoService.service, { Echo });
|
||||
server.bindAsync("0.0.0.0:50051", grpc.ServerCredentials.createInsecure(), () => {
|
||||
console.log("gRPC server running on :50051");
|
||||
server.start();
|
||||
});
|
||||
Loading…
Reference in new issue