* Added runtime mysql server logic in commit.

* Changed commit signature.
* Few fixes
v2.x_refactor_cluster_mysql_servers
Rahim Kanji 3 years ago
parent 32f9f727d0
commit 09f76511b1

@ -393,14 +393,14 @@ class MySQL_HostGroups_Manager {
#endif
enum HGM_TABLES {
MYSQL_SERVERS = 0,
MYSQL_SERVERS_INCOMING = 0,
MYSQL_REPLICATION_HOSTGROUPS,
MYSQL_GROUP_REPLICATION_HOSTGROUPS,
MYSQL_GALERA_HOSTGROUPS,
MYSQL_AWS_AURORA_HOSTGROUPS,
MYSQL_HOSTGROUP_ATTRIBUTES,
MYSQL_SERVERS,
MYSQL_SERVERS_INCOMING,
__HGM_TABLES_SIZE
};
@ -698,9 +698,10 @@ class MySQL_HostGroups_Manager {
void wrlock();
void wrunlock();
int servers_add(SQLite3_result *resultset);
void update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server);
bool commit(SQLite3_result* runtime_mysql_servers = nullptr, SQLite3_result* mysql_servers_incoming = nullptr,
const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server = {}, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming = {});
//void update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server);
bool commit(SQLite3_result* runtime_mysql_servers = nullptr, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server = {},
SQLite3_result* mysql_servers_incoming = nullptr, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming = {},
bool only_commit_runtime_mysql_servers = false);
void commit_update_checksums_from_tables();
void CUCFT1(const string& TableName, const string& ColumnName, uint64_t& raw_checksum); // used by commit_update_checksums_from_tables()
@ -826,6 +827,8 @@ class MySQL_HostGroups_Manager {
MySrvC* find_server_in_hg(unsigned int _hid, const std::string& addr, int port);
private:
static uint64_t compute_mysql_servers_raw_checksum(const SQLite3_result* runtime_mysql_servers);
void update_hostgroup_manager_mappings();
uint64_t get_mysql_servers_checksum(SQLite3_result* runtime_mysql_servers = nullptr);
uint64_t get_mysql_servers_incoming_checksum(SQLite3_result* incoming_mysql_servers, bool use_precalculated_checksum = true);

@ -1495,235 +1495,234 @@ unsigned int MySQL_HostGroups_Manager::get_servers_table_version() {
return __sync_fetch_and_add(&status.servers_table_version,0);
}
/**
* @brief Generates runtime mysql server records and checksum.
*
* IMPORTANT: It's assumed that the previous queries were successful and that the resultsets are received in
* the specified order.
* @param can be null or previously generated runtime_mysql_servers resultset can be passed.
*/
void MySQL_HostGroups_Manager::update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers,
const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server) {
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
// if any server has gtid_port enabled, use_gtid is set to true
// and then has_gtid_port is set too
bool use_gtid = false;
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
const char* query = "SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info("Dumping mysql_servers LEFT JOIN mysql_servers_incoming\n");
resultset->dump_to_stderr();
}
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
long long ptr = atoll(r->fields[0]);
proxy_warning("Removed server at address %lld, hostgroup %s, address %s port %s. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them\n", ptr, r->fields[1], r->fields[2], r->fields[3]);
MySrvC* mysrvc = (MySrvC*)ptr;
mysrvc->status = MYSQL_SERVER_STATUS_OFFLINE_HARD;
mysrvc->ConnectionsFree->drop_all_connections();
char* q1 = (char*)"DELETE FROM mysql_servers WHERE mem_pointer=%lld";
char* q2 = (char*)malloc(strlen(q1) + 32);
sprintf(q2, q1, ptr);
mydb->execute(q2);
free(q2);
}
}
if (resultset) { delete resultset; resultset = NULL; }
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming");
// SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query = (char*)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info("Dumping mysql_servers JOIN mysql_servers_incoming\n");
resultset->dump_to_stderr();
}
// optimization #829
int rc;
sqlite3_stmt* statement1 = NULL;
sqlite3_stmt* statement2 = NULL;
//sqlite3 *mydb3=mydb->get_db();
char* query1 = (char*)"UPDATE mysql_servers SET mem_pointer = ?1 WHERE hostgroup_id = ?2 AND hostname = ?3 AND port = ?4";
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
rc = mydb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, mydb);
char* query2 = (char*)"UPDATE mysql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 , gtid_port = ?9 WHERE hostgroup_id = ?10 AND hostname = ?11 AND port = ?12";
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query2, -1, &statement2, 0);
rc = mydb->prepare_v2(query2, &statement2);
ASSERT_SQLITE_OK(rc, mydb);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
long long ptr = atoll(r->fields[12]); // increase this index every time a new column is added
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), ptr, atoi(r->fields[0]), atoi(r->fields[6]));
//fprintf(stderr,"%lld\n", ptr);
if (ptr == 0) {
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]));
}
MySrvC* mysrvc = new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), atoi(r->fields[10]), r->fields[11]); // add new fields here if adding more columns in mysql_servers
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), mysrvc, atoi(r->fields[0]));
MyHGM->add(mysrvc, atoi(r->fields[0]));
ptr = (uintptr_t)mysrvc;
rc = (*proxy_sqlite3_bind_int64)(statement1, 1, ptr); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_text)(statement1, 3, r->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, mydb);
SAFE_SQLITE3_STEP2(statement1);
rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb);
if (mysrvc->gtid_port) {
// this server has gtid_port configured, we set use_gtid
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port);
use_gtid = true;
}
} else {
bool run_update = false;
MySrvC* mysrvc = (MySrvC*)ptr;
// carefully increase the 2nd index by 1 for every new column added
if (atoi(r->fields[3]) != atoi(r->fields[13])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing gtid_port for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), mysrvc->gtid_port, atoi(r->fields[13]));
mysrvc->gtid_port = atoi(r->fields[13]);
}
if (atoi(r->fields[4]) != atoi(r->fields[14])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), mysrvc->weight, atoi(r->fields[14]));
mysrvc->weight = atoi(r->fields[14]);
}
if (atoi(r->fields[5]) != atoi(r->fields[15])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]), mysrvc->status, atoi(r->fields[15]));
mysrvc->status = (MySerStatus)atoi(r->fields[15]);
if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic = false;
}
}
if (atoi(r->fields[6]) != atoi(r->fields[16])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]), mysrvc->compression, atoi(r->fields[16]));
mysrvc->compression = atoi(r->fields[16]);
}
if (atoi(r->fields[7]) != atoi(r->fields[17])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]), mysrvc->max_connections, atoi(r->fields[17]));
mysrvc->max_connections = atoi(r->fields[17]);
}
if (atoi(r->fields[8]) != atoi(r->fields[18])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]), mysrvc->max_replication_lag, atoi(r->fields[18]));
mysrvc->max_replication_lag = atoi(r->fields[18]);
if (mysrvc->max_replication_lag == 0) { // we just changed it to 0
if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
// the server is currently shunned due to replication lag
// but we reset max_replication_lag to 0
// therefore we immediately reset the status too
mysrvc->status = MYSQL_SERVER_STATUS_ONLINE;
}
}
}
if (atoi(r->fields[9]) != atoi(r->fields[19])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]), mysrvc->use_ssl, atoi(r->fields[19]));
mysrvc->use_ssl = atoi(r->fields[19]);
}
if (atoi(r->fields[10]) != atoi(r->fields[20])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[10]), mysrvc->max_latency_us / 1000, atoi(r->fields[20]));
mysrvc->max_latency_us = 1000 * atoi(r->fields[20]);
}
if (strcmp(r->fields[11], r->fields[21])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[11], r->fields[21]);
free(mysrvc->comment);
mysrvc->comment = strdup(r->fields[21]);
}
if (run_update) {
rc = (*proxy_sqlite3_bind_int64)(statement2, 1, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 2, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 3, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 4, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 5, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 6, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 7, mysrvc->max_latency_us / 1000); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 10, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_text)(statement2, 11, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_bind_int64)(statement2, 12, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
SAFE_SQLITE3_STEP2(statement2);
rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, mydb);
rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, mydb);
}
if (mysrvc->gtid_port) {
// this server has gtid_port configured, we set use_gtid
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port);
use_gtid = true;
}
}
}
(*proxy_sqlite3_finalize)(statement1);
(*proxy_sqlite3_finalize)(statement2);
}
if (use_gtid) {
has_gtid_port = true;
} else {
has_gtid_port = false;
}
if (resultset) { delete resultset; resultset = NULL; }
purge_mysql_servers_table();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
const auto mysql_servers_checksum = get_mysql_servers_checksum();
update_hostgroup_manager_mappings();
char buf[80];
uint32_t d32[2];
memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum));
sprintf(buf, "0x%0X%0X", d32[0], d32[1]);
pthread_mutex_lock(&GloVars.checksum_mutex);
GloVars.checksums_values.mysql_servers.set_checksum(buf);
GloVars.checksums_values.mysql_servers.version++;
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
time_t t = time(NULL);
if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false &&
GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) {
GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch;
} else {
GloVars.checksums_values.mysql_servers.epoch = t;
}
GloVars.checksums_values.updates_cnt++;
GloVars.generate_global_checksum();
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
update_table_mysql_servers_for_monitor(false);
}
///**
// * @brief Generates runtime mysql server records and checksum.
// *
// * IMPORTANT: It's assumed that the previous queries were successful and that the resultsets are received in
// * the specified order.
// * @param can be null or previously generated runtime_mysql_servers resultset can be passed.
// */
//void MySQL_HostGroups_Manager::update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers,
// const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server) {
// char* error = NULL;
// int cols = 0;
// int affected_rows = 0;
// SQLite3_result* resultset = NULL;
// // if any server has gtid_port enabled, use_gtid is set to true
// // and then has_gtid_port is set too
// bool use_gtid = false;
//
// mydb->execute("DELETE FROM mysql_servers");
// generate_mysql_servers_table();
//
// const char* query = "SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL";
// mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
// if (error) {
// proxy_error("Error on %s : %s\n", query, error);
// } else {
// if (GloMTH->variables.hostgroup_manager_verbose) {
// proxy_info("Dumping mysql_servers LEFT JOIN mysql_servers_incoming\n");
// resultset->dump_to_stderr();
// }
//
// for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
// SQLite3_row* r = *it;
// long long ptr = atoll(r->fields[0]);
// proxy_warning("Removed server at address %lld, hostgroup %s, address %s port %s. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them\n", ptr, r->fields[1], r->fields[2], r->fields[3]);
// MySrvC* mysrvc = (MySrvC*)ptr;
// mysrvc->status = MYSQL_SERVER_STATUS_OFFLINE_HARD;
// mysrvc->ConnectionsFree->drop_all_connections();
// char* q1 = (char*)"DELETE FROM mysql_servers WHERE mem_pointer=%lld";
// char* q2 = (char*)malloc(strlen(q1) + 32);
// sprintf(q2, q1, ptr);
// mydb->execute(q2);
// free(q2);
// }
// }
// if (resultset) { delete resultset; resultset = NULL; }
//
// mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming");
//
// // SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
// query = (char*)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
// mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
// if (error) {
// proxy_error("Error on %s : %s\n", query, error);
// } else {
//
// if (GloMTH->variables.hostgroup_manager_verbose) {
// proxy_info("Dumping mysql_servers JOIN mysql_servers_incoming\n");
// resultset->dump_to_stderr();
// }
// // optimization #829
// int rc;
// sqlite3_stmt* statement1 = NULL;
// sqlite3_stmt* statement2 = NULL;
// //sqlite3 *mydb3=mydb->get_db();
// char* query1 = (char*)"UPDATE mysql_servers SET mem_pointer = ?1 WHERE hostgroup_id = ?2 AND hostname = ?3 AND port = ?4";
// //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
// rc = mydb->prepare_v2(query1, &statement1);
// ASSERT_SQLITE_OK(rc, mydb);
// char* query2 = (char*)"UPDATE mysql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 , gtid_port = ?9 WHERE hostgroup_id = ?10 AND hostname = ?11 AND port = ?12";
// //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query2, -1, &statement2, 0);
// rc = mydb->prepare_v2(query2, &statement2);
// ASSERT_SQLITE_OK(rc, mydb);
//
// for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
// SQLite3_row* r = *it;
// long long ptr = atoll(r->fields[12]); // increase this index every time a new column is added
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), ptr, atoi(r->fields[0]), atoi(r->fields[6]));
// //fprintf(stderr,"%lld\n", ptr);
// if (ptr == 0) {
// if (GloMTH->variables.hostgroup_manager_verbose) {
// proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]));
// }
// MySrvC* mysrvc = new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), atoi(r->fields[10]), r->fields[11]); // add new fields here if adding more columns in mysql_servers
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), mysrvc, atoi(r->fields[0]));
// MyHGM->add(mysrvc, atoi(r->fields[0]));
// ptr = (uintptr_t)mysrvc;
// rc = (*proxy_sqlite3_bind_int64)(statement1, 1, ptr); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_text)(statement1, 3, r->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, mydb);
// SAFE_SQLITE3_STEP2(statement1);
// rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb);
// if (mysrvc->gtid_port) {
// // this server has gtid_port configured, we set use_gtid
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port);
// use_gtid = true;
// }
// } else {
// bool run_update = false;
// MySrvC* mysrvc = (MySrvC*)ptr;
// // carefully increase the 2nd index by 1 for every new column added
// if (atoi(r->fields[3]) != atoi(r->fields[13])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing gtid_port for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), mysrvc->gtid_port, atoi(r->fields[13]));
// mysrvc->gtid_port = atoi(r->fields[13]);
// }
//
// if (atoi(r->fields[4]) != atoi(r->fields[14])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), mysrvc->weight, atoi(r->fields[14]));
// mysrvc->weight = atoi(r->fields[14]);
// }
// if (atoi(r->fields[5]) != atoi(r->fields[15])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]), mysrvc->status, atoi(r->fields[15]));
// mysrvc->status = (MySerStatus)atoi(r->fields[15]);
// if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED) {
// mysrvc->shunned_automatic = false;
// }
// }
// if (atoi(r->fields[6]) != atoi(r->fields[16])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]), mysrvc->compression, atoi(r->fields[16]));
// mysrvc->compression = atoi(r->fields[16]);
// }
// if (atoi(r->fields[7]) != atoi(r->fields[17])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]), mysrvc->max_connections, atoi(r->fields[17]));
// mysrvc->max_connections = atoi(r->fields[17]);
// }
// if (atoi(r->fields[8]) != atoi(r->fields[18])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]), mysrvc->max_replication_lag, atoi(r->fields[18]));
// mysrvc->max_replication_lag = atoi(r->fields[18]);
// if (mysrvc->max_replication_lag == 0) { // we just changed it to 0
// if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
// // the server is currently shunned due to replication lag
// // but we reset max_replication_lag to 0
// // therefore we immediately reset the status too
// mysrvc->status = MYSQL_SERVER_STATUS_ONLINE;
// }
// }
// }
// if (atoi(r->fields[9]) != atoi(r->fields[19])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]), mysrvc->use_ssl, atoi(r->fields[19]));
// mysrvc->use_ssl = atoi(r->fields[19]);
// }
// if (atoi(r->fields[10]) != atoi(r->fields[20])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[10]), mysrvc->max_latency_us / 1000, atoi(r->fields[20]));
// mysrvc->max_latency_us = 1000 * atoi(r->fields[20]);
// }
// if (strcmp(r->fields[11], r->fields[21])) {
// if (GloMTH->variables.hostgroup_manager_verbose)
// proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[11], r->fields[21]);
// free(mysrvc->comment);
// mysrvc->comment = strdup(r->fields[21]);
// }
// if (run_update) {
// rc = (*proxy_sqlite3_bind_int64)(statement2, 1, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 2, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 3, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 4, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 5, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 6, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 7, mysrvc->max_latency_us / 1000); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 10, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_text)(statement2, 11, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_bind_int64)(statement2, 12, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
// SAFE_SQLITE3_STEP2(statement2);
// rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, mydb);
// rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, mydb);
// }
// if (mysrvc->gtid_port) {
// // this server has gtid_port configured, we set use_gtid
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port);
// use_gtid = true;
// }
// }
// }
// (*proxy_sqlite3_finalize)(statement1);
// (*proxy_sqlite3_finalize)(statement2);
// }
// if (use_gtid) {
// has_gtid_port = true;
// } else {
// has_gtid_port = false;
// }
// if (resultset) { delete resultset; resultset = NULL; }
//
// purge_mysql_servers_table();
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
// mydb->execute("DELETE FROM mysql_servers");
// generate_mysql_servers_table();
//
// const auto mysql_servers_checksum = get_mysql_servers_checksum();
//
// char buf[80];
// uint32_t d32[2];
// memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum));
// sprintf(buf, "0x%0X%0X", d32[0], d32[1]);
// pthread_mutex_lock(&GloVars.checksum_mutex);
// GloVars.checksums_values.mysql_servers.set_checksum(buf);
// GloVars.checksums_values.mysql_servers.version++;
// //struct timespec ts;
// //clock_gettime(CLOCK_REALTIME, &ts);
// time_t t = time(NULL);
//
// if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false &&
// GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) {
// GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch;
// } else {
// GloVars.checksums_values.mysql_servers.epoch = t;
// }
//
// GloVars.checksums_values.updates_cnt++;
// GloVars.generate_global_checksum();
// GloVars.epoch_version = t;
// pthread_mutex_unlock(&GloVars.checksum_mutex);
//
// update_hostgroup_manager_mappings();
// update_table_mysql_servers_for_monitor(false);
//}
// we always assume that the calling thread has acquired a rdlock()
int MySQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
@ -1908,9 +1907,11 @@ void MySQL_HostGroups_Manager::update_hostgroup_manager_mappings() {
}
bool MySQL_HostGroups_Manager::commit(
SQLite3_result* runtime_mysql_servers, SQLite3_result* mysql_servers_incoming,
const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming
SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server,
SQLite3_result* mysql_servers_incoming, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming,
bool only_commit_runtime_mysql_servers
) {
// if only_commit_runtime_mysql_servers is true, mysql_servers_incoming resultset will not be entertained and will cause memory leak.
unsigned long long curtime1=monotonic_time();
wrlock();
@ -1939,9 +1940,9 @@ bool MySQL_HostGroups_Manager::commit(
}
if (resultset) { delete resultset; resultset=NULL; }
}
char *query=NULL;
char *query=NULL;
query=(char *)"SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
@ -1969,15 +1970,14 @@ bool MySQL_HostGroups_Manager::commit(
//mydb->execute("DELETE FROM mysql_servers");
//generate_mysql_servers_table();
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming");
// SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query=(char *)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
@ -2120,74 +2120,75 @@ bool MySQL_HostGroups_Manager::commit(
has_gtid_port = false;
}
if (resultset) { delete resultset; resultset=NULL; }
//proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n");
//mydb->execute("DELETE FROM mysql_servers_incoming");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n");
mydb->execute("DELETE FROM mysql_servers_incoming");
// replication
if (incoming_replication_hostgroups) { // this IF is extremely important, otherwise replication hostgroups may disappear
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_replication_hostgroups");
generate_mysql_replication_hostgroups_table();
}
// group replication
if (incoming_group_replication_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
}
uint64_t mysql_servers_incoming_checksum = 0;
// galera
if (incoming_galera_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_galera_hostgroups\n");
mydb->execute("DELETE FROM mysql_galera_hostgroups");
generate_mysql_galera_hostgroups_table();
}
if (only_commit_runtime_mysql_servers == false) {
// replication
if (incoming_replication_hostgroups) { // this IF is extremely important, otherwise replication hostgroups may disappear
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_replication_hostgroups");
generate_mysql_replication_hostgroups_table();
}
// AWS Aurora
if (incoming_aws_aurora_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_aws_aurora_hostgroups\n");
mydb->execute("DELETE FROM mysql_aws_aurora_hostgroups");
generate_mysql_aws_aurora_hostgroups_table();
}
// group replication
if (incoming_group_replication_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
}
// hostgroup attributes
if (incoming_hostgroup_attributes) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_hostgroup_attributes\n");
mydb->execute("DELETE FROM mysql_hostgroup_attributes");
generate_mysql_hostgroup_attributes_table();
}
// galera
if (incoming_galera_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_galera_hostgroups\n");
mydb->execute("DELETE FROM mysql_galera_hostgroups");
generate_mysql_galera_hostgroups_table();
}
// AWS Aurora
if (incoming_aws_aurora_hostgroups) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_aws_aurora_hostgroups\n");
mydb->execute("DELETE FROM mysql_aws_aurora_hostgroups");
generate_mysql_aws_aurora_hostgroups_table();
}
//if (GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers)
{
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
// hostgroup attributes
if (incoming_hostgroup_attributes) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_hostgroup_attributes\n");
mydb->execute("DELETE FROM mysql_hostgroup_attributes");
generate_mysql_hostgroup_attributes_table();
}
const auto mysql_servers_checksum = get_mysql_servers_checksum(runtime_mysql_servers);
const auto mysql_servers_incoming_checksum = get_mysql_servers_incoming_checksum(mysql_servers_incoming, false);
mysql_servers_incoming_checksum = get_mysql_servers_incoming_checksum(mysql_servers_incoming, false);
}
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n");
mydb->execute("DELETE FROM mysql_servers_incoming");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char buf[80];
uint32_t d32[2];
const time_t t = time(NULL);
const auto mysql_servers_checksum = get_mysql_servers_checksum(runtime_mysql_servers);
memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum));
sprintf(buf, "0x%0X%0X", d32[0], d32[1]);
pthread_mutex_lock(&GloVars.checksum_mutex);
GloVars.checksums_values.mysql_servers.set_checksum(buf);
GloVars.checksums_values.mysql_servers.version++;
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false &&
GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) {
GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch;
} else {
GloVars.checksums_values.mysql_servers.epoch = t;
}
char buf[80];
uint32_t d32[2];
const time_t t = time(NULL);
memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum));
sprintf(buf, "0x%0X%0X", d32[0], d32[1]);
pthread_mutex_lock(&GloVars.checksum_mutex);
GloVars.checksums_values.mysql_servers.set_checksum(buf);
GloVars.checksums_values.mysql_servers.version++;
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false &&
GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) {
GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch;
} else {
GloVars.checksums_values.mysql_servers.epoch = t;
}
if (only_commit_runtime_mysql_servers == false) {
memcpy(&d32, &mysql_servers_incoming_checksum, sizeof(mysql_servers_incoming_checksum));
sprintf(buf, "0x%0X%0X", d32[0], d32[1]);
GloVars.checksums_values.mysql_servers_incoming.set_checksum(buf);
@ -2199,13 +2200,13 @@ bool MySQL_HostGroups_Manager::commit(
} else {
GloVars.checksums_values.mysql_servers_incoming.epoch = t;
}
GloVars.checksums_values.updates_cnt++;
GloVars.generate_global_checksum();
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
GloVars.checksums_values.updates_cnt++;
GloVars.generate_global_checksum();
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
// fill Hostgroup_Manager_Mapping with latest records
update_hostgroup_manager_mappings();
@ -2244,56 +2245,67 @@ bool MySQL_HostGroups_Manager::commit(
uint64_t MySQL_HostGroups_Manager::get_mysql_servers_checksum(SQLite3_result* runtime_mysql_servers) {
//Note: GloVars.checksum_mutex needs to be locked
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
char* query = (char*)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
SQLite3_result* resultset = nullptr;
if (runtime_mysql_servers == nullptr) {
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
mydb->execute_statement(MYHGM_GEN_ADMIN_RUNTIME_SERVERS, &error, &cols, &affected_rows, &resultset);
// Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster
// nodes, or relevant for checksum computation.
const size_t init_row_count = resultset->rows_count;
size_t rm_rows_count = 0;
const auto is_offline_server = [&rm_rows_count](SQLite3_row* row) {
if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) {
rm_rows_count += 1;
return true;
if (resultset) {
if (resultset->rows_count) {
// Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster
// nodes, or relevant for checksum computation.
const size_t init_row_count = resultset->rows_count;
size_t rm_rows_count = 0;
const auto is_offline_server = [&rm_rows_count](SQLite3_row* row) {
if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) {
rm_rows_count += 1;
return true;
} else {
return false;
}
};
resultset->rows.erase(
std::remove_if(resultset->rows.begin(), resultset->rows.end(), is_offline_server),
resultset->rows.end()
);
resultset->rows_count = init_row_count - rm_rows_count;
save_runtime_mysql_servers(resultset);
} else {
return false;
delete resultset;
resultset = nullptr;
}
};
resultset->rows.erase(
std::remove_if(resultset->rows.begin(), resultset->rows.end(), is_offline_server),
resultset->rows.end()
);
resultset->rows_count = init_row_count - rm_rows_count;
save_runtime_mysql_servers(resultset);
} else {
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0);
}
} else {
resultset = runtime_mysql_servers;
save_runtime_mysql_servers(runtime_mysql_servers);
}
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = 0;
if (resultset) {
if (resultset->rows_count) {
uint64_t hash1_ = resultset->raw_checksum();
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_;
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_);
}
delete resultset;
} else {
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0);
}
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = compute_mysql_servers_raw_checksum(resultset);
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]);
//char* error = NULL;
//int cols = 0;
//int affected_rows = 0;
//SQLite3_result* resultset = NULL;
//char* query = (char*)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port";
//mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
//if (resultset) {
// if (resultset->rows_count) {
// uint64_t hash1_ = resultset->raw_checksum();
// table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_;
// proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_);
// }
// delete resultset;
//} else {
// proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0);
//}
return table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS];
}
@ -2302,14 +2314,14 @@ uint64_t MySQL_HostGroups_Manager::get_mysql_servers_incoming_checksum(SQLite3_r
bool use_precalculated_checksum) {
//Note: GloVars.checksum_mutex needs to be locked
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
SQLite3_result* resultset = nullptr;
if (incoming_mysql_servers == nullptr)
{
char* error = nullptr;
int cols = 0;
int affected_rows = 0;
mydb->execute_statement(MYHGM_GEN_INCOMING_MYSQL_SERVERS, &error, &cols, &affected_rows, &resultset);
if (resultset) {
if (resultset->rows_count) {
@ -2331,80 +2343,31 @@ uint64_t MySQL_HostGroups_Manager::get_mysql_servers_incoming_checksum(SQLite3_r
resultset->rows_count = init_row_count - rm_rows_count;
save_mysql_servers_incoming(resultset);
} else {
delete resultset;
resultset = nullptr;
}
} else {
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", (long unsigned int)0);
}
} else {
resultset = incoming_mysql_servers;
save_mysql_servers_incoming(incoming_mysql_servers);
}
if (use_precalculated_checksum == false) {
// reset all checksum
table_resultset_checksum.fill(0);
// reset checksum
//table_resultset_checksum[MYSQL_SERVERS_INCOMING] = 0;
table_resultset_checksum[MYSQL_REPLICATION_HOSTGROUPS] = 0;
table_resultset_checksum[MYSQL_GROUP_REPLICATION_HOSTGROUPS] = 0;
table_resultset_checksum[MYSQL_GALERA_HOSTGROUPS] = 0;
table_resultset_checksum[MYSQL_AWS_AURORA_HOSTGROUPS] = 0;
table_resultset_checksum[MYSQL_HOSTGROUP_ATTRIBUTES] = 0;
}
resultset = get_current_mysql_table("mysql_servers_incoming");
if (resultset) {
int status_idx = -1;
for (int i = 0; i < resultset->columns; i++) {
if (resultset->column_definition[i] && resultset->column_definition[i]->name &&
strcasecmp(resultset->column_definition[i]->name, "status") == 0) {
status_idx = i;
break;
}
}
if (resultset->rows_count) {
uint64_t hash1 = 0, hash2 = 0;
SpookyHash myhash;
myhash.Init(19, 3);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const char* status_conv = nullptr;
const char* status = r->fields[status_idx];
if (status) {
if (strcasecmp(status, "OFFLINE_HARD") == 0)
continue;
if (strcasecmp(status, "ONLINE") == 0 ||
strcasecmp(status, "SHUNNED") == 0) {
status_conv = "0";
} else if (strcasecmp(status, "OFFLINE_SOFT") == 0) {
status_conv = "2";
}
}
for (int i = 0; i < resultset->columns; i++) {
if (i != status_idx) {
if (r->fields[i]) {
myhash.Update(r->fields[i], r->sizes[i]);
} else {
myhash.Update("", 0);
}
} else {
if (status_conv) {
myhash.Update(status_conv, strlen(status_conv));
} else {
myhash.Update("", 0);
}
}
}
}
myhash.Final(&hash1, &hash2);
table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_INCOMING] = hash1;
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", hash1);
}
}
table_resultset_checksum[MYSQL_SERVERS_INCOMING] = compute_mysql_servers_raw_checksum(resultset);
proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", table_resultset_checksum[MYSQL_SERVERS_INCOMING]);
/*char* query = (char*)"SELECT hostgroup_id,hostname,port,gtid_port,CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, \
weight,compression,max_connections, max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE status<>3 ORDER BY hostgroup_id, hostname, port";
mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset);
@ -5154,11 +5117,7 @@ void MySQL_HostGroups_Manager::read_only_action_v2(const std::list<std::tuple<st
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
// reset hgsm_mysql_servers_checksum checksum
hgsm_mysql_servers_checksum = 0;
const auto mysql_servers_checksum = get_mysql_servers_checksum();
hgsm_mysql_servers_checksum = mysql_servers_checksum;
char buf[80];
@ -8169,3 +8128,66 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv)
srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD;
srv->ConnectionsFree->drop_all_connections();
}
/**
* @brief This function computes the checksum of the mysql_servers resultset.
* As the checksum is being calculated, the function replaces the status values with their respective integer values.
*
* @param mysql_servers resultset of mysql_servers or mysql_servers_incoming.
*/
uint64_t MySQL_HostGroups_Manager::compute_mysql_servers_raw_checksum(const SQLite3_result* mysql_servers) {
if (!mysql_servers || mysql_servers->rows_count == 0)
return 0;
int status_idx = -1;
for (int i = 0; i < mysql_servers->columns; i++) {
if (mysql_servers->column_definition[i] && mysql_servers->column_definition[i]->name &&
strcmp(mysql_servers->column_definition[i]->name, "status") == 0) {
status_idx = i;
break;
}
}
if (status_idx == -1) assert(0);
SpookyHash myhash;
myhash.Init(19, 3);
for (const SQLite3_row* r : mysql_servers->rows) {
const char* mapped_status = "";
const char* status = r->fields[status_idx];
if (status) {
if (strcasecmp(status, "OFFLINE_HARD") == 0)
continue;
if (strcasecmp(status, "ONLINE") == 0 ||
strcasecmp(status, "SHUNNED") == 0) {
mapped_status = "0";
} else if (strcasecmp(status, "OFFLINE_SOFT") == 0) {
mapped_status = "2";
}
}
for (int i = 0; i < mysql_servers->columns; i++) {
if (r->fields[i]) {
if (i != status_idx) {
myhash.Update(r->fields[i], r->sizes[i]);
} else {
myhash.Update(mapped_status, strlen(mapped_status));
}
} else {
myhash.Update("", 0);
}
}
}
uint64_t res_hash = 0, hash2 = 0;
myhash.Final(&res_hash, &hash2);
return res_hash;
}

@ -12059,7 +12059,7 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime(const incoming_servers_t& inc
}
// commit all the changes
MyHGM->commit(runtime_mysql_servers, incoming_mysql_servers, peer_runtime_mysql_server, peer_mysql_server_incoming);
MyHGM->commit(runtime_mysql_servers, peer_runtime_mysql_server, incoming_mysql_servers, peer_mysql_server_incoming);
// quering runtime table will update and return latest records, so this is not needed.
// GloAdmin->save_mysql_servers_runtime_to_database(true);

@ -1736,7 +1736,7 @@ void ProxySQL_Cluster::pull_runtime_mysql_servers_from_peer(const runtime_mysql_
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Loading runtime_mysql_servers from peer %s:%d into mysql_servers_incoming", hostname, port);
MyHGM->servers_add(runtime_mysql_servers_resultset.get());
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Updating runtime_mysql_servers from peer %s:%d", hostname, port);
MyHGM->update_runtime_mysql_servers_table(runtime_mysql_servers_resultset.get(), peer_runtime_mysql_server);
MyHGM->commit(runtime_mysql_servers_resultset.release(), peer_runtime_mysql_server, nullptr, {}, true);
MyHGM->wrunlock();
// free result
@ -2847,7 +2847,7 @@ bool ProxySQL_Cluster_Nodes::Update_Global_Checksum(char * _h, uint16_t _p, MYSQ
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum 0x%llX for peer %s:%d matches\n", v, node->get_hostname(), node->get_port());
ret = false;
} else {
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum for peer %s:%d is different from fetched one. Local checksum:[0x%llX] Fetched checksum:[0x%llX]\n", node->get_hostname(), node->get_port(), node->global_checksum, v);
proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum for peer %s:%d is different from fetched one. Local checksum:[0x%lX] Fetched checksum:[0x%lX]\n", node->get_hostname(), node->get_port(), node->global_checksum, v);
node->global_checksum = v;
}
}

Loading…
Cancel
Save