From da99ec0a7e25bffd01807198ce560f858cfe43f3 Mon Sep 17 00:00:00 2001 From: Nick Vyzas Date: Fri, 21 Sep 2018 22:09:11 +0300 Subject: [PATCH] Enable mysql_query_rules_fast_routing in ProxySQL Cluster https://github.com/sysown/proxysql/issues/1675 --- lib/ProxySQL_Admin.cpp | 4 +- lib/ProxySQL_Cluster.cpp | 155 ++++++++++++++++++++++++++------------- src/proxysql.cfg | 14 +++- 3 files changed, 118 insertions(+), 55 deletions(-) diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index c20d48b39..cde98bb69 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -7332,7 +7332,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { int cols2 = 0; int affected_rows2 = 0; SQLite3_result *resultset2 = NULL; - char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing"; + char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing ORDER BY username, schemaname, flagIN"; admindb->execute_statement(query2, &error2 , &cols2 , &affected_rows2 , &resultset2); if (error) { proxy_error("Error on %s : %s\n", query, error); @@ -7343,6 +7343,8 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() { if (checksum_variables.checksum_mysql_query_rules) { pthread_mutex_lock(&GloVars.checksum_mutex); uint64_t hash1 = resultset->raw_checksum(); + uint64_t hash2 = resultset2->raw_checksum(); + hash1 += hash2; uint32_t d32[2]; char buf[20]; memcpy(&d32, &hash1, sizeof(hash1)); diff --git a/lib/ProxySQL_Cluster.cpp b/lib/ProxySQL_Cluster.cpp index edc5626ad..3aff986e7 100644 --- a/lib/ProxySQL_Cluster.cpp +++ b/lib/ProxySQL_Cluster.cpp @@ -7,7 +7,7 @@ #else #define DEB "" #endif /* DEBUG */ -#define PROXYSQL_CLUSTER_VERSION "0.1.0702" DEB +#define PROXYSQL_CLUSTER_VERSION "0.4.0906" DEB #define QUERY_ERROR_RATE 20 @@ -616,64 +616,113 @@ void ProxySQL_Cluster::pull_mysql_query_rules_from_peer() { proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d started\n", hostname, port); rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0); if (rc_conn) { + MYSQL_RES *result1 = NULL; + MYSQL_RES *result2 = NULL; rc_query = mysql_query(conn,"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, gtid_from_hostgroup, log, apply, comment FROM runtime_mysql_query_rules"); if ( rc_query == 0 ) { - MYSQL_RES *result = mysql_store_result(conn); - GloAdmin->admindb->execute("DELETE FROM mysql_query_rules"); - MYSQL_ROW row; - char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, gtid_from_hostgroup, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32)"; - sqlite3_stmt *statement1 = NULL; - sqlite3 *mydb3 = GloAdmin->admindb->get_db(); - rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0); - assert(rc==SQLITE_OK); - while ((row = mysql_fetch_row(result))) { - rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id - rc=sqlite3_bind_int64(statement1, 2, 1); assert(rc==SQLITE_OK); // active - rc=sqlite3_bind_text(statement1, 3, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username - rc=sqlite3_bind_text(statement1, 4, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname - rc=sqlite3_bind_text(statement1, 5, row[3], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagIN - rc=sqlite3_bind_text(statement1, 6, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr - rc=sqlite3_bind_text(statement1, 7, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr - rc=sqlite3_bind_text(statement1, 8, row[6], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_port - rc=sqlite3_bind_text(statement1, 9, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest - rc=sqlite3_bind_text(statement1, 10, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest - rc=sqlite3_bind_text(statement1, 11, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern - rc=sqlite3_bind_text(statement1, 12, row[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // negate_match_pattern - rc=sqlite3_bind_text(statement1, 13, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers - rc=sqlite3_bind_text(statement1, 14, row[12], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagOUT - rc=sqlite3_bind_text(statement1, 15, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern - rc=sqlite3_bind_text(statement1, 16, row[14], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // destination_hostgroup - rc=sqlite3_bind_text(statement1, 17, row[15], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // cache_ttl - rc=sqlite3_bind_text(statement1, 18, row[16], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // reconnect - rc=sqlite3_bind_text(statement1, 19, row[17], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // timeout - rc=sqlite3_bind_text(statement1, 20, row[18], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // retries - rc=sqlite3_bind_text(statement1, 21, row[19], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // delay - rc=sqlite3_bind_text(statement1, 22, row[20], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // next_query_flagIN - rc=sqlite3_bind_text(statement1, 23, row[21], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_flagOUT - rc=sqlite3_bind_text(statement1, 24, row[22], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_hostgroup - rc=sqlite3_bind_text(statement1, 25, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg - rc=sqlite3_bind_text(statement1, 26, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg - rc=sqlite3_bind_text(statement1, 27, row[25], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // sticky_conn - rc=sqlite3_bind_text(statement1, 28, row[26], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // multiplex - rc=sqlite3_bind_text(statement1, 29, row[27], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // gtid_from_hostgroup - rc=sqlite3_bind_text(statement1, 30, row[28], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // log - rc=sqlite3_bind_text(statement1, 31, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // apply - rc=sqlite3_bind_text(statement1, 32, row[30], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment - SAFE_SQLITE3_STEP(statement1); - rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); - rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); - } - mysql_free_result(result); - proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port); - proxy_info("Cluster: Loading to runtime MySQL Query Rules from peer %s:%d\n", hostname, port); - GloAdmin->load_mysql_query_rules_to_runtime(); - if (GloProxyCluster->cluster_mysql_query_rules_save_to_disk == true) { - proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port); - GloAdmin->flush_mysql_query_rules__from_memory_to_disk(); + MYSQL_RES *result1 = mysql_store_result(conn); + rc_query = mysql_query(conn,"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM runtime_mysql_query_rules_fast_routing"); + if ( rc_query == 0) { + result2 = mysql_store_result(conn); + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port); + proxy_info("Cluster: Loading to runtime MySQL Query Rules from peer %s:%d\n", hostname, port); + GloAdmin->admindb->execute("DELETE FROM mysql_query_rules"); + GloAdmin->admindb->execute("DELETE FROM mysql_query_rules_fast_routing"); + MYSQL_ROW row; + char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31)"; + sqlite3_stmt *statement1 = NULL; + sqlite3 *mydb3 = GloAdmin->admindb->get_db(); + rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0); + assert(rc==SQLITE_OK); + while ((row = mysql_fetch_row(result1))) { + rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id + rc=sqlite3_bind_int64(statement1, 2, 1); assert(rc==SQLITE_OK); // active + rc=sqlite3_bind_text(statement1, 3, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username + rc=sqlite3_bind_text(statement1, 4, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname + rc=sqlite3_bind_text(statement1, 5, row[3], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagIN + rc=sqlite3_bind_text(statement1, 6, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr + rc=sqlite3_bind_text(statement1, 7, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr + rc=sqlite3_bind_text(statement1, 8, row[6], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_port + rc=sqlite3_bind_text(statement1, 9, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest + rc=sqlite3_bind_text(statement1, 10, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest + rc=sqlite3_bind_text(statement1, 11, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern + rc=sqlite3_bind_text(statement1, 12, row[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // negate_match_pattern + rc=sqlite3_bind_text(statement1, 13, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers + rc=sqlite3_bind_text(statement1, 14, row[12], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagOUT + rc=sqlite3_bind_text(statement1, 15, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern + rc=sqlite3_bind_text(statement1, 16, row[14], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // destination_hostgroup + rc=sqlite3_bind_text(statement1, 17, row[15], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // cache_ttl + rc=sqlite3_bind_text(statement1, 18, row[16], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // reconnect + rc=sqlite3_bind_text(statement1, 19, row[17], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // timeout + rc=sqlite3_bind_text(statement1, 20, row[18], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // retries + rc=sqlite3_bind_text(statement1, 21, row[19], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // delay + rc=sqlite3_bind_text(statement1, 22, row[20], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // next_query_flagIN + rc=sqlite3_bind_text(statement1, 23, row[21], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_flagOUT + rc=sqlite3_bind_text(statement1, 24, row[22], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_hostgroup + rc=sqlite3_bind_text(statement1, 25, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg + rc=sqlite3_bind_text(statement1, 26, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg + rc=sqlite3_bind_text(statement1, 27, row[25], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // sticky_conn + rc=sqlite3_bind_text(statement1, 28, row[26], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // multiplex + rc=sqlite3_bind_text(statement1, 29, row[27], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // log + rc=sqlite3_bind_text(statement1, 30, row[28], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // apply + rc=sqlite3_bind_text(statement1, 31, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment + SAFE_SQLITE3_STEP2(statement1); + rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK); + } + char *q1fr = (char *)"INSERT INTO mysql_query_rules_fast_routing(username, schemaname, flagIN, destination_hostgroup, comment) VALUES (?1, ?2, ?3, ?4, ?5)"; + char *q32fr = (char *)"INSERT INTO mysql_query_rules_fast_routing(username, schemaname, flagIN, destination_hostgroup, comment) VALUES (?1, ?2, ?3, ?4, ?5), (?6, ?7, ?8, ?9, ?10), (?11, ?12, ?13, ?14, ?15), (?16, ?17, ?18, ?19, ?20), (?21, ?22, ?23, ?24, ?25), (?26, ?27, ?28, ?29, ?30), (?31, ?32, ?33, ?34, ?35), (?36, ?37, ?38, ?39, ?40), (?41, ?42, ?43, ?44, ?45), (?46, ?47, ?48, ?49, ?50), (?51, ?52, ?53, ?54, ?55), (?56, ?57, ?58, ?59, ?60), (?61, ?62, ?63, ?64, ?65), (?66, ?67, ?68, ?69, ?70), (?71, ?72, ?73, ?74, ?75), (?76, ?77, ?78, ?79, ?80), (?81, ?82, ?83, ?84, ?85), (?86, ?87, ?88, ?89, ?90), (?91, ?92, ?93, ?94, ?95), (?96, ?97, ?98, ?99, ?100), (?101, ?102, ?103, ?104, ?105), (?106, ?107, ?108, ?109, ?110), (?111, ?112, ?113, ?114, ?115), (?116, ?117, ?118, ?119, ?120), (?121, ?122, ?123, ?124, ?125), (?126, ?127, ?128, ?129, ?130), (?131, ?132, ?133, ?134, ?135), (?136, ?137, ?138, ?139, ?140), (?141, ?142, ?143, ?144, ?145), (?146, ?147, ?148, ?149, ?150), (?151, ?152, ?153, ?154, ?155), (?156, ?157, ?158, ?159, ?160)"; + sqlite3_stmt *statement1fr = NULL; + sqlite3_stmt *statement32fr = NULL; + rc=sqlite3_prepare_v2(mydb3, q1fr, -1, &statement1fr, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_prepare_v2(mydb3, q32fr, -1, &statement32fr, 0); + assert(rc==SQLITE_OK); + int row_idx=0; + int max_bulk_row_idx=mysql_num_rows(result2)/32; + max_bulk_row_idx=max_bulk_row_idx*32; + while ((row = mysql_fetch_row(result2))) { + int idx=row_idx%32; + if (row_idxload_mysql_query_rules_to_runtime(); + if (GloProxyCluster->cluster_mysql_query_rules_save_to_disk == true) { + proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port); + GloAdmin->flush_mysql_query_rules__from_memory_to_disk(); + } + } else { + proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); } } else { proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); } + if (result1) { + mysql_free_result(result1); + } + if (result2) { + mysql_free_result(result2); + } } else { proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn)); } diff --git a/src/proxysql.cfg b/src/proxysql.cfg index 332344051..3c869ab21 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -11,10 +11,22 @@ datadir="/var/lib/proxysql" admin_variables= { - admin_credentials="admin:admin" + admin_credentials="admin:admin;cluster1:secret1pass" mysql_ifaces="0.0.0.0:6032" refresh_interval=2000 debug=true + cluster_username="cluster1" + cluster_password="secret1pass" + cluster_check_interval_ms=200 + cluster_check_status_frequency=100 + cluster_mysql_query_rules_save_to_disk=true + cluster_mysql_servers_save_to_disk=true + cluster_mysql_users_save_to_disk=true + cluster_proxysql_servers_save_to_disk=true + cluster_mysql_query_rules_diffs_before_sync=3 + cluster_mysql_servers_diffs_before_sync=3 + cluster_mysql_users_diffs_before_sync=3 + cluster_proxysql_servers_diffs_before_sync=3 } mysql_variables=