From ea880ffaf3d8fb617b3fac106fc6f6d460ad38d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Fri, 28 Aug 2015 21:23:38 +0000 Subject: [PATCH] More development on #358 Logging on table mysql_server_replication_lag_log List of servers is retrieved from MyHGM --- include/MySQL_HostGroups_Manager.h | 9 +++-- include/MySQL_Monitor.hpp | 2 +- lib/MySQL_HostGroups_Manager.cpp | 59 ++++++++++++++++-------------- lib/MySQL_Monitor.cpp | 10 ++++- lib/ProxySQL_Admin.cpp | 20 +++++----- 5 files changed, 57 insertions(+), 43 deletions(-) diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index e05ee23ce..184c10b6a 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -3,8 +3,8 @@ #include "proxysql.h" #include "cpp.h" -#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )" -#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , PRIMARY KEY (hostgroup_id, hostname, port))" +#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )" +#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port))" class MySrvConnList; @@ -45,6 +45,7 @@ class MySrvC { // MySQL Server Container enum MySerStatus status; unsigned int compression; unsigned int max_connections; + unsigned int max_replication_lag; unsigned int connect_OK; unsigned int connect_ERR; time_t time_last_detected_error; @@ -54,7 +55,7 @@ class MySrvC { // MySQL Server Container //uint8_t charset; MySrvConnList *ConnectionsUsed; MySrvConnList *ConnectionsFree; - MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections); + MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections, unsigned int _max_replication_lag); ~MySrvC(); void connect_error(); }; @@ -112,7 +113,7 @@ class MySQL_HostGroups_Manager { void rdunlock(); void wrlock(); void wrunlock(); - bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100); + bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100, unsigned int _max_replication_lag=0); bool commit(); SQLite3_result * execute_query(char *query, char **error); diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index a2b424c11..0a5d02af2 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -14,7 +14,7 @@ #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG "CREATE TABLE mysql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , ping_success_time INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start))" -#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))" +#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))" class MySQL_Monitor_Connection_Pool; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index 74107ace1..00350e207 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -102,13 +102,14 @@ void MySrvConnList::drop_all_connections() { } -MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections) { +MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag) { address=strdup(add); port=p; weight=_weight; status=_status; compression=_compression; max_connections=_max_connections; + max_replication_lag=_max_replication_lag; connect_OK=0; connect_ERR=0; queries_sent=0; @@ -253,12 +254,12 @@ void MySQL_HostGroups_Manager::wrunlock() { // add a new row in mysql_servers_incoming // we always assume that the calling thread has acquired a rdlock() -bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, unsigned int _comp /*, uint8_t _charset */, unsigned int _max_connections) { +bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, unsigned int _comp /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag) { bool ret; - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression, max_connections %d\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */ , _max_connections); - char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u, %u, %u)"; + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression, max_connections %d, max_replication_lag %u\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */ , _max_connections, _max_replication_lag); + char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u, %u, %u, %u)"; char *query=(char *)malloc(strlen(q)+strlen(add)+100); - sprintf(query,q,hid,add,p,_weight,status,_comp /*,_charset */, _max_connections); + sprintf(query,q,hid,add,p,_weight,status,_comp /*,_charset */, _max_connections, _max_replication_lag); ret=mydb->execute(query); free(query); return ret; @@ -299,12 +300,12 @@ bool MySQL_HostGroups_Manager::commit() { if (resultset) { delete resultset; resultset=NULL; } // INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n"); - mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming"); +// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n"); + mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag FROM mysql_servers_incoming"); // SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet) - query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections"; + query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag"; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); if (error) { @@ -312,34 +313,38 @@ bool MySQL_HostGroups_Manager::commit() { } else { for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; - long long ptr=atoll(r->fields[7]); + long long ptr=atoll(r->fields[8]); proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]), atoi(r->fields[5])); //fprintf(stderr,"%lld\n", ptr); if (ptr==0) { proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Creating new server %s:%d , weight=%d, status=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]) ); - MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6])); + MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7])); proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), mysrvc, atoi(r->fields[0])); add(mysrvc,atoi(r->fields[0])); } else { MySrvC *mysrvc=(MySrvC *)ptr; - if (atoi(r->fields[3])!=atoi(r->fields[8])) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[8])); - mysrvc->weight=atoi(r->fields[8]); + if (atoi(r->fields[3])!=atoi(r->fields[9])) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[9])); + mysrvc->weight=atoi(r->fields[9]); } - if (atoi(r->fields[4])!=atoi(r->fields[9])) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[9])); - mysrvc->status=(MySerStatus)atoi(r->fields[9]); + if (atoi(r->fields[4])!=atoi(r->fields[10])) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[10])); + mysrvc->status=(MySerStatus)atoi(r->fields[10]); if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) { mysrvc->shunned_automatic=false; } } - if (atoi(r->fields[5])!=atoi(r->fields[10])) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[10])); - mysrvc->compression=atoi(r->fields[10]); + if (atoi(r->fields[5])!=atoi(r->fields[11])) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[11])); + mysrvc->compression=atoi(r->fields[11]); } - if (atoi(r->fields[6])!=atoi(r->fields[11])) { - proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_connections for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[11])); - mysrvc->max_connections=atoi(r->fields[11]); + if (atoi(r->fields[6])!=atoi(r->fields[12])) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_connections for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->max_connections , atoi(r->fields[12])); + mysrvc->max_connections=atoi(r->fields[12]); + } + if (atoi(r->fields[7])!=atoi(r->fields[13])) { + proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_replication_lag for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->max_replication_lag , atoi(r->fields[13])); + mysrvc->max_replication_lag=atoi(r->fields[13]); } } } @@ -369,9 +374,9 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() { for (unsigned int j=0; jmysrvs->servers->len; j++) { mysrvc=myhgc->mysrvs->idx(j); uintptr_t ptr=(uintptr_t)mysrvc; - char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%u,%u,%llu)"; - char *query=(char *)malloc(strlen(q)+8+strlen(mysrvc->address)+8+8+8+8+16+32); - sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, mysrvc->compression, mysrvc->max_connections, ptr); + char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%u,%u,%u,%llu)"; + char *query=(char *)malloc(strlen(q)+8+strlen(mysrvc->address)+8+8+8+8+8+16+32); + sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, mysrvc->compression, mysrvc->max_connections, mysrvc->max_replication_lag, ptr); char *st; switch (mysrvc->status) { case 0: @@ -390,7 +395,7 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() { assert(0); break; } - fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %d , status: %s , max_connections: %u\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections); + fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %d , status: %s , max_connections: %u , max_replication_lag: %u\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections, mysrvc->max_replication_lag); proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); //fprintf(stderr,"%s\n",query); mydb->execute(query); @@ -405,7 +410,7 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END, compression, max_connections FROM mysql_servers"; + char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END, compression, max_connections, max_replication_lag FROM mysql_servers"; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset); wrunlock(); diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index d9319e1de..919694fa7 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -145,6 +145,8 @@ class MySQL_Monitor_State_Data { int interr; char * mysql_error_msg; MYSQL_ROW *row; + unsigned int repl_lag; + unsigned int hostgroup_id; MySQL_Monitor_State_Data(char *h, int p, struct event_base *b) { task_id=MON_CONNECT; mysql=NULL; @@ -806,7 +808,7 @@ void * MySQL_Monitor::monitor_replication_lag() { SQLite3_result *resultset=NULL; MySQL_Monitor_State_Data **sds=NULL; int i=0; - char *query=(char *)"SELECT hostgroup_id, hostname, port FROM mysql_servers"; + char *query=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT LIKE 'OFFLINE%'"; t1=monotonic_time(); if (t1 < next_loop_at) { @@ -845,6 +847,8 @@ void * MySQL_Monitor::monitor_replication_lag() { SQLite3_row *r=*it; sds[i] = new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]),libevent_base); sds[i]->task_id=MON_REPLICATION_LAG; + sds[i]->hostgroup_id=atoi(r->fields[0]); + sds[i]->repl_lag=atoi(r->fields[3]); replication_lag__num_active_connections++; total_replication_lag__num_active_connections++; MySQL_Monitor_State_Data *_mmsd=sds[i]; @@ -904,7 +908,9 @@ __end_monitor_replication_lag_loop: } if (j>-1) { MYSQL_ROW row=mysql_fetch_row(mmsd->result); - repl_lag=atoi(row[j]); + if (row[j]) { + repl_lag=atoi(row[j]); + } } if (repl_lag>=0) { rc=sqlite3_bind_int64(statement, 5, repl_lag); assert(rc==SQLITE_OK); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 4e9d32834..9fa6240df 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -52,7 +52,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define LINESIZE 2048 -#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , PRIMARY KEY (hostgroup_id, hostname, port) )" +#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )" #define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))" #define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0, match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)" #define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)" @@ -2708,11 +2708,11 @@ void ProxySQL_Admin::save_mysql_servers_runtime_to_database() { admindb->execute(query); SQLite3_result *resultset=MyHGM->dump_table_mysql_servers(); if (!resultset) return; - char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s)"; + char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s,%s)"; for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { SQLite3_row *r=*it; - char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+16); - sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6]); + char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+strlen(r->fields[7])+16); + sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6], r->fields[7]); proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query); //fprintf(stderr,"%s\n",query); admindb->execute(query); @@ -2727,7 +2727,7 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() { int cols=0; int affected_rows=0; SQLite3_result *resultset=NULL; - char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections FROM main.mysql_servers"; + char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag FROM main.mysql_servers"; proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); //MyHGH->wrlock(); @@ -2750,8 +2750,8 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() { } } } - proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d , compression=%d , max_connections=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6])); - MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status, atoi(r->fields[5]), atoi(r->fields[6])); + proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d , compression=%d , max_connections=%d , max_replication_lag=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7])); + MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status, atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7])); //MyHGH->server_add_hg(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3])); } } @@ -2987,7 +2987,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { int i; int rows=0; admindb->execute("PRAGMA foreign_keys = OFF"); - char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d)"; + char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d, %d)"; for (i=0; i< count; i++) { const Setting &server = mysql_servers[i]; std::string address; @@ -2997,6 +2997,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { int weight=1; int compression=0; int max_connections=1000; // default + int max_replication_lag=0; // default if (server.lookupValue("address", address)==false) continue; if (server.lookupValue("port", port)==false) continue; if (server.lookupValue("hostgroup", hostgroup)==false) continue; @@ -3012,8 +3013,9 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() { server.lookupValue("compression", compression); server.lookupValue("weight", weight); server.lookupValue("max_connections", max_connections); + server.lookupValue("max_replication_lag", max_replication_lag); char *query=(char *)malloc(strlen(q)+strlen(status.c_str())+strlen(address.c_str())+128); - sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections); + sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag); //fprintf(stderr, "%s\n", query); admindb->execute(query); free(query);