Implements synchronization of: 'mysql_galera_hostgroups', 'group_replication_hostgroups' and 'mysql_aws_aurora_hostgroups'

- Implements synchronization for the mentioned tables.
- Creates new version of 'proxy_info' that accepts a 'const char*'
  as a parameter, instead of a 'string literal'.
pull/2833/head
Javier Jaramago Fernández 6 years ago
parent 0bdaa0b67b
commit e6907127a3

@ -105,6 +105,7 @@ enum MySQL_response_type mysql_response(unsigned char *, unsigned int);
void proxy_error_func(const char *, ...);
void print_backtrace(void);
void proxy_info_(const char* msg, ...);
#ifdef DEBUG
void init_debug_struct();

@ -92,6 +92,9 @@ extern int gdbg;
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \
proxy_error_func("%s %s:%d:%s(): [INFO] " fmt, __buffer, __FILE__, __LINE__, __func__ , ## __VA_ARGS__); \
} while(0)
#define proxy_info__(fmt, ...) proxy_info_(fmt, __FILE__, __LINE__, __func__ , ## __VA_ARGS__)
#else
#define proxy_info(fmt, ...) \
do { \
@ -103,6 +106,9 @@ extern int gdbg;
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info); \
proxy_error_func("%s [INFO] " fmt , __buffer , ## __VA_ARGS__); \
} while(0)
#define proxy_info__(fmt, ...) proxy_info_(fmt, ## __VA_ARGS__)
#endif
#ifdef DEBUG

@ -1,4 +1,5 @@
#include "proxysql.h"
#include "proxysql_utils.h"
#include "cpp.h"
#include "SpookyV2.h"
@ -863,6 +864,58 @@ __exit_pull_mysql_users_from_peer:
pthread_mutex_unlock(&GloProxyCluster->update_mysql_users_mutex);
}
/**
* @brief Simple struct for holding a query, and three messages to report
* the progress of the query execution.
*/
struct fetch_query {
const char* query;
std::string msgs[3];
};
/**
* @brief Makes a query with the supplied connection and stores the result in the
* 'MYSQL_RES' passed as a parameter.
*
* @param conn The MYSQL connectionn in which to perform the queries.
* @param f_query A struct holding the query and three messages:
* 1. Message to display before performing the query.
* 2. Message to display when the operation is complete.
* 3. Message to display in case the query fails to be executed.
* @param result The result of the executed query.
* @return int The errno in case fo the query execution not being successful,
* zero otherwise.
*/
int fetch_and_store(MYSQL* conn, const fetch_query& f_query, MYSQL_RES** result) {
const auto& msgs = f_query.msgs;
const auto& query = f_query.query;
// report operation to be performed
if (!msgs[0].empty()) {
proxy_info__(msgs[0].c_str());
}
int query_res = mysql_query(conn, query);
if (query_res == 0) {
*result = mysql_store_result(conn);
query_res = mysql_errno(conn);
} else {
// report error
if (!msgs[2].empty()) {
std::string f_err = msgs[2] + mysql_error(conn);
proxy_info__(f_err.c_str());
}
}
// report finish msg
if (!msgs[1].empty()) {
proxy_info__(msgs[1].c_str());
}
return query_res;
}
void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
char * hostname = NULL;
uint16_t port = 0;
@ -874,7 +927,6 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
char *password = NULL;
// bool rc_bool = true;
MYSQL *rc_conn;
int rc_query;
MYSQL *conn = mysql_init(NULL);
if (conn==NULL) {
proxy_error("Unable to run mysql_init()\n");
@ -891,107 +943,215 @@ void ProxySQL_Cluster::pull_mysql_servers_from_peer() {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d started. Expected checksum %s\n", hostname, port, peer_checksum);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
MYSQL_RES *result1 = NULL;
MYSQL_RES *result2 = NULL;
GloAdmin->mysql_servers_wrlock();
//rc_query = mysql_query(conn,"SELECT hostgroup_id, hostname, port, status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM runtime_mysql_servers WHERE status<>'OFFLINE_HARD'");
rc_query = mysql_query(conn,CLUSTER_QUERY_MYSQL_SERVERS); // for bug #1188 , ProxySQL Admin needs to know the exact query
if ( rc_query == 0 ) {
result1 = mysql_store_result(conn);
std::vector<MYSQL_RES*> results {};
// Server query messages
std::string fetch_servers_done = "";
string_format("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", fetch_servers_done, hostname, port);
std::string fetch_servers_err = "";
string_format("Cluster: Fetching MySQL Servers from peer %s:%d failed: \n", fetch_servers_err, hostname, port);
// group_replication_hostgroups query and messages
const char* CLUSTER_QUERY_MYSQL_GROUP_REPLICATION_HOSTGROUPS =
"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, "
"max_writers, writer_is_also_reader, max_transactions_behind, comment FROM mysql_group_replication_hostgroups";
std::string fetch_group_replication_hostgroups = "";
string_format("Cluster: Fetching 'MySQL Group Replication Hostgroups' from peer %s:%d\n", fetch_group_replication_hostgroups, hostname, port);
std::string fetch_group_replication_hostgroups_err = "";
string_format("Cluster: Fetching 'MySQL Group Replication Hostgroups' from peer %s:%d failed: \n", fetch_group_replication_hostgroups_err, hostname, port);
// AWS Aurora query and messages
const char* CLUSTER_QUERY_MYSQL_AWS_AURORA =
"SELECT writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, "
"check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment FROM mysql_aws_aurora_hostgroups";
std::string fetch_aws_aurora_start = "";
string_format("Cluster: Fetching 'MySQL Aurora Hostgroups' from peer %s:%d\n", fetch_aws_aurora_start, hostname, port);
std::string fetch_aws_aurora_err = "";
string_format("Cluster: Fetching 'MySQL Aurora Hostgroups' from peer %s:%d failed: \n", fetch_aws_aurora_err, hostname, port);
// Galera query and messages
const char* CLUSTER_QUERY_MYSQL_GALERA =
"SELECT writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, "
"max_writers, writer_is_also_reader, max_transactions_behind, comment FROM mysql_galera_hostgroups";
std::string fetch_galera_start = "";
string_format("Cluster: Fetching 'MySQL Galera Hostgroups' from peer %s:%d\n", fetch_galera_start, hostname, port);
std::string fetch_galera_err = "";
string_format("Cluster: Fetching 'MySQL Galera Hostgroups' from peer %s:%d failed: \n", fetch_galera_err, hostname, port);
// Checksums query and messages
const char* CLUSTER_QUERY_RUNTIME_CHECKS = "SELECT * FROM runtime_checksums_values WHERE name='mysql_servers' LIMIT 1";
std::string fetch_checksums_start = "";
string_format("Cluster: Fetching checksum for MySQL Servers from peer %s:%d before proceessing\n", fetch_checksums_start, hostname, port);
std::string fetch_checksums_err = "";
string_format("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed: \n", fetch_checksums_err, hostname, port);
// Create fetching queries
fetch_query queries[] = {
{ CLUSTER_QUERY_MYSQL_SERVERS, { "", "", fetch_servers_err } },
{ CLUSTER_QUERY_MYSQL_REPLICATION_HOSTGROUPS, { "", "", fetch_servers_err } },
{ CLUSTER_QUERY_MYSQL_GROUP_REPLICATION_HOSTGROUPS, { fetch_group_replication_hostgroups, "", fetch_group_replication_hostgroups_err } },
{ CLUSTER_QUERY_MYSQL_GALERA , { fetch_galera_start, "", fetch_galera_err } },
{ CLUSTER_QUERY_MYSQL_AWS_AURORA , { fetch_aws_aurora_start, "", fetch_aws_aurora_err } },
{ CLUSTER_QUERY_RUNTIME_CHECKS, { fetch_checksums_start, "", fetch_checksums_err } }
};
bool fetching_error = false;
for (size_t i = 0; i < sizeof(queries) / sizeof(fetch_query); i++) {
MYSQL_RES* fetch_res = nullptr;
int it_err = fetch_and_store(conn, queries[i], &fetch_res);
if (it_err == 0) {
results.push_back(fetch_res);
} else {
fetching_error = true;
break;
}
}
//rc_query = mysql_query(conn,"SELECT writer_hostgroup, reader_hostgroup, comment FROM runtime_mysql_replication_hostgroups");
rc_query = mysql_query(conn,CLUSTER_QUERY_MYSQL_REPLICATION_HOSTGROUPS);
if ( rc_query == 0 ) {
result2 = mysql_store_result(conn);
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d before proceessing\n", hostname, port);
rc_query = mysql_query(conn,"SELECT * FROM runtime_checksums_values WHERE name='mysql_servers' LIMIT 1");
if ( rc_query == 0) {
MYSQL_RES *result3 = mysql_store_result(conn);
MYSQL_ROW row;
char *checks = NULL;
while ((row = mysql_fetch_row(result3))) {
if (checks) { // health check
free(checks);
checks = NULL;
}
if (row[3]) {
checks = strdup(row[3]); // checksum
}
if (fetching_error == false) {
MYSQL_ROW row;
char *checks = NULL;
while ((row = mysql_fetch_row(results[5]))) {
if (checks) { // health check
free(checks);
checks = NULL;
}
if (row[3]) {
checks = strdup(row[3]); // checksum
}
}
if (checks && strcmp(checks,peer_checksum)==0) {
// we are OK to sync!
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d successful. Checksum: %s\n", hostname, port, checks);
// sync mysql_servers
proxy_info("Cluster: Writing mysql_servers table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_servers");
MYSQL_ROW row;
char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, %s, %s, \"%s\", %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(results[0]))) {
int i;
int l=0;
for (i=0; i<11; i++) {
l+=strlen(row[i]);
}
if (checks) {
if(strcmp(checks,peer_checksum)==0) {
// we are OK to sync!
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d successful. Checksum: %s\n", hostname, port, checks);
proxy_info("Cluster: Writing mysql_servers table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_servers");
MYSQL_ROW row;
char *q=(char *)"INSERT INTO mysql_servers (hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (%s, \"%s\", %s, %s, %s, \"%s\", %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(result1))) {
int i;
int l=0;
for (i=0; i<11; i++) {
l+=strlen(row[i]);
}
char *o=escape_string_single_quotes(row[11],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],row[2],row[3], row[4], ( strcmp(row[5],"SHUNNED")==0 ? "ONLINE" : row[5] ), row[6],row[7],row[8],row[9],row[10],o);
if (o!=row[11]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
char *o=escape_string_single_quotes(row[11],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
proxy_info("Cluster: Writing mysql_replication_hostgroups table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups");
q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, check_type, comment) VALUES (%s, %s, '%s', '%s')";
while ((row = mysql_fetch_row(result2))) {
int i;
int l=0;
for (i=0; i<3; i++) {
l+=strlen(row[i]);
}
char *o=escape_string_single_quotes(row[3],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],row[2],o);
if (o!=row[3]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
//mysql_free_result(result2);
proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_servers_to_runtime();
if (GloProxyCluster->cluster_mysql_servers_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_servers__from_memory_to_disk();
} else {
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed. Checksum: %s\n", hostname, port, checks);
}
}
sprintf(query,q,row[0],row[1],row[2],row[3], row[4], ( strcmp(row[5],"SHUNNED")==0 ? "ONLINE" : row[5] ), row[6],row[7],row[8],row[9],row[10],o);
if (o!=row[11]) { // there was a copy
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
// sync mysql_replication_hostgroups
proxy_info("Cluster: Writing mysql_replication_hostgroups table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_replication_hostgroups");
q=(char *)"INSERT INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, check_type, comment) VALUES (%s, %s, '%s', '%s')";
while ((row = mysql_fetch_row(results[1]))) {
int i;
int l=0;
for (i=0; i<3; i++) {
l+=strlen(row[i]);
}
if (result3) {
mysql_free_result(result3);
char *o=escape_string_single_quotes(row[3],false);
char *query = (char *)malloc(strlen(q)+i+strlen(o)+64);
sprintf(query,q,row[0],row[1],row[2],o);
if (o!=row[3]) { // there was a copy
free(o);
}
} else {
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
GloAdmin->admindb->execute(query);
free(query);
}
if (result2) {
mysql_free_result(result2);
// sync mysql_group_replication_hostgroups
proxy_info("Cluster: Writing mysql_group_replication_hostgroups table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_group_replication_hostgroups");
q=(char *)"INSERT INTO mysql_group_replication_hostgroups ( "
"writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, "
"max_writers, writer_is_also_reader, max_transactions_behind, comment) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(results[2]))) {
int i;
int l = 0;
for (i = 0; i < 8; i++) {
l += strlen(row[i]);
}
char *o = escape_string_single_quotes(row[8], false);
char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64);
sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], o);
// free in case of 'o' being a copy
if (o != row[8]) {
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
// sync mysql_galera_hostgroups
proxy_info("Cluster: Writing mysql_galera_hostgroups table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_galera_hostgroups");
q=(char *)"INSERT INTO mysql_galera_hostgroups ( "
"writer_hostgroup, backup_writer_hostgroup, reader_hostgroup, offline_hostgroup, active, "
"max_writers, writer_is_also_reader, max_transactions_behind, comment) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(results[3]))) {
int i;
int l = 0;
for (i = 0; i < 8; i++) {
l += strlen(row[i]);
}
char *o = escape_string_single_quotes(row[8], false);
char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64);
sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], o);
// free in case of 'o' being a copy
if (o != row[8]) {
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
// sync mysql_aws_aurora_hostgroups
proxy_info("Cluster: Writing mysql_aws_aurora_hostgroups table\n");
GloAdmin->admindb->execute("DELETE FROM mysql_aws_aurora_hostgroups");
q=(char *)"INSERT INTO mysql_aws_aurora_hostgroups ( "
"writer_hostgroup, reader_hostgroup, active, aurora_port, domain_name, max_lag_ms, check_interval_ms, "
"check_timeout_ms, writer_is_also_reader, new_reader_weight, add_lag_ms, min_lag_ms, lag_num_checks, comment) "
"VALUES (%s, %s, %s, %s, '%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s')";
while ((row = mysql_fetch_row(results[4]))) {
int i;
int l = 0;
for (i = 0; i < 13; i++) {
l += strlen(row[i]);
}
char *o = escape_string_single_quotes(row[13], false);
char *query = (char *)malloc(strlen(q) + i + strlen(o) + 64);
sprintf(query, q, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9], row[10], row[11], row[12], o);
// free in case of 'o' being a copy
if (o != row[13]) {
free(o);
}
GloAdmin->admindb->execute(query);
free(query);
}
proxy_info("Cluster: Loading to runtime MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_servers_to_runtime();
if (GloProxyCluster->cluster_mysql_servers_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Servers from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_servers__from_memory_to_disk();
} else {
// TODO: Change this message
proxy_info("Cluster: Fetching checksum for MySQL Servers from peer %s:%d failed. Checksum: %s\n", hostname, port, checks);
}
} else {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
if (result1) {
mysql_free_result(result1);
// free results
for (MYSQL_RES* result : results) {
mysql_free_result(result);
}
} else {
proxy_info("Cluster: Fetching MySQL Servers from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
GloAdmin->mysql_servers_wrunlock();
} else {

@ -190,3 +190,35 @@ void init_debug_struct_from_cmdline() {
}
}
#endif /* DEBUG */
void proxy_info_(const char* msg, ...) {
va_list args;
va_start(args, msg);
time_t __timer;
char __buffer[25];
// create the time info
struct tm *__tm_info;
time(&__timer);
__tm_info = localtime(&__timer);
strftime(__buffer, 25, "%Y-%m-%d %H:%M:%S", __tm_info);
// format the message
#ifdef DEBUG
const char* debug_msg_fmt = " %s:%d:%s(): [INFO] ";
#else
const char* debug_msg_fmt = " [INFO]";
#endif /* DEBUG */
std::size_t msg_len = strlen(msg);
char* str_buf = (char*)malloc(25 + strlen(debug_msg_fmt) + msg_len + 1);
strcpy(str_buf, __buffer);
strcat(str_buf, debug_msg_fmt);
strcat(str_buf, msg);
vfprintf(stderr, str_buf, args);
free(str_buf);
va_end(args);
}

Loading…
Cancel
Save