From 09f76511b1f2caa7f928ac2e6aa1d04b2e94aa19 Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 6 Apr 2023 16:36:33 +0500 Subject: [PATCH] * Added runtime mysql server logic in commit. * Changed commit signature. * Few fixes --- include/MySQL_HostGroups_Manager.h | 13 +- lib/MySQL_HostGroups_Manager.cpp | 832 +++++++++++++++-------------- lib/ProxySQL_Admin.cpp | 2 +- lib/ProxySQL_Cluster.cpp | 4 +- 4 files changed, 438 insertions(+), 413 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index 332fd161d..ee313df31 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -393,14 +393,14 @@ class MySQL_HostGroups_Manager { #endif enum HGM_TABLES { - MYSQL_SERVERS = 0, + MYSQL_SERVERS_INCOMING = 0, MYSQL_REPLICATION_HOSTGROUPS, MYSQL_GROUP_REPLICATION_HOSTGROUPS, MYSQL_GALERA_HOSTGROUPS, MYSQL_AWS_AURORA_HOSTGROUPS, MYSQL_HOSTGROUP_ATTRIBUTES, + MYSQL_SERVERS, - MYSQL_SERVERS_INCOMING, __HGM_TABLES_SIZE }; @@ -698,9 +698,10 @@ class MySQL_HostGroups_Manager { void wrlock(); void wrunlock(); int servers_add(SQLite3_result *resultset); - void update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server); - bool commit(SQLite3_result* runtime_mysql_servers = nullptr, SQLite3_result* mysql_servers_incoming = nullptr, - const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server = {}, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming = {}); + //void update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server); + bool commit(SQLite3_result* runtime_mysql_servers = nullptr, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server = {}, + SQLite3_result* mysql_servers_incoming = nullptr, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming = {}, + bool only_commit_runtime_mysql_servers = false); void commit_update_checksums_from_tables(); void CUCFT1(const string& TableName, const string& ColumnName, uint64_t& raw_checksum); // used by commit_update_checksums_from_tables() @@ -826,6 +827,8 @@ class MySQL_HostGroups_Manager { MySrvC* find_server_in_hg(unsigned int _hid, const std::string& addr, int port); private: + static uint64_t compute_mysql_servers_raw_checksum(const SQLite3_result* runtime_mysql_servers); + void update_hostgroup_manager_mappings(); uint64_t get_mysql_servers_checksum(SQLite3_result* runtime_mysql_servers = nullptr); uint64_t get_mysql_servers_incoming_checksum(SQLite3_result* incoming_mysql_servers, bool use_precalculated_checksum = true); diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 50f39ff46..3fb6fa1da 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -1495,235 +1495,234 @@ unsigned int MySQL_HostGroups_Manager::get_servers_table_version() { return __sync_fetch_and_add(&status.servers_table_version,0); } -/** - * @brief Generates runtime mysql server records and checksum. - * - * IMPORTANT: It's assumed that the previous queries were successful and that the resultsets are received in - * the specified order. - * @param can be null or previously generated runtime_mysql_servers resultset can be passed. - */ -void MySQL_HostGroups_Manager::update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, - const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server) { - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - // if any server has gtid_port enabled, use_gtid is set to true - // and then has_gtid_port is set too - bool use_gtid = false; - - mydb->execute("DELETE FROM mysql_servers"); - generate_mysql_servers_table(); - - const char* query = "SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL"; - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); - if (error) { - proxy_error("Error on %s : %s\n", query, error); - } else { - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Dumping mysql_servers LEFT JOIN mysql_servers_incoming\n"); - resultset->dump_to_stderr(); - } - - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { - SQLite3_row* r = *it; - long long ptr = atoll(r->fields[0]); - proxy_warning("Removed server at address %lld, hostgroup %s, address %s port %s. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them\n", ptr, r->fields[1], r->fields[2], r->fields[3]); - MySrvC* mysrvc = (MySrvC*)ptr; - mysrvc->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; - mysrvc->ConnectionsFree->drop_all_connections(); - char* q1 = (char*)"DELETE FROM mysql_servers WHERE mem_pointer=%lld"; - char* q2 = (char*)malloc(strlen(q1) + 32); - sprintf(q2, q1, ptr); - mydb->execute(q2); - free(q2); - } - } - if (resultset) { delete resultset; resultset = NULL; } - - mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming"); - - // SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet) - query = (char*)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment"; - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); - if (error) { - proxy_error("Error on %s : %s\n", query, error); - } else { - - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Dumping mysql_servers JOIN mysql_servers_incoming\n"); - resultset->dump_to_stderr(); - } - // optimization #829 - int rc; - sqlite3_stmt* statement1 = NULL; - sqlite3_stmt* statement2 = NULL; - //sqlite3 *mydb3=mydb->get_db(); - char* query1 = (char*)"UPDATE mysql_servers SET mem_pointer = ?1 WHERE hostgroup_id = ?2 AND hostname = ?3 AND port = ?4"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0); - rc = mydb->prepare_v2(query1, &statement1); - ASSERT_SQLITE_OK(rc, mydb); - char* query2 = (char*)"UPDATE mysql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 , gtid_port = ?9 WHERE hostgroup_id = ?10 AND hostname = ?11 AND port = ?12"; - //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query2, -1, &statement2, 0); - rc = mydb->prepare_v2(query2, &statement2); - ASSERT_SQLITE_OK(rc, mydb); - - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { - SQLite3_row* r = *it; - long long ptr = atoll(r->fields[12]); // increase this index every time a new column is added - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), ptr, atoi(r->fields[0]), atoi(r->fields[6])); - //fprintf(stderr,"%lld\n", ptr); - if (ptr == 0) { - if (GloMTH->variables.hostgroup_manager_verbose) { - proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5])); - } - MySrvC* mysrvc = new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), atoi(r->fields[10]), r->fields[11]); // add new fields here if adding more columns in mysql_servers - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), mysrvc, atoi(r->fields[0])); - MyHGM->add(mysrvc, atoi(r->fields[0])); - ptr = (uintptr_t)mysrvc; - rc = (*proxy_sqlite3_bind_int64)(statement1, 1, ptr); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_text)(statement1, 3, r->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, mydb); - SAFE_SQLITE3_STEP2(statement1); - rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb); - if (mysrvc->gtid_port) { - // this server has gtid_port configured, we set use_gtid - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port); - use_gtid = true; - } - } else { - bool run_update = false; - MySrvC* mysrvc = (MySrvC*)ptr; - // carefully increase the 2nd index by 1 for every new column added - if (atoi(r->fields[3]) != atoi(r->fields[13])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing gtid_port for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), mysrvc->gtid_port, atoi(r->fields[13])); - mysrvc->gtid_port = atoi(r->fields[13]); - } - - if (atoi(r->fields[4]) != atoi(r->fields[14])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), mysrvc->weight, atoi(r->fields[14])); - mysrvc->weight = atoi(r->fields[14]); - } - if (atoi(r->fields[5]) != atoi(r->fields[15])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]), mysrvc->status, atoi(r->fields[15])); - mysrvc->status = (MySerStatus)atoi(r->fields[15]); - if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED) { - mysrvc->shunned_automatic = false; - } - } - if (atoi(r->fields[6]) != atoi(r->fields[16])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]), mysrvc->compression, atoi(r->fields[16])); - mysrvc->compression = atoi(r->fields[16]); - } - if (atoi(r->fields[7]) != atoi(r->fields[17])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]), mysrvc->max_connections, atoi(r->fields[17])); - mysrvc->max_connections = atoi(r->fields[17]); - } - if (atoi(r->fields[8]) != atoi(r->fields[18])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]), mysrvc->max_replication_lag, atoi(r->fields[18])); - mysrvc->max_replication_lag = atoi(r->fields[18]); - if (mysrvc->max_replication_lag == 0) { // we just changed it to 0 - if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { - // the server is currently shunned due to replication lag - // but we reset max_replication_lag to 0 - // therefore we immediately reset the status too - mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; - } - } - } - if (atoi(r->fields[9]) != atoi(r->fields[19])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]), mysrvc->use_ssl, atoi(r->fields[19])); - mysrvc->use_ssl = atoi(r->fields[19]); - } - if (atoi(r->fields[10]) != atoi(r->fields[20])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[10]), mysrvc->max_latency_us / 1000, atoi(r->fields[20])); - mysrvc->max_latency_us = 1000 * atoi(r->fields[20]); - } - if (strcmp(r->fields[11], r->fields[21])) { - if (GloMTH->variables.hostgroup_manager_verbose) - proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[11], r->fields[21]); - free(mysrvc->comment); - mysrvc->comment = strdup(r->fields[21]); - } - if (run_update) { - rc = (*proxy_sqlite3_bind_int64)(statement2, 1, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 2, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 3, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 4, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 5, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 6, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 7, mysrvc->max_latency_us / 1000); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 10, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_text)(statement2, 11, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_bind_int64)(statement2, 12, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb); - SAFE_SQLITE3_STEP2(statement2); - rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, mydb); - rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, mydb); - } - if (mysrvc->gtid_port) { - // this server has gtid_port configured, we set use_gtid - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port); - use_gtid = true; - } - } - } - (*proxy_sqlite3_finalize)(statement1); - (*proxy_sqlite3_finalize)(statement2); - } - if (use_gtid) { - has_gtid_port = true; - } else { - has_gtid_port = false; - } - if (resultset) { delete resultset; resultset = NULL; } - - purge_mysql_servers_table(); - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); - mydb->execute("DELETE FROM mysql_servers"); - generate_mysql_servers_table(); - - const auto mysql_servers_checksum = get_mysql_servers_checksum(); - - update_hostgroup_manager_mappings(); - - char buf[80]; - uint32_t d32[2]; - memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum)); - sprintf(buf, "0x%0X%0X", d32[0], d32[1]); - pthread_mutex_lock(&GloVars.checksum_mutex); - GloVars.checksums_values.mysql_servers.set_checksum(buf); - GloVars.checksums_values.mysql_servers.version++; - //struct timespec ts; - //clock_gettime(CLOCK_REALTIME, &ts); - time_t t = time(NULL); - - if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false && - GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) { - GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch; - } else { - GloVars.checksums_values.mysql_servers.epoch = t; - } - - GloVars.checksums_values.updates_cnt++; - GloVars.generate_global_checksum(); - GloVars.epoch_version = t; - pthread_mutex_unlock(&GloVars.checksum_mutex); - - update_table_mysql_servers_for_monitor(false); -} +///** +// * @brief Generates runtime mysql server records and checksum. +// * +// * IMPORTANT: It's assumed that the previous queries were successful and that the resultsets are received in +// * the specified order. +// * @param can be null or previously generated runtime_mysql_servers resultset can be passed. +// */ +//void MySQL_HostGroups_Manager::update_runtime_mysql_servers_table(SQLite3_result* runtime_mysql_servers, +// const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server) { +// char* error = NULL; +// int cols = 0; +// int affected_rows = 0; +// SQLite3_result* resultset = NULL; +// // if any server has gtid_port enabled, use_gtid is set to true +// // and then has_gtid_port is set too +// bool use_gtid = false; +// +// mydb->execute("DELETE FROM mysql_servers"); +// generate_mysql_servers_table(); +// +// const char* query = "SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL"; +// mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); +// if (error) { +// proxy_error("Error on %s : %s\n", query, error); +// } else { +// if (GloMTH->variables.hostgroup_manager_verbose) { +// proxy_info("Dumping mysql_servers LEFT JOIN mysql_servers_incoming\n"); +// resultset->dump_to_stderr(); +// } +// +// for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { +// SQLite3_row* r = *it; +// long long ptr = atoll(r->fields[0]); +// proxy_warning("Removed server at address %lld, hostgroup %s, address %s port %s. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them\n", ptr, r->fields[1], r->fields[2], r->fields[3]); +// MySrvC* mysrvc = (MySrvC*)ptr; +// mysrvc->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; +// mysrvc->ConnectionsFree->drop_all_connections(); +// char* q1 = (char*)"DELETE FROM mysql_servers WHERE mem_pointer=%lld"; +// char* q2 = (char*)malloc(strlen(q1) + 32); +// sprintf(q2, q1, ptr); +// mydb->execute(q2); +// free(q2); +// } +// } +// if (resultset) { delete resultset; resultset = NULL; } +// +// mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming"); +// +// // SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet) +// query = (char*)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment"; +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); +// mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); +// if (error) { +// proxy_error("Error on %s : %s\n", query, error); +// } else { +// +// if (GloMTH->variables.hostgroup_manager_verbose) { +// proxy_info("Dumping mysql_servers JOIN mysql_servers_incoming\n"); +// resultset->dump_to_stderr(); +// } +// // optimization #829 +// int rc; +// sqlite3_stmt* statement1 = NULL; +// sqlite3_stmt* statement2 = NULL; +// //sqlite3 *mydb3=mydb->get_db(); +// char* query1 = (char*)"UPDATE mysql_servers SET mem_pointer = ?1 WHERE hostgroup_id = ?2 AND hostname = ?3 AND port = ?4"; +// //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0); +// rc = mydb->prepare_v2(query1, &statement1); +// ASSERT_SQLITE_OK(rc, mydb); +// char* query2 = (char*)"UPDATE mysql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 , gtid_port = ?9 WHERE hostgroup_id = ?10 AND hostname = ?11 AND port = ?12"; +// //rc=(*proxy_sqlite3_prepare_v2)(mydb3, query2, -1, &statement2, 0); +// rc = mydb->prepare_v2(query2, &statement2); +// ASSERT_SQLITE_OK(rc, mydb); +// +// for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { +// SQLite3_row* r = *it; +// long long ptr = atoll(r->fields[12]); // increase this index every time a new column is added +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), ptr, atoi(r->fields[0]), atoi(r->fields[6])); +// //fprintf(stderr,"%lld\n", ptr); +// if (ptr == 0) { +// if (GloMTH->variables.hostgroup_manager_verbose) { +// proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5])); +// } +// MySrvC* mysrvc = new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), atoi(r->fields[10]), r->fields[11]); // add new fields here if adding more columns in mysql_servers +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus)atoi(r->fields[5]), mysrvc, atoi(r->fields[0])); +// MyHGM->add(mysrvc, atoi(r->fields[0])); +// ptr = (uintptr_t)mysrvc; +// rc = (*proxy_sqlite3_bind_int64)(statement1, 1, ptr); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement1, 2, atoi(r->fields[0])); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_text)(statement1, 3, r->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r->fields[2])); ASSERT_SQLITE_OK(rc, mydb); +// SAFE_SQLITE3_STEP2(statement1); +// rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb); +// if (mysrvc->gtid_port) { +// // this server has gtid_port configured, we set use_gtid +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port); +// use_gtid = true; +// } +// } else { +// bool run_update = false; +// MySrvC* mysrvc = (MySrvC*)ptr; +// // carefully increase the 2nd index by 1 for every new column added +// if (atoi(r->fields[3]) != atoi(r->fields[13])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing gtid_port for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), mysrvc->gtid_port, atoi(r->fields[13])); +// mysrvc->gtid_port = atoi(r->fields[13]); +// } +// +// if (atoi(r->fields[4]) != atoi(r->fields[14])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), mysrvc->weight, atoi(r->fields[14])); +// mysrvc->weight = atoi(r->fields[14]); +// } +// if (atoi(r->fields[5]) != atoi(r->fields[15])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]), mysrvc->status, atoi(r->fields[15])); +// mysrvc->status = (MySerStatus)atoi(r->fields[15]); +// if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED) { +// mysrvc->shunned_automatic = false; +// } +// } +// if (atoi(r->fields[6]) != atoi(r->fields[16])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]), mysrvc->compression, atoi(r->fields[16])); +// mysrvc->compression = atoi(r->fields[16]); +// } +// if (atoi(r->fields[7]) != atoi(r->fields[17])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]), mysrvc->max_connections, atoi(r->fields[17])); +// mysrvc->max_connections = atoi(r->fields[17]); +// } +// if (atoi(r->fields[8]) != atoi(r->fields[18])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]), mysrvc->max_replication_lag, atoi(r->fields[18])); +// mysrvc->max_replication_lag = atoi(r->fields[18]); +// if (mysrvc->max_replication_lag == 0) { // we just changed it to 0 +// if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) { +// // the server is currently shunned due to replication lag +// // but we reset max_replication_lag to 0 +// // therefore we immediately reset the status too +// mysrvc->status = MYSQL_SERVER_STATUS_ONLINE; +// } +// } +// } +// if (atoi(r->fields[9]) != atoi(r->fields[19])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]), mysrvc->use_ssl, atoi(r->fields[19])); +// mysrvc->use_ssl = atoi(r->fields[19]); +// } +// if (atoi(r->fields[10]) != atoi(r->fields[20])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[10]), mysrvc->max_latency_us / 1000, atoi(r->fields[20])); +// mysrvc->max_latency_us = 1000 * atoi(r->fields[20]); +// } +// if (strcmp(r->fields[11], r->fields[21])) { +// if (GloMTH->variables.hostgroup_manager_verbose) +// proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[11], r->fields[21]); +// free(mysrvc->comment); +// mysrvc->comment = strdup(r->fields[21]); +// } +// if (run_update) { +// rc = (*proxy_sqlite3_bind_int64)(statement2, 1, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 2, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 3, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 4, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 5, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 6, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 7, mysrvc->max_latency_us / 1000); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 10, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_text)(statement2, 11, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_bind_int64)(statement2, 12, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb); +// SAFE_SQLITE3_STEP2(statement2); +// rc = (*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, mydb); +// rc = (*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, mydb); +// } +// if (mysrvc->gtid_port) { +// // this server has gtid_port configured, we set use_gtid +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port); +// use_gtid = true; +// } +// } +// } +// (*proxy_sqlite3_finalize)(statement1); +// (*proxy_sqlite3_finalize)(statement2); +// } +// if (use_gtid) { +// has_gtid_port = true; +// } else { +// has_gtid_port = false; +// } +// if (resultset) { delete resultset; resultset = NULL; } +// +// purge_mysql_servers_table(); +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n"); +// mydb->execute("DELETE FROM mysql_servers"); +// generate_mysql_servers_table(); +// +// const auto mysql_servers_checksum = get_mysql_servers_checksum(); +// +// char buf[80]; +// uint32_t d32[2]; +// memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum)); +// sprintf(buf, "0x%0X%0X", d32[0], d32[1]); +// pthread_mutex_lock(&GloVars.checksum_mutex); +// GloVars.checksums_values.mysql_servers.set_checksum(buf); +// GloVars.checksums_values.mysql_servers.version++; +// //struct timespec ts; +// //clock_gettime(CLOCK_REALTIME, &ts); +// time_t t = time(NULL); +// +// if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false && +// GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) { +// GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch; +// } else { +// GloVars.checksums_values.mysql_servers.epoch = t; +// } +// +// GloVars.checksums_values.updates_cnt++; +// GloVars.generate_global_checksum(); +// GloVars.epoch_version = t; +// pthread_mutex_unlock(&GloVars.checksum_mutex); +// +// update_hostgroup_manager_mappings(); +// update_table_mysql_servers_for_monitor(false); +//} // we always assume that the calling thread has acquired a rdlock() int MySQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) { @@ -1908,9 +1907,11 @@ void MySQL_HostGroups_Manager::update_hostgroup_manager_mappings() { } bool MySQL_HostGroups_Manager::commit( - SQLite3_result* runtime_mysql_servers, SQLite3_result* mysql_servers_incoming, - const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming + SQLite3_result* runtime_mysql_servers, const runtime_mysql_servers_checksum_t& peer_runtime_mysql_server, + SQLite3_result* mysql_servers_incoming, const mysql_servers_incoming_checksum_t& peer_mysql_server_incoming, + bool only_commit_runtime_mysql_servers ) { + // if only_commit_runtime_mysql_servers is true, mysql_servers_incoming resultset will not be entertained and will cause memory leak. unsigned long long curtime1=monotonic_time(); wrlock(); @@ -1939,9 +1940,9 @@ bool MySQL_HostGroups_Manager::commit( } if (resultset) { delete resultset; resultset=NULL; } } - char *query=NULL; + char *query=NULL; query=(char *)"SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL"; - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { @@ -1969,15 +1970,14 @@ bool MySQL_HostGroups_Manager::commit( //mydb->execute("DELETE FROM mysql_servers"); //generate_mysql_servers_table(); -// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming -// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n"); + // INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming + // proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n"); mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming"); - // SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet) query=(char *)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment"; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); - mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { proxy_error("Error on %s : %s\n", query, error); } else { @@ -2120,74 +2120,75 @@ bool MySQL_HostGroups_Manager::commit( has_gtid_port = false; } if (resultset) { delete resultset; resultset=NULL; } - //proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n"); - //mydb->execute("DELETE FROM mysql_servers_incoming"); + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n"); + mydb->execute("DELETE FROM mysql_servers_incoming"); - // replication - if (incoming_replication_hostgroups) { // this IF is extremely important, otherwise replication hostgroups may disappear - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n"); - mydb->execute("DELETE FROM mysql_replication_hostgroups"); - generate_mysql_replication_hostgroups_table(); - } - // group replication - if (incoming_group_replication_hostgroups) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n"); - mydb->execute("DELETE FROM mysql_group_replication_hostgroups"); - generate_mysql_group_replication_hostgroups_table(); - } + uint64_t mysql_servers_incoming_checksum = 0; - // galera - if (incoming_galera_hostgroups) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_galera_hostgroups\n"); - mydb->execute("DELETE FROM mysql_galera_hostgroups"); - generate_mysql_galera_hostgroups_table(); - } + if (only_commit_runtime_mysql_servers == false) { + // replication + if (incoming_replication_hostgroups) { // this IF is extremely important, otherwise replication hostgroups may disappear + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n"); + mydb->execute("DELETE FROM mysql_replication_hostgroups"); + generate_mysql_replication_hostgroups_table(); + } - // AWS Aurora - if (incoming_aws_aurora_hostgroups) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_aws_aurora_hostgroups\n"); - mydb->execute("DELETE FROM mysql_aws_aurora_hostgroups"); - generate_mysql_aws_aurora_hostgroups_table(); - } + // group replication + if (incoming_group_replication_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_group_replication_hostgroups\n"); + mydb->execute("DELETE FROM mysql_group_replication_hostgroups"); + generate_mysql_group_replication_hostgroups_table(); + } - // hostgroup attributes - if (incoming_hostgroup_attributes) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_hostgroup_attributes\n"); - mydb->execute("DELETE FROM mysql_hostgroup_attributes"); - generate_mysql_hostgroup_attributes_table(); - } + // galera + if (incoming_galera_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_galera_hostgroups\n"); + mydb->execute("DELETE FROM mysql_galera_hostgroups"); + generate_mysql_galera_hostgroups_table(); + } + // AWS Aurora + if (incoming_aws_aurora_hostgroups) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_aws_aurora_hostgroups\n"); + mydb->execute("DELETE FROM mysql_aws_aurora_hostgroups"); + generate_mysql_aws_aurora_hostgroups_table(); + } - //if (GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers) - { - mydb->execute("DELETE FROM mysql_servers"); - generate_mysql_servers_table(); + // hostgroup attributes + if (incoming_hostgroup_attributes) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_hostgroup_attributes\n"); + mydb->execute("DELETE FROM mysql_hostgroup_attributes"); + generate_mysql_hostgroup_attributes_table(); + } - const auto mysql_servers_checksum = get_mysql_servers_checksum(runtime_mysql_servers); - const auto mysql_servers_incoming_checksum = get_mysql_servers_incoming_checksum(mysql_servers_incoming, false); + mysql_servers_incoming_checksum = get_mysql_servers_incoming_checksum(mysql_servers_incoming, false); + } - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n"); - mydb->execute("DELETE FROM mysql_servers_incoming"); + mydb->execute("DELETE FROM mysql_servers"); + generate_mysql_servers_table(); - char buf[80]; - uint32_t d32[2]; - const time_t t = time(NULL); + const auto mysql_servers_checksum = get_mysql_servers_checksum(runtime_mysql_servers); - memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum)); - sprintf(buf, "0x%0X%0X", d32[0], d32[1]); - pthread_mutex_lock(&GloVars.checksum_mutex); - GloVars.checksums_values.mysql_servers.set_checksum(buf); - GloVars.checksums_values.mysql_servers.version++; - //struct timespec ts; - //clock_gettime(CLOCK_REALTIME, &ts); - if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false && - GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) { - GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch; - } else { - GloVars.checksums_values.mysql_servers.epoch = t; - } + char buf[80]; + uint32_t d32[2]; + const time_t t = time(NULL); + memcpy(&d32, &mysql_servers_checksum, sizeof(mysql_servers_checksum)); + sprintf(buf, "0x%0X%0X", d32[0], d32[1]); + pthread_mutex_lock(&GloVars.checksum_mutex); + GloVars.checksums_values.mysql_servers.set_checksum(buf); + GloVars.checksums_values.mysql_servers.version++; + //struct timespec ts; + //clock_gettime(CLOCK_REALTIME, &ts); + if (peer_runtime_mysql_server.epoch != 0 && peer_runtime_mysql_server.checksum.empty() == false && + GloVars.checksums_values.mysql_servers.checksum == peer_runtime_mysql_server.checksum) { + GloVars.checksums_values.mysql_servers.epoch = peer_runtime_mysql_server.epoch; + } else { + GloVars.checksums_values.mysql_servers.epoch = t; + } + + if (only_commit_runtime_mysql_servers == false) { memcpy(&d32, &mysql_servers_incoming_checksum, sizeof(mysql_servers_incoming_checksum)); sprintf(buf, "0x%0X%0X", d32[0], d32[1]); GloVars.checksums_values.mysql_servers_incoming.set_checksum(buf); @@ -2199,13 +2200,13 @@ bool MySQL_HostGroups_Manager::commit( } else { GloVars.checksums_values.mysql_servers_incoming.epoch = t; } - - GloVars.checksums_values.updates_cnt++; - GloVars.generate_global_checksum(); - GloVars.epoch_version = t; - pthread_mutex_unlock(&GloVars.checksum_mutex); } + GloVars.checksums_values.updates_cnt++; + GloVars.generate_global_checksum(); + GloVars.epoch_version = t; + pthread_mutex_unlock(&GloVars.checksum_mutex); + // fill Hostgroup_Manager_Mapping with latest records update_hostgroup_manager_mappings(); @@ -2244,56 +2245,67 @@ bool MySQL_HostGroups_Manager::commit( uint64_t MySQL_HostGroups_Manager::get_mysql_servers_checksum(SQLite3_result* runtime_mysql_servers) { //Note: GloVars.checksum_mutex needs to be locked - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; - char* query = (char*)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port"; - mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + SQLite3_result* resultset = nullptr; if (runtime_mysql_servers == nullptr) { char* error = NULL; int cols = 0; int affected_rows = 0; - SQLite3_result* resultset = NULL; mydb->execute_statement(MYHGM_GEN_ADMIN_RUNTIME_SERVERS, &error, &cols, &affected_rows, &resultset); - // Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster - // nodes, or relevant for checksum computation. - const size_t init_row_count = resultset->rows_count; - size_t rm_rows_count = 0; - const auto is_offline_server = [&rm_rows_count](SQLite3_row* row) { - if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) { - rm_rows_count += 1; - return true; + if (resultset) { + if (resultset->rows_count) { + // Remove 'OFFLINE_HARD' servers since they are not relevant to propagate to other Cluster + // nodes, or relevant for checksum computation. + const size_t init_row_count = resultset->rows_count; + size_t rm_rows_count = 0; + const auto is_offline_server = [&rm_rows_count](SQLite3_row* row) { + if (strcasecmp(row->fields[4], "OFFLINE_HARD") == 0) { + rm_rows_count += 1; + return true; + } else { + return false; + } + }; + resultset->rows.erase( + std::remove_if(resultset->rows.begin(), resultset->rows.end(), is_offline_server), + resultset->rows.end() + ); + resultset->rows_count = init_row_count - rm_rows_count; + + save_runtime_mysql_servers(resultset); } else { - return false; + delete resultset; + resultset = nullptr; } - }; - resultset->rows.erase( - std::remove_if(resultset->rows.begin(), resultset->rows.end(), is_offline_server), - resultset->rows.end() - ); - resultset->rows_count = init_row_count - rm_rows_count; - - save_runtime_mysql_servers(resultset); + } else { + proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0); + } } else { + resultset = runtime_mysql_servers; save_runtime_mysql_servers(runtime_mysql_servers); } - table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = 0; - - if (resultset) { - if (resultset->rows_count) { - uint64_t hash1_ = resultset->raw_checksum(); - table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_; - proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_); - } - delete resultset; - } else { - proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0); - } + table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = compute_mysql_servers_raw_checksum(resultset); + proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]); + + //char* error = NULL; + //int cols = 0; + //int affected_rows = 0; + //SQLite3_result* resultset = NULL; + //char* query = (char*)"SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers WHERE status<>3 ORDER BY hostgroup_id, hostname, port"; + //mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); + //if (resultset) { + // if (resultset->rows_count) { + // uint64_t hash1_ = resultset->raw_checksum(); + // table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS] = hash1_; + // proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", hash1_); + // } + // delete resultset; + //} else { + // proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers", (long unsigned int)0); + //} return table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS]; } @@ -2302,14 +2314,14 @@ uint64_t MySQL_HostGroups_Manager::get_mysql_servers_incoming_checksum(SQLite3_r bool use_precalculated_checksum) { //Note: GloVars.checksum_mutex needs to be locked - - char* error = NULL; - int cols = 0; - int affected_rows = 0; - SQLite3_result* resultset = NULL; + SQLite3_result* resultset = nullptr; if (incoming_mysql_servers == nullptr) { + char* error = nullptr; + int cols = 0; + int affected_rows = 0; + mydb->execute_statement(MYHGM_GEN_INCOMING_MYSQL_SERVERS, &error, &cols, &affected_rows, &resultset); if (resultset) { if (resultset->rows_count) { @@ -2331,80 +2343,31 @@ uint64_t MySQL_HostGroups_Manager::get_mysql_servers_incoming_checksum(SQLite3_r resultset->rows_count = init_row_count - rm_rows_count; save_mysql_servers_incoming(resultset); + } else { + delete resultset; + resultset = nullptr; } } else { proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", (long unsigned int)0); } } else { + resultset = incoming_mysql_servers; save_mysql_servers_incoming(incoming_mysql_servers); } if (use_precalculated_checksum == false) { - // reset all checksum - table_resultset_checksum.fill(0); + // reset checksum + //table_resultset_checksum[MYSQL_SERVERS_INCOMING] = 0; + table_resultset_checksum[MYSQL_REPLICATION_HOSTGROUPS] = 0; + table_resultset_checksum[MYSQL_GROUP_REPLICATION_HOSTGROUPS] = 0; + table_resultset_checksum[MYSQL_GALERA_HOSTGROUPS] = 0; + table_resultset_checksum[MYSQL_AWS_AURORA_HOSTGROUPS] = 0; + table_resultset_checksum[MYSQL_HOSTGROUP_ATTRIBUTES] = 0; } - resultset = get_current_mysql_table("mysql_servers_incoming"); - - if (resultset) { - int status_idx = -1; - - for (int i = 0; i < resultset->columns; i++) { - if (resultset->column_definition[i] && resultset->column_definition[i]->name && - strcasecmp(resultset->column_definition[i]->name, "status") == 0) { - status_idx = i; - break; - } - } - - if (resultset->rows_count) { - uint64_t hash1 = 0, hash2 = 0; - SpookyHash myhash; - myhash.Init(19, 3); - - for (std::vector::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) { - SQLite3_row* r = *it; - - const char* status_conv = nullptr; - const char* status = r->fields[status_idx]; - - if (status) { - if (strcasecmp(status, "OFFLINE_HARD") == 0) - continue; - - if (strcasecmp(status, "ONLINE") == 0 || - strcasecmp(status, "SHUNNED") == 0) { - status_conv = "0"; - } else if (strcasecmp(status, "OFFLINE_SOFT") == 0) { - status_conv = "2"; - } - } - - for (int i = 0; i < resultset->columns; i++) { - - if (i != status_idx) { - if (r->fields[i]) { - myhash.Update(r->fields[i], r->sizes[i]); - } else { - myhash.Update("", 0); - } - } else { - if (status_conv) { - myhash.Update(status_conv, strlen(status_conv)); - } else { - myhash.Update("", 0); - } - } - } - } - myhash.Final(&hash1, &hash2); - - table_resultset_checksum[HGM_TABLES::MYSQL_SERVERS_INCOMING] = hash1; - proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", hash1); - } - } + table_resultset_checksum[MYSQL_SERVERS_INCOMING] = compute_mysql_servers_raw_checksum(resultset); + proxy_info("Checksum for table %s is 0x%lX\n", "mysql_servers_incoming", table_resultset_checksum[MYSQL_SERVERS_INCOMING]); - /*char* query = (char*)"SELECT hostgroup_id,hostname,port,gtid_port,CASE status WHEN 0 OR 1 OR 4 THEN 0 ELSE status END status, \ weight,compression,max_connections, max_replication_lag,use_ssl,max_latency_ms,comment FROM mysql_servers_incoming WHERE status<>3 ORDER BY hostgroup_id, hostname, port"; mydb->execute_statement(query, &error, &cols, &affected_rows, &resultset); @@ -5154,11 +5117,7 @@ void MySQL_HostGroups_Manager::read_only_action_v2(const std::listexecute("DELETE FROM mysql_servers"); generate_mysql_servers_table(); - // reset hgsm_mysql_servers_checksum checksum - hgsm_mysql_servers_checksum = 0; - const auto mysql_servers_checksum = get_mysql_servers_checksum(); - hgsm_mysql_servers_checksum = mysql_servers_checksum; char buf[80]; @@ -8169,3 +8128,66 @@ void MySQL_HostGroups_Manager::HostGroup_Server_Mapping::remove_HGM(MySrvC* srv) srv->status = MYSQL_SERVER_STATUS_OFFLINE_HARD; srv->ConnectionsFree->drop_all_connections(); } + +/** + * @brief This function computes the checksum of the mysql_servers resultset. +* As the checksum is being calculated, the function replaces the status values with their respective integer values. + * + * @param mysql_servers resultset of mysql_servers or mysql_servers_incoming. + */ +uint64_t MySQL_HostGroups_Manager::compute_mysql_servers_raw_checksum(const SQLite3_result* mysql_servers) { + + if (!mysql_servers || mysql_servers->rows_count == 0) + return 0; + + int status_idx = -1; + + for (int i = 0; i < mysql_servers->columns; i++) { + if (mysql_servers->column_definition[i] && mysql_servers->column_definition[i]->name && + strcmp(mysql_servers->column_definition[i]->name, "status") == 0) { + status_idx = i; + break; + } + } + + if (status_idx == -1) assert(0); + + SpookyHash myhash; + myhash.Init(19, 3); + + for (const SQLite3_row* r : mysql_servers->rows) { + + const char* mapped_status = ""; + const char* status = r->fields[status_idx]; + + if (status) { + if (strcasecmp(status, "OFFLINE_HARD") == 0) + continue; + + if (strcasecmp(status, "ONLINE") == 0 || + strcasecmp(status, "SHUNNED") == 0) { + mapped_status = "0"; + } else if (strcasecmp(status, "OFFLINE_SOFT") == 0) { + mapped_status = "2"; + } + } + + for (int i = 0; i < mysql_servers->columns; i++) { + + if (r->fields[i]) { + if (i != status_idx) { + myhash.Update(r->fields[i], r->sizes[i]); + } else { + myhash.Update(mapped_status, strlen(mapped_status)); + } + } else { + myhash.Update("", 0); + } + } + } + + uint64_t res_hash = 0, hash2 = 0; + myhash.Final(&res_hash, &hash2); + + return res_hash; +} \ No newline at end of file diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 0e53edd36..5b351ff8e 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -12059,7 +12059,7 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime(const incoming_servers_t& inc } // commit all the changes - MyHGM->commit(runtime_mysql_servers, incoming_mysql_servers, peer_runtime_mysql_server, peer_mysql_server_incoming); + MyHGM->commit(runtime_mysql_servers, peer_runtime_mysql_server, incoming_mysql_servers, peer_mysql_server_incoming); // quering runtime table will update and return latest records, so this is not needed. // GloAdmin->save_mysql_servers_runtime_to_database(true); diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index 292e8f6fa..d47ebc287 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -1736,7 +1736,7 @@ void ProxySQL_Cluster::pull_runtime_mysql_servers_from_peer(const runtime_mysql_ proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Loading runtime_mysql_servers from peer %s:%d into mysql_servers_incoming", hostname, port); MyHGM->servers_add(runtime_mysql_servers_resultset.get()); proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Updating runtime_mysql_servers from peer %s:%d", hostname, port); - MyHGM->update_runtime_mysql_servers_table(runtime_mysql_servers_resultset.get(), peer_runtime_mysql_server); + MyHGM->commit(runtime_mysql_servers_resultset.release(), peer_runtime_mysql_server, nullptr, {}, true); MyHGM->wrunlock(); // free result @@ -2847,7 +2847,7 @@ bool ProxySQL_Cluster_Nodes::Update_Global_Checksum(char * _h, uint16_t _p, MYSQ proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum 0x%llX for peer %s:%d matches\n", v, node->get_hostname(), node->get_port()); ret = false; } else { - proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum for peer %s:%d is different from fetched one. Local checksum:[0x%llX] Fetched checksum:[0x%llX]\n", node->get_hostname(), node->get_port(), node->global_checksum, v); + proxy_debug(PROXY_DEBUG_CLUSTER, 5, "Global checksum for peer %s:%d is different from fetched one. Local checksum:[0x%lX] Fetched checksum:[0x%lX]\n", node->get_hostname(), node->get_port(), node->global_checksum, v); node->global_checksum = v; } }