Enhancement on Cluster solution

This commit introduces:

2 new tables:
* `runtime_checksums_values` : stores checksums of configurations in runtime. For now for `mysql_query_rules`, `mysql_servers` and `mysql_users`
* `stats_proxysql_servers_checksums` : when clustering is enabled, it collects all metrics from `runtime_checksums_values` from all its peers
3 new global variables that defines it checksum needs to be generated during `LOAD ... TO RUNTIME`
* `admin-checksum_mysql_query_rules`
* `admin-checksum_mysql_servers`
* `admin-checksum_mysql_users`

ProxySQL Cluster connections now have timeouts:
* 1 second timeout for CONNECT and WRITE
* 60 seconds timeout for READ (useful for long poll)
pull/1169/head
René Cannaò 9 years ago
parent 3eda76379e
commit ee8110a515

@ -39,6 +39,7 @@ class ProxySQL_Node_Entry {
int metrics_idx_prev;
int metrics_idx;
ProxySQL_Node_Metrics **metrics;
public:
uint64_t get_hash();
ProxySQL_Node_Entry(char *_hostname, uint16_t _port, uint64_t _weight, char *_comment);
@ -52,6 +53,7 @@ class ProxySQL_Node_Entry {
}
void set_comment(char *a); // note, this is strdup()
void set_metrics(MYSQL_RES *_r, unsigned long long _response_time);
void set_checksums(MYSQL_RES *_r);
char *get_hostname() { // note, NO strdup()
return hostname;
}
@ -60,6 +62,14 @@ class ProxySQL_Node_Entry {
}
ProxySQL_Node_Metrics * get_metrics_curr();
ProxySQL_Node_Metrics * get_metrics_prev();
struct {
ProxySQL_Checksum_Value admin_variables;
ProxySQL_Checksum_Value mysql_variables;
ProxySQL_Checksum_Value mysql_query_rules;
ProxySQL_Checksum_Value mysql_servers;
ProxySQL_Checksum_Value mysql_users;
ProxySQL_Checksum_Value proxysql_servers;
} checksums_values;
};
class ProxySQL_Cluster_Nodes {
@ -74,7 +84,9 @@ class ProxySQL_Cluster_Nodes {
~ProxySQL_Cluster_Nodes();
void load_servers_list(SQLite3_result *);
bool Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_RES *_r, unsigned long long _response_time);
bool Update_Node_Checksums(char * _h, uint16_t _p, MYSQL_RES *_r);
SQLite3_result * dump_table_proxysql_servers();
SQLite3_result * stats_proxysql_servers_checksums();
SQLite3_result * stats_proxysql_servers_metrics();
};
@ -101,9 +113,15 @@ class ProxySQL_Cluster {
bool Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_RES *_r, unsigned long long _response_time) {
return nodes.Update_Node_Metrics(_h, _p, _r, _response_time);
}
bool Update_Node_Checksums(char * _h, uint16_t _p, MYSQL_RES *_r) {
return nodes.Update_Node_Checksums(_h, _p, _r);
}
SQLite3_result *dump_table_proxysql_servers() {
return nodes.dump_table_proxysql_servers();
}
SQLite3_result * get_stats_proxysql_servers_checksums() {
return nodes.stats_proxysql_servers_checksums();
}
SQLite3_result * get_stats_proxysql_servers_metrics() {
return nodes.stats_proxysql_servers_metrics();
}

@ -111,7 +111,7 @@ class ProxySQL_Admin {
void __insert_or_replace_disktable_select_maintable();
void __attach_db(SQLite3DB *db1, SQLite3DB *db2, char *alias);
void __add_active_users(enum cred_username_type usertype, char *user=NULL);
void __add_active_users(enum cred_username_type usertype, char *user=NULL, uint64_t *hash1 = NULL);
void __delete_inactive_users(enum cred_username_type usertype);
void add_admin_users();
void __refresh_users();
@ -141,6 +141,11 @@ class ProxySQL_Admin {
void *opt;
void **re;
} match_regexes;
struct {
bool checksum_mysql_query_rules;
bool checksum_mysql_servers;
bool checksum_mysql_users;
} checksum_variables;
void public_add_active_users(enum cred_username_type usertype, char *user=NULL) {
__add_active_users(usertype, user);
}
@ -201,6 +206,7 @@ class ProxySQL_Admin {
void stats___mysql_global();
void stats___mysql_users();
void stats___proxysql_servers_checksums();
void stats___proxysql_servers_metrics();
int Read_Global_Variables_from_configfile(const char *prefix);
@ -227,6 +233,7 @@ class ProxySQL_Admin {
void flush_proxysql_servers__from_memory_to_disk();
void flush_proxysql_servers__from_disk_to_memory();
void save_proxysql_servers_runtime_to_database(bool);
void dump_checksums_values_table();
};
#endif /* __CLASS_PROXYSQL_ADMIN_H */

@ -8,6 +8,33 @@ namespace ez {
class ezOptionParser;
};
class ProxySQL_Checksum_Value {
public:
char *checksum;
unsigned long long version;
unsigned long long epoch;
ProxySQL_Checksum_Value() {
checksum = (char *)malloc(20);
memset(checksum,0,20);
version = 0;
epoch = 0;
}
void set_checksum(char *c) {
memset(checksum,0,20);
strncpy(checksum,c,18);
for (int i=2; i<18; i++) {
if (checksum[i]==' ' || checksum[i]==0) {
checksum[i]='0';
}
}
}
~ProxySQL_Checksum_Value() {
free(checksum);
checksum = NULL;
}
};
class ProxySQL_GlobalVariables {
public:
ez::ezOptionParser *opt;
@ -48,7 +75,7 @@ class ProxySQL_GlobalVariables {
char *pidfile;
bool restart_on_error;
int restart_delay;
SSL_CTX *ssl_ctx;
SSL_CTX *ssl_ctx;
} global;
struct mysql {
char *server_version;
@ -59,6 +86,16 @@ class ProxySQL_GlobalVariables {
unsigned long stack_memory_admin_threads;
unsigned long stack_memory_cluster_threads;
} statuses;
pthread_mutex_t checksum_mutex;
time_t epoch_version;
struct {
ProxySQL_Checksum_Value admin_variables;
ProxySQL_Checksum_Value mysql_query_rules;
ProxySQL_Checksum_Value mysql_servers;
ProxySQL_Checksum_Value mysql_users;
ProxySQL_Checksum_Value mysql_variables;
ProxySQL_Checksum_Value proxysql_servers;
} checksums_values;
ProxySQL_GlobalVariables();
~ProxySQL_GlobalVariables();
void process_opts_pre();

@ -110,7 +110,7 @@ class SQLite3_result {
int columns;
int rows_count;
char *checksum();
int64_t raw_checksum();
uint64_t raw_checksum();
std::vector<SQLite3_column *> column_definition;
std::vector<SQLite3_row *> rows;

@ -1,5 +1,6 @@
#include "proxysql.h"
#include "cpp.h"
#include "SpookyV2.h"
#define char_malloc (char *)malloc
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
@ -698,6 +699,69 @@ bool MySQL_HostGroups_Manager::commit() {
mydb->execute("DELETE FROM mysql_group_replication_hostgroups");
generate_mysql_group_replication_hostgroups_table();
}
if ( GloAdmin && GloAdmin->checksum_variables.checksum_mysql_servers ) {
uint64_t hash1, hash2;
SpookyHash myhash;
char buf[80];
myhash.Init(19,3);
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
// hostgroup
sprintf(buf,"%u",mysrvc->myhgc->hid);
myhash.Update(buf,strlen(buf));
// hoatname
if (mysrvc->address) {
myhash.Update(mysrvc->address,strlen(mysrvc->address));
} else { myhash.Update("",0); }
// port
sprintf(buf,"%u",mysrvc->port);
myhash.Update(buf,strlen(buf));
// status
sprintf(buf,"%u",mysrvc->status);
myhash.Update(buf,strlen(buf));
// weight
sprintf(buf,"%u",mysrvc->weight);
myhash.Update(buf,strlen(buf));
// compression
sprintf(buf,"%u",mysrvc->compression);
myhash.Update(buf,strlen(buf));
// max_connections
sprintf(buf,"%u",mysrvc->max_connections);
myhash.Update(buf,strlen(buf));
// max_replication_lag
sprintf(buf,"%u",mysrvc->max_replication_lag);
myhash.Update(buf,strlen(buf));
// use_ssl
sprintf(buf,"%u",mysrvc->use_ssl);
myhash.Update(buf,strlen(buf));
// max_latency_ms
sprintf(buf,"%u",mysrvc->max_latency_us);
myhash.Update(buf,strlen(buf));
if (mysrvc->comment) {
myhash.Update(mysrvc->comment,strlen(mysrvc->comment));
} else { myhash.Update("",0); }
}
}
myhash.Final(&hash1, &hash2);
uint32_t d32[2];
memcpy(&d32,&hash1,sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
pthread_mutex_lock(&GloVars.checksum_mutex);
GloVars.checksums_values.mysql_servers.set_checksum(buf);
GloVars.checksums_values.mysql_servers.version++;
//struct timespec ts;
//clock_gettime(CLOCK_REALTIME, &ts);
time_t t = time(NULL);
GloVars.checksums_values.mysql_servers.epoch = t;
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
__sync_fetch_and_add(&status.servers_table_version,1);
pthread_cond_broadcast(&status.servers_table_version_cond);
pthread_mutex_unlock(&status.servers_table_version_lock);
@ -726,6 +790,8 @@ void MySQL_HostGroups_Manager::purge_mysql_servers_table() {
}
}
void MySQL_HostGroups_Manager::generate_mysql_servers_table(int *_onlyhg) {
int rc;
sqlite3_stmt *statement1=NULL;

@ -170,6 +170,8 @@ pthread_mutex_t users_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_RUNTIME_MYSQL_USERS "CREATE TABLE runtime_mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 1 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))"
#define ADMIN_SQLITE_RUNTIME_CHECKSUMS_VALUES "CREATE TABLE runtime_checksums_values (name VARCHAR NOT NULL , version INT NOT NULL , epoch INT NOT NULL , checksum VARCHAR NOT NULL , PRIMARY KEY (name))"
// mysql_query_rules in v1.1.0
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES_V1_1_0 "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , error_msg VARCHAR , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
@ -262,6 +264,8 @@ pthread_mutex_t users_mutex = PTHREAD_MUTEX_INITIALIZER;
#define STATS_SQLITE_TABLE_PROXYSQL_SERVERS_METRICS "CREATE TABLE stats_proxysql_servers_metrics (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , response_time_ms INT NOT NULL , Uptime_s INT NOT NULL , last_check_ms INT NOT NULL , Queries INT NOT NULL , Client_Connections_connected INT NOT NULL , Client_Connections_created INT NOT NULL , PRIMARY KEY (hostname, port) )"
#define STATS_SQLITE_TABLE_PROXYSQL_SERVERS_CHECKSUMS "CREATE TABLE stats_proxysql_servers_checksums (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 6032 , name VARCHAR NOT NULL , version INT NOT NULL , epoch INT NOT NULL , checksum VARCHAR NOT NULL , PRIMARY KEY (hostname, port, name) )"
static char * admin_variables_names[]= {
@ -277,6 +281,9 @@ static char * admin_variables_names[]= {
(char *)"cluster_username",
(char *)"cluster_password",
(char *)"cluster_check_interval_ms",
(char *)"checksum_mysql_query_rules",
(char *)"checksum_mysql_servers",
(char *)"checksum_mysql_users",
#ifdef DEBUG
(char *)"debug",
#endif /* DEBUG */
@ -1470,9 +1477,11 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
bool runtime_mysql_query_rules=false;
bool runtime_proxysql_servers=false;
bool runtime_checksums_values=false;
bool monitor_mysql_server_group_replication_log=false;
bool stats_proxysql_servers_checksums = false;
bool stats_proxysql_servers_metrics = false;
bool stats_proxysql_servers_status = false;
@ -1504,6 +1513,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
if (strstr(query_no_space,"stats_mysql_users"))
{ stats_mysql_users=true; refresh=true; }
if (strstr(query_no_space,"stats_proxysql_servers_checksums"))
{ stats_proxysql_servers_checksums = true; refresh = true; }
if (strstr(query_no_space,"stats_proxysql_servers_metrics"))
{ stats_proxysql_servers_metrics = true; refresh = true; }
if (strstr(query_no_space,"stats_proxysql_servers_status"))
@ -1534,6 +1545,9 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
if (strstr(query_no_space,"runtime_proxysql_servers")) {
runtime_proxysql_servers=true; refresh=true;
}
if (strstr(query_no_space,"runtime_checksums_values")) {
runtime_checksums_values=true; refresh=true;
}
}
}
if (strstr(query_no_space,"mysql_server_group_replication_log")) {
@ -1570,6 +1584,9 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
if (stats_proxysql_servers_metrics) {
stats___proxysql_servers_metrics();
}
if (stats_proxysql_servers_checksums) {
stats___proxysql_servers_checksums();
}
// if (stats_proxysql_servers_status) {
// stats___proxysql_servers_status();
// }
@ -1599,6 +1616,9 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
if (runtime_scheduler) {
save_scheduler_runtime_to_database(true);
}
if (runtime_checksums_values) {
dump_checksums_values_table();
}
}
if (monitor_mysql_server_group_replication_log) {
if (GloMyMon) {
@ -2738,6 +2758,9 @@ ProxySQL_Admin::ProxySQL_Admin() {
variables.cluster_username=strdup((char *)"");
variables.cluster_password=strdup((char *)"");
variables.cluster_check_interval_ms=1000;
checksum_variables.checksum_mysql_query_rules = true;
checksum_variables.checksum_mysql_servers = true;
checksum_variables.checksum_mysql_users = true;
#ifdef DEBUG
variables.debug=GloVars.global.gdbg;
#endif /* DEBUG */
@ -2842,6 +2865,7 @@ bool ProxySQL_Admin::init() {
insert_into_tables_defs(tables_defs_admin,"runtime_mysql_servers", ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_SERVERS);
insert_into_tables_defs(tables_defs_admin,"mysql_users", ADMIN_SQLITE_TABLE_MYSQL_USERS);
insert_into_tables_defs(tables_defs_admin,"runtime_mysql_users", ADMIN_SQLITE_RUNTIME_MYSQL_USERS);
insert_into_tables_defs(tables_defs_admin,"runtime_checksums_values", ADMIN_SQLITE_RUNTIME_CHECKSUMS_VALUES);
insert_into_tables_defs(tables_defs_admin,"runtime_mysql_replication_hostgroups", ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_REPLICATION_HOSTGROUPS);
insert_into_tables_defs(tables_defs_admin,"mysql_replication_hostgroups", ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS);
insert_into_tables_defs(tables_defs_admin,"mysql_group_replication_hostgroups", ADMIN_SQLITE_TABLE_MYSQL_GROUP_REPLICATION_HOSTGROUPS);
@ -2887,6 +2911,7 @@ bool ProxySQL_Admin::init() {
insert_into_tables_defs(tables_defs_admin,"proxysql_servers", ADMIN_SQLITE_TABLE_PROXYSQL_SERVERS);
insert_into_tables_defs(tables_defs_config,"proxysql_servers", ADMIN_SQLITE_TABLE_PROXYSQL_SERVERS);
insert_into_tables_defs(tables_defs_admin,"runtime_proxysql_servers", ADMIN_SQLITE_TABLE_RUNTIME_PROXYSQL_SERVERS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_servers_checksums", STATS_SQLITE_TABLE_PROXYSQL_SERVERS_CHECKSUMS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_servers_metrics", STATS_SQLITE_TABLE_PROXYSQL_SERVERS_METRICS);
insert_into_tables_defs(tables_defs_stats,"stats_proxysql_servers_status", STATS_SQLITE_TABLE_PROXYSQL_SERVERS_STATUS);
@ -3326,6 +3351,15 @@ char * ProxySQL_Admin::get_variable(char *name) {
if (!strcasecmp(name,"hash_passwords")) {
return strdup((variables.hash_passwords ? "true" : "false"));
}
if (!strcasecmp(name,"checksum_mysql_query_rules")) {
return strdup((checksum_variables.checksum_mysql_query_rules ? "true" : "false"));
}
if (!strcasecmp(name,"checksum_mysql_servers")) {
return strdup((checksum_variables.checksum_mysql_servers ? "true" : "false"));
}
if (!strcasecmp(name,"checksum_mysql_users")) {
return strdup((checksum_variables.checksum_mysql_users ? "true" : "false"));
}
#ifdef DEBUG
if (!strcasecmp(name,"debug")) {
return strdup((variables.debug ? "true" : "false"));
@ -3535,6 +3569,39 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub
}
return false;
}
if (!strcasecmp(name,"checksum_mysql_query_rules")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
checksum_variables.checksum_mysql_query_rules=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
checksum_variables.checksum_mysql_query_rules=false;
return true;
}
return false;
}
if (!strcasecmp(name,"checksum_mysql_servers")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
checksum_variables.checksum_mysql_servers=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
checksum_variables.checksum_mysql_servers=false;
return true;
}
return false;
}
if (!strcasecmp(name,"checksum_mysql_users")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
checksum_variables.checksum_mysql_users=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
checksum_variables.checksum_mysql_users=false;
return true;
}
return false;
}
if (!strcasecmp(name,"read_only")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.admin_read_only=true;
@ -3943,6 +4010,37 @@ void ProxySQL_Admin::stats___mysql_query_rules() {
delete resultset;
}
void ProxySQL_Admin::stats___proxysql_servers_checksums() {
statsdb->execute("BEGIN");
statsdb->execute("DELETE FROM stats_proxysql_servers_checksums");
SQLite3_result *resultset=NULL;
resultset=GloProxyCluster->get_stats_proxysql_servers_checksums();
if (resultset) {
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3 *mydb3=statsdb->get_db();
char *query1=NULL;
query1=(char *)"INSERT INTO stats_proxysql_servers_checksums VALUES (?1, ?2, ?3, ?4, ?5, ?6)";
rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0);
assert(rc==SQLITE_OK);
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r1=*it;
rc=sqlite3_bind_text(statement1, 1, r1->fields[0], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, atoi(r1->fields[1])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 3, r1->fields[2], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 4, atoi(r1->fields[3])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 5, atoi(r1->fields[4])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 6, r1->fields[5], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
sqlite3_finalize(statement1);
}
statsdb->execute("COMMIT");
delete resultset;
}
void ProxySQL_Admin::stats___proxysql_servers_metrics() {
//SQLite3_result * resultset=GloProxyCluster->get_stats_proxysql_servers_metrics();
//if (resultset==NULL) return;
@ -4464,16 +4562,41 @@ void ProxySQL_Admin::add_admin_users() {
}
void ProxySQL_Admin::__refresh_users() {
bool calculate_checksum = false;
if (checksum_variables.checksum_mysql_servers) {
calculate_checksum = true;
}
if (calculate_checksum)
pthread_mutex_lock(&GloVars.checksum_mutex);
__delete_inactive_users(USERNAME_BACKEND);
__delete_inactive_users(USERNAME_FRONTEND);
GloMyAuth->set_all_inactive(USERNAME_BACKEND);
GloMyAuth->set_all_inactive(USERNAME_FRONTEND);
add_admin_users();
__add_active_users(USERNAME_BACKEND);
__add_active_users(USERNAME_FRONTEND);
uint64_t hashB, hashF;
if (calculate_checksum) {
__add_active_users(USERNAME_BACKEND, NULL, &hashB);
__add_active_users(USERNAME_FRONTEND, NULL, &hashF);
} else {
__add_active_users(USERNAME_BACKEND);
__add_active_users(USERNAME_FRONTEND);
}
GloMyAuth->remove_inactives(USERNAME_BACKEND);
GloMyAuth->remove_inactives(USERNAME_FRONTEND);
set_variable((char *)"admin_credentials",(char *)"");
if (calculate_checksum) {
uint64_t hash1 = hashB + hashF; // overflow allowed
uint32_t d32[2];
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
GloVars.checksums_values.mysql_users.set_checksum(buf);
GloVars.checksums_values.mysql_users.version++;
time_t t = time(NULL);
GloVars.checksums_values.mysql_users.epoch = t;
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
}
void ProxySQL_Admin::send_MySQL_OK(MySQL_Protocol *myprot, char *msg, int rows) {
@ -4514,10 +4637,14 @@ void ProxySQL_Admin::__delete_inactive_users(enum cred_username_type usertype) {
}
#define ADDUSER_STMT_RAW
void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *__user) {
void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *__user, uint64_t *hash1) {
char *error=NULL;
int cols=0;
int affected_rows=0;
SpookyHash myhash;
if (hash1) {
myhash.Init(19,3);
}
#ifdef ADDUSER_STMT_RAW
sqlite3_stmt *statement=NULL;
#else
@ -4526,7 +4653,11 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
char *str=NULL;
char *query=NULL;
if (__user==NULL) {
str=(char *)"SELECT username,password,use_ssl,default_hostgroup,default_schema,schema_locked,transaction_persistent,fast_forward,max_connections FROM main.mysql_users WHERE %s=1 AND active=1 AND default_hostgroup>=0";
if (hash1) {
str=(char *)"SELECT username,password,use_ssl,default_hostgroup,default_schema,schema_locked,transaction_persistent,fast_forward,max_connections FROM main.mysql_users WHERE %s=1 AND active=1 AND default_hostgroup>=0 ORDER BY username";
} else {
str=(char *)"SELECT username,password,use_ssl,default_hostgroup,default_schema,schema_locked,transaction_persistent,fast_forward,max_connections FROM main.mysql_users WHERE %s=1 AND active=1 AND default_hostgroup>=0";
}
query=(char *)malloc(strlen(str)+15);
sprintf(query,str,(usertype==USERNAME_BACKEND ? "backend" : "frontend"));
} else {
@ -4547,6 +4678,16 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
while ((rc=sqlite3_step(statement))==SQLITE_ROW) {
SQLite3_row *r=new SQLite3_row(cols);
r->add_fields(statement);
if (hash1) {
for (int i=0; i<cols;i++) {
if (r->fields[i]) {
myhash.Update(r->fields[i],r->sizes[i]);
} else {
myhash.Update("",0);
}
}
}
#else
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
@ -4603,6 +4744,11 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
if (statement) {
sqlite3_finalize(statement);
}
if (hash1) {
uint64_t h1, h2;
myhash.Final(&h1, &h2);
*hash1 = h1;
}
#else
if (resultset) delete resultset;
#endif
@ -4610,6 +4756,69 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype, char *
}
void ProxySQL_Admin::dump_checksums_values_table() {
char *q = (char *)"REPLACE INTO runtime_checksums_values VALUES (?1 , ?2 , ?3 , ?4)";
sqlite3_stmt *statement1 = NULL;
sqlite3 *mydb3 = admindb->get_db();
rc=sqlite3_prepare_v2(mydb3, q, -1, &statement1, 0);
assert(rc==SQLITE_OK);
pthread_mutex_lock(&GloVars.checksum_mutex);
admindb->execute((char *)"BEGIN");
admindb->execute((char *)"DELETE FROM runtime_checksums_values");
rc=sqlite3_bind_text(statement1, 1, "admin_variables", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.admin_variables.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.admin_variables.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.admin_variables.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 1, "mysql_query_rules", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.mysql_query_rules.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.mysql_query_rules.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.mysql_query_rules.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 1, "mysql_servers", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.mysql_servers.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.mysql_servers.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.mysql_servers.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 1, "mysql_users", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.mysql_users.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.mysql_users.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.mysql_users.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 1, "mysql_variables", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.mysql_variables.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.mysql_variables.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.mysql_variables.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 1, "proxysql_servers", -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, GloVars.checksums_values.proxysql_servers.version); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, GloVars.checksums_values.proxysql_servers.epoch); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 4, GloVars.checksums_values.proxysql_servers.checksum, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
admindb->execute((char *)"COMMIT");
pthread_mutex_unlock(&GloVars.checksum_mutex);
sqlite3_finalize(statement1);
}
void ProxySQL_Admin::save_mysql_users_runtime_to_database(bool _runtime) {
char *query=NULL;
if (_runtime) {
@ -5104,6 +5313,20 @@ char * ProxySQL_Admin::load_mysql_query_rules_to_runtime() {
proxy_error("Error on %s : %s\n", query, error);
} else {
GloQPro->wrlock();
if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
uint64_t hash1 = resultset->raw_checksum();
uint32_t d32[2];
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
GloVars.checksums_values.mysql_query_rules.set_checksum(buf);
GloVars.checksums_values.mysql_query_rules.version++;
time_t t = time(NULL);
GloVars.checksums_values.mysql_query_rules.epoch = t;
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
}
GloQPro->reset_all(false);
QP_rule_t * nqpr;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
@ -6151,6 +6374,20 @@ void ProxySQL_Admin::load_proxysql_servers_to_runtime() {
proxy_error("Error on %s : %s\n", query, error);
} else {
GloProxyCluster->load_servers_list(resultset);
// if (checksum_variables.checksum_mysql_query_rules) {
pthread_mutex_lock(&GloVars.checksum_mutex);
uint64_t hash1 = resultset->raw_checksum();
uint32_t d32[2];
char buf[20];
memcpy(&d32, &hash1, sizeof(hash1));
sprintf(buf,"0x%0X%0X", d32[0], d32[1]);
GloVars.checksums_values.proxysql_servers.set_checksum(buf);
GloVars.checksums_values.proxysql_servers.version++;
time_t t = time(NULL);
GloVars.checksums_values.proxysql_servers.epoch = t;
GloVars.epoch_version = t;
pthread_mutex_unlock(&GloVars.checksum_mutex);
// }
}
if (resultset) delete resultset;
resultset=NULL;

@ -37,6 +37,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
//char *query1 = (char *)"SELECT 1"; // in future this will be used for "light check"
char *query2 = (char *)"SELECT * FROM stats_mysql_global ORDER BY Variable_Name";
char *query3 = (char *)"SELECT * FROM runtime_checksums_values ORDER BY name";
char *username = NULL;
char *password = NULL;
bool rc_bool = true;
@ -54,6 +55,11 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
GloProxyCluster->get_credentials(&username, &password);
// TODO: add options, like timeout
if (strlen(username)) { // do not monitor if the username is empty
unsigned int timeout = 1;
unsigned int timeout_long = 60;
mysql_options(conn, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(conn, MYSQL_OPT_READ_TIMEOUT, &timeout_long);
mysql_options(conn, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
rc_conn = mysql_real_connect(conn, node->hostname, username, password, NULL, node->port, NULL, 0);
char *query = query2;
if (rc_conn) {
@ -67,6 +73,7 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
rc_bool = GloProxyCluster->Update_Node_Metrics(node->hostname, node->port, result, elapsed_time_us);
mysql_free_result(result);
unsigned long long elapsed_time_ms = elapsed_time_us / 1000;
/*
int e_ms = (int)elapsed_time_ms;
//fprintf(stderr,"Elapsed time = %d ms\n", e_ms);
int ci = __sync_fetch_and_add(&GloProxyCluster->cluster_check_interval_ms,0);
@ -75,8 +82,28 @@ void * ProxySQL_Cluster_Monitor_thread(void *args) {
usleep((ci-e_ms)*1000); // remember, usleep is in us
}
}
*/
query = query3;
unsigned long long before_query_time2=monotonic_time();
rc_query = mysql_query(conn,query);
if ( rc_query == 0 ) {
MYSQL_RES *result = mysql_store_result(conn);
unsigned long long after_query_time2=monotonic_time();
unsigned long long elapsed_time_us2 = (after_query_time2 - before_query_time2);
rc_bool = GloProxyCluster->Update_Node_Checksums(node->hostname, node->port, result);
mysql_free_result(result);
unsigned long long elapsed_time_ms2 = elapsed_time_us2 / 1000;
int e_ms = (int)elapsed_time_ms + int(elapsed_time_ms2);
//fprintf(stderr,"Elapsed time = %d ms\n", e_ms);
int ci = __sync_fetch_and_add(&GloProxyCluster->cluster_check_interval_ms,0);
if (ci > e_ms) {
if (rc_bool) {
usleep((ci-e_ms)*1000); // remember, usleep is in us
}
}
}
}
}
if (glovars.shutdown == 0) {
// we arent' shutting down, but the query failed
@ -204,6 +231,47 @@ ProxySQL_Node_Metrics * ProxySQL_Node_Entry::get_metrics_prev() {
return m;
}
void ProxySQL_Node_Entry::set_checksums(MYSQL_RES *_r) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(_r))) {
if (strcmp(row[0],"admin_variables")==0) {
checksums_values.admin_variables.version = atoll(row[1]);
checksums_values.admin_variables.epoch = atoll(row[2]);
strcpy(checksums_values.admin_variables.checksum, row[3]);
continue;
}
if (strcmp(row[0],"mysql_query_rules")==0) {
checksums_values.mysql_query_rules.version = atoll(row[1]);
checksums_values.mysql_query_rules.epoch = atoll(row[2]);
strcpy(checksums_values.mysql_query_rules.checksum, row[3]);
continue;
}
if (strcmp(row[0],"mysql_servers")==0) {
checksums_values.mysql_servers.version = atoll(row[1]);
checksums_values.mysql_servers.epoch = atoll(row[2]);
strcpy(checksums_values.mysql_servers.checksum, row[3]);
continue;
}
if (strcmp(row[0],"mysql_users")==0) {
checksums_values.mysql_users.version = atoll(row[1]);
checksums_values.mysql_users.epoch = atoll(row[2]);
strcpy(checksums_values.mysql_users.checksum, row[3]);
continue;
}
if (strcmp(row[0],"mysql_variables")==0) {
checksums_values.mysql_variables.version = atoll(row[1]);
checksums_values.mysql_variables.epoch = atoll(row[2]);
strcpy(checksums_values.mysql_variables.checksum, row[3]);
continue;
}
if (strcmp(row[0],"proxysql_servers")==0) {
checksums_values.proxysql_servers.version = atoll(row[1]);
checksums_values.proxysql_servers.epoch = atoll(row[2]);
strcpy(checksums_values.proxysql_servers.checksum, row[3]);
continue;
}
}
}
void ProxySQL_Node_Entry::set_metrics(MYSQL_RES *_r, unsigned long long _response_time) {
MYSQL_ROW row;
metrics_idx_prev = metrics_idx;
@ -323,6 +391,21 @@ void ProxySQL_Cluster_Nodes::load_servers_list(SQLite3_result *resultset) {
pthread_mutex_unlock(&mutex);
}
// if it returns false , the node doesn't exist anymore and the monitor should stop
bool ProxySQL_Cluster_Nodes::Update_Node_Checksums(char * _h, uint16_t _p, MYSQL_RES *_r) {
bool ret = false;
uint64_t hash_ = generate_hash(_h, _p);
pthread_mutex_lock(&mutex);
std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator ite = umap_proxy_nodes.find(hash_);
if (ite != umap_proxy_nodes.end()) {
ProxySQL_Node_Entry * node = ite->second;
node->set_checksums(_r);
ret = true;
}
pthread_mutex_unlock(&mutex);
return ret;
}
// if it returns false , the node doesn't exist anymore and the monitor should stop
bool ProxySQL_Cluster_Nodes::Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_RES *_r, unsigned long long _response_time) {
bool ret = false;
@ -338,6 +421,78 @@ bool ProxySQL_Cluster_Nodes::Update_Node_Metrics(char * _h, uint16_t _p, MYSQL_R
return ret;
}
SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_checksums() {
const int colnum=6;
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"hostname");
result->add_column_definition(SQLITE_TEXT,"port");
result->add_column_definition(SQLITE_TEXT,"name");
result->add_column_definition(SQLITE_TEXT,"version");
result->add_column_definition(SQLITE_TEXT,"epoch");
result->add_column_definition(SQLITE_TEXT,"checksum");
char buf[32];
int k;
pthread_mutex_lock(&mutex);
unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
ProxySQL_Checksum_Value * vals[6];
vals[0] = &node->checksums_values.admin_variables;
vals[1] = &node->checksums_values.mysql_query_rules;
vals[2] = &node->checksums_values.mysql_servers;
vals[3] = &node->checksums_values.mysql_users;
vals[4] = &node->checksums_values.mysql_variables;
vals[5] = &node->checksums_values.proxysql_servers;
for (int i=0; i<6 ; i++) {
ProxySQL_Checksum_Value *v = vals[i];
char **pta=(char **)malloc(sizeof(char *)*colnum);
pta[0]=strdup(node->get_hostname());
sprintf(buf,"%d", node->get_port());
pta[1]=strdup(buf);
switch (i) {
case 0:
pta[2]=strdup((char *)"admin_variables");
break;
case 1:
pta[2]=strdup((char *)"mysql_query_rules");
break;
case 2:
pta[2]=strdup((char *)"mysql_servers");
break;
case 3:
pta[2]=strdup((char *)"mysql_users");
break;
case 4:
pta[2]=strdup((char *)"mysql_variables");
break;
case 5:
pta[2]=strdup((char *)"proxysql_servers");
break;
default:
break;
}
sprintf(buf,"%llu", v->version);
pta[3]=strdup(buf);
sprintf(buf,"%llu", v->epoch);
pta[4]=strdup(buf);
pta[5]=strdup(v->checksum);
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])
free(pta[k]);
}
free(pta);
}
it++;
}
pthread_mutex_unlock(&mutex);
return result;
}
SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_metrics() {
const int colnum=10;
SQLite3_result *result=new SQLite3_result(colnum);
@ -351,12 +506,12 @@ SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_metrics() {
result->add_column_definition(SQLITE_TEXT,"Queries");
result->add_column_definition(SQLITE_TEXT,"Client_Connections_connected");
result->add_column_definition(SQLITE_TEXT,"Client_Connections_created");
char buf[32];
int k;
pthread_mutex_lock(&mutex);
unsigned long long curtime = monotonic_time();
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
for( std::unordered_map<uint64_t, ProxySQL_Node_Entry *>::iterator it = umap_proxy_nodes.begin(); it != umap_proxy_nodes.end(); ) {
ProxySQL_Node_Entry * node = it->second;
char **pta=(char **)malloc(sizeof(char *)*colnum);
pta[0]=strdup(node->get_hostname());
@ -379,7 +534,7 @@ SQLite3_result * ProxySQL_Cluster_Nodes::stats_proxysql_servers_metrics() {
pta[8]=strdup(buf);
sprintf(buf,"%llu", curr->Client_Connections_created);
pta[9]=strdup(buf);
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])

@ -65,6 +65,8 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() {
#endif /* SO_REUSEPORT */
// global.use_proxysql_mem=false;
pthread_mutex_init(&global.start_mutex,NULL);
pthread_mutex_init(&checksum_mutex,NULL);
epoch_version = 0;
#ifdef DEBUG
global.gdb=0;
#endif

@ -252,7 +252,7 @@ void SQLite3DB::wrunlock() {
#endif
}
int64_t SQLite3_result::raw_checksum() {
uint64_t SQLite3_result::raw_checksum() {
if (this->rows_count==0) return 0;
uint64_t hash1, hash2;
SpookyHash myhash;
@ -276,6 +276,7 @@ int64_t SQLite3_result::raw_checksum() {
char *SQLite3_result::checksum() {
uint64_t hash1=raw_checksum();
char buf[128];
memset(buf,'0',128);
uint32_t d32[2];
memcpy(&d32,&hash1,sizeof(hash1));
sprintf(buf,"0x%X%X", d32[0], d32[1]);

Loading…
Cancel
Save