From e6907127a32d22f8803e05561f4c384d76a080e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Javier=20Jaramago=20Fern=C3=A1ndez?= Date: Tue, 19 May 2020 16:32:42 +0200 Subject: [PATCH] Implements synchronization of: 'mysql_galera_hostgroups', 'group_replication_hostgroups' and 'mysql_aws_aurora_hostgroups' - Implements synchronization for the mentioned tables. - Creates new version of 'proxy_info' that accepts a 'const char*' as a parameter, instead of a 'string literal'. --- include/proxysql.h | 1 + include/proxysql_debug.h | 6 + lib/ProxySQL_Cluster.cpp | 346 ++++++++++++++++++++++++++++----------- lib/debug.cpp | 32 ++++ 4 files changed, 292 insertions(+), 93 deletions(-) diff --git a/include/proxysql.h b/include/proxysql.h index 886e59eb2..89645feb7 100644 --- a/include/proxysql.h +++ b/include/proxysql.h @@ -105,6 +105,7 @@ enum MySQL_response_type mysql_response(unsigned char *, unsigned int); void proxy_error_func(const char *, ...); void print_backtrace(void); +void proxy_info_(const char* msg, ...); #ifdef DEBUG void init_debug_struct(); diff --git a/include/proxysql_debug.h b/include/proxysql_debug.h index d85676c98..7416df6b7 100644 --- a/include/proxysql_debug.h +++ b/include/proxysql_debug.h @@ -92,6 +92,9 @@ extern int gdbg; strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \ proxy_error_func("%s %s:%d:%s(): [INFO] " fmt, __buffer, __FILE__, __LINE__, __func__ , ## __VA_ARGS__); \ } while(0) + +#define proxy_info__(fmt, ...) proxy_info_(fmt, __FILE__, __LINE__, __func__ , ## __VA_ARGS__) + #else #define proxy_info(fmt, ...) \ do { \ @@ -103,6 +106,9 @@ extern int gdbg; strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \ proxy_error_func("%s [INFO] " fmt , __buffer , ## __VA_ARGS__); \ } while(0) + +#define proxy_info__(fmt, ...) proxy_info_(fmt, ## __VA_ARGS__) + #endif #ifdef DEBUG diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index bbd54397f..f50e1766a 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -1,4 +1,5 @@ #include "proxysql.h" +#include "proxysql_utils.h" #include "cpp.h" #include "SpookyV2.h" @@ -863,6 +864,58 @@ __exit_pull_mysql_users_from_peer: pthread_mutex_unlock(&GloProxyCluster->update_mysql_users_mutex); } +/** + * @brief Simple struct for holding a query, and three messages to report + * the progress of the query execution. + */ +struct fetch_query { + const char* query; + std::string msgs[3]; +}; + +/** + * @brief Makes a query with the supplied connection and stores the result in the + * 'MYSQL_RES' passed as a parameter. + * + * @param conn The MYSQL connectionn in which to perform the queries. + * @param f_query A struct holding the query and three messages: + * 1. Message to display before performing the query. + * 2. Message to display when the operation is complete. + * 3. Message to display in case the query fails to be executed. + * @param result The result of the executed query. + * @return int The errno in case fo the query execution not being successful, + * zero otherwise. + */ +int fetch_and_store(MYSQL* conn, const fetch_query& f_query, MYSQL_RES** result) { + const auto& msgs = f_query.msgs; + const auto& query = f_query.query; + + // report operation to be performed + if (!msgs[0].empty()) { + proxy_info__(msgs[0].c_str()); + } + + int query_res = mysql_query(conn, query); + + if (query_res == 0) { + *result = mysql_store_result(conn); + query_res = mysql_errno(conn); + } else { + // report error + if (!msgs[2].empty()) { + std::string f_err = msgs[2] + mysql_error(conn); + proxy_info__(f_err.c_str()); + } + } + + // report finish msg + if (!msgs[1].empty()) { + proxy_info__(msgs[1].c_str()); + } + + return query_res; +} + void ProxySQL_Cluster::pull_mysql_servers_from_peer() { char * hostname = NULL; uint16_t port = 0; @@ -874,7 +927,6 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer() { char *password = NULL; // bool rc_bool = true; MYSQL *rc_conn; - int rc_query; MYSQL *conn = mysql_init(NULL); if (conn==NULL) { proxy_error("Unable to run mysql_init()\n"); @@ -891,107 +943,215 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer() { proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d started. Expected checksum %s\n", hostname, port, peer_checksum); rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); if (rc_conn) { - MYSQL_RES *result1 = NULL; - MYSQL_RES *result2 = NULL; - GloAdmin->mysql_servers_wrlock(); - //rc_query = mysql_query(conn,"SELECT hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM runtime_mysql_servers WHERE status<>'OFFLINE_HARD'"); - rc_query = mysql_query(conn,CLUSTER_QUERY_MYSQL_SERVERS); // for bug #1188 , ProxySQL Admin needs to know the exact query - if ( rc_query == 0 ) { - result1 = mysql_store_result(conn); + std::vector results {}; + + // Server query messages + std::string fetch_servers_done = ""; + string_format("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", fetch_servers_done, hostname, port); + std::string fetch_servers_err = ""; + string_format("Cluster: Fetching MySQL Servers from peer %s:%d failed: \n", fetch_servers_err, hostname, port); + + // group_replication_hostgroups query and messages + const char* CLUSTER_QUERY_MYSQL_GROUP_REPLICATION_HOSTGROUPS = + "SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, " + "max_writers, writer_is_also_reader, max_transactions_behind, comment FROM mysql_group_replication_hostgroups"; + std::string fetch_group_replication_hostgroups = ""; + string_format("Cluster: Fetching 'MySQL Group Replication Hostgroups' from peer %s:%d\n", fetch_group_replication_hostgroups, hostname, port); + std::string fetch_group_replication_hostgroups_err = ""; + string_format("Cluster: Fetching 'MySQL Group Replication Hostgroups' from peer %s:%d failed: \n", fetch_group_replication_hostgroups_err, hostname, port); + + // AWS Aurora query and messages + const char* CLUSTER_QUERY_MYSQL_AWS_AURORA = + "SELECT writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, " + "check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment FROM mysql_aws_aurora_hostgroups"; + std::string fetch_aws_aurora_start = ""; + string_format("Cluster: Fetching 'MySQL Aurora Hostgroups' from peer %s:%d\n", fetch_aws_aurora_start, hostname, port); + std::string fetch_aws_aurora_err = ""; + string_format("Cluster: Fetching 'MySQL Aurora Hostgroups' from peer %s:%d failed: \n", fetch_aws_aurora_err, hostname, port); + + // Galera query and messages + const char* CLUSTER_QUERY_MYSQL_GALERA = + "SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, " + "max_writers, writer_is_also_reader, max_transactions_behind, comment FROM mysql_galera_hostgroups"; + std::string fetch_galera_start = ""; + string_format("Cluster: Fetching 'MySQL Galera Hostgroups' from peer %s:%d\n", fetch_galera_start, hostname, port); + std::string fetch_galera_err = ""; + string_format("Cluster: Fetching 'MySQL Galera Hostgroups' from peer %s:%d failed: \n", fetch_galera_err, hostname, port); + + // Checksums query and messages + const char* CLUSTER_QUERY_RUNTIME_CHECKS = "SELECT * FROM runtime_checksums_values WHERE name='mysql_servers' LIMIT 1"; + std::string fetch_checksums_start = ""; + string_format("Cluster: Fetching checksum for MySQL Servers from peer %s:%d before proceessing\n", fetch_checksums_start, hostname, port); + std::string fetch_checksums_err = ""; + string_format("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed: \n", fetch_checksums_err, hostname, port); + + // Create fetching queries + fetch_query queries[] = { + { CLUSTER_QUERY_MYSQL_SERVERS, { "", "", fetch_servers_err } }, + { CLUSTER_QUERY_MYSQL_REPLICATION_HOSTGROUPS, { "", "", fetch_servers_err } }, + { CLUSTER_QUERY_MYSQL_GROUP_REPLICATION_HOSTGROUPS, { fetch_group_replication_hostgroups, "", fetch_group_replication_hostgroups_err } }, + { CLUSTER_QUERY_MYSQL_GALERA , { fetch_galera_start, "", fetch_galera_err } }, + { CLUSTER_QUERY_MYSQL_AWS_AURORA , { fetch_aws_aurora_start, "", fetch_aws_aurora_err } }, + { CLUSTER_QUERY_RUNTIME_CHECKS, { fetch_checksums_start, "", fetch_checksums_err } } + }; + + bool fetching_error = false; + for (size_t i = 0; i < sizeof(queries) / sizeof(fetch_query); i++) { + MYSQL_RES* fetch_res = nullptr; + int it_err = fetch_and_store(conn, queries[i], &fetch_res); + + if (it_err == 0) { + results.push_back(fetch_res); + } else { + fetching_error = true; + break; + } + } - //rc_query = mysql_query(conn,"SELECT writer_hostgroup, reader_hostgroup, comment FROM runtime_mysql_replication_hostgroups"); - rc_query = mysql_query(conn,CLUSTER_QUERY_MYSQL_REPLICATION_HOSTGROUPS); - if ( rc_query == 0 ) { - result2 = mysql_store_result(conn); - proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", hostname, port); - proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d before proceessing\n", hostname, port); - rc_query = mysql_query(conn,"SELECT * FROM runtime_checksums_values WHERE name='mysql_servers' LIMIT 1"); - if ( rc_query == 0) { - MYSQL_RES *result3 = mysql_store_result(conn); - MYSQL_ROW row; - char *checks = NULL; - while ((row = mysql_fetch_row(result3))) { - if (checks) { // health check - free(checks); - checks = NULL; - } - if (row[3]) { - checks = strdup(row[3]); // checksum - } + if (fetching_error == false) { + MYSQL_ROW row; + char *checks = NULL; + while ((row = mysql_fetch_row(results[5]))) { + if (checks) { // health check + free(checks); + checks = NULL; + } + if (row[3]) { + checks = strdup(row[3]); // checksum + } + } + if (checks && strcmp(checks,peer_checksum)==0) { + // we are OK to sync! + proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d successful. Checksum: %s\n", hostname, port, checks); + // sync mysql_servers + proxy_info("Cluster: Writing mysql_servers table\n"); + GloAdmin->admindb->execute("DELETE FROM mysql_servers"); + MYSQL_ROW row; + char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, %s, %s, \"%s\", %s, %s, %s, %s, %s, '%s')"; + while ((row = mysql_fetch_row(results[0]))) { + int i; + int l=0; + for (i=0; i<11; i++) { + l+=strlen(row[i]); } - if (checks) { - if(strcmp(checks,peer_checksum)==0) { - // we are OK to sync! - proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d successful. Checksum: %s\n", hostname, port, checks); - - proxy_info("Cluster: Writing mysql_servers table\n"); - GloAdmin->admindb->execute("DELETE FROM mysql_servers"); - MYSQL_ROW row; - char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, %s, %s, \"%s\", %s, %s, %s, %s, %s, '%s')"; - while ((row = mysql_fetch_row(result1))) { - int i; - int l=0; - for (i=0; i<11; i++) { - l+=strlen(row[i]); - } - char *o=escape_string_single_quotes(row[11],false); - char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); - - sprintf(query,q,row[0],row[1],row[2],row[3], row[4], ( strcmp(row[5],"SHUNNED")==0 ? "ONLINE" : row[5] ), row[6],row[7],row[8],row[9],row[10],o); - if (o!=row[11]) { // there was a copy - free(o); - } - GloAdmin->admindb->execute(query); - free(query); - } + char *o=escape_string_single_quotes(row[11],false); + char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); - proxy_info("Cluster: Writing mysql_replication_hostgroups table\n"); - GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups"); - q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, check_type, comment) VALUES (%s, %s, '%s', '%s')"; - while ((row = mysql_fetch_row(result2))) { - int i; - int l=0; - for (i=0; i<3; i++) { - l+=strlen(row[i]); - } - char *o=escape_string_single_quotes(row[3],false); - char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); - sprintf(query,q,row[0],row[1],row[2],o); - if (o!=row[3]) { // there was a copy - free(o); - } - GloAdmin->admindb->execute(query); - free(query); - } - //mysql_free_result(result2); - - proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port); - GloAdmin->load_mysql_servers_to_runtime(); - if (GloProxyCluster->cluster_mysql_servers_save_to_disk == true) { - proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port); - GloAdmin->flush_mysql_servers__from_memory_to_disk(); - } else { - proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed. Checksum: %s\n", hostname, port, checks); - } - } + sprintf(query,q,row[0],row[1],row[2],row[3], row[4], ( strcmp(row[5],"SHUNNED")==0 ? "ONLINE" : row[5] ), row[6],row[7],row[8],row[9],row[10],o); + if (o!=row[11]) { // there was a copy + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + + // sync mysql_replication_hostgroups + proxy_info("Cluster: Writing mysql_replication_hostgroups table\n"); + GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups"); + q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, check_type, comment) VALUES (%s, %s, '%s', '%s')"; + while ((row = mysql_fetch_row(results[1]))) { + int i; + int l=0; + for (i=0; i<3; i++) { + l+=strlen(row[i]); } - if (result3) { - mysql_free_result(result3); + char *o=escape_string_single_quotes(row[3],false); + char *query = (char *)malloc(strlen(q)+i+strlen(o)+64); + sprintf(query,q,row[0],row[1],row[2],o); + if (o!=row[3]) { // there was a copy + free(o); } - } else { - proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); + GloAdmin->admindb->execute(query); + free(query); } - if (result2) { - mysql_free_result(result2); + + // sync mysql_group_replication_hostgroups + proxy_info("Cluster: Writing mysql_group_replication_hostgroups table\n"); + GloAdmin->admindb->execute("DELETE FROM mysql_group_replication_hostgroups"); + q=(char *)"INSERT INTO mysql_group_replication_hostgroups ( " + "writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, " + "max_writers, writer_is_also_reader, max_transactions_behind, comment) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s')"; + while ((row = mysql_fetch_row(results[2]))) { + int i; + int l = 0; + for (i = 0; i < 8; i++) { + l += strlen(row[i]); + } + char *o = escape_string_single_quotes(row[8], false); + char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64); + sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], o); + // free in case of 'o' being a copy + if (o != row[8]) { + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + + // sync mysql_galera_hostgroups + proxy_info("Cluster: Writing mysql_galera_hostgroups table\n"); + GloAdmin->admindb->execute("DELETE FROM mysql_galera_hostgroups"); + q=(char *)"INSERT INTO mysql_galera_hostgroups ( " + "writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, " + "max_writers, writer_is_also_reader, max_transactions_behind, comment) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s')"; + while ((row = mysql_fetch_row(results[3]))) { + int i; + int l = 0; + for (i = 0; i < 8; i++) { + l += strlen(row[i]); + } + char *o = escape_string_single_quotes(row[8], false); + char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64); + sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], o); + // free in case of 'o' being a copy + if (o != row[8]) { + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + + // sync mysql_aws_aurora_hostgroups + proxy_info("Cluster: Writing mysql_aws_aurora_hostgroups table\n"); + GloAdmin->admindb->execute("DELETE FROM mysql_aws_aurora_hostgroups"); + q=(char *)"INSERT INTO mysql_aws_aurora_hostgroups ( " + "writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, " + "check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) " + "VALUES (%s, %s, %s, %s, '%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s')"; + while ((row = mysql_fetch_row(results[4]))) { + int i; + int l = 0; + for (i = 0; i < 13; i++) { + l += strlen(row[i]); + } + char *o = escape_string_single_quotes(row[13], false); + char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64); + sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], o); + // free in case of 'o' being a copy + if (o != row[13]) { + free(o); + } + GloAdmin->admindb->execute(query); + free(query); + } + + proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->load_mysql_servers_to_runtime(); + if (GloProxyCluster->cluster_mysql_servers_save_to_disk == true) { + proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port); + GloAdmin->flush_mysql_servers__from_memory_to_disk(); + } else { + // TODO: Change this message + proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed. Checksum: %s\n", hostname, port, checks); } - } else { - proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); } - if (result1) { - mysql_free_result(result1); + + // free results + for (MYSQL_RES* result : results) { + mysql_free_result(result); } - } else { - proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); } GloAdmin->mysql_servers_wrunlock(); } else { diff --git a/lib/debug.cpp b/lib/debug.cpp index dd896aea4..90cb610e9 100644 --- a/lib/debug.cpp +++ b/lib/debug.cpp @@ -190,3 +190,35 @@ void init_debug_struct_from_cmdline() { } } #endif /* DEBUG */ + +void proxy_info_(const char* msg, ...) { + va_list args; + va_start(args, msg); + + time_t __timer; + char __buffer[25]; + + // create the time info + struct tm *__tm_info; + time(&__timer); + __tm_info = localtime(&__timer); + strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); + + // format the message +#ifdef DEBUG + const char* debug_msg_fmt = " %s:%d:%s(): [INFO] "; +#else + const char* debug_msg_fmt = " [INFO]"; +#endif /* DEBUG */ + + std::size_t msg_len = strlen(msg); + char* str_buf = (char*)malloc(25 + strlen(debug_msg_fmt) + msg_len + 1); + strcpy(str_buf, __buffer); + strcat(str_buf, debug_msg_fmt); + strcat(str_buf, msg); + + vfprintf(stderr, str_buf, args); + + free(str_buf); + va_end(args); +}