Enable mysql_query_rules_fast_routing in ProxySQL Cluster

https://github.com/sysown/proxysql/issues/1675
pull/1704/head
Nick Vyzas 8 years ago
parent 4411f6bae4
commit da99ec0a7e

@ -7332,7 +7332,7 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
int cols2 = 0;
int affected_rows2 = 0;
SQLite3_result *resultset2 = NULL;
char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing";
char *query2=(char *)"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM main.mysql_query_rules_fast_routing ORDER BY username, schemaname, flagIN";
admindb->execute_statement(query2, &error2 , &cols2 , &affected_rows2 , &resultset2);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
@ -7343,6 +7343,8 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
uint64_t hash1 = resultset->raw_checksum();
uint64_t hash2 = resultset2->raw_checksum();
hash1 += hash2;
uint32_t d32[2];
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));

@ -7,7 +7,7 @@
#else
#define DEB ""
#endif /* DEBUG */
#define PROXYSQL_CLUSTER_VERSION "0.1.0702" DEB
#define PROXYSQL_CLUSTER_VERSION "0.4.0906" DEB
#define QUERY_ERROR_RATE 20
@ -616,64 +616,113 @@ void ProxySQL_Cluster::pull_mysql_query_rules_from_peer() {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d started\n", hostname, port);
rc_conn = mysql_real_connect(conn, hostname, username, password, NULL, port, NULL, 0);
if (rc_conn) {
MYSQL_RES *result1 = NULL;
MYSQL_RES *result2 = NULL;
rc_query = mysql_query(conn,"SELECT rule_id, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, gtid_from_hostgroup, log, apply, comment FROM runtime_mysql_query_rules");
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
GloAdmin->admindb->execute("DELETE FROM mysql_query_rules");
MYSQL_ROW row;
char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, gtid_from_hostgroup, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32)";
sqlite3_stmt *statement1 = NULL;
sqlite3 *mydb3 = GloAdmin->admindb->get_db();
rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0);
assert(rc==SQLITE_OK);
while ((row = mysql_fetch_row(result))) {
rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id
rc=sqlite3_bind_int64(statement1, 2, 1); assert(rc==SQLITE_OK); // active
rc=sqlite3_bind_text(statement1, 3, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement1, 4, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname
rc=sqlite3_bind_text(statement1, 5, row[3], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagIN
rc=sqlite3_bind_text(statement1, 6, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr
rc=sqlite3_bind_text(statement1, 7, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr
rc=sqlite3_bind_text(statement1, 8, row[6], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_port
rc=sqlite3_bind_text(statement1, 9, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest
rc=sqlite3_bind_text(statement1, 10, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest
rc=sqlite3_bind_text(statement1, 11, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern
rc=sqlite3_bind_text(statement1, 12, row[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // negate_match_pattern
rc=sqlite3_bind_text(statement1, 13, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers
rc=sqlite3_bind_text(statement1, 14, row[12], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagOUT
rc=sqlite3_bind_text(statement1, 15, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern
rc=sqlite3_bind_text(statement1, 16, row[14], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // destination_hostgroup
rc=sqlite3_bind_text(statement1, 17, row[15], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // cache_ttl
rc=sqlite3_bind_text(statement1, 18, row[16], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // reconnect
rc=sqlite3_bind_text(statement1, 19, row[17], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // timeout
rc=sqlite3_bind_text(statement1, 20, row[18], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // retries
rc=sqlite3_bind_text(statement1, 21, row[19], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // delay
rc=sqlite3_bind_text(statement1, 22, row[20], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // next_query_flagIN
rc=sqlite3_bind_text(statement1, 23, row[21], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_flagOUT
rc=sqlite3_bind_text(statement1, 24, row[22], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_hostgroup
rc=sqlite3_bind_text(statement1, 25, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg
rc=sqlite3_bind_text(statement1, 26, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg
rc=sqlite3_bind_text(statement1, 27, row[25], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // sticky_conn
rc=sqlite3_bind_text(statement1, 28, row[26], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // multiplex
rc=sqlite3_bind_text(statement1, 29, row[27], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // gtid_from_hostgroup
rc=sqlite3_bind_text(statement1, 30, row[28], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // log
rc=sqlite3_bind_text(statement1, 31, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // apply
rc=sqlite3_bind_text(statement1, 32, row[30], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
mysql_free_result(result);
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->load_mysql_query_rules_to_runtime();
if (GloProxyCluster->cluster_mysql_query_rules_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_query_rules__from_memory_to_disk();
MYSQL_RES *result1 = mysql_store_result(conn);
rc_query = mysql_query(conn,"SELECT username, schemaname, flagIN, destination_hostgroup, comment FROM runtime_mysql_query_rules_fast_routing");
if ( rc_query == 0) {
result2 = mysql_store_result(conn);
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d completed\n", hostname, port);
proxy_info("Cluster: Loading to runtime MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->admindb->execute("DELETE FROM mysql_query_rules");
GloAdmin->admindb->execute("DELETE FROM mysql_query_rules_fast_routing");
MYSQL_ROW row;
char *q = (char *)"INSERT INTO mysql_query_rules (rule_id, active, username, schemaname, flagIN, client_addr, proxy_addr, proxy_port, digest, match_digest, match_pattern, negate_match_pattern, re_modifiers, flagOUT, replace_pattern, destination_hostgroup, cache_ttl, reconnect, timeout, retries, delay, next_query_flagIN, mirror_flagOUT, mirror_hostgroup, error_msg, ok_msg, sticky_conn, multiplex, log, apply, comment) VALUES (?1 , ?2 , ?3 , ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31)";
sqlite3_stmt *statement1 = NULL;
sqlite3 *mydb3 = GloAdmin->admindb->get_db();
rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0);
assert(rc==SQLITE_OK);
while ((row = mysql_fetch_row(result1))) {
rc=sqlite3_bind_int64(statement1, 1, atoll(row[0])); assert(rc==SQLITE_OK); // rule_id
rc=sqlite3_bind_int64(statement1, 2, 1); assert(rc==SQLITE_OK); // active
rc=sqlite3_bind_text(statement1, 3, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement1, 4, row[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname
rc=sqlite3_bind_text(statement1, 5, row[3], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagIN
rc=sqlite3_bind_text(statement1, 6, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // client_addr
rc=sqlite3_bind_text(statement1, 7, row[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_addr
rc=sqlite3_bind_text(statement1, 8, row[6], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // proxy_port
rc=sqlite3_bind_text(statement1, 9, row[7], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // digest
rc=sqlite3_bind_text(statement1, 10, row[8], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_digest
rc=sqlite3_bind_text(statement1, 11, row[9], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // match_pattern
rc=sqlite3_bind_text(statement1, 12, row[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // negate_match_pattern
rc=sqlite3_bind_text(statement1, 13, row[11], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // re_modifiers
rc=sqlite3_bind_text(statement1, 14, row[12], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // flagOUT
rc=sqlite3_bind_text(statement1, 15, row[13], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // replace_pattern
rc=sqlite3_bind_text(statement1, 16, row[14], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // destination_hostgroup
rc=sqlite3_bind_text(statement1, 17, row[15], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // cache_ttl
rc=sqlite3_bind_text(statement1, 18, row[16], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // reconnect
rc=sqlite3_bind_text(statement1, 19, row[17], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // timeout
rc=sqlite3_bind_text(statement1, 20, row[18], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // retries
rc=sqlite3_bind_text(statement1, 21, row[19], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // delay
rc=sqlite3_bind_text(statement1, 22, row[20], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // next_query_flagIN
rc=sqlite3_bind_text(statement1, 23, row[21], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_flagOUT
rc=sqlite3_bind_text(statement1, 24, row[22], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // mirror_hostgroup
rc=sqlite3_bind_text(statement1, 25, row[23], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // error_msg
rc=sqlite3_bind_text(statement1, 26, row[24], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // OK_msg
rc=sqlite3_bind_text(statement1, 27, row[25], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // sticky_conn
rc=sqlite3_bind_text(statement1, 28, row[26], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // multiplex
rc=sqlite3_bind_text(statement1, 29, row[27], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // log
rc=sqlite3_bind_text(statement1, 30, row[28], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // apply
rc=sqlite3_bind_text(statement1, 31, row[29], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment
SAFE_SQLITE3_STEP2(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
char *q1fr = (char *)"INSERT INTO mysql_query_rules_fast_routing(username, schemaname, flagIN, destination_hostgroup, comment) VALUES (?1, ?2, ?3, ?4, ?5)";
char *q32fr = (char *)"INSERT INTO mysql_query_rules_fast_routing(username, schemaname, flagIN, destination_hostgroup, comment) VALUES (?1, ?2, ?3, ?4, ?5), (?6, ?7, ?8, ?9, ?10), (?11, ?12, ?13, ?14, ?15), (?16, ?17, ?18, ?19, ?20), (?21, ?22, ?23, ?24, ?25), (?26, ?27, ?28, ?29, ?30), (?31, ?32, ?33, ?34, ?35), (?36, ?37, ?38, ?39, ?40), (?41, ?42, ?43, ?44, ?45), (?46, ?47, ?48, ?49, ?50), (?51, ?52, ?53, ?54, ?55), (?56, ?57, ?58, ?59, ?60), (?61, ?62, ?63, ?64, ?65), (?66, ?67, ?68, ?69, ?70), (?71, ?72, ?73, ?74, ?75), (?76, ?77, ?78, ?79, ?80), (?81, ?82, ?83, ?84, ?85), (?86, ?87, ?88, ?89, ?90), (?91, ?92, ?93, ?94, ?95), (?96, ?97, ?98, ?99, ?100), (?101, ?102, ?103, ?104, ?105), (?106, ?107, ?108, ?109, ?110), (?111, ?112, ?113, ?114, ?115), (?116, ?117, ?118, ?119, ?120), (?121, ?122, ?123, ?124, ?125), (?126, ?127, ?128, ?129, ?130), (?131, ?132, ?133, ?134, ?135), (?136, ?137, ?138, ?139, ?140), (?141, ?142, ?143, ?144, ?145), (?146, ?147, ?148, ?149, ?150), (?151, ?152, ?153, ?154, ?155), (?156, ?157, ?158, ?159, ?160)";
sqlite3_stmt *statement1fr = NULL;
sqlite3_stmt *statement32fr = NULL;
rc=sqlite3_prepare_v2(mydb3, q1fr, -1, &statement1fr, 0);
assert(rc==SQLITE_OK);
rc=sqlite3_prepare_v2(mydb3, q32fr, -1, &statement32fr, 0);
assert(rc==SQLITE_OK);
int row_idx=0;
int max_bulk_row_idx=mysql_num_rows(result2)/32;
max_bulk_row_idx=max_bulk_row_idx*32;
while ((row = mysql_fetch_row(result2))) {
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=sqlite3_bind_text(statement32fr, (idx*5)+1, row[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement32fr, (idx*5)+2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname
rc=sqlite3_bind_int64(statement32fr, (idx*5)+3, atoll(row[2])); assert(rc==SQLITE_OK); // flagIN
rc=sqlite3_bind_int64(statement32fr, (idx*5)+4, atoll(row[3])); assert(rc==SQLITE_OK); // destination_hostgroup
rc=sqlite3_bind_text(statement32fr, (idx*5)+5, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32fr);
rc=sqlite3_clear_bindings(statement32fr); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement32fr); assert(rc==SQLITE_OK);
}
} else { // single row
rc=sqlite3_bind_text(statement1fr, 1, row[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // username
rc=sqlite3_bind_text(statement1fr, 2, row[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // schemaname
rc=sqlite3_bind_int64(statement1fr, 3, atoll(row[2])); assert(rc==SQLITE_OK); // flagIN
rc=sqlite3_bind_int64(statement1fr, 4, atoll(row[3])); assert(rc==SQLITE_OK); // destination_hostgroup
rc=sqlite3_bind_text(statement1fr, 5, row[4], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); // comment
SAFE_SQLITE3_STEP2(statement1fr);
rc=sqlite3_clear_bindings(statement1fr); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1fr); assert(rc==SQLITE_OK);
}
row_idx++;
}
GloAdmin->load_mysql_query_rules_to_runtime();
if (GloProxyCluster->cluster_mysql_query_rules_save_to_disk == true) {
proxy_info("Cluster: Saving to disk MySQL Query Rules from peer %s:%d\n", hostname, port);
GloAdmin->flush_mysql_query_rules__from_memory_to_disk();
}
} else {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
} else {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}
if (result1) {
mysql_free_result(result1);
}
if (result2) {
mysql_free_result(result2);
}
} else {
proxy_info("Cluster: Fetching MySQL Query Rules from peer %s:%d failed: %s\n", hostname, port, mysql_error(conn));
}

@ -11,10 +11,22 @@ datadir="/var/lib/proxysql"
admin_variables=
{
admin_credentials="admin:admin"
admin_credentials="admin:admin;cluster1:secret1pass"
mysql_ifaces="0.0.0.0:6032"
refresh_interval=2000
debug=true
cluster_username="cluster1"
cluster_password="secret1pass"
cluster_check_interval_ms=200
cluster_check_status_frequency=100
cluster_mysql_query_rules_save_to_disk=true
cluster_mysql_servers_save_to_disk=true
cluster_mysql_users_save_to_disk=true
cluster_proxysql_servers_save_to_disk=true
cluster_mysql_query_rules_diffs_before_sync=3
cluster_mysql_servers_diffs_before_sync=3
cluster_mysql_users_diffs_before_sync=3
cluster_proxysql_servers_diffs_before_sync=3
}
mysql_variables=

Loading…
Cancel
Save