DRAFT: Monitor 'Group replication' rework

This is a draft of a rework for Monitor 'Group Replication', this
reworks reuses the Monitoring model already taken for AWS Aurora for
'Group Replication'.
v2.x-group_replication_rework-SHUNNED_promotion
Javier Jaramago Fernández 3 years ago
parent 22ff0feea4
commit 51c723f6fc

@ -192,7 +192,9 @@ class MySQL_Monitor_State_Data {
public:
MySQL_Monitor_State_Data_Task_Type task_id;
struct timeval tv_out;
/* @brief Time prior fetch operations. 'Start time' of the monitoring check. */
unsigned long long t1;
/* @brief Time post fetch operations. Current time before peforming local monitoring actions. */
unsigned long long t2;
int ST;
char *hostname;
@ -404,6 +406,7 @@ class MySQL_Monitor {
void * monitor_ping();
void * monitor_read_only();
void * monitor_group_replication();
void * monitor_group_replication_2();
void * monitor_galera();
void * monitor_aws_aurora();
void * monitor_replication_lag();

@ -129,6 +129,7 @@ enum debug_module {
PROXY_DEBUG_QUERY_CACHE,
PROXY_DEBUG_QUERY_STATISTICS,
PROXY_DEBUG_RESTAPI,
PROXY_DEBUG_MONITOR,
PROXY_DEBUG_UNKNOWN // this module doesn't exist. It is used only to define the last possible module
};
@ -437,6 +438,7 @@ enum PROXYSQL_MYSQL_ERR {
ER_PROXYSQL_LAGGING_SRV = 9005,
ER_PROXYSQL_PING_TIMEOUT = 9006,
ER_PROXYSQL_CHANGE_USER_TIMEOUT = 9007,
ER_PROXYSQL_GR_HEALTH_CONN_CHECK_TIMEOUT = 9020,
ER_PROXYSQL_GR_HEALTH_CHECK_TIMEOUT = 9008,
ER_PROXYSQL_GR_HEALTH_CHECKS_MISSED = 9009,
ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT = 9010,

@ -3632,9 +3632,18 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u
*/
void MySQL_HostGroups_Manager::group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) {
if (myhgc == NULL || address == NULL) return;
proxy_debug(
PROXY_DEBUG_MONITOR, 5, "Params - address: %s, port: %d, lag_count: %d, enable: %d\n", address, port,
lag_count, enable
);
for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
proxy_debug(
PROXY_DEBUG_MONITOR, 6, "Server 'MySrvC' - address: %s, port: %d, status: %d\n", mysrvc->address,
mysrvc->port, mysrvc->status
);
if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) {
if (enable == true) {
@ -3692,7 +3701,6 @@ void MySQL_HostGroups_Manager::group_replication_lag_action(
rhid_row = rhid_res->rows[0];
reader_hostgroup = atoi(rhid_row->fields[0]);
// writer_is_also_reader = atoi(rhid_row->fields[1]);
{
MyHGC* myhgc = nullptr;

@ -8,6 +8,7 @@
* original implementation
*/
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <future>
@ -58,6 +59,11 @@ static MySQL_Monitor *GloMyMon;
} while (rc==SQLITE_LOCKED || rc==SQLITE_BUSY);\
} while (0)
using std::string;
using std::set;
using std::vector;
using std::unique_ptr;
template<typename T, bool check_monitor_enabled_flag = true>
class ConsumerThread : public Thread {
wqueue<WorkItem<T>*>& m_queue;
@ -559,7 +565,8 @@ void * monitor_group_replication_pthread(void *arg) {
usleep(50000);
}
usleep(100000);
GloMyMon->monitor_group_replication();
// GloMyMon->monitor_group_replication();
GloMyMon->monitor_group_replication_2();
return NULL;
}
@ -3156,6 +3163,805 @@ __sleep_monitor_read_only:
return NULL;
}
set<uint32_t> extract_writer_hgs(SQLite3_result* Group_Replication_Hosts_resultset) {
set<uint32_t> writer_hgs {};
// NOTE: This operation should be at worst `N * log(N)`
if (Group_Replication_Hosts_resultset->rows_count) {
for (SQLite3_row* sqlite_row : Group_Replication_Hosts_resultset->rows) {
writer_hgs.insert(atoi(sqlite_row->fields[0]));
}
}
return writer_hgs;
}
/**
* @brief Holds the info from a GR server definition.
*/
typedef struct _gr_host_def_t {
string host;
int port;
int use_ssl;
bool writer_is_also_reader;
int max_transactions_behind;
int max_transactions_behind_count;
} gr_host_def_t;
/**
* @brief Extracts a 'MySQL_Monitor_State_Data' from the provided 'SQLite3_result*'.
* @details The expected contents of the provided 'SQLite3_result*' are the ones generated by
* 'MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table'.
* @param Group_Replication_Hosts_resultset Resultset held by 'MySQL_Monitor' and generated by
* 'MySQL_HostGroups_Manager' to be used to build a 'MySQL_Monitor_State_Data'.
* @return Vector with the GR servers configurations.
*/
vector<gr_host_def_t> extract_gr_host_defs(
uint32_t tg_writer_hg, SQLite3_result* Group_Replication_Hosts_resultset
) {
vector<gr_host_def_t> result {};
for (SQLite3_row* row : Group_Replication_Hosts_resultset->rows) {
uint32_t writer_hg = atoi(row->fields[0]);
if (tg_writer_hg == writer_hg) {
char* hostname = row->fields[1];
int port = atoi(row->fields[2]);
bool use_ssl = atoi(row->fields[3]);
bool wr_is_also_rd = atoi(row->fields[4]);
int max_trx_behind = atoi(row->fields[5]);
int max_trx_behind_count = mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count;
result.push_back({ hostname, port, use_ssl, wr_is_also_rd, max_trx_behind, max_trx_behind_count });
}
}
return result;
}
/**
* @brief Filter the responsive servers from the supplied hosts definitions.
* @details Responsive servers are servers not exceeding 'mysql_thread___monitor_ping_max_failures'.
* @param hosts_defs Hosts definitions to filter
* @return Responsive servers found in the supplied hosts definitions.
*/
vector<gr_host_def_t> find_resp_srvs(const vector<gr_host_def_t>& hosts_defs) {
vector<gr_host_def_t> resp_srvs {};
for (const gr_host_def_t& host_def : hosts_defs) {
char* c_hostname = const_cast<char*>(host_def.host.c_str());
if (GloMyMon->server_responds_to_ping(c_hostname, host_def.port)) {
resp_srvs.push_back(host_def);
}
}
return resp_srvs;
}
/**
* @brief Simple wrapper struct holding a 'mmsd' and extra information about it's creation.
*/
struct srv_mmsd_t {
bool created_conn;
unique_ptr<MySQL_Monitor_State_Data> mmsd;
};
string create_conn_err_msg(const unique_ptr<MySQL_Monitor_State_Data>& mmsd) {
const char ACCESS_DENIED_MSG[] { "Access denied for user" };
const char* srv_overload = "If the server is overload, increase mysql-monitor_connect_timeout. ";
if (strncmp(mmsd->mysql_error_msg, ACCESS_DENIED_MSG, strlen(ACCESS_DENIED_MSG)) == 0) {
srv_overload = "";
}
cfmt_t err_fmt = cstr_format(
"%sError: timeout or error in creating new connection: %s", srv_overload, mmsd->mysql_error_msg
);
return err_fmt.str;
}
/**
* @brief Initializes a 'MySQL_Monitor_State_Data' with a MySQL conn.
*
* @param srv_def The server info for the initialization.
* @param writer_hg The writer_hostgroup to specify.
* @param start_time The time at which this conn creation operation was started.
*
* @return A wrapper over the created 'mmsd' with the conn creation info.
*/
srv_mmsd_t init_mmsd_with_conn(const gr_host_def_t srv_def, uint32_t writer_hg, uint64_t start_time) {
char* c_hostname = const_cast<char*>(srv_def.host.c_str());
unique_ptr<MySQL_Monitor_State_Data> mmsd {
new MySQL_Monitor_State_Data { c_hostname, srv_def.port, NULL, static_cast<bool>(srv_def.use_ssl), }
};
mmsd->t1 = start_time;
mmsd->writer_hostgroup = writer_hg;
mmsd->writer_is_also_reader = srv_def.writer_is_also_reader;
mmsd->max_transactions_behind = srv_def.max_transactions_behind;
mmsd->max_transactions_behind_count = srv_def.max_transactions_behind_count;
mmsd->mysql = GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get());
bool created_conn = false;
if (mmsd->mysql == NULL) {
bool rc = mmsd->create_new_connection();
if (rc && mmsd->mysql) {
GloMyMon->My_Conn_Pool->conn_register(mmsd.get());
created_conn = true;
} else {
uint64_t now = monotonic_time();
string err_msg = create_conn_err_msg(mmsd);
proxy_error(
"Error on Group Replication check for %s:%d after %lldms. Unable to create a connection. %s.\n",
mmsd->hostname, mmsd->port, (now - mmsd->t1)/1000, err_msg.c_str()
);
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port,
ER_PROXYSQL_GR_HEALTH_CONN_CHECK_TIMEOUT
);
// Update 'mmsd' error message to report connection creating failure
cfmt_t conn_err_msg {
cstr_format("timeout or error in creating new connection: %s", mmsd->mysql_error_msg)
};
mmsd->mysql_error_msg = strdup(conn_err_msg.str.c_str());
}
}
return { created_conn, std::move(mmsd) };
}
/// TODO: TO BE REPLACED BY ASYNC IMPL
/// ///////////////////////////////////////////////////////////////////////////
struct async_mysql_action {
enum action {
query,
store
};
};
int cont_async_mysql_action(
async_mysql_action::action action, unique_ptr<MySQL_Monitor_State_Data>& mmsd
) {
while (mmsd->async_exit_status) {
mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
// Check if 'shutdown' have been received after blocking actions
if (GloMyMon->shutdown == true) {
return -2;
}
uint64_t now = monotonic_time();
if (now > mmsd->t1 + mysql_thread___monitor_groupreplication_healthcheck_timeout * 1000) {
mmsd->mysql_error_msg = strdup("timeout check");
proxy_error(
"Timeout on group replication health check for %s:%d after %lldms. If the server is overload, increase"
" mysql-monitor_groupreplication_healthcheck_timeout. Assuming viable_candidate=NO and read_only=YES\n",
mmsd->hostname, mmsd->port, (now - mmsd->t1)/1000
);
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port,
ER_PROXYSQL_GR_HEALTH_CHECK_TIMEOUT
);
return -1;
}
if (mmsd->interr) {
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)
);
return -1;
}
if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) {
if (action == async_mysql_action::query) {
mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status);
} else {
mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status);
}
}
}
if (action == async_mysql_action::query) {
if (mmsd->interr) {
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)
);
return -1;
}
} else {
if (mysql_errno(mmsd->mysql)) {
mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql));
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)
);
return -1;
}
}
return EXIT_SUCCESS;
}
/// ///////////////////////////////////////////////////////////////////////////
/**
* @brief MySQL 8 status query for Group Replication members.
* @details Since 'MySQL 8' we rely on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE', deprecating the previously
* required 'sys.gr_member_routing_candidate_status' view.
*/
const char MYSQL_8_GR_QUERY[] {
"SELECT (SELECT IF ("
"MEMBER_STATE='ONLINE' AND ("
"(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE != 'ONLINE') >="
" ((SELECT COUNT(*) FROM performance_schema.replication_group_members)/2) = 0)"
", 'YES', 'NO')) AS viable_candidate,"
" (SELECT IF (@@read_only, 'YES', 'NO')) as read_only,"
" COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind "
"FROM "
"performance_schema.replication_group_members "
"JOIN performance_schema.replication_group_member_stats rgms USING(member_id)"
"WHERE rgms.MEMBER_ID=@@SERVER_UUID"
};
void gr_fetch_srv_status(srv_mmsd_t& ini_mmsd) {
unique_ptr<MySQL_Monitor_State_Data>& mmsd { ini_mmsd.mmsd };
if (ini_mmsd.mmsd->mysql == nullptr) {
proxy_warning(
"Skipping fetching for server %s:%d due to error '%s'\n",
ini_mmsd.mmsd->hostname, ini_mmsd.mmsd->port, ini_mmsd.mmsd->mysql_error_msg
);
return;
}
#ifdef TEST_GROUPREP
string query { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" };
query += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port);
#else
string query {};
// MySQL-8: Query dependent on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE'; deprecating the previously
// used `sys.gr_member_routing_candidate_status` view.
if (strncasecmp(mmsd->mysql->server_version, "8", 1) == 0) {
query = MYSQL_8_GR_QUERY;
} else {
// If not MySQL 8 we default back to the old check
query = "SELECT viable_candidate,read_only,transactions_behind FROM sys.gr_member_routing_candidate_status";
}
#endif
mmsd->async_exit_status = mysql_query_start(&mmsd->interr, mmsd->mysql, query.c_str());
int my_async_rc = cont_async_mysql_action(async_mysql_action::query, mmsd);
if (my_async_rc != EXIT_SUCCESS) {
proxy_error(
"Got error: mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd,
mmsd->mysql_error_msg
);
return;
}
mmsd->async_exit_status = mysql_store_result_start(&mmsd->result, mmsd->mysql);
my_async_rc = cont_async_mysql_action(async_mysql_action::store, mmsd);
if (my_async_rc != EXIT_SUCCESS) {
proxy_error(
"Got error: mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd,
mmsd->mysql_error_msg
);
}
}
struct gr_srv_st_t {
bool viable_candidate = false;
bool read_only = true;
int64_t transactions_behind = -1;
bool inv_srv_state = false;
};
gr_srv_st_t extract_gr_srv_st(unique_ptr<MySQL_Monitor_State_Data>& mmsd) {
gr_srv_st_t gr_srv_st {};
if (mmsd->interr == 0 && mmsd->result) {
int num_fields=0;
int num_rows=0;
MYSQL_FIELD * fields = mysql_fetch_fields(mmsd->result);
num_fields = mysql_num_fields(mmsd->result);
num_rows = mysql_num_rows(mmsd->result);
if (fields == NULL || num_fields!=3 || num_rows!=1) {
proxy_error(
"'mysql_fetch_fields' returns 'NULL', or 'mysql_num_fields(%d)', or 'mysql_num_rows(%d)' are incorrect."
" Server %s:%d. See bug #1994\n",
num_fields, num_rows, mmsd->hostname, mmsd->port
);
if (mmsd->mysql_error_msg == NULL) {
mmsd->mysql_error_msg = strdup("Unknown error");
}
gr_srv_st.inv_srv_state = true;
} else {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row[0] && !strcasecmp(row[0],"YES")) {
gr_srv_st.viable_candidate=true;
}
if (row[1] && !strcasecmp(row[1],"NO")) {
gr_srv_st.read_only=false;
}
if (row[2]) {
gr_srv_st.transactions_behind=atol(row[2]);
}
}
}
proxy_debug(
PROXY_DEBUG_MONITOR, 4,
"Fetched %u:%s:%d info - interr: %d, error: %s, viable_candidate:'%d', read_only:'%d',"
" transactions_behind:'%ld'\n",
mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mmsd->interr, mmsd->mysql_error_msg,
gr_srv_st.viable_candidate, gr_srv_st.read_only, gr_srv_st.transactions_behind
);
if (mmsd->result) {
mysql_free_result(mmsd->result);
mmsd->result=NULL;
}
return gr_srv_st;
}
/**
* @brief Holds all info required for performing monitoring actions over the GR node.
*/
struct gr_node_info_t {
gr_srv_st_t srv_st;
bool unresp_server = false;
int num_timeouts = 0;
int lag_counts = 0;
};
gr_node_info_t gr_update_hosts_map(
uint64_t start_time, const gr_srv_st_t& gr_srv_st, unique_ptr<MySQL_Monitor_State_Data>& mmsd
) {
// NOTE: This isn't specified in the initializer list due to current standard limitations
gr_node_info_t node_info {};
node_info.srv_st = gr_srv_st;
// Consider 'time_now' to be 'now - fetch_duration'
unsigned long long time_now=realtime_time();
time_now=time_now-(mmsd->t2 - start_time);
cfmt_t fmt_srv_addr { cstr_format("%s:%d", mmsd->hostname, mmsd->port) };
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
std::map<std::string, MyGR_monitor_node *>::iterator it2;
it2 = GloMyMon->Group_Replication_Hosts_Map.find(fmt_srv_addr.str);
MyGR_monitor_node *node=NULL;
if (it2!=GloMyMon->Group_Replication_Hosts_Map.end()) {
node=it2->second;
node->add_entry(
time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1), gr_srv_st.transactions_behind,
gr_srv_st.viable_candidate, gr_srv_st.read_only,mmsd->mysql_error_msg
);
} else {
node = new MyGR_monitor_node(mmsd->hostname,mmsd->port,mmsd->writer_hostgroup);
node->add_entry(
time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1), gr_srv_st.transactions_behind,
gr_srv_st.viable_candidate, gr_srv_st.read_only,mmsd->mysql_error_msg
);
GloMyMon->Group_Replication_Hosts_Map.insert(std::make_pair(fmt_srv_addr.str,node));
}
if (mmsd->mysql_error_msg) {
if (strncasecmp(mmsd->mysql_error_msg, (char *)"timeout", 7) == 0) {
node_info.num_timeouts = node->get_timeout_count();
}
}
// NOTE: Previously 'lag_counts' was only updated for 'read_only'
// because 'writers' were never selected for being set 'OFFLINE' due to
// replication lag. Since the change of this behavior to 'SHUNNING'
// with replication lag, no matter it's 'read_only' value, 'lag_counts'
// is computed everytime.
node_info.lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind);
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
return node_info;
}
/**
* @brief Perform the actual monitoring action on the server based on the 'mmsd' info.
*
* @param mmsd The 'mmsd' holding info about fetching errors.
* @param node_info The fetched server information itself.
*/
void gr_mon_action_over_resp_srv(unique_ptr<MySQL_Monitor_State_Data>& mmsd, const gr_node_info_t& node_info) {
// NOTE: We update MyHGM outside the mutex group_replication_mutex
if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR
if (node_info.num_timeouts == 0) {
// it wasn't a timeout, reconfigure immediately
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg);
} else {
proxy_warning("%s:%d : group replication health check timeout count %d. Max threshold %d.\n",
mmsd->hostname, mmsd->port, node_info.num_timeouts, mmsd->max_transactions_behind_count);
// It was a timeout. Check if we are having consecutive timeout
if (node_info.num_timeouts == mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count) {
proxy_error("Server %s:%d missed %d group replication checks. Number retries %d, Assuming offline\n",
mmsd->hostname, mmsd->port, node_info.num_timeouts, node_info.num_timeouts);
MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_GR_HEALTH_CHECKS_MISSED);
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg);
}
}
} else {
if (node_info.srv_st.viable_candidate==false) {
MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO");
} else {
if (node_info.srv_st.read_only==true) {
MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES");
} else {
// the node is a writer
// TODO: for now we don't care about the number of writers
MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup);
}
// NOTE: Replication lag action should takes place **after** the
// servers have been placed in the correct hostgroups, otherwise
// during the reconfiguration of the servers due to 'update_group_replication_set_writer'
// there would be a small window in which the 'SHUNNED' server
// will be treat as 'ONLINE' letting some new connections to
// take places, before it becomes 'SHUNNED' again.
bool enable = true;
if (node_info.lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) {
enable = false;
}
MyHGM->group_replication_lag_action(
mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, node_info.lag_counts, node_info.srv_st.read_only, enable
);
}
}
}
/**
* @brief NOTE: Currently unused. Unresponsive servers are SHUNNED by monitoring PING actions, and no further
* monitoring actions are performed on them.
*
* @param hosts_defs Unresponsive hosts.
* @param wr_hg The writer hostgroup from the unresponsive hosts.
*/
void gr_handle_actions_over_unresp_srvs(const vector<gr_host_def_t>& hosts_defs, uint32_t wr_hg) {
char unresp_err_msg[] = "Server unresponsive to PING requests";
for (const gr_host_def_t& host_def : hosts_defs) {
char* c_hostname = const_cast<char*>(host_def.host.c_str());
proxy_warning(
"%s:%d: Server considered OFFLINE due to unresponsiveness to PING requests", c_hostname, host_def.port
);
MyHGM->update_group_replication_set_offline(c_hostname, host_def.port, wr_hg, unresp_err_msg);
}
}
/**
* @brief Handles the return of the 'MySQL' conn used by the 'mmsd' to Monitor 'ConnectionPool'.
* @details Connections are returned to the 'ConnectionPool' if no errors took place during the fetching. If
* the connection is a new created connection, we try to configured it with the proper 'set_wait_timeout'
* before placing the connection back into the 'ConnectionPool', on failure, we discard the connection.
* @param srv_mmsd The mmsd wrapper holding all information for returning the connection.
*/
void handle_mmsd_mysql_conn(srv_mmsd_t& srv_mmsd) {
if (srv_mmsd.mmsd == nullptr) return;
unique_ptr<MySQL_Monitor_State_Data>& mmsd = srv_mmsd.mmsd;
if (mmsd->mysql) {
if (mmsd->interr || mmsd->mysql_error_msg) {
GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql);
} else {
if (srv_mmsd.created_conn) {
bool rc = mmsd->set_wait_timeout();
if (rc) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
} else {
proxy_error(
"Error by 'set_wait_timeout' for new connection. mmsd %p , MYSQL %p , FD %d : %s\n",
mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd, mmsd->mysql_error_msg
);
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)
);
GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql);
}
} else {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
}
}
mmsd->mysql=NULL;
}
}
/**
* @brief Report the fetching errors of the supplied 'mmsd' and increase the corresponding counter.
* @param srv_mmsd The 'mmsd' which failures are to be reported.
*/
void gr_report_fetching_errs(srv_mmsd_t& srv_mmsd) {
unique_ptr<MySQL_Monitor_State_Data>& mmsd = srv_mmsd.mmsd;
if (mmsd->mysql) {
if (mmsd->interr || mmsd->mysql_error_msg) {
proxy_error(
"Got error. mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql,
mmsd->mysql->net.fd, mmsd->mysql_error_msg
);
MyHGM->p_update_mysql_error_counter(
p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql)
);
}
}
}
/**
* @brief Performs the corresponding monitoring actions over the supplied 'MySQL_Monitor_State_Data'.
* @details This function expects to be called when the fetching operation has completed for the supplied
* 'MySQL_Monitor_State_Data' holding a final 'MYSQL_RES' or an error. Otherwise servers will be set to
* 'OFFLINE_HARD' due to defaults on 'gr_srv_st_t'. Actions taken are:
* 1. Extract fetching results from the supplied 'MySQL_Monitor_State_Data' into 'gr_srv_st_t'.
* 2. Update 'Group_Replication_Hosts_Map' and build a resulting 'gr_node_info_t' with the required info
* for performing the monitoring actions.
* 3. Perform any required actions to the servers through 'MySQL_HostGroups_Manager'.
*
* NOTE: We only perform monitoring actions over responsive servers, unresponsive servers are SHUNNED
* by monitoring PING actions, and no further monitoring actions should be performed on them.
*
* @param start_time The time at which this complete 'fetch + actions' monitoring cycle started.
* @param srv_mmsd The server 'MySQL_Monitor_State_Data' after the fetching is completed. It should either
* hold a valid 'MYSQL_RES' or an error.
*/
void async_gr_mon_actions_handler(uint64_t start_time, srv_mmsd_t& srv_mmsd) {
unique_ptr<MySQL_Monitor_State_Data>& mmsd = srv_mmsd.mmsd;
// Extract the server status from the 'mmsd'. Reports if invalid data is received
gr_srv_st_t gr_srv_st { extract_gr_srv_st(srv_mmsd.mmsd) };
// Report fetch errors; logs should report 'cause -> effect'
gr_report_fetching_errs(srv_mmsd);
// Perform monitoring actions; only if the response wasn't illformed
if (gr_srv_st.inv_srv_state == false) {
gr_node_info_t node_info { gr_update_hosts_map(start_time, gr_srv_st, mmsd) };
gr_mon_action_over_resp_srv(mmsd, node_info);
}
// Handle 'mmsd' MySQL conn return to 'ConnectionPool'
handle_mmsd_mysql_conn(srv_mmsd);
}
/**
* @brief Initializes the structures related with a MySQL_Thread.
* @details It doesn't initialize a real thread, just the structures associated with it.
* @return The created and initialized 'MySQL_Thread'.
*/
unique_ptr<MySQL_Thread> init_mysql_thread_struct() {
unique_ptr<MySQL_Thread> mysql_thr { new MySQL_Thread() };
mysql_thr->curtime = monotonic_time();
mysql_thr->refresh_variables();
return mysql_thr;
}
struct thread_info_t {
pthread_t pthread;
uint32_t writer_hg;
};
void* monitor_GR_thread_HG(void *arg) {
uint32_t wr_hg = *(static_cast<uint32_t*>(arg));
proxy_info("Started Monitor thread for Group Replication writer HG %u\n", wr_hg);
// Quick exit during shutdown/restart
if (!GloMTH) { return NULL; }
// Initial Monitor thread variables version
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version();
// MySQL thread structure used for variable refreshing
unique_ptr<MySQL_Thread> mysql_thr { init_mysql_thread_struct() };
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
// Get the initial config checksum; this thread must exist on any config changes
uint64_t initial_raw_checksum = GloMyMon->Group_Replication_Hosts_resultset->raw_checksum();
// Extract the monitoring data required for the target writer hostgroup
vector<gr_host_def_t> hosts_defs { extract_gr_host_defs(wr_hg, GloMyMon->Group_Replication_Hosts_resultset) };
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
uint64_t next_check_time = 0;
uint64_t MAX_CHECK_DELAY_US = 500000;
while (GloMyMon->shutdown == false && mysql_thread___monitor_enabled == true) {
if (!GloMTH) { break; } // quick exit during shutdown/restart
// Config check; Exit if config has been altered
{
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
uint64_t current_raw_checksum = GloMyMon->Group_Replication_Hosts_resultset->raw_checksum();
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
if (current_raw_checksum != initial_raw_checksum) {
break;
}
}
// Check variable version changes; refresh if needed and don't delay next check
unsigned int glover = GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover;
mysql_thr->refresh_variables();
next_check_time = 0;
}
uint64_t curtime = monotonic_time();
// Delay the next check if needed
if (curtime < next_check_time) {
uint64_t time_left = next_check_time - curtime;
uint64_t next_check_delay = 0;
if (time_left > MAX_CHECK_DELAY_US) {
next_check_delay = MAX_CHECK_DELAY_US;
} else {
next_check_delay = time_left;
}
usleep(next_check_delay);
continue;
}
// Get the current 'pingable' status for the servers.
const vector<gr_host_def_t>& resp_srvs { find_resp_srvs(hosts_defs) };
if (resp_srvs.empty()) {
proxy_error("No node is pingable for Group Replication cluster with writer HG %u\n", wr_hg);
next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000;
continue;
}
// Initialize the 'MMSD' for data fetching for responsive servers
vector<srv_mmsd_t> srvs_mmsds {};
for (const gr_host_def_t& host_def : resp_srvs) {
srvs_mmsds.push_back(init_mmsd_with_conn(host_def, wr_hg, curtime));
}
// Update 't1' for subsequent fetch operations and reset errors
for (const srv_mmsd_t& srv_mmsd : srvs_mmsds) {
if (srv_mmsd.mmsd->mysql) {
srv_mmsd.mmsd->t1 = monotonic_time();
srv_mmsd.mmsd->interr = 0;
}
}
// TODO: This section will be replaced by 'async' operations
// //////////////////////////////////////////////////////////////////////////
// Perform the fetching operations, all the info is contained within the 'MMSD' themselves.
for (srv_mmsd_t& srv_mmsd : srvs_mmsds) {
// NOTE: Just required due the sync nature of the operations. To be replaced.
srv_mmsd.mmsd->t1 = monotonic_time();
gr_fetch_srv_status(srv_mmsd);
srv_mmsd.mmsd->t2 = monotonic_time();
}
// //////////////////////////////////////////////////////////////////////////
// Extract the fetched data from resp servers and perform monitoring actions
for (srv_mmsd_t& srv_mmsd : srvs_mmsds) {
// TODO: This is the idea for the async handler to be used when fetching is rewritten
async_gr_mon_actions_handler(curtime, srv_mmsd);
}
// Set the time for the next iteration
next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000;
}
proxy_info("Stopping Monitor thread for Group Replication writer HG %u\n", wr_hg);
return NULL;
}
/**
* @brief Creates a monitoring thread for each 'GroupReplication' cluster determined by writer hostgroups.
* @param writer_hgs The writer hostgroups to use when creating the threads.
* @return A vector of 'thread_info_t' holding info of the created threads.
*/
vector<thread_info_t> create_group_replication_worker_threads(const set<uint32_t>& writer_hgs) {
proxy_info("Activating Monitoring of %lu Group Replication clusters\n", writer_hgs.size());
vector<thread_info_t> threads_info {};
for (const uint32_t writer_hg : writer_hgs) {
threads_info.push_back({pthread_t {}, writer_hg});
}
for (thread_info_t& thread_info : threads_info) {
proxy_info("Starting Monitor thread for Group Replication writer HG %u\n", thread_info.writer_hg);
int err = pthread_create(&thread_info.pthread, NULL, monitor_GR_thread_HG, &thread_info.writer_hg);
if (err) {
proxy_error("Thread creation failed with error '%s'\n", strerror(err));
assert(0);
}
}
return threads_info;
}
void* MySQL_Monitor::monitor_group_replication_2() {
uint64_t last_raw_checksum = 0;
// Quick exit during shutdown/restart
if (!GloMTH) return NULL;
// Initial Monitor thread variables version
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version = 0;
MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version();
// MySQL thread structure used for variable refreshing
unique_ptr<MySQL_Thread> mysql_thr { init_mysql_thread_struct() };
// Info of the current GR monitoring threads: handle + writer_hg
vector<thread_info_t> threads_info {};
while (GloMyMon->shutdown == false && mysql_thread___monitor_enabled == true) {
// Quick exit during shutdown/restart
if (!GloMTH) { return NULL; }
// Check variable version changes; refresh if needed
unsigned int glover = GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover;
mysql_thr->refresh_variables();
}
// Config check; Wait for all threads to stop before relaunch in case servers or options changed
pthread_mutex_lock(&group_replication_mutex);
uint64_t new_raw_checksum = Group_Replication_Hosts_resultset->raw_checksum();
pthread_mutex_unlock(&group_replication_mutex);
if (new_raw_checksum != last_raw_checksum) {
proxy_info("Detected new/changed definition for Group Replication monitoring\n");
// Update the new checksum
last_raw_checksum = new_raw_checksum;
// Wait for the threads to terminate; Threads should exit on config change
if (threads_info.empty() == false) {
for (const thread_info_t& thread_info : threads_info) {
pthread_join(thread_info.pthread, NULL);
proxy_info("Stopped Monitor thread for Group Replication writer HG %u\n", thread_info.writer_hg);
}
}
pthread_mutex_lock(&group_replication_mutex);
set<uint32_t> wr_hgs_set = extract_writer_hgs(Group_Replication_Hosts_resultset);
threads_info = create_group_replication_worker_threads(wr_hgs_set);
pthread_mutex_unlock(&group_replication_mutex);
}
usleep(10000);
}
// Signal monitor worker threads to stop
for (unsigned int i=0;i<num_threads; i++) {
WorkItem<MySQL_Monitor_State_Data> *item=NULL;
GloMyMon->queue->add(item);
}
return NULL;
}
void * MySQL_Monitor::monitor_group_replication() {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
// struct event_base *libevent_base;

@ -402,6 +402,7 @@ void init_debug_struct() {
GloVars.global.gdbg_lvl[PROXY_DEBUG_QUERY_CACHE].name=(char *)"debug_query_cache";
GloVars.global.gdbg_lvl[PROXY_DEBUG_QUERY_STATISTICS].name=(char *)"debug_query_statistics";
GloVars.global.gdbg_lvl[PROXY_DEBUG_RESTAPI].name=(char *)"debug_restapi";
GloVars.global.gdbg_lvl[PROXY_DEBUG_MONITOR].name=(char *)"debug_monitor";
for (i=0;i<PROXY_DEBUG_UNKNOWN;i++) {
// if this happen, the above table is not populated correctly

Loading…
Cancel
Save