First POC for integration with ReadySet

Monitor is able to detect if a backend is a ReadySet server, and it enables special monitoring.
The special monitoring hacks replication lag checks: a ReadySet server is "monitored for
replication lag" , but it has a special query and a special handler.
The query for check is "SHOW READYSET STATUS" , and the `Status` line is processed:
* Online: the backend is configured as ONLINE
* Maintenance* : the backend is configured as OFFLINE_SOFT
* anything else, or failed check: SHUNNED

A new monitor table is also added: `readyset_status_log` .
It has a similar structure of `mysql_server_replication_lag_log` , but instead of storing `repl_lag` (replication lag)
the full output of `SHOW READYSET STATUS` is saved as a JSON (that can be queried using `JSON_EXTRACT()`
pull/4851/head
René Cannaò 1 year ago
parent edc84ea14b
commit d641ca89be

@ -1074,6 +1074,7 @@ class MySQL_HostGroups_Manager : public Base_HostGroups_Manager<MyHGC> {
void wait_servers_table_version(unsigned, unsigned);
bool shun_and_killall(char *hostname, int port);
void set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us);
void set_ReadySet_status(char *hostname, int port, enum MySerStatus status);
unsigned long long Get_Memory_Stats();
void add_discovered_servers_to_mysql_servers_and_replication_hostgroups(const vector<tuple<string, int, int>>& new_servers);

@ -22,6 +22,8 @@
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
#define MONITOR_SQLITE_TABLE_READYSET_STATUS_LOG "CREATE TABLE readyset_status_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , status VARCHAR , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG "CREATE TABLE mysql_server_group_replication_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"
//#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_GALERA_LOG "CREATE TABLE mysql_server_galera_log (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start_us INT NOT NULL DEFAULT 0 , success_time_us INT DEFAULT 0 , viable_candidate VARCHAR NOT NULL DEFAULT 'NO' , read_only VARCHAR NOT NULL DEFAULT 'YES' , transactions_behind INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start_us))"

@ -4095,6 +4095,25 @@ void MySQL_HostGroups_Manager::set_server_current_latency_us(char *hostname, int
wrunlock();
}
void MySQL_HostGroups_Manager::set_ReadySet_status(char *hostname, int port, enum MySerStatus status) {
wrlock();
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
unsigned int j;
unsigned int l=myhgc->mysrvs->cnt();
if (l) {
for (j=0; j<l; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->port==port && strcmp(mysrvc->address,hostname)==0) {
mysrvc->set_status(status);
}
}
}
}
wrunlock();
}
void MySQL_HostGroups_Manager::p_update_metrics() {
p_update_counter(status.p_counter_array[p_hg_counter::servers_table_version], status.servers_table_version);
// Update *server_connections* related metrics

@ -40,6 +40,8 @@ using json = nlohmann::json;
#endif /* DEBUG */
#define MYSQL_MONITOR_VERSION "2.0.1226" DEB
#define SERVER_VERSION_READYSET "readyset"
#ifdef DEBUG
//#define VALGRIND_ENABLE_ERROR_REPORTING
//#define VALGRIND_DISABLE_ERROR_REPORTING
@ -55,6 +57,29 @@ extern ProxySQL_Cluster* GloProxyCluster;
static MySQL_Monitor *GloMyMon;
struct ServerInfo {
std::string ipAddress;
int port;
// Default constructor (important for some operations)
ServerInfo() : ipAddress(""), port(0) {}
ServerInfo(const std::string& ip, int p) : ipAddress(ip), port(p) {}
// Overload the < operator for std::set to compare ServerInfo objects
bool operator<(const ServerInfo& other) const {
if (ipAddress < other.ipAddress) {
return true;
} else if (ipAddress == other.ipAddress) {
return port < other.port;
} else {
return false;
}
}
};
static std::set<ServerInfo> ReadySet_Servers;
#define SAFE_SQLITE3_STEP(_stmt) do {\
do {\
rc=(*proxy_sqlite3_step)(_stmt);\
@ -702,6 +727,9 @@ void MySQL_Monitor_State_Data::init_async() {
query_ = "SHOW REPLICA STATUS";
}
}
if (strcasestr(mysql->server_version, (const char *)SERVER_VERSION_READYSET) != NULL) {
query_ = "SHOW READYSET STATUS";
}
#endif
task_timeout_ = mysql_thread___monitor_replication_lag_timeout;
task_handler_ = &MySQL_Monitor_State_Data::replication_lag_handler;
@ -1066,6 +1094,7 @@ MySQL_Monitor::MySQL_Monitor() {
insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG);
insert_into_tables_defs(tables_defs_monitor,"readyset_status_log", MONITOR_SQLITE_TABLE_READYSET_STATUS_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_group_replication_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GROUP_REPLICATION_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_galera_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_GALERA_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_aws_aurora_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_AWS_AURORA_LOG);
@ -1080,6 +1109,7 @@ MySQL_Monitor::MySQL_Monitor() {
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS readyset_status_log_time_start ON readyset_status_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_group_replication_log_time_start ON mysql_server_group_replication_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_galera_log_time_start ON mysql_server_galera_log (time_start_us)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_aws_aurora_log_time_start ON mysql_server_aws_aurora_log (time_start_us)");
@ -2678,6 +2708,9 @@ void * monitor_replication_lag_thread(void *arg) {
mmsd->t1=start_time;
string server_version = "";
string query = "SHOW SLAVE STATUS";
bool crc=false;
if (mmsd->mysql==NULL) { // we don't have a connection, let's create it
bool rc;
@ -2700,6 +2733,8 @@ void * monitor_replication_lag_thread(void *arg) {
mmsd->t1=monotonic_time();
mmsd->interr=0; // reset the value
if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version);
#ifdef TEST_REPLICATIONLAG
{
const string REPLICA { mysql_get_server_version(mmsd->mysql) < 80023 ? "SLAVE" : "REPLICA" };
@ -2716,16 +2751,20 @@ void * monitor_replication_lag_thread(void *arg) {
char *base_query = (char *)"SELECT MAX(ROUND(TIMESTAMPDIFF(MICROSECOND, ts, SYSDATE(6))/1000000)) AS Seconds_Behind_Master FROM %s";
char *replication_query = (char *)malloc(strlen(base_query)+l);
sprintf(replication_query,base_query,percona_heartbeat_table);
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,replication_query);
query = string(replication_query);
free(replication_query);
}
}
if (use_percona_heartbeat == false) {
const char* query {
mysql_get_server_version(mmsd->mysql) < 80023 ? "SHOW SLAVE STATUS" : "SHOW REPLICA STATUS"
};
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql, query);
query = "SHOW SLAVE STATUS";
if (mysql_get_server_version(mmsd->mysql) >= 80023) {
query = "SHOW REPLICA STATUS";
}
}
if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) {
query = "SHOW READYSET STATUS";
}
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,query.c_str());
#endif // TEST_REPLICATIONLAG
while (mmsd->async_exit_status) {
mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
@ -2798,6 +2837,7 @@ __exit_monitor_replication_lag_thread:
int rc;
char *query=NULL;
if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) == NULL) {
query=(char *)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
//rc=(*proxy_sqlite3_prepare_v2)(mondb, query, -1, &statement, 0);
rc = mmsd->mondb->prepare_v2(query, &statement);
@ -2880,7 +2920,65 @@ __exit_monitor_replication_lag_thread:
if (mmsd->mysql_error_msg == NULL) {
replication_lag_success = true;
}
} else { // readyset
query=(char *)"INSERT OR REPLACE INTO readyset_status_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
rc = mmsd->mondb->prepare_v2(query, &statement);
ASSERT_SQLITE_OK(rc, mmsd->mondb);
unordered_map<string,string> status_output = {};
enum MySerStatus status = MYSQL_SERVER_STATUS_SHUNNED; // default status
rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now=realtime_time();
time_now=time_now-(mmsd->t2 - start_time);
rc=(*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb);
if (mmsd->interr == 0 && mmsd->result) {
int num_fields=0;
int k=0;
MYSQL_FIELD * fields=NULL;
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
if ( fields && (num_fields == 2) ) {
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 0; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
string Variable_name = string(row[0]);
string Value = (row[1] == NULL ? string("") : string(row[1]) );
status_output[Variable_name] = Value;
if (Variable_name == "Status") {
if (strcasecmp(Value.c_str(), (const char *)"Online") == 0) {
status = MYSQL_SERVER_STATUS_ONLINE; // set to ONLINE
} else if (strncasecmp(Value.c_str(), (const char *)"Maintenance", strlen("Maintenance")) == 0) {
status = MYSQL_SERVER_STATUS_OFFLINE_SOFT; // set to OFFLINE_SOFT
} else {
status = MYSQL_SERVER_STATUS_SHUNNED; // set to SHUNNED
}
}
}
nlohmann::json json_output = status_output; // directly assign the map to the json object
std::string json_string = json_output.dump();
rc=(*proxy_sqlite3_bind_text)(statement, 5, json_string.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
mysql_free_result(mmsd->result);
mmsd->result=NULL;
} else {
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
rc=(*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
SAFE_SQLITE3_STEP2(statement);
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status);
(*proxy_sqlite3_finalize)(statement);
if (mmsd->mysql_error_msg == NULL) {
replication_lag_success = true;
}
}
}
if (mmsd->interr || mmsd->mysql_error_msg) { // check failed
if (mmsd->mysql) {
@ -4503,13 +4601,17 @@ void * MySQL_Monitor::monitor_replication_lag() {
unsigned int glover;
char *error=NULL;
SQLite3_result *resultset=NULL;
// add support for SSL
char *query= NULL;
string ReadySetServers_query = "";
for (const auto& server : ReadySet_Servers) {
ReadySetServers_query += " OR (hostname = '" + server.ipAddress + "' AND port = " + to_string(server.port) + ")";
}
string queryS = "";
if (mysql_thread___monitor_replication_lag_group_by_host==true) {
query = (char *)"SELECT MIN(hostgroup_id), hostname, port, MIN(max_replication_lag), MAX(use_ssl) FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT IN (2,3) GROUP BY hostname, port";
queryS = "SELECT MIN(hostgroup_id), hostname, port, MIN(max_replication_lag), MAX(use_ssl) FROM mysql_servers WHERE (max_replication_lag > 0 AND status NOT IN (2,3)) " + ReadySetServers_query + " GROUP BY hostname, port";
} else {
query=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag, use_ssl FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT IN (2,3)";
queryS=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag, use_ssl FROM mysql_servers WHERE (max_replication_lag > 0 AND status NOT IN (2,3))" + ReadySetServers_query;
}
char *query= (char *)queryS.c_str();
t1=monotonic_time();
if (!GloMTH) return NULL; // quick exit during shutdown/restart
@ -4543,24 +4645,32 @@ void * MySQL_Monitor::monitor_replication_lag() {
__end_monitor_replication_lag_loop:
if (mysql_thread___monitor_enabled==true) {
sqlite3_stmt *statement=NULL;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement2=NULL;
//sqlite3 *mondb=monitordb->get_db();
int rc;
char *query=NULL;
query=(char *)"DELETE FROM mysql_server_replication_lag_log WHERE time_start_us < ?1";
//rc=(*proxy_sqlite3_prepare_v2)(mondb, query, -1, &statement, 0);
rc = monitordb->prepare_v2(query, &statement);
rc = monitordb->prepare_v2(query, &statement1);
ASSERT_SQLITE_OK(rc, monitordb);
query=(char *)"DELETE FROM readyset_status_log WHERE time_start_us < ?1";
rc = monitordb->prepare_v2(query, &statement2);
ASSERT_SQLITE_OK(rc, monitordb);
if (mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 )) { // issue #626
if (mysql_thread___monitor_ping_interval < 3600000)
mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 );
}
unsigned long long time_now=realtime_time();
rc=(*proxy_sqlite3_bind_int64)(statement, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb);
SAFE_SQLITE3_STEP2(statement);
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, monitordb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, monitordb);
(*proxy_sqlite3_finalize)(statement);
rc=(*proxy_sqlite3_bind_int64)(statement1, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb);
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, monitordb);
rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, monitordb);
(*proxy_sqlite3_finalize)(statement1);
rc=(*proxy_sqlite3_bind_int64)(statement2, 1, time_now-(unsigned long long)mysql_thread___monitor_history*1000); ASSERT_SQLITE_OK(rc, monitordb);
SAFE_SQLITE3_STEP2(statement2);
rc=(*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, monitordb);
rc=(*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, monitordb);
(*proxy_sqlite3_finalize)(statement2);
}
if (resultset)
@ -7198,6 +7308,9 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector<MySQL_Mon
assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
string server_version = "";
if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version);
if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) {
__sync_fetch_and_add(&ping_check_OK, 1);
My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
@ -7241,6 +7354,16 @@ bool MySQL_Monitor::monitor_ping_process_ready_tasks(const std::vector<MySQL_Mon
rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
(*proxy_sqlite3_finalize)(statement);
if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) != NULL) {
ReadySet_Servers.insert(ServerInfo(mmsd->hostname, mmsd->port));
} else {
if (ReadySet_Servers.size() > 0) { // optimization . The following section is skipped if there are no servers
ServerInfo searchServer(mmsd->hostname, mmsd->port);
if (ReadySet_Servers.count(searchServer) > 0) {
ReadySet_Servers.erase(searchServer);
}
}
}
}
return true;
@ -7863,6 +7986,9 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
assert(task_result != MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING);
string server_version = "";
if (mmsd->mysql && mmsd->mysql->server_version) server_version = string(mmsd->mysql->server_version);
if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) {
__sync_fetch_and_add(&replication_lag_check_OK, 1);
My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
@ -7891,84 +8017,142 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
return false;
}
sqlite3_stmt* statement = NULL;
const char* query = (char*)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
int rc = mmsd->mondb->prepare_v2(query, &statement);
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag = -2;
bool override_repl_lag = true;
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
time_now = time_now - (mmsd->t2 - mmsd->t1);
rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb);
if (mmsd->interr == 0 && mmsd->result) {
int num_fields = 0;
int k = 0;
MYSQL_FIELD* fields = NULL;
int j = -1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
if (strcasestr(server_version.c_str(), (const char *)SERVER_VERSION_READYSET) == NULL) {
sqlite3_stmt* statement = NULL;
const char* query = (char*)"INSERT OR REPLACE INTO mysql_server_replication_lag_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
int rc = mmsd->mondb->prepare_v2(query, &statement);
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag = -2;
bool override_repl_lag = true;
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
time_now = time_now - (mmsd->t2 - mmsd->t1);
rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb);
if (mmsd->interr == 0 && mmsd->result) {
int num_fields = 0;
int k = 0;
MYSQL_FIELD* fields = NULL;
int j = -1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
#ifdef TEST_REPLICATIONLAG
if (fields && num_fields == 1)
if (fields && num_fields == 1)
#else
if (
fields && (
(num_fields == 1 && mmsd->use_percona_heartbeat == true)
||
(num_fields > 30 && mmsd->use_percona_heartbeat == false)
)
)
if (
fields && (
(num_fields == 1 && mmsd->use_percona_heartbeat == true)
||
(num_fields > 30 && mmsd->use_percona_heartbeat == false)
)
)
#endif
{
for (k = 0; k < num_fields; k++) {
if (fields[k].name) {
if (
strcmp("Seconds_Behind_Master", fields[k].name)==0
|| strcmp("Seconds_Behind_Source", fields[k].name)==0
) {
j = k;
{
for (k = 0; k < num_fields; k++) {
if (fields[k].name) {
if (
strcmp("Seconds_Behind_Master", fields[k].name)==0
|| strcmp("Seconds_Behind_Source", fields[k].name)==0
) {
j = k;
}
}
}
}
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
repl_lag = -1; // this is old behavior
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag = atoi(row[j]);
override_repl_lag = false;
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
if (j > -1) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
repl_lag = -1; // this is old behavior
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag = atoi(row[j]);
override_repl_lag = false;
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
}
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
mysql_free_result(mmsd->result);
mmsd->result = NULL;
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag_check' timed out, we set 'repl_lag' to '-3' to avoid server to be 're-enabled'.
repl_lag = -3;
}
mysql_free_result(mmsd->result);
mmsd->result = NULL;
} else {
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag_check' timed out, we set 'repl_lag' to '-3' to avoid server to be 're-enabled'.
repl_lag = -3;
rc = (*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
SAFE_SQLITE3_STEP2(statement);
rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag });
} else { // readyset
sqlite3_stmt* statement = NULL;
const char* query = (char*)"INSERT OR REPLACE INTO readyset_status_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
int rc = mmsd->mondb->prepare_v2(query, &statement);
ASSERT_SQLITE_OK(rc, mmsd->mondb);
unordered_map<string,string> status_output = {};
enum MySerStatus status = MYSQL_SERVER_STATUS_SHUNNED; // default status
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
time_now = time_now - (mmsd->t2 - mmsd->t1);
rc = (*proxy_sqlite3_bind_int64)(statement, 3, time_now); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int64)(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2 - mmsd->t1)); ASSERT_SQLITE_OK(rc, mmsd->mondb);
if (mmsd->interr == 0 && mmsd->result) {
int num_fields=0;
int k=0;
MYSQL_FIELD * fields=NULL;
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
if ( fields && (num_fields == 2) ) {
int num_rows = mysql_num_rows(mmsd->result);
for (int i = 0; i < num_rows; i++) {
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
string Variable_name = string(row[0]);
string Value = (row[1] == NULL ? string("") : string(row[1]) );
status_output[Variable_name] = Value;
if (Variable_name == "Status") {
if (strcasecmp(Value.c_str(), (const char *)"Online") == 0) {
status = MYSQL_SERVER_STATUS_ONLINE; // set to ONLINE
} else if (strncasecmp(Value.c_str(), (const char *)"Maintenance", strlen("Maintenance")) == 0) {
status = MYSQL_SERVER_STATUS_OFFLINE_SOFT; // set to OFFLINE_SOFT
} else {
status = MYSQL_SERVER_STATUS_SHUNNED; // set to SHUNNED
}
}
}
nlohmann::json json_output = status_output; // directly assign the map to the json object
std::string json_string = json_output.dump();
rc=(*proxy_sqlite3_bind_text)(statement, 5, json_string.c_str(), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
proxy_error("mysql_fetch_fields returns NULL, or mysql_num_fields is incorrect. Server %s:%d . See bug #1994\n", mmsd->hostname, mmsd->port);
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
mysql_free_result(mmsd->result);
mmsd->result=NULL;
} else {
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
}
rc=(*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
SAFE_SQLITE3_STEP2(statement);
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->set_ReadySet_status(mmsd->hostname, mmsd->port, status);
(*proxy_sqlite3_finalize)(statement);
}
rc = (*proxy_sqlite3_bind_text)(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
SAFE_SQLITE3_STEP2(statement);
rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag });
}
//executing replication lag action

Loading…
Cancel
Save