* Improved handling for cases where seconds_behind_master is null or negative

pull/4528/head
Rahim Kanji 2 years ago
parent 40068b122a
commit 648c3ab61d

@ -541,9 +541,10 @@ using address_t = std::string;
using port_t = unsigned int;
using read_only_t = int;
using current_replication_lag = int;
using replace_current_replication_lag = bool;
using read_only_server_t = std::tuple<hostname_t,port_t,read_only_t>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag>;
using replication_lag_server_t = std::tuple<hostgroupid_t,address_t,port_t,current_replication_lag,replace_current_replication_lag>;
enum READ_ONLY_SERVER_T {
ROS_HOSTNAME = 0,
@ -557,6 +558,7 @@ enum REPLICATION_LAG_SERVER_T {
RLS_ADDRESS,
RLS_PORT,
RLS_CURRENT_REPLICATION_LAG,
RLS_OVERRIDE_REPLICATION_LAG,
RLS__SIZE
};
@ -1090,7 +1092,7 @@ class MySQL_HostGroups_Manager {
void push_MyConn_to_pool_array(MySQL_Connection **, unsigned int);
void destroy_MyConn_from_pool(MySQL_Connection *, bool _lock=true);
void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int);
void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int, bool);
void replication_lag_action(const std::list<replication_lag_server_t>& mysql_servers);
void read_only_action(char *hostname, int port, int read_only);
void read_only_action_v2(const std::list<read_only_server_t>& mysql_servers);

@ -56,7 +56,7 @@ class SQLite3_Server {
std::vector<table_def_t *> *tables_defs_readonly;
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
std::unordered_map<std::string, int> replicationlag_map;
std::unordered_map<std::string, std::unique_ptr<int>> replicationlag_map;
std::vector<table_def_t*>* tables_defs_replicationlag;
#endif // TEST_REPLICATIONLAG
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
@ -105,7 +105,7 @@ class SQLite3_Server {
#ifdef TEST_REPLICATIONLAG
pthread_mutex_t test_replicationlag_mutex;
void load_replicationlag_table(MySQL_Session* sess);
int replicationlag_test_value(const char* p);
int* replicationlag_test_value(const char* p);
int replicationlag_map_size() {
return replicationlag_map.size();
}

@ -2685,9 +2685,10 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) {
myhgc->mysrvs->add(mysrvc);
}
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, int current_replication_lag) {
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port,
int current_replication_lag, bool override_repl_lag) {
if (current_replication_lag == -1) {
if (current_replication_lag == -1 && override_repl_lag == true) {
current_replication_lag = myhgc->get_monitor_slave_lag_when_null();
proxy_error("Replication lag on server %s:%d is NULL, using value %d\n", address, port, current_replication_lag);
}
@ -2730,7 +2731,7 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const
if (
(current_replication_lag>=0 && ((unsigned int)current_replication_lag <= mysrvc->max_replication_lag))
||
(current_replication_lag==-2) // see issue 959
(current_replication_lag==-2 && override_repl_lag == true) // see issue 959
) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
proxy_warning("Re-enabling server %s:%d from HG %u with replication lag of %d second\n", address, port, myhgc->hid, current_replication_lag);
@ -2756,18 +2757,19 @@ void MySQL_HostGroups_Manager::replication_lag_action(const std::list<replicatio
const std::string& address = std::get<REPLICATION_LAG_SERVER_T::RLS_ADDRESS>(server);
const unsigned int port = std::get<REPLICATION_LAG_SERVER_T::RLS_PORT>(server);
const int current_replication_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_CURRENT_REPLICATION_LAG>(server);
const bool override_repl_lag = std::get<REPLICATION_LAG_SERVER_T::RLS_OVERRIDE_REPLICATION_LAG>(server);
if (mysql_thread___monitor_replication_lag_group_by_host == false) {
// legacy check. 1 check per server per hostgroup
MyHGC *myhgc = MyHGC_find(hid);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
else {
// only 1 check per server, no matter the hostgroup
// all hostgroups must be searched
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC*myhgc=(MyHGC*)MyHostGroups->index(i);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag,override_repl_lag);
}
}
}

@ -2756,6 +2756,7 @@ __exit_monitor_replication_lag_thread:
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag=-2;
bool override_repl_lag = true;
rc=(*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now=realtime_time();
@ -2792,14 +2793,16 @@ __exit_monitor_replication_lag_thread:
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
repl_lag=-1; // this is old behavior
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag=atoi(row[j]);
override_repl_lag = false;
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag>=0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc=(*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc=(*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
@ -2820,7 +2823,7 @@ __exit_monitor_replication_lag_thread:
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->replication_lag_action( std::list<replication_lag_server_t> {
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag}
replication_lag_server_t {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag }
} );
(*proxy_sqlite3_finalize)(statement);
if (mmsd->mysql_error_msg == NULL) {
@ -7739,8 +7742,7 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(
bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {
std::list<std::tuple<int, std::string, unsigned int, int>> mysql_servers;
std::list<replication_lag_server_t> mysql_servers;
for (auto& mmsd : mmsds) {
@ -7782,6 +7784,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
ASSERT_SQLITE_OK(rc, mmsd->mondb);
// 'replication_lag' to be feed to 'replication_lag_action'
int repl_lag = -2;
bool override_repl_lag = true;
rc = (*proxy_sqlite3_bind_text)(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_bind_int)(statement, 2, mmsd->port); ASSERT_SQLITE_OK(rc, mmsd->mondb);
unsigned long long time_now = realtime_time();
@ -7818,14 +7821,16 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
MYSQL_ROW row = mysql_fetch_row(mmsd->result);
if (row) {
repl_lag = -1; // this is old behavior
override_repl_lag = true;
if (row[j]) { // if Seconds_Behind_Master is not NULL
repl_lag = atoi(row[j]);
override_repl_lag = false;
} else {
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_SRV_NULL_REPLICATION_LAG);
}
}
}
if (repl_lag >= 0) {
if (/*repl_lag >= 0 ||*/ override_repl_lag == false) {
rc = (*proxy_sqlite3_bind_int64)(statement, 5, repl_lag); ASSERT_SQLITE_OK(rc, mmsd->mondb);
} else {
rc = (*proxy_sqlite3_bind_null)(statement, 5); ASSERT_SQLITE_OK(rc, mmsd->mondb);
@ -7847,7 +7852,7 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back( std::tuple<int,std::string,int,int> { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag });
mysql_servers.push_back( replication_lag_server_t { mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag, override_repl_lag });
}
//executing replication lag action

@ -879,11 +879,17 @@ __run_query:
// probably never initialized
GloSQLite3Server->load_replicationlag_table(sess);
}
const int rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
const int* rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
free(query);
char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, rc);
if (rc == nullptr) {
const char* a = (char*)"SELECT null as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a);
} else {
const char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, *rc);
}
pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex);
}
}
@ -1845,7 +1851,7 @@ bool SQLite3_Server::init() {
insert_into_tables_defs(tables_defs_replicationlag,
(const char*)"REPLICATIONLAG_HOST_STATUS",
(const char*)"CREATE TABLE REPLICATIONLAG_HOST_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT NOT NULL, PRIMARY KEY (hostname, port)"
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT DEFAULT NULL, PRIMARY KEY (hostname, port)"
")"
);
@ -2016,7 +2022,14 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = std::string(r->fields[0]) + ":" + std::string(r->fields[1]);
replicationlag_map[s] = atoi(r->fields[2]);
if (r->fields[2] == nullptr) {
replicationlag_map[s] = nullptr;
} else {
int* repl_lag = new int;
*repl_lag = atoi(r->fields[2]);
replicationlag_map[s] = std::unique_ptr<int>(repl_lag);
}
}
}
delete resultset;
@ -2024,7 +2037,7 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
GloAdmin->admindb->execute_statement((char*)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE hostgroup_id BETWEEN 5202 AND 5700", &error, &cols, &affected_rows, &resultset);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",0)";
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",null)";
sessdb->execute(s.c_str());
}
delete resultset;
@ -2032,11 +2045,11 @@ void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
GloAdmin->mysql_servers_wrunlock();
}
int SQLite3_Server::replicationlag_test_value(const char* p) {
int rc = 0; // default
std::unordered_map<std::string, int>::iterator it = replicationlag_map.find(std::string(p));
int* SQLite3_Server::replicationlag_test_value(const char* p) {
int* rc = 0; // default
std::unordered_map<std::string, std::unique_ptr<int>>::iterator it = replicationlag_map.find(std::string(p));
if (it != replicationlag_map.end()) {
rc = it->second;
rc = it->second.get();
}
return rc;
}

Loading…
Cancel
Save