Refactoring replication lag action method

* Introduced support for handling of bulk servers
* Commented mysql_servers_wrlock as this method does not use admin tables.
pull/4133/head
Rahim Kanji 3 years ago
parent b6a59f37e7
commit 55deb8ddac

@ -650,8 +650,8 @@ class MySQL_HostGroups_Manager {
void push_MyConn_to_pool_array(MySQL_Connection **, unsigned int);
void destroy_MyConn_from_pool(MySQL_Connection *, bool _lock=true);
void replication_lag_action_inner(MyHGC *, char*, unsigned int, int);
void replication_lag_action(int, char*, unsigned int, int);
void replication_lag_action_inner(MyHGC *, const char*, unsigned int, int);
void replication_lag_action(const std::list<std::tuple<int, std::string, unsigned int, int>>& mysql_servers);
void read_only_action(char *hostname, int port, int read_only);
unsigned int get_servers_table_version();
void wait_servers_table_version(unsigned, unsigned);

@ -3367,7 +3367,7 @@ void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) {
myhgc->mysrvs->add(mysrvc);
}
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, char *address, unsigned int port, int current_replication_lag) {
void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, const char *address, unsigned int port, int current_replication_lag) {
int j;
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
@ -3419,23 +3419,45 @@ void MySQL_HostGroups_Manager::replication_lag_action_inner(MyHGC *myhgc, char *
}
}
void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, unsigned int port, int current_replication_lag) {
GloAdmin->mysql_servers_wrlock();
void MySQL_HostGroups_Manager::replication_lag_action(const std::list<std::tuple<int, std::string, unsigned int, int>>& mysql_servers) {
//this method does not use admin table, so this lock is not needed.
//GloAdmin->mysql_servers_wrlock();
int hid = -1;
std::string address;
unsigned int port = 0;
int current_replication_lag = -1;
unsigned long long curtime1 = monotonic_time();
wrlock();
if (mysql_thread___monitor_replication_lag_group_by_host == false) {
// legacy check. 1 check per server per hostgroup
MyHGC *myhgc = MyHGC_find(_hid);
replication_lag_action_inner(myhgc,address,port,current_replication_lag);
} else {
// only 1 check per server, no matter the hostgroup
// all hostgroups must be searched
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
replication_lag_action_inner(myhgc,address,port,current_replication_lag);
for (const auto& server : mysql_servers) {
std::tie(hid, address, port, current_replication_lag) = server;
if (mysql_thread___monitor_replication_lag_group_by_host == false) {
// legacy check. 1 check per server per hostgroup
MyHGC *myhgc = MyHGC_find(hid);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
}
else {
// only 1 check per server, no matter the hostgroup
// all hostgroups must be searched
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC*myhgc=(MyHGC*)MyHostGroups->index(i);
replication_lag_action_inner(myhgc,address.c_str(),port,current_replication_lag);
}
}
}
wrunlock();
GloAdmin->mysql_servers_wrunlock();
//GloAdmin->mysql_servers_wrunlock();
unsigned long long curtime2 = monotonic_time();
curtime1 = curtime1 / 1000;
curtime2 = curtime2 / 1000;
proxy_info("MySQL_HostGroups_Manager::replication_lag_action() locked for %llums (server count:%ld)\n", curtime2 - curtime1, mysql_servers.size());
}
/**

@ -2746,7 +2746,7 @@ __exit_monitor_replication_lag_thread:
SAFE_SQLITE3_STEP2(statement);
rc=(*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc=(*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
MyHGM->replication_lag_action({ {mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag} });
(*proxy_sqlite3_finalize)(statement);
if (mmsd->mysql_error_msg == NULL) {
replication_lag_success = true;
@ -7508,6 +7508,8 @@ void MySQL_Monitor::monitor_gr_async_actions_handler(
bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vector<MySQL_Monitor_State_Data*>& mmsds) {
std::list<std::tuple<int, std::string, unsigned int, int>> mysql_servers;
for (auto& mmsd : mmsds) {
@ -7609,10 +7611,14 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
SAFE_SQLITE3_STEP2(statement);
rc = (*proxy_sqlite3_clear_bindings)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
rc = (*proxy_sqlite3_reset)(statement); ASSERT_SQLITE_OK(rc, mmsd->mondb);
MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
//MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag);
(*proxy_sqlite3_finalize)(statement);
mysql_servers.push_back({ mmsd->hostgroup_id, mmsd->hostname, mmsd->port, repl_lag });
}
//executing replication lag action
MyHGM->replication_lag_action(mysql_servers);
return true;
}

Loading…
Cancel
Save