Merge pull request #3533 from sysown/v2.x-gr_replication_lag_action

Changes current handling for replication lag in favor of setting lagging servers to SHUNNED state
v2.3.0
Javier Jaramago Fernández 5 years ago committed by GitHub
commit 4f94fd3d0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -417,6 +417,8 @@ class MySQL_HostGroups_Manager {
void p_update_connection_pool_update_counter(std::string& endpoint_id, std::map<std::string, std::string> labels, std::map<std::string, prometheus::Counter*>& m_map, unsigned long long value, p_hg_dyn_counter::metric idx);
void p_update_connection_pool_update_gauge(std::string& endpoint_id, std::map<std::string, std::string> labels, std::map<std::string, prometheus::Gauge*>& m_map, unsigned long long value, p_hg_dyn_gauge::metric idx);
void group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable);
public:
std::mutex galera_set_writer_mutex;
pthread_rwlock_t gtid_rwlock;
@ -558,7 +560,30 @@ class MySQL_HostGroups_Manager {
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
void converge_group_replication_config(int _writer_hostgroup);
/**
* @brief Set the supplied server as SHUNNED, this function shall be called
* to 'SHUNNED' those servers which replication lag is bigger than:
* - `mysql_thread___monitor_groupreplication_max_transactions_behind_count`
*
* @details The function automatically handles the appropriate operation to
* perform on the supplied server, based on the supplied 'enable' flag and
* in 'monitor_groupreplication_max_transaction_behind_for_read_only'
* variable. In case the value of the variable is:
*
* * '0' or '2': It's required to search the writer hostgroup for
* finding the supplied server.
* * '1' or '2': It's required to search the reader hostgroup for
* finding the supplied server.
*
* @param _hid The writer hostgroup.
* @param address The server address.
* @param port The server port.
* @param lag_counts The computed lag for the sever.
* @param read_only Boolean specifying the read_only flag value of the server.
* @param enable Boolean specifying if the server needs to be disabled / enabled,
* 'true' for enabling the server if it's 'SHUNNED', 'false' for disabling it.
*/
void group_replication_lag_action(int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable);
void update_galera_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error, bool soft=false);
void update_galera_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_galera_set_writer(char *_hostname, int _port, int _writer_hostgroup);

@ -410,6 +410,7 @@ class MySQL_Threads_Handler
int monitor_groupreplication_healthcheck_timeout;
int monitor_groupreplication_healthcheck_max_timeout_count;
int monitor_groupreplication_max_transactions_behind_count;
int monitor_groupreplication_max_transactions_behind_for_read_only;
int monitor_galera_healthcheck_interval;
int monitor_galera_healthcheck_timeout;
int monitor_galera_healthcheck_max_timeout_count;

@ -13,6 +13,10 @@ class SQLite3_Session {
~SQLite3_Session();
};
#ifdef TEST_GROUPREP
using group_rep_status = std::tuple<bool, bool, uint32_t>;
#endif
class SQLite3_Server {
private:
volatile int main_shutdown;
@ -43,6 +47,7 @@ class SQLite3_Server {
std::vector<table_def_t *> *tables_defs_galera;
#endif // TEST_GALERA
#ifdef TEST_GROUPREP
std::unordered_map<std::string, group_rep_status> grouprep_map;
std::vector<table_def_t *> *tables_defs_grouprep;
#endif // TEST_GROUPREP
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP)
@ -69,9 +74,11 @@ class SQLite3_Server {
void init_galera_ifaces_string(std::string& s);
#endif // TEST_GALERA
#ifdef TEST_GROUPREP
unsigned int max_num_grouprep_servers;
pthread_mutex_t grouprep_mutex;
void populate_grouprep_table(MySQL_Session *sess, int txs_behind = 0);
void init_grouprep_ifaces_string(std::string& s);
group_rep_status grouprep_test_value(const std::string& srv_addr);
#endif // TEST_GROUPREP
SQLite3_Server();
~SQLite3_Server();

@ -842,6 +842,7 @@ __thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
__thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
__thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
__thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
__thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only;
__thread int mysql_thread___monitor_galera_healthcheck_interval;
__thread int mysql_thread___monitor_galera_healthcheck_timeout;
__thread int mysql_thread___monitor_galera_healthcheck_max_timeout_count;
@ -996,6 +997,7 @@ extern __thread int mysql_thread___monitor_replication_lag_count;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_interval;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_timeout;
extern __thread int mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
extern __thread int mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only;
extern __thread int mysql_thread___monitor_groupreplication_max_transactions_behind_count;
extern __thread int mysql_thread___monitor_galera_healthcheck_interval;
extern __thread int mysql_thread___monitor_galera_healthcheck_timeout;

@ -16,6 +16,7 @@
#include <prometheus/gauge.h>
#include "prometheus_helpers.h"
#include "proxysql_utils.h"
#define char_malloc (char *)malloc
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
@ -3374,6 +3375,114 @@ __exit_replication_lag_action:
GloAdmin->mysql_servers_wrunlock();
}
/**
* @brief Finds the supplied server in the provided 'MyHGC' and sets the status
* either to 'MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG' if 'enable' is
* 'false' or 'MYSQL_SERVER_STATUS_ONLINE' if 'true'. If either of the
* 'myhgc' or 'address' params are 'NULL' the function performs no action,
* and returns immediately.
*
* @param myhgc The MySQL Hostgroup Container in which to perform the server
* search.
* @param address The server address.
* @param port The server port.
* @param lag_count The lag count, computed by 'get_lag_behind_count'.
* @param enable Boolean specifying if the server should be enabled or not.
*/
void MySQL_HostGroups_Manager::group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) {
if (myhgc == NULL || address == NULL) return;
for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) {
if (enable == true) {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG || mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
proxy_info("Re-enabling server %u:%s:%d from replication lag\n", myhgc->hid, address, port);
}
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) {
proxy_warning("Shunning 'soft' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count);
mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED;
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
if (lag_count >= ( mysql_thread___monitor_groupreplication_max_transactions_behind_count * 2 )) {
proxy_warning("Shunning 'hard' server %u:%s:%d with replication lag, count number: %d\n", myhgc->hid, address, port, lag_count);
mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG;
}
}
}
}
}
}
}
void MySQL_HostGroups_Manager::group_replication_lag_action(
int _hid, char *address, unsigned int port, int lag_counts, bool read_only, bool enable
) {
GloAdmin->mysql_servers_wrlock();
wrlock();
int reader_hostgroup = 0;
bool writer_is_also_reader = false;
// Get the reader_hostgroup for the supplied writter hostgroup
std::string t_reader_hostgroup_query {
"SELECT reader_hostgroup,writer_is_also_reader FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"
};
std::string reader_hostgroup_query {};
string_format(t_reader_hostgroup_query, reader_hostgroup_query, _hid);
int cols=0;
char *error=NULL;
int affected_rows=0;
SQLite3_result* rhid_res=NULL;
SQLite3_row* rhid_row=nullptr;
mydb->execute_statement(
reader_hostgroup_query.c_str(), &error , &cols , &affected_rows , &rhid_res
);
// If the server isn't present in the supplied hostgroup, there is nothing to do.
if (rhid_res->rows.empty() || rhid_res->rows[0]->get_size() == 0) {
goto __exit_replication_lag_action;
}
rhid_row = rhid_res->rows[0];
reader_hostgroup = atoi(rhid_row->fields[0]);
writer_is_also_reader = atoi(rhid_row->fields[1]);
{
MyHGC* myhgc = nullptr;
if (
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 0 ||
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 ||
enable
) {
if (read_only == false) {
myhgc = MyHGM->MyHGC_find(_hid);
group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable);
}
}
if (
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 1 ||
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only == 2 ||
enable
) {
myhgc = MyHGM->MyHGC_find(reader_hostgroup);
group_replication_lag_action_set_server_status(myhgc, address, port, lag_counts, enable);
}
}
__exit_replication_lag_action:
wrunlock();
GloAdmin->mysql_servers_wrunlock();
}
void MySQL_HostGroups_Manager::drop_all_idle_connections() {
// NOTE: the caller should hold wrlock
int i, j;
@ -4529,19 +4638,31 @@ void MySQL_HostGroups_Manager::update_group_replication_set_offline(char *_hostn
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
// NOTE: Only updated the servers that have belong to the same cluster.
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN ("
" SELECT %d UNION ALL"
" SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL"
" SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"
")";
query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
// NOTE: Only delete the servers that have belong to the same cluster.
q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN ("
" SELECT %d UNION ALL"
" SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL"
" SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"
")";
sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
// q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
// sprintf(query,q,_hostname,_port,_writer_hostgroup);
q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE "
" (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND"
" hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)"
" WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
converge_group_replication_config(_writer_hostgroup);
@ -4605,19 +4726,30 @@ void MySQL_HostGroups_Manager::update_group_replication_set_read_only(char *_hos
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
// NOTE: Only updated the servers that have belong to the same cluster.
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d) WHERE hostname='%s' AND port=%d AND hostgroup_id IN ("
" SELECT %d UNION ALL"
" SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL"
" SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"
")";
query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
// NOTE: Only delete the servers that have belong to the same cluster.
q=(char*)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN ("
" SELECT %d UNION ALL"
" SELECT backup_writer_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d UNION ALL"
" SELECT offline_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d"
")";
sprintf(query,q,_hostname,_port,_writer_hostgroup,_writer_hostgroup,_writer_hostgroup);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
// NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise we set the server as 'ONLINE'.
q=(char *)"UPDATE mysql_servers_incoming SET status=(CASE "
" (SELECT status FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND"
" hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)) WHEN 2 THEN 2 ELSE 0 END)"
" WHERE hostname='%s' AND port=%d AND hostgroup_id=(SELECT reader_hostgroup FROM mysql_group_replication_hostgroups WHERE writer_hostgroup=%d)";
sprintf(query,q,_hostname,_port,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
mydb->execute(query);
//free(query);
converge_group_replication_config(_writer_hostgroup);
@ -4680,7 +4812,12 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna
bool found_writer=false;
bool found_reader=false;
int read_HG=-1;
int offline_HG=-1;
int backup_writer_HG=-1;
bool need_converge=false;
int status=0;
bool offline_soft_found=false;
if (resultset) {
// let's get info about this cluster
pthread_mutex_lock(&Group_Replication_Info_mutex);
@ -4691,6 +4828,8 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna
info=it2->second;
writer_is_also_reader=info->writer_is_also_reader;
read_HG=info->reader_hostgroup;
offline_HG=info->offline_hostgroup;
backup_writer_HG=info->backup_writer_hostgroup;
need_converge=info->need_converge;
info->need_converge=false;
}
@ -4700,9 +4839,11 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int hostgroup=atoi(r->fields[0]);
offline_soft_found = atoi(r->fields[1]) == 2 ? true : false;
if (hostgroup==_writer_hostgroup) {
int status = atoi(r->fields[1]);
if (status == 0) {
status = atoi(r->fields[1]);
if (status == 0 || status == 2) {
found_writer=true;
}
}
@ -4713,6 +4854,13 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna
}
}
}
// NOTE: In case of a writer not being found but a 'OFFLINE_SOFT' status
// is found in a hostgroup, 'OFFLINE_SOFT' status should be preserved.
if (found_writer == false) {
if (offline_soft_found) {
status = 2;
}
}
if (need_converge==false) {
if (found_writer) { // maybe no-op
if (
@ -4735,19 +4883,19 @@ void MySQL_HostGroups_Manager::update_group_replication_set_writer(char *_hostna
GloAdmin->mysql_servers_wrlock();
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("INSERT INTO mysql_servers_incoming SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers");
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d";
// NOTE: Only updated the servers that have belong to the same cluster.
q=(char *)"UPDATE OR IGNORE mysql_servers_incoming SET hostgroup_id=%d WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)";
query=(char *)malloc(strlen(q)+strlen(_hostname)+256);
sprintf(query,q,_writer_hostgroup,_hostname,_port,_writer_hostgroup);
sprintf(query,q,_writer_hostgroup,_hostname,_port,backup_writer_HG,read_HG,offline_HG);
mydb->execute(query);
//free(query);
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id<>%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
// NOTE: Only delete the servers that have belong to the same cluster.
q=(char *)"DELETE FROM mysql_servers_incoming WHERE hostname='%s' AND port=%d AND hostgroup_id IN (%d, %d, %d)";
sprintf(query,q,_hostname,_port,backup_writer_HG,read_HG,offline_HG);
mydb->execute(query);
//free(query);
q=(char *)"UPDATE mysql_servers_incoming SET status=0 WHERE hostname='%s' AND port=%d AND hostgroup_id=%d";
//query=(char *)malloc(strlen(q)+strlen(_hostname)+64);
sprintf(query,q,_hostname,_port,_writer_hostgroup);
q=(char *)"UPDATE mysql_servers_incoming SET status=%d WHERE hostname='%s' AND port=%d AND hostgroup_id=%d";
// NOTE: In case of the server being 'OFFLINE_SOFT' we preserve this status. Otherwise
// we set the server as 'ONLINE'.
sprintf(query, q, (status == 2 ? 2 : 0 ), _hostname, _port, _writer_hostgroup);
mydb->execute(query);
//free(query);
if (writer_is_also_reader && read_HG>=0) {

@ -1420,7 +1420,11 @@ void * monitor_group_replication_thread(void *arg) {
//mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql);
mmsd->interr=0; // reset the value
#ifdef TEST_GROUPREP
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS");
{
std::string s { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" };
s += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port);
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,s.c_str());
}
#else
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SELECT viable_candidate,read_only,transactions_behind FROM sys.gr_member_routing_candidate_status");
#endif
@ -1561,10 +1565,12 @@ __exit_monitor_group_replication_thread:
mmsd->hostname, mmsd->port, num_timeouts, mmsd->max_transactions_behind_count);
}
}
int lag_counts = 0;
if (read_only) {
lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind);
}
// NOTE: Previously 'lag_counts' was only updated for 'read_only'
// because 'writers' were never selected for being set 'OFFLINE' due to
// replication lag. Since the change of this behavior to 'SHUNNING'
// with replication lag, no matter it's 'read_only' value, 'lag_counts'
// is computed everytime.
int lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind);
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
// NOTE: we update MyHGM outside the mutex group_replication_mutex
@ -1588,16 +1594,26 @@ __exit_monitor_group_replication_thread:
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO");
} else {
if (read_only==true) {
if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"slave is lagging");
} else {
MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES");
}
MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES");
} else {
// the node is a writer
// TODO: for now we don't care about the number of writers
MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup);
}
// NOTE: Replication lag action should takes place **after** the
// servers have been placed in the correct hostgroups, otherwise
// during the reconfiguration of the servers due to 'update_group_replication_set_writer'
// there would be a small window in which the 'SHUNNED' server
// will be treat as 'ONLINE' letting some new connections to
// take places, before it becomes 'SHUNNED' again.
bool enable = true;
if (lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) {
enable = false;
}
MyHGM->group_replication_lag_action(
mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, lag_counts, read_only, enable
);
}
}

@ -457,6 +457,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_groupreplication_healthcheck_timeout",
(char *)"monitor_groupreplication_healthcheck_max_timeout_count",
(char *)"monitor_groupreplication_max_transactions_behind_count",
(char *)"monitor_groupreplication_max_transactions_behind_for_read_only",
(char *)"monitor_galera_healthcheck_interval",
(char *)"monitor_galera_healthcheck_timeout",
(char *)"monitor_galera_healthcheck_max_timeout_count",
@ -1048,6 +1049,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_groupreplication_healthcheck_timeout=800;
variables.monitor_groupreplication_healthcheck_max_timeout_count=3;
variables.monitor_groupreplication_max_transactions_behind_count=3;
variables.monitor_groupreplication_max_transactions_behind_for_read_only=1;
variables.monitor_galera_healthcheck_interval=5000;
variables.monitor_galera_healthcheck_timeout=800;
variables.monitor_galera_healthcheck_max_timeout_count=3;
@ -2116,6 +2118,7 @@ char ** MySQL_Threads_Handler::get_variables_list() {
VariablesPointers_int["monitor_groupreplication_healthcheck_timeout"] = make_tuple(&variables.monitor_groupreplication_healthcheck_timeout, 100, 600*1000, false);
VariablesPointers_int["monitor_groupreplication_healthcheck_max_timeout_count"] = make_tuple(&variables.monitor_groupreplication_healthcheck_max_timeout_count, 1, 10, false);
VariablesPointers_int["monitor_groupreplication_max_transactions_behind_count"] = make_tuple(&variables.monitor_groupreplication_max_transactions_behind_count, 1, 10, false);
VariablesPointers_int["monitor_groupreplication_max_transactions_behind_for_read_only"] = make_tuple(&variables.monitor_groupreplication_max_transactions_behind_for_read_only, 0, 2, false);
VariablesPointers_int["monitor_galera_healthcheck_interval"] = make_tuple(&variables.monitor_galera_healthcheck_interval, 50, 7*24*3600*1000, false);
VariablesPointers_int["monitor_galera_healthcheck_timeout"] = make_tuple(&variables.monitor_galera_healthcheck_timeout, 50, 600*1000, false);
@ -3669,6 +3672,7 @@ void MySQL_Thread::refresh_variables() {
mysql_thread___monitor_groupreplication_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_timeout");
mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_healthcheck_max_timeout_count");
mysql_thread___monitor_groupreplication_max_transactions_behind_count=GloMTH->get_variable_int((char *)"monitor_groupreplication_max_transactions_behind_count");
mysql_thread___monitor_groupreplication_max_transaction_behind_for_read_only=GloMTH->get_variable_int((char *)"monitor_groupreplication_max_transactions_behind_for_read_only");
mysql_thread___monitor_galera_healthcheck_interval=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_interval");
mysql_thread___monitor_galera_healthcheck_timeout=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_timeout");
mysql_thread___monitor_galera_healthcheck_max_timeout_count=GloMTH->get_variable_int((char *)"monitor_galera_healthcheck_max_timeout_count");

@ -8,6 +8,7 @@
#include "MySQL_Logger.hpp"
#include "MySQL_Data_Stream.h"
#include "proxysql_utils.h"
#include "query_processor.h"
#include "SQLite3_Server.h"
@ -233,7 +234,80 @@ class sqlite3server_main_loop_listeners {
static sqlite3server_main_loop_listeners S_amll;
/**
* @brief Helper function that checks if the supplied string
* is a number.
* @param s The string to check.
* @return True if the supplied string is just composed of
* digits, false otherwise.
*/
bool is_number(const std::string& s) {
if (s.empty()) { return false; }
for (const auto& d : s) {
if (std::isdigit(d) == false) {
return false;
}
}
return true;
}
/**
* @brief Checks if the query matches an specified 'monitor_query' of the
* following format:
*
* "$MONITOR_QUERY" + " hostname:port"
*
* If the query matches, 'true' is returned, false otherwise.
*
* @param monitor_query Query that should be matched against the current
* supplied 'query'.
* @param query Current query, to be matched against the supplied
* 'monitor_query'.
* @return 'true' if the query matches, false otherwise.
*/
bool match_monitor_query(const std::string& monitor_query, const std::string& query) {
if (query.rfind(monitor_query, 0) != 0) {
return false;
}
std::string srv_address {
query.substr(monitor_query.size())
};
// Check that what is beyond this point, is just the servers address,
// written as an identifier 'n.n.n.n:n'.
std::size_t cur_mark_pos = 0;
for (int i = 0; i < 3; i++) {
std::size_t next_mark_pos = srv_address.find('.', cur_mark_pos);
if (next_mark_pos == std::string::npos) {
return false;
} else {
std::string number {
srv_address.substr(cur_mark_pos, next_mark_pos - cur_mark_pos)
};
if (is_number(number)) {
cur_mark_pos = next_mark_pos + 1;
} else {
return false;
}
}
}
// Check last part is also a valid number
cur_mark_pos = srv_address.find(':', cur_mark_pos);
if (cur_mark_pos == std::string::npos) {
return false;
} else {
std::string number {
srv_address.substr(cur_mark_pos + 1)
};
return is_number(number);
}
}
void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *pkt) {
@ -555,8 +629,41 @@ __run_query:
#ifdef TEST_GROUPREP
if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) {
pthread_mutex_lock(&GloSQLite3Server->grouprep_mutex);
GloSQLite3Server->populate_grouprep_table(sess, testLag);
if (testLag > 0) testLag--;
GloSQLite3Server->populate_grouprep_table(sess, 0);
// NOTE: This query should be in one place that can be reused by
// 'ProxySQL_Monitor' module.
const std::string grouprep_monitor_test_query_start {
"SELECT viable_candidate,read_only,transactions_behind "
"FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS "
};
// If the query matches 'grouprep_monitor_test_query_start', it
// means that the query has been issued by `ProxySQL_Monitor` and
// we need to fetch for the proper values and replace the query
// with one holding the values from `grouprep_map`.
if (match_monitor_query(grouprep_monitor_test_query_start, query_no_space)) {
std::string srv_addr {
query_no_space + grouprep_monitor_test_query_start.size()
};
const group_rep_status& gr_srv_status =
GloSQLite3Server->grouprep_test_value(srv_addr);
free(query);
std::string t_select_as_query {
"SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind"
};
std::string select_as_query {};
string_format(
t_select_as_query, select_as_query,
std::get<0>(gr_srv_status) ? "YES" : "NO",
std::get<1>(gr_srv_status) ? "YES" : "NO",
std::get<2>(gr_srv_status)
);
query = static_cast<char*>(malloc(select_as_query.length() + 1));
strcpy(query, select_as_query.c_str());
}
}
#endif // TEST_GROUPREP
if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) {
@ -595,20 +702,12 @@ __run_query:
#ifdef TEST_GROUPREP
if (strstr(query_no_space,(char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS")) {
pthread_mutex_unlock(&GloSQLite3Server->grouprep_mutex);
if (resultset->rows_count == 0) {
PROXY_TRACE();
}
if (strncmp("127.2.1.2", sess->client_myds->proxy_addr.addr,9) == 0) {
if (testTimeoutSequence[testIndex--])
sleep(2);
if (testIndex < 0)
testIndex = 7;
}
else {
if (rand() % 20 == 0)
sleep(2);
}
// NOTE: Enable this just in case of manual testing
// if (rand() % 100 == 0) {
// // randomly add some latency on 1% of the traffic
// sleep(2);
// }
}
#endif // TEST_GROUPREP
if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) {
@ -631,6 +730,18 @@ __run_query:
l_free(query_length,query);
}
#ifdef TEST_GROUPREP
group_rep_status SQLite3_Server::grouprep_test_value(const std::string& srv_addr) {
group_rep_status cur_srv_st { "YES", "YES", 0 };
auto it = grouprep_map.find(srv_addr);
if (it != grouprep_map.end()) {
cur_srv_st = it->second;
}
return cur_srv_st;
}
#endif
SQLite3_Session::SQLite3_Session() {
sessdb=new SQLite3DB();
@ -912,7 +1023,16 @@ void SQLite3_Server::init_grouprep_ifaces_string(std::string& s) {
pthread_mutex_init(&grouprep_mutex,NULL);
if (!s.empty())
s += ";";
s += "127.2.1.1:3306;127.2.1.2:3306;127.2.1.3:3306";
// Maximum number of servers to simulate.
max_num_grouprep_servers = 50;
for (unsigned int i=0; i < max_num_grouprep_servers; i++) {
s += "127.2.1." + std::to_string(i) + ":3306";
if (i != max_num_grouprep_servers) {
s += ";";
}
}
}
#endif // TEST_GROUPREP
@ -1094,19 +1214,77 @@ void SQLite3_Server::populate_aws_aurora_table(MySQL_Session *sess) {
#endif // TEST_AURORA
#ifdef TEST_GROUPREP
/**
* @brief Populates the 'grouprep' table if it's found empty with the default
* values for the three testing servers.
*
* NOTE: This function needs to be called with lock on grouprep_mutex already acquired
*
* @param sess The current session performing a query.
* @param txs_behind Unused parameter.
*/
void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind) {
// this function needs to be called with lock on mutex galera_mutex already acquired
GloAdmin->mysql_servers_wrlock();
// We are going to repopulate the map
this->grouprep_map.clear();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
string query { "SELECT * FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" };
sessdb->execute_statement(query.c_str(), &error, &cols, &affected_rows, &resultset);
if (resultset) {
for (const SQLite3_row* r : resultset->rows) {
std::string srv_addr { std::string(r->fields[0]) + ":" + std::string(r->fields[1]) };
const group_rep_status srv_status {
std::string { r->fields[2] } == "YES" ? true : false,
std::string { r->fields[3] } == "YES" ? true : false,
atoi(r->fields[4])
};
this->grouprep_map[srv_addr] = srv_status;
}
}
delete resultset;
// Insert some default servers for manual testing.
//
sessdb->execute("DELETE FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS");
string myip = string(sess->client_myds->proxy_addr.addr);
string server_id = myip.substr(8,1);
if (server_id == "1")
sessdb->execute("INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'NO', 0)");
else {
std::stringstream ss;
ss << "INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate, read_only, transactions_behind) values ('YES', 'YES', " << txs_behind << ")";
sessdb->execute(ss.str().c_str());
// NOTE: This logic can be improved in the future, for now it only populates
// the 'monitoring' data for the default severs. If more servers are placed
// as the default ones, more servers will be placed in their appropiated
// hostgroups with the same pattern as first ones.
if (this->grouprep_map.size() == 0) {
GloAdmin->admindb->execute_statement(
(char*)"SELECT DISTINCT hostname, port, hostgroup_id FROM mysql_servers"
" WHERE hostgroup_id BETWEEN 2700 AND 4200",
&error, &cols , &affected_rows , &resultset
);
for (const SQLite3_row* r : resultset->rows) {
std::string hostname { r->fields[0] };
int port = atoi(r->fields[1]);
int hostgroup_id = atoi(r->fields[2]);
const std::string t_insert_query {
"INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS"
" (hostname, port, viable_candidate, read_only, transactions_behind) VALUES"
" ('%s', %d, '%s', '%s', 0)"
};
std::string insert_query {};
if (hostgroup_id % 4 == 0) {
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO");
sessdb->execute(insert_query.c_str());
} else {
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES");
sessdb->execute(insert_query.c_str());
}
}
delete resultset;
}
GloAdmin->mysql_servers_wrunlock();
}
#endif // TEST_GALERA
@ -1179,7 +1357,11 @@ bool SQLite3_Server::init() {
tables_defs_grouprep = new std::vector<table_def_t *>;
insert_into_tables_defs(tables_defs_grouprep,
(const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS",
(const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS (viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null)");
(const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, PRIMARY KEY (hostname, port)"
")"
);
check_and_build_standard_tables(sessdb, tables_defs_grouprep);
GloAdmin->enable_grouprep_testing();
#endif // TEST_GALERA

Loading…
Cancel
Save