More development on #358

Logging on table mysql_server_replication_lag_log
List of servers is retrieved from MyHGM
pull/378/head
René Cannaò 11 years ago
parent 71b26a7ae0
commit ea880ffaf3

@ -3,8 +3,8 @@
#include "proxysql.h"
#include "cpp.h"
#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , PRIMARY KEY (hostgroup_id, hostname, port))"
#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port))"
class MySrvConnList;
@ -45,6 +45,7 @@ class MySrvC { // MySQL Server Container
enum MySerStatus status;
unsigned int compression;
unsigned int max_connections;
unsigned int max_replication_lag;
unsigned int connect_OK;
unsigned int connect_ERR;
time_t time_last_detected_error;
@ -54,7 +55,7 @@ class MySrvC { // MySQL Server Container
//uint8_t charset;
MySrvConnList *ConnectionsUsed;
MySrvConnList *ConnectionsFree;
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections);
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections, unsigned int _max_replication_lag);
~MySrvC();
void connect_error();
};
@ -112,7 +113,7 @@ class MySQL_HostGroups_Manager {
void rdunlock();
void wrlock();
void wrunlock();
bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100);
bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100, unsigned int _max_replication_lag=0);
bool commit();
SQLite3_result * execute_query(char *query, char **error);

@ -14,7 +14,7 @@
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG "CREATE TABLE mysql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , ping_success_time INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
class MySQL_Monitor_Connection_Pool;

@ -102,13 +102,14 @@ void MySrvConnList::drop_all_connections() {
}
MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections) {
MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag) {
address=strdup(add);
port=p;
weight=_weight;
status=_status;
compression=_compression;
max_connections=_max_connections;
max_replication_lag=_max_replication_lag;
connect_OK=0;
connect_ERR=0;
queries_sent=0;
@ -253,12 +254,12 @@ void MySQL_HostGroups_Manager::wrunlock() {
// add a new row in mysql_servers_incoming
// we always assume that the calling thread has acquired a rdlock()
bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, unsigned int _comp /*, uint8_t _charset */, unsigned int _max_connections) {
bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, unsigned int _comp /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag) {
bool ret;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression, max_connections %d\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */ , _max_connections);
char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u, %u, %u)";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression, max_connections %d, max_replication_lag %u\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */ , _max_connections, _max_replication_lag);
char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u, %u, %u, %u)";
char *query=(char *)malloc(strlen(q)+strlen(add)+100);
sprintf(query,q,hid,add,p,_weight,status,_comp /*,_charset */, _max_connections);
sprintf(query,q,hid,add,p,_weight,status,_comp /*,_charset */, _max_connections, _max_replication_lag);
ret=mydb->execute(query);
free(query);
return ret;
@ -299,12 +300,12 @@ bool MySQL_HostGroups_Manager::commit() {
if (resultset) { delete resultset; resultset=NULL; }
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming");
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag FROM mysql_servers_incoming");
// SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections";
query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
@ -312,34 +313,38 @@ bool MySQL_HostGroups_Manager::commit() {
} else {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
long long ptr=atoll(r->fields[7]);
long long ptr=atoll(r->fields[8]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]), atoi(r->fields[5]));
//fprintf(stderr,"%lld\n", ptr);
if (ptr==0) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Creating new server %s:%d , weight=%d, status=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]) );
MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]));
MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]));
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), mysrvc, atoi(r->fields[0]));
add(mysrvc,atoi(r->fields[0]));
} else {
MySrvC *mysrvc=(MySrvC *)ptr;
if (atoi(r->fields[3])!=atoi(r->fields[8])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[8]));
mysrvc->weight=atoi(r->fields[8]);
if (atoi(r->fields[3])!=atoi(r->fields[9])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[9]));
mysrvc->weight=atoi(r->fields[9]);
}
if (atoi(r->fields[4])!=atoi(r->fields[9])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[9]));
mysrvc->status=(MySerStatus)atoi(r->fields[9]);
if (atoi(r->fields[4])!=atoi(r->fields[10])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[10]));
mysrvc->status=(MySerStatus)atoi(r->fields[10]);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic=false;
}
}
if (atoi(r->fields[5])!=atoi(r->fields[10])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[10]));
mysrvc->compression=atoi(r->fields[10]);
if (atoi(r->fields[5])!=atoi(r->fields[11])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[11]));
mysrvc->compression=atoi(r->fields[11]);
}
if (atoi(r->fields[6])!=atoi(r->fields[11])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_connections for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[11]));
mysrvc->max_connections=atoi(r->fields[11]);
if (atoi(r->fields[6])!=atoi(r->fields[12])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_connections for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->max_connections , atoi(r->fields[12]));
mysrvc->max_connections=atoi(r->fields[12]);
}
if (atoi(r->fields[7])!=atoi(r->fields[13])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_replication_lag for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->max_replication_lag , atoi(r->fields[13]));
mysrvc->max_replication_lag=atoi(r->fields[13]);
}
}
}
@ -369,9 +374,9 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
uintptr_t ptr=(uintptr_t)mysrvc;
char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%u,%u,%llu)";
char *query=(char *)malloc(strlen(q)+8+strlen(mysrvc->address)+8+8+8+8+16+32);
sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, mysrvc->compression, mysrvc->max_connections, ptr);
char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%u,%u,%u,%llu)";
char *query=(char *)malloc(strlen(q)+8+strlen(mysrvc->address)+8+8+8+8+8+16+32);
sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, mysrvc->compression, mysrvc->max_connections, mysrvc->max_replication_lag, ptr);
char *st;
switch (mysrvc->status) {
case 0:
@ -390,7 +395,7 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
assert(0);
break;
}
fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %d , status: %s , max_connections: %u\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections);
fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %d , status: %s , max_connections: %u , max_replication_lag: %u\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections, mysrvc->max_replication_lag);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
//fprintf(stderr,"%s\n",query);
mydb->execute(query);
@ -405,7 +410,7 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END, compression, max_connections FROM mysql_servers";
char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END, compression, max_connections, max_replication_lag FROM mysql_servers";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();

@ -145,6 +145,8 @@ class MySQL_Monitor_State_Data {
int interr;
char * mysql_error_msg;
MYSQL_ROW *row;
unsigned int repl_lag;
unsigned int hostgroup_id;
MySQL_Monitor_State_Data(char *h, int p, struct event_base *b) {
task_id=MON_CONNECT;
mysql=NULL;
@ -806,7 +808,7 @@ void * MySQL_Monitor::monitor_replication_lag() {
SQLite3_result *resultset=NULL;
MySQL_Monitor_State_Data **sds=NULL;
int i=0;
char *query=(char *)"SELECT hostgroup_id, hostname, port FROM mysql_servers";
char *query=(char *)"SELECT hostgroup_id, hostname, port, max_replication_lag FROM mysql_servers WHERE max_replication_lag > 0 AND status NOT LIKE 'OFFLINE%'";
t1=monotonic_time();
if (t1 < next_loop_at) {
@ -845,6 +847,8 @@ void * MySQL_Monitor::monitor_replication_lag() {
SQLite3_row *r=*it;
sds[i] = new MySQL_Monitor_State_Data(r->fields[1],atoi(r->fields[2]),libevent_base);
sds[i]->task_id=MON_REPLICATION_LAG;
sds[i]->hostgroup_id=atoi(r->fields[0]);
sds[i]->repl_lag=atoi(r->fields[3]);
replication_lag__num_active_connections++;
total_replication_lag__num_active_connections++;
MySQL_Monitor_State_Data *_mmsd=sds[i];
@ -904,7 +908,9 @@ __end_monitor_replication_lag_loop:
}
if (j>-1) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
repl_lag=atoi(row[j]);
if (row[j]) {
repl_lag=atoi(row[j]);
}
}
if (repl_lag>=0) {
rc=sqlite3_bind_int64(statement, 5, repl_lag); assert(rc==SQLITE_OK);

@ -52,7 +52,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define LINESIZE 2048
#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , default_schema VARCHAR , schema_locked INT CHECK (schema_locked IN (0,1)) NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))"
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0, match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)"
@ -2708,11 +2708,11 @@ void ProxySQL_Admin::save_mysql_servers_runtime_to_database() {
admindb->execute(query);
SQLite3_result *resultset=MyHGM->dump_table_mysql_servers();
if (!resultset) return;
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s)";
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s,%s)";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6]);
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+strlen(r->fields[7])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6], r->fields[7]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
//fprintf(stderr,"%s\n",query);
admindb->execute(query);
@ -2727,7 +2727,7 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections FROM main.mysql_servers";
char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag FROM main.mysql_servers";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
//MyHGH->wrlock();
@ -2750,8 +2750,8 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
}
}
}
proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d , compression=%d , max_connections=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]));
MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status, atoi(r->fields[5]), atoi(r->fields[6]));
proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d , compression=%d , max_connections=%d , max_replication_lag=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]));
MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status, atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]));
//MyHGH->server_add_hg(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]));
}
}
@ -2987,7 +2987,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
int i;
int rows=0;
admindb->execute("PRAGMA foreign_keys = OFF");
char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d)";
char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d, %d)";
for (i=0; i< count; i++) {
const Setting &server = mysql_servers[i];
std::string address;
@ -2997,6 +2997,7 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
int weight=1;
int compression=0;
int max_connections=1000; // default
int max_replication_lag=0; // default
if (server.lookupValue("address", address)==false) continue;
if (server.lookupValue("port", port)==false) continue;
if (server.lookupValue("hostgroup", hostgroup)==false) continue;
@ -3012,8 +3013,9 @@ int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
server.lookupValue("compression", compression);
server.lookupValue("weight", weight);
server.lookupValue("max_connections", max_connections);
server.lookupValue("max_replication_lag", max_replication_lag);
char *query=(char *)malloc(strlen(q)+strlen(status.c_str())+strlen(address.c_str())+128);
sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections);
sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
free(query);

Loading…
Cancel
Save