From 51c723f6fc9545bcb6de409e0c443ec1fe773baa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Mon, 16 Jan 2023 10:57:18 +0100 Subject: [PATCH] DRAFT: Monitor 'Group replication' rework This is a draft of a rework for Monitor 'Group Replication', this reworks reuses the Monitoring model already taken for AWS Aurora for 'Group Replication'. --- include/MySQL_Monitor.hpp | 3 + include/proxysql_structs.h | 2 + lib/MySQL_HostGroups_Manager.cpp | 10 +- lib/MySQL_Monitor.cpp | 808 ++++++++++++++++++++++++++++++- lib/debug.cpp | 1 + 5 files changed, 822 insertions(+), 2 deletions(-) diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index d3308bdea..3d6341549 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -192,7 +192,9 @@ class MySQL_Monitor_State_Data { public: MySQL_Monitor_State_Data_Task_Type task_id; struct timeval tv_out; + /* @brief Time prior fetch operations. 'Start time' of the monitoring check. */ unsigned long long t1; + /* @brief Time post fetch operations. Current time before peforming local monitoring actions. */ unsigned long long t2; int ST; char *hostname; @@ -404,6 +406,7 @@ class MySQL_Monitor { void * monitor_ping(); void * monitor_read_only(); void * monitor_group_replication(); + void * monitor_group_replication_2(); void * monitor_galera(); void * monitor_aws_aurora(); void * monitor_replication_lag(); diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index a55ee54c9..9a1348c69 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -129,6 +129,7 @@ enum debug_module { PROXY_DEBUG_QUERY_CACHE, PROXY_DEBUG_QUERY_STATISTICS, PROXY_DEBUG_RESTAPI, + PROXY_DEBUG_MONITOR, PROXY_DEBUG_UNKNOWN // this module doesn't exist. It is used only to define the last possible module }; @@ -437,6 +438,7 @@ enum PROXYSQL_MYSQL_ERR { ER_PROXYSQL_LAGGING_SRV = 9005, ER_PROXYSQL_PING_TIMEOUT = 9006, ER_PROXYSQL_CHANGE_USER_TIMEOUT = 9007, + ER_PROXYSQL_GR_HEALTH_CONN_CHECK_TIMEOUT = 9020, ER_PROXYSQL_GR_HEALTH_CHECK_TIMEOUT = 9008, ER_PROXYSQL_GR_HEALTH_CHECKS_MISSED = 9009, ER_PROXYSQL_READ_ONLY_CHECK_CONN_TIMEOUT = 9010, diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index c517553aa..9fefcdc11 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -3632,9 +3632,18 @@ void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, u */ void MySQL_HostGroups_Manager::group_replication_lag_action_set_server_status(MyHGC* myhgc, char* address, int port, int lag_count, bool enable) { if (myhgc == NULL || address == NULL) return; + proxy_debug( + PROXY_DEBUG_MONITOR, 5, "Params - address: %s, port: %d, lag_count: %d, enable: %d\n", address, port, + lag_count, enable + ); for (int j=0; j<(int)myhgc->mysrvs->cnt(); j++) { MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j); + proxy_debug( + PROXY_DEBUG_MONITOR, 6, "Server 'MySrvC' - address: %s, port: %d, status: %d\n", mysrvc->address, + mysrvc->port, mysrvc->status + ); + if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) { if (enable == true) { @@ -3692,7 +3701,6 @@ void MySQL_HostGroups_Manager::group_replication_lag_action( rhid_row = rhid_res->rows[0]; reader_hostgroup = atoi(rhid_row->fields[0]); - // writer_is_also_reader = atoi(rhid_row->fields[1]); { MyHGC* myhgc = nullptr; diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 32acbf9d9..d2ec4f682 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -8,6 +8,7 @@ * original implementation */ #include +#include #include #include #include @@ -58,6 +59,11 @@ static MySQL_Monitor *GloMyMon; } while (rc==SQLITE_LOCKED || rc==SQLITE_BUSY);\ } while (0) +using std::string; +using std::set; +using std::vector; +using std::unique_ptr; + template class ConsumerThread : public Thread { wqueue*>& m_queue; @@ -559,7 +565,8 @@ void * monitor_group_replication_pthread(void *arg) { usleep(50000); } usleep(100000); - GloMyMon->monitor_group_replication(); + // GloMyMon->monitor_group_replication(); + GloMyMon->monitor_group_replication_2(); return NULL; } @@ -3156,6 +3163,805 @@ __sleep_monitor_read_only: return NULL; } +set extract_writer_hgs(SQLite3_result* Group_Replication_Hosts_resultset) { + set writer_hgs {}; + + // NOTE: This operation should be at worst `N * log(N)` + if (Group_Replication_Hosts_resultset->rows_count) { + for (SQLite3_row* sqlite_row : Group_Replication_Hosts_resultset->rows) { + writer_hgs.insert(atoi(sqlite_row->fields[0])); + } + } + + return writer_hgs; +} + +/** + * @brief Holds the info from a GR server definition. + */ +typedef struct _gr_host_def_t { + string host; + int port; + int use_ssl; + bool writer_is_also_reader; + int max_transactions_behind; + int max_transactions_behind_count; +} gr_host_def_t; + +/** + * @brief Extracts a 'MySQL_Monitor_State_Data' from the provided 'SQLite3_result*'. + * @details The expected contents of the provided 'SQLite3_result*' are the ones generated by + * 'MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table'. + * @param Group_Replication_Hosts_resultset Resultset held by 'MySQL_Monitor' and generated by + * 'MySQL_HostGroups_Manager' to be used to build a 'MySQL_Monitor_State_Data'. + * @return Vector with the GR servers configurations. + */ +vector extract_gr_host_defs( + uint32_t tg_writer_hg, SQLite3_result* Group_Replication_Hosts_resultset +) { + vector result {}; + + for (SQLite3_row* row : Group_Replication_Hosts_resultset->rows) { + uint32_t writer_hg = atoi(row->fields[0]); + + if (tg_writer_hg == writer_hg) { + char* hostname = row->fields[1]; + int port = atoi(row->fields[2]); + bool use_ssl = atoi(row->fields[3]); + bool wr_is_also_rd = atoi(row->fields[4]); + int max_trx_behind = atoi(row->fields[5]); + int max_trx_behind_count = mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count; + + result.push_back({ hostname, port, use_ssl, wr_is_also_rd, max_trx_behind, max_trx_behind_count }); + } + } + + return result; +} + +/** + * @brief Filter the responsive servers from the supplied hosts definitions. + * @details Responsive servers are servers not exceeding 'mysql_thread___monitor_ping_max_failures'. + * @param hosts_defs Hosts definitions to filter + * @return Responsive servers found in the supplied hosts definitions. + */ +vector find_resp_srvs(const vector& hosts_defs) { + vector resp_srvs {}; + + for (const gr_host_def_t& host_def : hosts_defs) { + char* c_hostname = const_cast(host_def.host.c_str()); + + if (GloMyMon->server_responds_to_ping(c_hostname, host_def.port)) { + resp_srvs.push_back(host_def); + } + } + + return resp_srvs; +} + +/** + * @brief Simple wrapper struct holding a 'mmsd' and extra information about it's creation. + */ +struct srv_mmsd_t { + bool created_conn; + unique_ptr mmsd; +}; + +string create_conn_err_msg(const unique_ptr& mmsd) { + const char ACCESS_DENIED_MSG[] { "Access denied for user" }; + + const char* srv_overload = "If the server is overload, increase mysql-monitor_connect_timeout. "; + if (strncmp(mmsd->mysql_error_msg, ACCESS_DENIED_MSG, strlen(ACCESS_DENIED_MSG)) == 0) { + srv_overload = ""; + } + + cfmt_t err_fmt = cstr_format( + "%sError: timeout or error in creating new connection: %s", srv_overload, mmsd->mysql_error_msg + ); + + return err_fmt.str; +} + +/** + * @brief Initializes a 'MySQL_Monitor_State_Data' with a MySQL conn. + * + * @param srv_def The server info for the initialization. + * @param writer_hg The writer_hostgroup to specify. + * @param start_time The time at which this conn creation operation was started. + * + * @return A wrapper over the created 'mmsd' with the conn creation info. + */ +srv_mmsd_t init_mmsd_with_conn(const gr_host_def_t srv_def, uint32_t writer_hg, uint64_t start_time) { + char* c_hostname = const_cast(srv_def.host.c_str()); + unique_ptr mmsd { + new MySQL_Monitor_State_Data { c_hostname, srv_def.port, NULL, static_cast(srv_def.use_ssl), } + }; + mmsd->t1 = start_time; + mmsd->writer_hostgroup = writer_hg; + mmsd->writer_is_also_reader = srv_def.writer_is_also_reader; + mmsd->max_transactions_behind = srv_def.max_transactions_behind; + mmsd->max_transactions_behind_count = srv_def.max_transactions_behind_count; + mmsd->mysql = GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port, mmsd.get()); + bool created_conn = false; + + if (mmsd->mysql == NULL) { + bool rc = mmsd->create_new_connection(); + + if (rc && mmsd->mysql) { + GloMyMon->My_Conn_Pool->conn_register(mmsd.get()); + created_conn = true; + } else { + uint64_t now = monotonic_time(); + string err_msg = create_conn_err_msg(mmsd); + + proxy_error( + "Error on Group Replication check for %s:%d after %lldms. Unable to create a connection. %s.\n", + mmsd->hostname, mmsd->port, (now - mmsd->t1)/1000, err_msg.c_str() + ); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, + ER_PROXYSQL_GR_HEALTH_CONN_CHECK_TIMEOUT + ); + + // Update 'mmsd' error message to report connection creating failure + cfmt_t conn_err_msg { + cstr_format("timeout or error in creating new connection: %s", mmsd->mysql_error_msg) + }; + mmsd->mysql_error_msg = strdup(conn_err_msg.str.c_str()); + } + } + + return { created_conn, std::move(mmsd) }; +} + +/// TODO: TO BE REPLACED BY ASYNC IMPL +/// /////////////////////////////////////////////////////////////////////////// + +struct async_mysql_action { + enum action { + query, + store + }; +}; + +int cont_async_mysql_action( + async_mysql_action::action action, unique_ptr& mmsd +) { + while (mmsd->async_exit_status) { + mmsd->async_exit_status = wait_for_mysql(mmsd->mysql, mmsd->async_exit_status); + + // Check if 'shutdown' have been received after blocking actions + if (GloMyMon->shutdown == true) { + return -2; + } + + uint64_t now = monotonic_time(); + if (now > mmsd->t1 + mysql_thread___monitor_groupreplication_healthcheck_timeout * 1000) { + mmsd->mysql_error_msg = strdup("timeout check"); + proxy_error( + "Timeout on group replication health check for %s:%d after %lldms. If the server is overload, increase" + " mysql-monitor_groupreplication_healthcheck_timeout. Assuming viable_candidate=NO and read_only=YES\n", + mmsd->hostname, mmsd->port, (now - mmsd->t1)/1000 + ); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, + ER_PROXYSQL_GR_HEALTH_CHECK_TIMEOUT + ); + + return -1; + } + + if (mmsd->interr) { + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) + ); + + return -1; + } + + if ((mmsd->async_exit_status & MYSQL_WAIT_TIMEOUT) == 0) { + if (action == async_mysql_action::query) { + mmsd->async_exit_status = mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status); + } else { + mmsd->async_exit_status = mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status); + } + } + } + + if (action == async_mysql_action::query) { + if (mmsd->interr) { + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) + ); + return -1; + } + } else { + if (mysql_errno(mmsd->mysql)) { + mmsd->mysql_error_msg = strdup(mysql_error(mmsd->mysql)); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) + ); + return -1; + } + } + + return EXIT_SUCCESS; +} + +/// /////////////////////////////////////////////////////////////////////////// + +/** + * @brief MySQL 8 status query for Group Replication members. + * @details Since 'MySQL 8' we rely on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE', deprecating the previously + * required 'sys.gr_member_routing_candidate_status' view. + */ +const char MYSQL_8_GR_QUERY[] { + "SELECT (SELECT IF (" + "MEMBER_STATE='ONLINE' AND (" + "(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE != 'ONLINE') >=" + " ((SELECT COUNT(*) FROM performance_schema.replication_group_members)/2) = 0)" + ", 'YES', 'NO')) AS viable_candidate," + " (SELECT IF (@@read_only, 'YES', 'NO')) as read_only," + " COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind " + "FROM " + "performance_schema.replication_group_members " + "JOIN performance_schema.replication_group_member_stats rgms USING(member_id)" + "WHERE rgms.MEMBER_ID=@@SERVER_UUID" +}; + +void gr_fetch_srv_status(srv_mmsd_t& ini_mmsd) { + unique_ptr& mmsd { ini_mmsd.mmsd }; + + if (ini_mmsd.mmsd->mysql == nullptr) { + proxy_warning( + "Skipping fetching for server %s:%d due to error '%s'\n", + ini_mmsd.mmsd->hostname, ini_mmsd.mmsd->port, ini_mmsd.mmsd->mysql_error_msg + ); + return; + } + +#ifdef TEST_GROUPREP + string query { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" }; + query += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port); +#else + string query {}; + + // MySQL-8: Query dependent on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE'; deprecating the previously + // used `sys.gr_member_routing_candidate_status` view. + if (strncasecmp(mmsd->mysql->server_version, "8", 1) == 0) { + query = MYSQL_8_GR_QUERY; + } else { + // If not MySQL 8 we default back to the old check + query = "SELECT viable_candidate,read_only,transactions_behind FROM sys.gr_member_routing_candidate_status"; + } +#endif + + mmsd->async_exit_status = mysql_query_start(&mmsd->interr, mmsd->mysql, query.c_str()); + int my_async_rc = cont_async_mysql_action(async_mysql_action::query, mmsd); + + if (my_async_rc != EXIT_SUCCESS) { + proxy_error( + "Got error: mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd, + mmsd->mysql_error_msg + ); + return; + } + + mmsd->async_exit_status = mysql_store_result_start(&mmsd->result, mmsd->mysql); + my_async_rc = cont_async_mysql_action(async_mysql_action::store, mmsd); + + if (my_async_rc != EXIT_SUCCESS) { + proxy_error( + "Got error: mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd, + mmsd->mysql_error_msg + ); + } +} + +struct gr_srv_st_t { + bool viable_candidate = false; + bool read_only = true; + int64_t transactions_behind = -1; + bool inv_srv_state = false; +}; + +gr_srv_st_t extract_gr_srv_st(unique_ptr& mmsd) { + gr_srv_st_t gr_srv_st {}; + + if (mmsd->interr == 0 && mmsd->result) { + int num_fields=0; + int num_rows=0; + MYSQL_FIELD * fields = mysql_fetch_fields(mmsd->result); + num_fields = mysql_num_fields(mmsd->result); + num_rows = mysql_num_rows(mmsd->result); + + if (fields == NULL || num_fields!=3 || num_rows!=1) { + proxy_error( + "'mysql_fetch_fields' returns 'NULL', or 'mysql_num_fields(%d)', or 'mysql_num_rows(%d)' are incorrect." + " Server %s:%d. See bug #1994\n", + num_fields, num_rows, mmsd->hostname, mmsd->port + ); + if (mmsd->mysql_error_msg == NULL) { + mmsd->mysql_error_msg = strdup("Unknown error"); + } + gr_srv_st.inv_srv_state = true; + } else { + MYSQL_ROW row=mysql_fetch_row(mmsd->result); + if (row[0] && !strcasecmp(row[0],"YES")) { + gr_srv_st.viable_candidate=true; + } + if (row[1] && !strcasecmp(row[1],"NO")) { + gr_srv_st.read_only=false; + } + if (row[2]) { + gr_srv_st.transactions_behind=atol(row[2]); + } + } + } + + proxy_debug( + PROXY_DEBUG_MONITOR, 4, + "Fetched %u:%s:%d info - interr: %d, error: %s, viable_candidate:'%d', read_only:'%d'," + " transactions_behind:'%ld'\n", + mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mmsd->interr, mmsd->mysql_error_msg, + gr_srv_st.viable_candidate, gr_srv_st.read_only, gr_srv_st.transactions_behind + ); + + if (mmsd->result) { + mysql_free_result(mmsd->result); + mmsd->result=NULL; + } + + return gr_srv_st; +} + +/** + * @brief Holds all info required for performing monitoring actions over the GR node. + */ +struct gr_node_info_t { + gr_srv_st_t srv_st; + bool unresp_server = false; + int num_timeouts = 0; + int lag_counts = 0; +}; + +gr_node_info_t gr_update_hosts_map( + uint64_t start_time, const gr_srv_st_t& gr_srv_st, unique_ptr& mmsd +) { + // NOTE: This isn't specified in the initializer list due to current standard limitations + gr_node_info_t node_info {}; + node_info.srv_st = gr_srv_st; + + // Consider 'time_now' to be 'now - fetch_duration' + unsigned long long time_now=realtime_time(); + time_now=time_now-(mmsd->t2 - start_time); + cfmt_t fmt_srv_addr { cstr_format("%s:%d", mmsd->hostname, mmsd->port) }; + + pthread_mutex_lock(&GloMyMon->group_replication_mutex); + + std::map::iterator it2; + it2 = GloMyMon->Group_Replication_Hosts_Map.find(fmt_srv_addr.str); + MyGR_monitor_node *node=NULL; + + if (it2!=GloMyMon->Group_Replication_Hosts_Map.end()) { + node=it2->second; + node->add_entry( + time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1), gr_srv_st.transactions_behind, + gr_srv_st.viable_candidate, gr_srv_st.read_only,mmsd->mysql_error_msg + ); + } else { + node = new MyGR_monitor_node(mmsd->hostname,mmsd->port,mmsd->writer_hostgroup); + node->add_entry( + time_now, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1), gr_srv_st.transactions_behind, + gr_srv_st.viable_candidate, gr_srv_st.read_only,mmsd->mysql_error_msg + ); + GloMyMon->Group_Replication_Hosts_Map.insert(std::make_pair(fmt_srv_addr.str,node)); + } + if (mmsd->mysql_error_msg) { + if (strncasecmp(mmsd->mysql_error_msg, (char *)"timeout", 7) == 0) { + node_info.num_timeouts = node->get_timeout_count(); + } + } + // NOTE: Previously 'lag_counts' was only updated for 'read_only' + // because 'writers' were never selected for being set 'OFFLINE' due to + // replication lag. Since the change of this behavior to 'SHUNNING' + // with replication lag, no matter it's 'read_only' value, 'lag_counts' + // is computed everytime. + node_info.lag_counts = node->get_lag_behind_count(mmsd->max_transactions_behind); + + pthread_mutex_unlock(&GloMyMon->group_replication_mutex); + + return node_info; +} + +/** + * @brief Perform the actual monitoring action on the server based on the 'mmsd' info. + * + * @param mmsd The 'mmsd' holding info about fetching errors. + * @param node_info The fetched server information itself. + */ +void gr_mon_action_over_resp_srv(unique_ptr& mmsd, const gr_node_info_t& node_info) { + // NOTE: We update MyHGM outside the mutex group_replication_mutex + if (mmsd->mysql_error_msg) { // there was an error checking the status of the server, surely we need to reconfigure GR + if (node_info.num_timeouts == 0) { + // it wasn't a timeout, reconfigure immediately + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg); + } else { + proxy_warning("%s:%d : group replication health check timeout count %d. Max threshold %d.\n", + mmsd->hostname, mmsd->port, node_info.num_timeouts, mmsd->max_transactions_behind_count); + + // It was a timeout. Check if we are having consecutive timeout + if (node_info.num_timeouts == mysql_thread___monitor_groupreplication_healthcheck_max_timeout_count) { + proxy_error("Server %s:%d missed %d group replication checks. Number retries %d, Assuming offline\n", + mmsd->hostname, mmsd->port, node_info.num_timeouts, node_info.num_timeouts); + MyHGM->p_update_mysql_error_counter(p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, ER_PROXYSQL_GR_HEALTH_CHECKS_MISSED); + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, mmsd->mysql_error_msg); + } + } + } else { + if (node_info.srv_st.viable_candidate==false) { + MyHGM->update_group_replication_set_offline(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"viable_candidate=NO"); + } else { + if (node_info.srv_st.read_only==true) { + MyHGM->update_group_replication_set_read_only(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup, (char *)"read_only=YES"); + } else { + // the node is a writer + // TODO: for now we don't care about the number of writers + MyHGM->update_group_replication_set_writer(mmsd->hostname, mmsd->port, mmsd->writer_hostgroup); + } + + // NOTE: Replication lag action should takes place **after** the + // servers have been placed in the correct hostgroups, otherwise + // during the reconfiguration of the servers due to 'update_group_replication_set_writer' + // there would be a small window in which the 'SHUNNED' server + // will be treat as 'ONLINE' letting some new connections to + // take places, before it becomes 'SHUNNED' again. + bool enable = true; + if (node_info.lag_counts >= mysql_thread___monitor_groupreplication_max_transactions_behind_count) { + enable = false; + } + MyHGM->group_replication_lag_action( + mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, node_info.lag_counts, node_info.srv_st.read_only, enable + ); + } + } +} + +/** + * @brief NOTE: Currently unused. Unresponsive servers are SHUNNED by monitoring PING actions, and no further + * monitoring actions are performed on them. + * + * @param hosts_defs Unresponsive hosts. + * @param wr_hg The writer hostgroup from the unresponsive hosts. + */ +void gr_handle_actions_over_unresp_srvs(const vector& hosts_defs, uint32_t wr_hg) { + char unresp_err_msg[] = "Server unresponsive to PING requests"; + + for (const gr_host_def_t& host_def : hosts_defs) { + char* c_hostname = const_cast(host_def.host.c_str()); + + proxy_warning( + "%s:%d: Server considered OFFLINE due to unresponsiveness to PING requests", c_hostname, host_def.port + ); + MyHGM->update_group_replication_set_offline(c_hostname, host_def.port, wr_hg, unresp_err_msg); + } +} + +/** + * @brief Handles the return of the 'MySQL' conn used by the 'mmsd' to Monitor 'ConnectionPool'. + * @details Connections are returned to the 'ConnectionPool' if no errors took place during the fetching. If + * the connection is a new created connection, we try to configured it with the proper 'set_wait_timeout' + * before placing the connection back into the 'ConnectionPool', on failure, we discard the connection. + * @param srv_mmsd The mmsd wrapper holding all information for returning the connection. + */ +void handle_mmsd_mysql_conn(srv_mmsd_t& srv_mmsd) { + if (srv_mmsd.mmsd == nullptr) return; + unique_ptr& mmsd = srv_mmsd.mmsd; + + if (mmsd->mysql) { + if (mmsd->interr || mmsd->mysql_error_msg) { + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); + } else { + if (srv_mmsd.created_conn) { + bool rc = mmsd->set_wait_timeout(); + if (rc) { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + } else { + proxy_error( + "Error by 'set_wait_timeout' for new connection. mmsd %p , MYSQL %p , FD %d : %s\n", + mmsd.get(), mmsd->mysql, mmsd->mysql->net.fd, mmsd->mysql_error_msg + ); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) + ); + GloMyMon->My_Conn_Pool->conn_unregister(mmsd.get()); + mysql_close(mmsd->mysql); + } + } else { + GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql); + } + } + + mmsd->mysql=NULL; + } +} + +/** + * @brief Report the fetching errors of the supplied 'mmsd' and increase the corresponding counter. + * @param srv_mmsd The 'mmsd' which failures are to be reported. + */ +void gr_report_fetching_errs(srv_mmsd_t& srv_mmsd) { + unique_ptr& mmsd = srv_mmsd.mmsd; + + if (mmsd->mysql) { + if (mmsd->interr || mmsd->mysql_error_msg) { + proxy_error( + "Got error. mmsd %p , MYSQL %p , FD %d : %s\n", mmsd.get(), mmsd->mysql, + mmsd->mysql->net.fd, mmsd->mysql_error_msg + ); + MyHGM->p_update_mysql_error_counter( + p_mysql_error_type::proxysql, mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mysql_errno(mmsd->mysql) + ); + } + } +} + +/** + * @brief Performs the corresponding monitoring actions over the supplied 'MySQL_Monitor_State_Data'. + * @details This function expects to be called when the fetching operation has completed for the supplied + * 'MySQL_Monitor_State_Data' holding a final 'MYSQL_RES' or an error. Otherwise servers will be set to + * 'OFFLINE_HARD' due to defaults on 'gr_srv_st_t'. Actions taken are: + * 1. Extract fetching results from the supplied 'MySQL_Monitor_State_Data' into 'gr_srv_st_t'. + * 2. Update 'Group_Replication_Hosts_Map' and build a resulting 'gr_node_info_t' with the required info + * for performing the monitoring actions. + * 3. Perform any required actions to the servers through 'MySQL_HostGroups_Manager'. + * + * NOTE: We only perform monitoring actions over responsive servers, unresponsive servers are SHUNNED + * by monitoring PING actions, and no further monitoring actions should be performed on them. + * + * @param start_time The time at which this complete 'fetch + actions' monitoring cycle started. + * @param srv_mmsd The server 'MySQL_Monitor_State_Data' after the fetching is completed. It should either + * hold a valid 'MYSQL_RES' or an error. + */ +void async_gr_mon_actions_handler(uint64_t start_time, srv_mmsd_t& srv_mmsd) { + unique_ptr& mmsd = srv_mmsd.mmsd; + + // Extract the server status from the 'mmsd'. Reports if invalid data is received + gr_srv_st_t gr_srv_st { extract_gr_srv_st(srv_mmsd.mmsd) }; + + // Report fetch errors; logs should report 'cause -> effect' + gr_report_fetching_errs(srv_mmsd); + + // Perform monitoring actions; only if the response wasn't illformed + if (gr_srv_st.inv_srv_state == false) { + gr_node_info_t node_info { gr_update_hosts_map(start_time, gr_srv_st, mmsd) }; + gr_mon_action_over_resp_srv(mmsd, node_info); + } + + // Handle 'mmsd' MySQL conn return to 'ConnectionPool' + handle_mmsd_mysql_conn(srv_mmsd); +} + +/** + * @brief Initializes the structures related with a MySQL_Thread. + * @details It doesn't initialize a real thread, just the structures associated with it. + * @return The created and initialized 'MySQL_Thread'. + */ +unique_ptr init_mysql_thread_struct() { + unique_ptr mysql_thr { new MySQL_Thread() }; + mysql_thr->curtime = monotonic_time(); + mysql_thr->refresh_variables(); + + return mysql_thr; +} + +struct thread_info_t { + pthread_t pthread; + uint32_t writer_hg; +}; + +void* monitor_GR_thread_HG(void *arg) { + uint32_t wr_hg = *(static_cast(arg)); + proxy_info("Started Monitor thread for Group Replication writer HG %u\n", wr_hg); + + // Quick exit during shutdown/restart + if (!GloMTH) { return NULL; } + + // Initial Monitor thread variables version + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version(); + // MySQL thread structure used for variable refreshing + unique_ptr mysql_thr { init_mysql_thread_struct() }; + + pthread_mutex_lock(&GloMyMon->group_replication_mutex); + // Get the initial config checksum; this thread must exist on any config changes + uint64_t initial_raw_checksum = GloMyMon->Group_Replication_Hosts_resultset->raw_checksum(); + // Extract the monitoring data required for the target writer hostgroup + vector hosts_defs { extract_gr_host_defs(wr_hg, GloMyMon->Group_Replication_Hosts_resultset) }; + pthread_mutex_unlock(&GloMyMon->group_replication_mutex); + + uint64_t next_check_time = 0; + uint64_t MAX_CHECK_DELAY_US = 500000; + + while (GloMyMon->shutdown == false && mysql_thread___monitor_enabled == true) { + if (!GloMTH) { break; } // quick exit during shutdown/restart + + // Config check; Exit if config has been altered + { + pthread_mutex_lock(&GloMyMon->group_replication_mutex); + uint64_t current_raw_checksum = GloMyMon->Group_Replication_Hosts_resultset->raw_checksum(); + pthread_mutex_unlock(&GloMyMon->group_replication_mutex); + + if (current_raw_checksum != initial_raw_checksum) { + break; + } + } + + // Check variable version changes; refresh if needed and don't delay next check + unsigned int glover = GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover; + mysql_thr->refresh_variables(); + next_check_time = 0; + } + + uint64_t curtime = monotonic_time(); + + // Delay the next check if needed + if (curtime < next_check_time) { + uint64_t time_left = next_check_time - curtime; + uint64_t next_check_delay = 0; + + if (time_left > MAX_CHECK_DELAY_US) { + next_check_delay = MAX_CHECK_DELAY_US; + } else { + next_check_delay = time_left; + } + + usleep(next_check_delay); + continue; + } + + // Get the current 'pingable' status for the servers. + const vector& resp_srvs { find_resp_srvs(hosts_defs) }; + if (resp_srvs.empty()) { + proxy_error("No node is pingable for Group Replication cluster with writer HG %u\n", wr_hg); + next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000; + continue; + } + + // Initialize the 'MMSD' for data fetching for responsive servers + vector srvs_mmsds {}; + for (const gr_host_def_t& host_def : resp_srvs) { + srvs_mmsds.push_back(init_mmsd_with_conn(host_def, wr_hg, curtime)); + } + + // Update 't1' for subsequent fetch operations and reset errors + for (const srv_mmsd_t& srv_mmsd : srvs_mmsds) { + if (srv_mmsd.mmsd->mysql) { + srv_mmsd.mmsd->t1 = monotonic_time(); + srv_mmsd.mmsd->interr = 0; + } + } + + // TODO: This section will be replaced by 'async' operations + // ////////////////////////////////////////////////////////////////////////// + // Perform the fetching operations, all the info is contained within the 'MMSD' themselves. + for (srv_mmsd_t& srv_mmsd : srvs_mmsds) { + // NOTE: Just required due the sync nature of the operations. To be replaced. + srv_mmsd.mmsd->t1 = monotonic_time(); + gr_fetch_srv_status(srv_mmsd); + srv_mmsd.mmsd->t2 = monotonic_time(); + } + // ////////////////////////////////////////////////////////////////////////// + + // Extract the fetched data from resp servers and perform monitoring actions + for (srv_mmsd_t& srv_mmsd : srvs_mmsds) { + // TODO: This is the idea for the async handler to be used when fetching is rewritten + async_gr_mon_actions_handler(curtime, srv_mmsd); + } + + // Set the time for the next iteration + next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000; + } + + proxy_info("Stopping Monitor thread for Group Replication writer HG %u\n", wr_hg); + return NULL; +} + +/** + * @brief Creates a monitoring thread for each 'GroupReplication' cluster determined by writer hostgroups. + * @param writer_hgs The writer hostgroups to use when creating the threads. + * @return A vector of 'thread_info_t' holding info of the created threads. + */ +vector create_group_replication_worker_threads(const set& writer_hgs) { + proxy_info("Activating Monitoring of %lu Group Replication clusters\n", writer_hgs.size()); + + vector threads_info {}; + + for (const uint32_t writer_hg : writer_hgs) { + threads_info.push_back({pthread_t {}, writer_hg}); + } + + for (thread_info_t& thread_info : threads_info) { + proxy_info("Starting Monitor thread for Group Replication writer HG %u\n", thread_info.writer_hg); + int err = pthread_create(&thread_info.pthread, NULL, monitor_GR_thread_HG, &thread_info.writer_hg); + + if (err) { + proxy_error("Thread creation failed with error '%s'\n", strerror(err)); + assert(0); + } + } + + return threads_info; +} + + +void* MySQL_Monitor::monitor_group_replication_2() { + uint64_t last_raw_checksum = 0; + + // Quick exit during shutdown/restart + if (!GloMTH) return NULL; + + // Initial Monitor thread variables version + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version = 0; + MySQL_Monitor__thread_MySQL_Thread_Variables_version = GloMTH->get_global_version(); + + // MySQL thread structure used for variable refreshing + unique_ptr mysql_thr { init_mysql_thread_struct() }; + + // Info of the current GR monitoring threads: handle + writer_hg + vector threads_info {}; + + while (GloMyMon->shutdown == false && mysql_thread___monitor_enabled == true) { + // Quick exit during shutdown/restart + if (!GloMTH) { return NULL; } + + // Check variable version changes; refresh if needed + unsigned int glover = GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version = glover; + mysql_thr->refresh_variables(); + } + + // Config check; Wait for all threads to stop before relaunch in case servers or options changed + pthread_mutex_lock(&group_replication_mutex); + uint64_t new_raw_checksum = Group_Replication_Hosts_resultset->raw_checksum(); + pthread_mutex_unlock(&group_replication_mutex); + + if (new_raw_checksum != last_raw_checksum) { + proxy_info("Detected new/changed definition for Group Replication monitoring\n"); + // Update the new checksum + last_raw_checksum = new_raw_checksum; + + // Wait for the threads to terminate; Threads should exit on config change + if (threads_info.empty() == false) { + for (const thread_info_t& thread_info : threads_info) { + pthread_join(thread_info.pthread, NULL); + proxy_info("Stopped Monitor thread for Group Replication writer HG %u\n", thread_info.writer_hg); + } + } + + pthread_mutex_lock(&group_replication_mutex); + set wr_hgs_set = extract_writer_hgs(Group_Replication_Hosts_resultset); + threads_info = create_group_replication_worker_threads(wr_hgs_set); + pthread_mutex_unlock(&group_replication_mutex); + } + + usleep(10000); + } + + // Signal monitor worker threads to stop + for (unsigned int i=0;i *item=NULL; + GloMyMon->queue->add(item); + } + + return NULL; +} + void * MySQL_Monitor::monitor_group_replication() { // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) // struct event_base *libevent_base; diff --git a/lib/debug.cpp b/lib/debug.cpp index b963115a9..54007fd92 100644 --- a/lib/debug.cpp +++ b/lib/debug.cpp @@ -402,6 +402,7 @@ void init_debug_struct() { GloVars.global.gdbg_lvl[PROXY_DEBUG_QUERY_CACHE].name=(char *)"debug_query_cache"; GloVars.global.gdbg_lvl[PROXY_DEBUG_QUERY_STATISTICS].name=(char *)"debug_query_statistics"; GloVars.global.gdbg_lvl[PROXY_DEBUG_RESTAPI].name=(char *)"debug_restapi"; + GloVars.global.gdbg_lvl[PROXY_DEBUG_MONITOR].name=(char *)"debug_monitor"; for (i=0;i