@ -2,10 +2,22 @@ const { MonitorType } = require("./monitor-type");
const { log , UP } = require ( "../../src/util" ) ;
const mqtt = require ( "mqtt" ) ;
const jsonata = require ( "jsonata" ) ;
const { ConditionVariable } = require ( "../monitor-conditions/variables" ) ;
const { defaultStringOperators , defaultNumberOperators } = require ( "../monitor-conditions/operators" ) ;
const { ConditionExpressionGroup } = require ( "../monitor-conditions/expression" ) ;
const { evaluateExpressionGroup } = require ( "../monitor-conditions/evaluator" ) ;
class MqttMonitorType extends MonitorType {
name = "mqtt" ;
supportsConditions = true ;
conditionVariables = [
new ConditionVariable ( "topic" , defaultStringOperators ) ,
new ConditionVariable ( "message" , defaultStringOperators ) ,
new ConditionVariable ( "json_value" , defaultStringOperators . concat ( defaultNumberOperators ) ) ,
] ;
/ * *
* @ inheritdoc
* /
@ -19,32 +31,98 @@ class MqttMonitorType extends MonitorType {
} ) ;
if ( monitor . mqttCheckType == null || monitor . mqttCheckType === "" ) {
// use old default
monitor . mqttCheckType = "keyword" ;
}
if ( monitor . mqttCheckType === "keyword" ) {
if ( receivedMessage != null && receivedMessage . includes ( monitor . mqttSuccessMessage ) ) {
heartbeat . msg = ` Topic: ${ messageTopic } ; Message: ${ receivedMessage } ` ;
heartbeat . status = UP ;
} else {
throw Error ( ` Message Mismatch - Topic: ${ monitor . mqttTopic } ; Message: ${ receivedMessage } ` ) ;
}
// Check if conditions are defined
const conditions = monitor . conditions ? ConditionExpressionGroup . fromMonitor ( monitor ) : null ;
const hasConditions = conditions && conditions . children && conditions . children . length > 0 ;
if ( hasConditions ) {
await this . checkConditions ( monitor , heartbeat , messageTopic , receivedMessage , conditions ) ;
} else if ( monitor . mqttCheckType === "keyword" ) {
this . checkKeyword ( monitor , heartbeat , messageTopic , receivedMessage ) ;
} else if ( monitor . mqttCheckType === "json-query" ) {
const parsedMessage = JSON . parse ( receivedMessage ) ;
await this . checkJsonQuery ( monitor , heartbeat , receivedMessage ) ;
} else {
throw new Error ( "Unknown MQTT Check Type" ) ;
}
}
let expression = jsonata ( monitor . jsonPath ) ;
/ * *
* Check using keyword matching
* @ param { object } monitor Monitor object
* @ param { object } heartbeat Heartbeat object
* @ param { string } messageTopic Received MQTT topic
* @ param { string } receivedMessage Received MQTT message
* @ returns { void }
* @ throws { Error } If keyword is not found in message
* /
checkKeyword ( monitor , heartbeat , messageTopic , receivedMessage ) {
if ( receivedMessage != null && receivedMessage . includes ( monitor . mqttSuccessMessage ) ) {
heartbeat . msg = ` Topic: ${ messageTopic } ; Message: ${ receivedMessage } ` ;
heartbeat . status = UP ;
} else {
throw new Error ( ` Message Mismatch - Topic: ${ monitor . mqttTopic } ; Message: ${ receivedMessage } ` ) ;
}
}
let result = await expression . evaluate ( parsedMessage ) ;
/ * *
* Check using JSONata query
* @ param { object } monitor Monitor object
* @ param { object } heartbeat Heartbeat object
* @ param { string } receivedMessage Received MQTT message
* @ returns { Promise < void > }
* /
async checkJsonQuery ( monitor , heartbeat , receivedMessage ) {
const parsedMessage = JSON . parse ( receivedMessage ) ;
const expression = jsonata ( monitor . jsonPath ) ;
const result = await expression . evaluate ( parsedMessage ) ;
if ( result ? . toString ( ) === monitor . expectedValue ) {
heartbeat . msg = "Message received, expected value is found" ;
heartbeat . status = UP ;
} else {
throw new Error ( "Message received but value is not equal to expected value, value was: [" + result + "]" ) ;
}
}
if ( result ? . toString ( ) === monitor . expectedValue ) {
heartbeat . msg = "Message received, expected value is found" ;
heartbeat . status = UP ;
} else {
throw new Error ( "Message received but value is not equal to expected value, value was: [" + result + "]" ) ;
/ * *
* Check using conditions system
* @ param { object } monitor Monitor object
* @ param { object } heartbeat Heartbeat object
* @ param { string } messageTopic Received MQTT topic
* @ param { string } receivedMessage Received MQTT message
* @ param { ConditionExpressionGroup } conditions Parsed conditions
* @ returns { Promise < void > }
* /
async checkConditions ( monitor , heartbeat , messageTopic , receivedMessage , conditions ) {
let jsonValue = null ;
// Parse JSON and extract value if jsonPath is defined
if ( monitor . jsonPath ) {
try {
const parsedMessage = JSON . parse ( receivedMessage ) ;
const expression = jsonata ( monitor . jsonPath ) ;
jsonValue = await expression . evaluate ( parsedMessage ) ;
} catch ( e ) {
// JSON parsing failed, jsonValue remains null
}
}
const conditionData = {
topic : messageTopic ,
message : receivedMessage ,
json _value : jsonValue ? . toString ( ) ? ? "" ,
} ;
const conditionsResult = evaluateExpressionGroup ( conditions , conditionData ) ;
if ( conditionsResult ) {
heartbeat . msg = ` Topic: ${ messageTopic } ; Message: ${ receivedMessage } ` ;
heartbeat . status = UP ;
} else {
throw Error ( "Unknown MQTT Check Type" ) ;
throw new Error ( ` Conditions not met - Topic: ${ messageTopic } ; Message: ${ receivedMessage } ` ) ;
}
}