Add support for Group Replication (GR) autodiscovery

- Add support for autodiscovery for GR for MySQL8.
 - Allow to configure autodiscovered servers defaults via
   'mysql_hostgroup_attributes'.
 - Add support for GR autodiscovery for Cluster Simulator.
 - Improve server 'viable_candidate' detection for MySQL 8,
   'RECOVERING' state is now also considered.
pull/4208/head
Javier Jaramago Fernández 3 years ago
parent ef3d6bd5a7
commit af80944cfe

@ -585,6 +585,17 @@ class MySQL_HostGroups_Manager {
SQLite3_result *incoming_replication_hostgroups;
void generate_mysql_group_replication_hostgroups_table();
/**
* @brief Regenerates the resultset used by 'MySQL_Monitor' containing the servers to be monitored.
* @details This function is required to be called after any action that results in the addition of a new
* server that 'MySQL_Monitor' should be aware of for 'group_replication', i.e. a server added to the
* hostgroups present in any entry of 'mysql_group_replication_hostgroups'. E.g:
* - Inside 'generate_mysql_group_replication_hostgroups_table'.
* - Autodiscovery.
*
* NOTE: This is a common pattern for all the clusters monitoring.
*/
void generate_mysql_group_replication_hostgroups_monitor_resultset();
SQLite3_result *incoming_group_replication_hostgroups;
pthread_mutex_t Group_Replication_Info_mutex;
@ -812,6 +823,31 @@ class MySQL_HostGroups_Manager {
void update_group_replication_set_offline(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_read_only(char *_hostname, int _port, int _writer_hostgroup, char *error);
void update_group_replication_set_writer(char *_hostname, int _port, int _writer_hostgroup);
/**
* @brief Tries to add a new server found during GR autodiscovery to the supplied hostgroup.
* @details For adding the new server, several actions are performed:
* 1. Lookup the target server in the corresponding MyHGC for the supplied hostgroup.
* 2. If server is found, and it's status isn't 'OFFLINE_HARD' do nothing. Otherwise:
* - If server is found as 'OFFLINE_HARD', re-enable the server, log the action.
* - If server isn't found, create it in the corresponding reader hostgroup of the supplied writer
* hostgroup, setting all 'servers_defaults' params as '-1', log the action.pasalo
* - After any of the two previous actions, always regenerate servers data structures.
*
* NOTE: Server data structures regeneration requires:
* 1. Purging the 'mysql_servers_table' (Lazy removal of 'OFFLINE_HARD' servers.)
* 2. Regenerate the actual 'myhgm::mysql_servers' table from memory structures.
* 3. Update the 'mysql_servers' resultset used for monitoring. This resultset is used for general
* monitoring actions like 'ping', 'connect'.
* 4. Regenerate the specific resultset for 'Group Replication' monitoring. This resultset is the way to
* communicate back to the main monitoring thread that servers config has changed, and a new thread
* shall be created with the new servers config. This same principle is used for Aurora.
*
* @param _host Server address.
* @param _port Server port.
* @param _wr_hg Writer hostgroup of the cluster being monitored. Autodiscovered servers are always added
* to the reader hostgroup by default, later monitoring actions will re-position the server is required.
*/
void update_group_replication_add_autodiscovered(const std::string& _host, int _port, int _wr_hg);
void converge_group_replication_config(int _writer_hostgroup);
/**
* @brief Set the supplied server as SHUNNED, this function shall be called

@ -200,6 +200,17 @@ enum class MySQL_Monitor_State_Data_Task_Result {
TASK_RESULT_PENDING
};
/**
* @brief Holds the info from a GR server definition.
*/
struct gr_host_def_t {
string host;
int port;
int use_ssl;
bool writer_is_also_reader;
int max_transactions_behind;
int max_transactions_behind_count;
};
class MySQL_Monitor_State_Data {
public:
@ -237,6 +248,11 @@ public:
* @details Currently only used by 'group_replication'.
*/
uint64_t init_time = 0;
/**
* @brief Used by GroupReplication to determine if servers reported by cluster 'members' are already monitored.
* @details This way we avoid non-needed locking on 'MySQL_HostGroups_Manager' for server search.
*/
const std::vector<gr_host_def_t>* cur_monitored_gr_srvs = nullptr;
MySQL_Monitor_State_Data(MySQL_Monitor_State_Data_Task_Type task_type, char* h, int p, bool _use_ssl = 0, int g = 0);
~MySQL_Monitor_State_Data();

@ -5,6 +5,7 @@
#include "proxysql.h"
#include "cpp.h"
#include <vector>
#include <string>
class SQLite3_Session {
public:
@ -14,7 +15,7 @@ class SQLite3_Session {
};
#ifdef TEST_GROUPREP
using group_rep_status = std::tuple<bool, bool, uint32_t>;
using group_rep_status = std::tuple<bool, bool, uint32_t, std::string>;
#endif
class SQLite3_Server {

@ -203,6 +203,13 @@ uint64_t get_timestamp_us();
*/
std::string replace_str(const std::string& str, const std::string& match, const std::string& repl);
/**
* @brief Split a string into a vector of strings with the provided 'char' delimiter.
* @param s String to be split.
* @param delimiter Delimiter to be used.
* @return Vector with the string splits. Empty if none is found.
*/
std::vector<std::string> split_str(const std::string& s, char delimiter);
std::string generate_multi_rows_query(int rows, int params);
#endif

@ -2427,6 +2427,11 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table
// it is now time to build a new structure in Monitor
generate_mysql_group_replication_hostgroups_monitor_resultset();
pthread_mutex_unlock(&Group_Replication_Info_mutex);
}
void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_monitor_resultset() {
pthread_mutex_lock(&GloMyMon->group_replication_mutex);
{
char *error=NULL;
@ -2445,8 +2450,6 @@ void MySQL_HostGroups_Manager::generate_mysql_group_replication_hostgroups_table
}
}
pthread_mutex_unlock(&GloMyMon->group_replication_mutex);
pthread_mutex_unlock(&Group_Replication_Info_mutex);
}
void MySQL_HostGroups_Manager::generate_mysql_galera_hostgroups_table() {
@ -5645,6 +5648,69 @@ void MySQL_HostGroups_Manager::converge_group_replication_config(int _writer_hos
pthread_mutex_unlock(&Group_Replication_Info_mutex);
}
void MySQL_HostGroups_Manager::update_group_replication_add_autodiscovered(
const string& _host, int _port, int _wr_hg
) {
pthread_mutex_lock(&Group_Replication_Info_mutex);
const auto gr_info_map_it = this->Group_Replication_Info_Map.find(_wr_hg);
int32_t reader_hg = -1;
if (gr_info_map_it == Group_Replication_Info_Map.end()) {
assert(0);
} else {
reader_hg = gr_info_map_it->second->reader_hostgroup;
}
pthread_mutex_unlock(&Group_Replication_Info_mutex);
wrlock();
MyHGC *myhgc = MyHGC_lookup(reader_hg);
bool srv_found = false;
bool srv_found_offline = false;
for (uint32_t j = 0; j < myhgc->mysrvs->cnt(); j++) {
MySrvC* mysrvc = static_cast<MySrvC*>(myhgc->mysrvs->servers->index(j));
// TODO: Motivation for this logic needs to be better described
if (strcmp(mysrvc->address,_host.c_str())==0 && mysrvc->port==_port) {
srv_found = true;
if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) {
proxy_info(
"Found healthy previously discovered GR node %s:%d as 'OFFLINE_HARD', setting back as 'ONLINE' with:"
" hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
_host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);
mysrvc->status = MYSQL_SERVER_STATUS_ONLINE;
srv_found_offline = true;
}
}
}
if (srv_found == false) {
MySrvC* mysrvc = new MySrvC(
const_cast<char*>(_host.c_str()), _port, 0, -1, MYSQL_SERVER_STATUS_ONLINE, 0, -1, 0, -1, 0, const_cast<char*>("")
);
add(mysrvc, reader_hg);
proxy_info(
"Adding new discovered GR node %s:%d with: hostgroup=%d, weight=%ld, max_connections=%ld, use_ssl=%d\n",
_host.c_str(), _port, reader_hg, mysrvc->weight, mysrvc->max_connections, mysrvc->use_ssl
);
}
if (srv_found == false || srv_found_offline) {
purge_mysql_servers_table();
mydb->execute("DELETE FROM mysql_servers");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
generate_mysql_servers_table();
update_table_mysql_servers_for_monitor(false);
generate_mysql_group_replication_hostgroups_monitor_resultset();
}
wrunlock();
}
Galera_Info::Galera_Info(int w, int b, int r, int o, int mw, int mtb, bool _a, int _w, char *c) {
comment=NULL;
if (c) {

@ -495,19 +495,23 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
/**
* @brief MySQL 8 status query for Group Replication members.
* @details Since 'MySQL 8' we rely on 'COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE', deprecating the previously
* required 'sys.gr_member_routing_candidate_status' view.
* required 'sys.gr_member_routing_candidate_status' view. Another additions:
* - A new field 'members' has been added to the query, containing the current cluster members as seen by the
* queried node. This field is used for auto discovery.
* - Server state 'RECOVERING' is now also considered when detecting if a member is a 'viable' candidate.
*/
const char MYSQL_8_GR_QUERY[] {
"SELECT (SELECT IF ("
"MEMBER_STATE='ONLINE' AND ("
"(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE != 'ONLINE') >="
"(SELECT COUNT(*) FROM performance_schema.replication_group_members WHERE MEMBER_STATE NOT IN ('ONLINE', 'RECOVERING')) >="
" ((SELECT COUNT(*) FROM performance_schema.replication_group_members)/2) = 0)"
", 'YES', 'NO')) AS viable_candidate,"
" (SELECT IF (@@read_only, 'YES', 'NO')) as read_only,"
" COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind "
" COUNT_TRANSACTIONS_REMOTE_IN_APPLIER_QUEUE AS transactions_behind, "
" (SELECT GROUP_CONCAT(CONCAT(member_host, \":\", member_port)) FROM performance_schema.replication_group_members) AS members "
"FROM "
"performance_schema.replication_group_members "
"JOIN performance_schema.replication_group_member_stats rgms USING(member_id)"
"JOIN performance_schema.replication_group_member_stats rgms USING(member_id) "
"WHERE rgms.MEMBER_ID=@@SERVER_UUID"
};
@ -616,7 +620,7 @@ void MySQL_Monitor_State_Data::init_async() {
async_state_machine_ = ASYNC_QUERY_START;
#ifdef TEST_GROUPREP
{
query_ = "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS ";
query_ = "SELECT viable_candidate,read_only,transactions_behind,members FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS ";
query_ += std::string(hostname) + ":" + std::to_string(port);
}
#else
@ -1867,7 +1871,7 @@ void * monitor_group_replication_thread(void *arg) {
mmsd->interr=0; // reset the value
#ifdef TEST_GROUPREP
{
std::string s { "SELECT viable_candidate,read_only,transactions_behind FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" };
std::string s { "SELECT viable_candidate,read_only,transactions_behind,members FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS" };
s += " " + std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port);
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,s.c_str());
}
@ -3373,18 +3377,6 @@ set<uint32_t> extract_writer_hgs(SQLite3_result* Group_Replication_Hosts_results
return writer_hgs;
}
/**
* @brief Holds the info from a GR server definition.
*/
typedef struct _gr_host_def_t {
string host;
int port;
int use_ssl;
bool writer_is_also_reader;
int max_transactions_behind;
int max_transactions_behind_count;
} gr_host_def_t;
/**
* @brief Extracts a 'MySQL_Monitor_State_Data' from the provided 'SQLite3_result*'.
* @details The expected contents of the provided 'SQLite3_result*' are the ones generated by
@ -3505,13 +3497,67 @@ unique_ptr<MySQL_Monitor_State_Data> init_mmsd_with_conn(
return mmsd;
}
using gr_srv_addr_t = pair<string,int32_t>;
struct gr_srv_st_t {
bool viable_candidate = false;
bool read_only = true;
int64_t transactions_behind = -1;
bool inv_srv_state = false;
vector<gr_srv_addr_t> gr_members {};
};
#define GR_MEMBER_ENTRY_ERR "%s '%s' in 'members' field from GR query to server '%s:%d'. Autodiscovery action aborted.\n"
vector<pair<string,int32_t>> parse_gr_members_addrs(
const MySQL_Monitor_State_Data* mmsd, const vector<string>& gr_cluster_members
) {
#ifdef DEBUG
nlohmann::ordered_json members { gr_cluster_members };
proxy_debug(
PROXY_DEBUG_MONITOR, 7, "Received 'members' field '%s' from GR query to server '%s:%d'\n", members.dump().c_str(),
mmsd->hostname, mmsd->port
);
#endif
vector<pair<string,int32_t>> result {};
for (const auto& cluster_member : gr_cluster_members) {
const vector<string> gr_member_host_port { split_str(cluster_member, ':') };
if (gr_member_host_port.size() != 2) {
proxy_error(GR_MEMBER_ENTRY_ERR, "Invalid server entry", cluster_member.c_str(), mmsd->hostname, mmsd->port);
break;
}
const string srv_host { gr_member_host_port[0] };
const char* c_str_port { gr_member_host_port[1].c_str() };
int32_t srv_port = -1;
{
char* p_end = nullptr;
long port = std::strtol(c_str_port, &p_end, 10);
if (c_str_port == p_end) {
proxy_error(
GR_MEMBER_ENTRY_ERR, "Failed to parse port for server entry", cluster_member.c_str(), mmsd->hostname, mmsd->port
);
break;
} else {
srv_port = port;
}
}
result.push_back({srv_host, srv_port});
}
// If any entry fails to parse, we invalidate the whole action
if (gr_cluster_members.size() != result.size()) {
return {};
} else {
return result;
}
}
gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) {
gr_srv_st_t gr_srv_st {};
@ -3522,7 +3568,7 @@ gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) {
num_fields = mysql_num_fields(mmsd->result);
num_rows = mysql_num_rows(mmsd->result);
if (fields == NULL || num_fields!=3 || num_rows!=1) {
if (fields == NULL || num_fields!=4 || num_rows!=1) {
proxy_error(
"'mysql_fetch_fields' returns 'NULL', or 'mysql_num_fields(%d)', or 'mysql_num_rows(%d)' are incorrect."
" Server %s:%d. See bug #1994\n",
@ -3543,11 +3589,17 @@ gr_srv_st_t extract_gr_srv_st(MySQL_Monitor_State_Data* mmsd) {
if (row[2]) {
gr_srv_st.transactions_behind=atol(row[2]);
}
if (mmsd->cur_monitored_gr_srvs && row[3]) {
const string str_members_addrs { row[3] };
const vector<string> members_addrs { split_str(str_members_addrs, ',') };
gr_srv_st.gr_members = parse_gr_members_addrs(mmsd, members_addrs);
}
}
}
proxy_debug(
PROXY_DEBUG_MONITOR, 4,
PROXY_DEBUG_MONITOR, 7,
"Fetched %u:%s:%d info - interr: %d, error: %s, viable_candidate:'%d', read_only:'%d',"
" transactions_behind:'%ld'\n",
mmsd->hostgroup_id, mmsd->hostname, mmsd->port, mmsd->interr, mmsd->mysql_error_msg,
@ -3578,7 +3630,6 @@ gr_node_info_t gr_update_hosts_map(
// NOTE: This isn't specified in the initializer list due to current standard limitations
gr_node_info_t node_info {};
node_info.srv_st = gr_srv_st;
MySQL_Monitor_State_Data_Task_Result task_result = mmsd->get_task_result();
// Consider 'time_now' to be 'now - fetch_duration'
unsigned long long time_now=realtime_time();
@ -3671,6 +3722,24 @@ void gr_mon_action_over_resp_srv(MySQL_Monitor_State_Data* mmsd, const gr_node_i
MyHGM->group_replication_lag_action(
mmsd->writer_hostgroup, mmsd->hostname, mmsd->port, node_info.lag_counts, node_info.srv_st.read_only, enable
);
if (mmsd->cur_monitored_gr_srvs && node_info.srv_st.gr_members.empty() == false) {
for (const gr_srv_addr_t& gr_member : node_info.srv_st.gr_members) {
const string& srv_host { gr_member.first };
const int32_t srv_port { gr_member.second };
bool found = false;
for (const gr_host_def_t& host_def : *mmsd->cur_monitored_gr_srvs) {
if (srv_host == host_def.host && srv_port == host_def.port) {
found = true;
}
}
if (found == false) {
MyHGM->update_group_replication_add_autodiscovered(srv_host, srv_port, mmsd->writer_hostgroup);
}
}
}
}
}
}
@ -3905,6 +3974,11 @@ void* monitor_GR_thread_HG(void *arg) {
}
}
int rnd_discoverer = conn_mmsds.size() == 0 ? -1 : rand() % conn_mmsds.size();
if (rnd_discoverer != -1) {
conn_mmsds[rnd_discoverer]->cur_monitored_gr_srvs = &hosts_defs;
}
// TODO: This needs to be reworked once we change the way monitoring actions work on clusters, taking
// the full cluster fetch data to avoid transient states. For now, since we perform the monitoring
// actions independently, we workaround the limitation of 'Monitor_Poll' of only handling
@ -3934,6 +4008,10 @@ void* monitor_GR_thread_HG(void *arg) {
///////////////////////////////////////////////////////////////////////////////////////
if (rnd_discoverer != -1) {
conn_mmsds[rnd_discoverer]->cur_monitored_gr_srvs = nullptr;
}
// Set the time for the next iteration
next_check_time = curtime + mysql_thread___monitor_groupreplication_healthcheck_interval * 1000;
}

@ -362,6 +362,18 @@ int wexecvp(
return child_err;
}
std::vector<std::string> split_str(const std::string& s, char delimiter) {
std::vector<std::string> tokens {};
std::string token {};
std::istringstream tokenStream(s);
while (std::getline(tokenStream, token, delimiter)) {
tokens.push_back(token);
}
return tokens;
}
std::string replace_str(const std::string& str, const std::string& match, const std::string& repl) {
if(match.empty()) {
return str;

@ -751,7 +751,7 @@ __run_query:
// NOTE: This query should be in one place that can be reused by
// 'ProxySQL_Monitor' module.
const std::string grouprep_monitor_test_query_start {
"SELECT viable_candidate,read_only,transactions_behind "
"SELECT viable_candidate,read_only,transactions_behind,members "
"FROM GR_MEMBER_ROUTING_CANDIDATE_STATUS "
};
@ -769,14 +769,15 @@ __run_query:
free(query);
std::string t_select_as_query {
"SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind"
"SELECT '%s' AS viable_candidate, '%s' AS read_only, %d AS transactions_behind, '%s' AS members"
};
std::string select_as_query {};
string_format(
t_select_as_query, select_as_query,
std::get<0>(gr_srv_status) ? "YES" : "NO",
std::get<1>(gr_srv_status) ? "YES" : "NO",
std::get<2>(gr_srv_status)
std::get<2>(gr_srv_status),
std::get<3>(gr_srv_status).c_str()
);
query = static_cast<char*>(malloc(select_as_query.length() + 1));
@ -913,7 +914,7 @@ __run_query:
#ifdef TEST_GROUPREP
group_rep_status SQLite3_Server::grouprep_test_value(const std::string& srv_addr) {
group_rep_status cur_srv_st { "YES", "YES", 0 };
group_rep_status cur_srv_st { "YES", "YES", 0, "" };
auto it = grouprep_map.find(srv_addr);
if (it != grouprep_map.end()) {
@ -1471,7 +1472,8 @@ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind
const group_rep_status srv_status {
std::string { r->fields[2] } == "YES" ? true : false,
std::string { r->fields[3] } == "YES" ? true : false,
atoi(r->fields[4])
atoi(r->fields[4]),
std::string { r->fields[5] }
};
this->grouprep_map[srv_addr] = srv_status;
@ -1498,16 +1500,16 @@ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind
int hostgroup_id = atoi(r->fields[2]);
const std::string t_insert_query {
"INSERT INTO GR_MEMBER_ROUTING_CANDIDATE_STATUS"
" (hostname, port, viable_candidate, read_only, transactions_behind) VALUES"
" ('%s', %d, '%s', '%s', 0)"
" (hostname, port, viable_candidate, read_only, transactions_behind, members) VALUES"
" ('%s', %d, '%s', '%s', 0, '%s')"
};
std::string insert_query {};
if (hostgroup_id % 4 == 0) {
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO");
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "NO", "");
sessdb->execute(insert_query.c_str());
} else {
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES");
string_format(t_insert_query, insert_query, hostname.c_str(), port, "YES", "YES", "");
sessdb->execute(insert_query.c_str());
}
}
@ -1588,7 +1590,7 @@ bool SQLite3_Server::init() {
insert_into_tables_defs(tables_defs_grouprep,
(const char *)"GR_MEMBER_ROUTING_CANDIDATE_STATUS",
(const char*)"CREATE TABLE GR_MEMBER_ROUTING_CANDIDATE_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, PRIMARY KEY (hostname, port)"
"hostname VARCHAR NOT NULL, port INT NOT NULL, viable_candidate varchar not null, read_only varchar not null, transactions_behind int not null, members VARCHAR NOT NULL, PRIMARY KEY (hostname, port)"
")"
);

Loading…
Cancel
Save