Merge pull request #412 from renecannao/replication_hostgroups

Replication hostgroups
1.0
René Cannaò 11 years ago
commit 8dfde1735d

@ -5,7 +5,7 @@
#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port))"
#define MYHGM_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))"
class MySrvConnList;
class MySrvC;
@ -89,6 +89,7 @@ class MyHGC { // MySQL Host Group Container
class MySQL_HostGroups_Manager {
private:
SQLite3DB *admindb;
SQLite3DB *mydb;
rwlock_t rwlock;
PtrArray *MyHostGroups;
@ -97,7 +98,10 @@ class MySQL_HostGroups_Manager {
MyHGC * MyHGC_create(unsigned int);
void add(MySrvC *, unsigned int);
void purge_mysql_servers_table();
void generate_mysql_servers_table();
void generate_mysql_replication_hostgroups_table();
SQLite3_result *incoming_replication_hostgroups;
public:
struct {
@ -119,8 +123,10 @@ class MySQL_HostGroups_Manager {
bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, unsigned int _comp=0, unsigned int _max_connections=100, unsigned int _max_replication_lag=0);
bool commit();
void set_incoming_replication_hostgroups(SQLite3_result *);
SQLite3_result * execute_query(char *query, char **error);
SQLite3_result *dump_table_mysql_servers();
SQLite3_result *dump_table_mysql_replication_hostgroups();
MyHGC * MyHGC_lookup(unsigned int);
void MyConn_add_to_pool(MySQL_Connection *);
@ -134,6 +140,7 @@ class MySQL_HostGroups_Manager {
void destroy_MyConn_from_pool(MySQL_Connection *);
void replication_lag_action(int, char*, unsigned int, int);
void read_only_action(char *hostname, int port, int read_only);
};
#endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */

@ -14,6 +14,8 @@
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG "CREATE TABLE mysql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , ping_success_time INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG "CREATE TABLE mysql_server_read_only_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , read_only INT DEFAULT 1 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
@ -37,6 +39,7 @@ class MySQL_Monitor {
void print_version();
void * monitor_connect();
void * monitor_ping();
void * monitor_read_only();
void * monitor_replication_lag();
void * run();
};

@ -249,6 +249,9 @@ class MySQL_Threads_Handler
int monitor_connect_timeout;
int monitor_ping_interval;
int monitor_ping_timeout;
int monitor_read_only_interval;
int monitor_read_only_timeout;
bool monitor_writer_is_also_reader;
int monitor_replication_lag_interval;
int monitor_replication_lag_timeout;
int monitor_query_interval;

@ -705,6 +705,9 @@ __thread int mysql_thread___monitor_connect_interval;
__thread int mysql_thread___monitor_connect_timeout;
__thread int mysql_thread___monitor_ping_interval;
__thread int mysql_thread___monitor_ping_timeout;
__thread int mysql_thread___monitor_read_only_interval;
__thread int mysql_thread___monitor_read_only_timeout;
__thread bool mysql_thread___monitor_writer_is_also_reader;
__thread int mysql_thread___monitor_replication_lag_interval;
__thread int mysql_thread___monitor_replication_lag_timeout;
__thread int mysql_thread___monitor_query_interval;
@ -760,6 +763,9 @@ extern __thread int mysql_thread___monitor_connect_interval;
extern __thread int mysql_thread___monitor_connect_timeout;
extern __thread int mysql_thread___monitor_ping_interval;
extern __thread int mysql_thread___monitor_ping_timeout;
extern __thread int mysql_thread___monitor_read_only_interval;
extern __thread int mysql_thread___monitor_read_only_timeout;
extern __thread bool mysql_thread___monitor_writer_is_also_reader;
extern __thread int mysql_thread___monitor_replication_lag_interval;
extern __thread int mysql_thread___monitor_replication_lag_timeout;
extern __thread int mysql_thread___monitor_query_interval;

@ -8,6 +8,7 @@
//#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, PRIMARY KEY (hostgroup_id, hostname, port) )"
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Threads_Handler *GloMTH;
@ -221,11 +222,14 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
status.myconnpoll_push=0;
status.myconnpoll_destroy=0;
spinlock_rwlock_init(&rwlock);
admindb=NULL; // initialized only if needed
mydb=new SQLite3DB();
mydb->open((char *)"file:mem_mydb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
mydb->execute(MYHGM_MYSQL_SERVERS);
mydb->execute(MYHGM_MYSQL_SERVERS_INCOMING);
mydb->execute(MYHGM_MYSQL_REPLICATION_HOSTGROUPS);
MyHostGroups=new PtrArray();
incoming_replication_hostgroups=NULL;
}
MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
@ -235,6 +239,9 @@ MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
}
delete MyHostGroups;
delete mydb;
if (admindb) {
delete admindb;
}
}
void MySQL_HostGroups_Manager::rdlock() {
@ -280,6 +287,14 @@ SQLite3_result * MySQL_HostGroups_Manager::execute_query(char *query, char **err
}
bool MySQL_HostGroups_Manager::commit() {
// purge table
purge_mysql_servers_table();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char *error=NULL;
int cols=0;
int affected_rows=0;
@ -302,6 +317,9 @@ bool MySQL_HostGroups_Manager::commit() {
}
if (resultset) { delete resultset; resultset=NULL; }
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag FROM mysql_servers_incoming");
@ -359,8 +377,11 @@ bool MySQL_HostGroups_Manager::commit() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
// FIXME: scan all servers and recreate mysql_servers
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_replication_hostgroups");
generate_mysql_servers_table();
generate_mysql_replication_hostgroups_table();
wrunlock();
if (GloMTH) {
@ -369,6 +390,24 @@ bool MySQL_HostGroups_Manager::commit() {
return true;
}
void MySQL_HostGroups_Manager::purge_mysql_servers_table() {
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_OFFLINE_HARD) {
if (mysrvc->ConnectionsUsed->conns->len==0 && mysrvc->ConnectionsFree->conns->len==0) {
// no more connections for OFFLINE_HARD server, removing it
mysrvc=(MySrvC *)myhgc->mysrvs->servers->remove_index_fast(j);
delete mysrvc;
}
}
}
}
}
void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
proxy_info("New mysql_servers table\n");
for (unsigned int i=0; i<MyHostGroups->len; i++) {
@ -405,8 +444,30 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
}
}
void MySQL_HostGroups_Manager::generate_mysql_replication_hostgroups_table() {
if (incoming_replication_hostgroups==NULL)
return;
proxy_info("New mysql_replication_hostgroups table\n");
for (std::vector<SQLite3_row *>::iterator it = incoming_replication_hostgroups->rows.begin() ; it != incoming_replication_hostgroups->rows.end(); ++it) {
SQLite3_row *r=*it;
char query[256];
sprintf(query,"INSERT INTO mysql_replication_hostgroups VALUES(%s,%s)",r->fields[0],r->fields[1]);
mydb->execute(query);
fprintf(stderr,"writer_hostgroup: %s , reader_hostgroup: %s\n", r->fields[0],r->fields[1]);
}
incoming_replication_hostgroups=NULL;
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
wrlock();
// purge table
purge_mysql_servers_table();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char *error=NULL;
int cols=0;
int affected_rows=0;
@ -418,6 +479,19 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
return resultset;
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_replication_hostgroups() {
wrlock();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT writer_hostgroup, reader_hostgroup FROM mysql_replication_hostgroups";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
return resultset;
}
MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) {
MyHGC *myhgc=new MyHGC(_hid);
return myhgc;
@ -705,6 +779,10 @@ __exit_get_multiple_idle_connections:
return num_conn_current;
}
void MySQL_HostGroups_Manager::set_incoming_replication_hostgroups(SQLite3_result *s) {
incoming_replication_hostgroups=s;
}
SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
const int colnum=11;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping Connection Pool\n");
@ -787,3 +865,74 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
wrunlock();
return result;
}
void MySQL_HostGroups_Manager::read_only_action(char *hostname, int port, int read_only) {
// define queries
const char *Q1=(char *)"SELECT hostgroup_id FROM mysql_servers join mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup WHERE hostname='%s' AND port=%d AND status=0";
const char *Q2=(char *)"UPDATE OR IGNORE mysql_servers SET hostgroup_id=(SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id)";
const char *Q3A=(char *)"INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, status, weight, max_connections, max_replication_lag) SELECT reader_hostgroup, hostname, port, status, weight, max_connections, max_replication_lag FROM mysql_servers JOIN mysql_replication_hostgroups ON mysql_servers.hostgroup_id=mysql_replication_hostgroups.writer_hostgroup WHERE hostname='%s' AND port=%d";
const char *Q3B=(char *)"DELETE FROM mysql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id)";
const char *Q4=(char *)"UPDATE OR IGNORE mysql_servers SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id)";
const char *Q5=(char *)"DELETE FROM mysql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id)";
// define a buffer that will be used for all queries
char *query=(char *)malloc(strlen(hostname)+strlen(Q3A)+32);
sprintf(query,Q1,hostname,port);
int cols=0;
char *error=NULL;
int affected_rows=0;
SQLite3_result *resultset=NULL;
wrlock();
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
int num_rows=0;
if (resultset) {
num_rows=resultset->rows_count;
delete resultset;
}
if (GloAdmin==NULL) {
// quick exit
free(query);
return;
}
if (admindb==NULL) { // we initialize admindb only if needed
admindb=new SQLite3DB();
admindb->open((char *)"file:mem_admindb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
}
switch (read_only) {
case 0:
if (num_rows==0) {
// the server has read_only=0 , but we can't find any writer, so we perform a swap
GloAdmin->save_mysql_servers_runtime_to_database(); // SAVE MYSQL SERVERS FROM RUNTIME
sprintf(query,Q2,hostname,port);
admindb->execute(query);
if (mysql_thread___monitor_writer_is_also_reader) {
sprintf(query,Q3A,hostname,port);
} else {
sprintf(query,Q3B,hostname,port);
}
admindb->execute(query);
GloAdmin->load_mysql_servers_to_runtime(); // LOAD MYSQL SERVERS TO RUNTIME
}
break;
case 1:
if (num_rows) {
// the server has read_only=1 , but we find it as writer, so we perform a swap
GloAdmin->save_mysql_servers_runtime_to_database(); // SAVE MYSQL SERVERS FROM RUNTIME
sprintf(query,Q4,hostname,port);
admindb->execute(query);
sprintf(query,Q5,hostname,port);
admindb->execute(query);
GloAdmin->load_mysql_servers_to_runtime(); // LOAD MYSQL SERVERS TO RUNTIME
}
break;
default:
assert(0);
break;
}
free(query);
}

@ -35,7 +35,6 @@ static MySQL_Monitor *GloMyMon;
static void state_machine_handler(int fd, short event, void *arg);
/*
struct state_data {
int ST;
@ -58,6 +57,8 @@ static int ping__num_active_connections;
static int total_ping__num_active_connections=0;
static int replication_lag__num_active_connections;
static int total_replication_lag__num_active_connections=0;
static int read_only__num_active_connections;
static int total_read_only__num_active_connections=0;
struct cmp_str {
@ -125,6 +126,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS
enum MySQL_Monitor_State_Data_Task_Type {
MON_CONNECT,
MON_PING,
MON_READ_ONLY,
MON_REPLICATION_LAG
};
@ -223,6 +225,9 @@ again:
case MON_PING:
NEXT_IMMEDIATE(7);
break;
case MON_READ_ONLY:
NEXT_IMMEDIATE(20);
break;
case MON_REPLICATION_LAG:
NEXT_IMMEDIATE(10);
break;
@ -337,6 +342,72 @@ again:
}
break;
case 20:
if (mysql_thread___monitor_timer_cached==true) {
event_base_gettimeofday_cached(base, &tv_out);
} else {
evutil_gettimeofday(&tv_out, NULL);
}
t1=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
status=mysql_query_start(&interr,mysql,"SHOW GLOBAL VARIABLES LIKE 'read_only'");
if (status)
next_event(21,status);
else
NEXT_IMMEDIATE(22);
break;
case 21:
status=mysql_query_cont(&interr,mysql, mysql_status(event));
if (status)
next_event(21,status);
else
NEXT_IMMEDIATE(22);
break;
case 22:
if (interr) {
mysql_error_msg=strdup(mysql_error(mysql));
mysql_close(mysql);
mysql=NULL;
NEXT_IMMEDIATE(50);
} else {
status=mysql_store_result_start(&result, mysql);
if (status)
next_event(23,status);
else
NEXT_IMMEDIATE(24);
}
break;
case 23:
status=mysql_store_result_cont(&result, mysql, mysql_status(event));
if (status)
next_event(23,status);
else
NEXT_IMMEDIATE(24);
break;
case 24:
if (result) {
if (mysql_thread___monitor_timer_cached==true) {
event_base_gettimeofday_cached(base, &tv_out);
} else {
evutil_gettimeofday(&tv_out, NULL);
}
t2=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
GloMyMon->My_Conn_Pool->put_connection(hostname,port,mysql);
mysql=NULL;
return -1;
} else {
// no resultset, consider it an error
// FIXME: if this happen, should be logged
mysql_error_msg=strdup(mysql_error(mysql));
mysql_close(mysql);
mysql=NULL;
NEXT_IMMEDIATE(50);
}
break;
case 39:
if (mysql_thread___monitor_timer_cached==true) {
event_base_gettimeofday_cached(base, &tv_out);
@ -440,6 +511,11 @@ state_machine_handler(int fd __attribute__((unused)), short event, void *arg) {
if (ping__num_active_connections == 0)
event_base_loopbreak(base);
break;
case MON_READ_ONLY:
read_only__num_active_connections--;
if (read_only__num_active_connections == 0)
event_base_loopbreak(base);
break;
case MON_REPLICATION_LAG:
replication_lag__num_active_connections--;
if (replication_lag__num_active_connections == 0)
@ -472,11 +548,13 @@ MySQL_Monitor::MySQL_Monitor() {
insert_into_tables_defs(tables_defs_monitor,"mysql_server_connect_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_CONNECT_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG);
insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG);
// create monitoring tables
check_and_build_standard_tables(monitordb, tables_defs_monitor);
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON mysql_server_connect_log (time_start)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start)");
monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start)");
@ -785,6 +863,182 @@ __sleep_monitor_ping_loop:
}
return NULL;
}
void * MySQL_Monitor::monitor_read_only() {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
mysql_thr->refresh_variables();
unsigned long long t1;
unsigned long long t2;
unsigned long long start_time;
unsigned long long next_loop_at=0;
while (shutdown==false) {
unsigned int glover;
char *error=NULL;
// int cols=0;
// int affected_rows=0;
SQLite3_result *resultset=NULL;
MySQL_Monitor_State_Data **sds=NULL;
int i=0;
char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup";
t1=monotonic_time();
if (t1 < next_loop_at) {
goto __sleep_monitor_read_only;
}
next_loop_at=t1+1000*mysql_thread___monitor_read_only_interval;
struct timeval tv_out;
evutil_gettimeofday(&tv_out, NULL);
start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
read_only__num_active_connections=0;
// create libevent base
libevent_base= event_base_new();
glover=GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover;
mysql_thr->refresh_variables();
//proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables");
}
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
resultset = MyHGM->execute_query(query, &error);
assert(resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
goto __end_monitor_read_only_loop;
} else {
if (resultset->rows_count==0) {
goto __end_monitor_read_only_loop;
}
sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base);
sds[i]->task_id=MON_READ_ONLY;
// sds[i]->hostgroup_id=atoi(r->fields[0]);
// sds[i]->repl_lag=atoi(r->fields[3]);
read_only__num_active_connections++;
total_read_only__num_active_connections++;
MySQL_Monitor_State_Data *_mmsd=sds[i];
_mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port);
if (_mmsd->mysql==NULL) {
state_machine_handler(-1,-1,_mmsd);
} else {
int fd=mysql_get_socket(_mmsd->mysql);
_mmsd->ST=20;
state_machine_handler(fd,-1,_mmsd);
}
i++;
}
}
// start libevent loop
event_base_dispatch(libevent_base);
__end_monitor_read_only_loop:
if (sds) {
sqlite3_stmt *statement;
sqlite3 *mondb=monitordb->get_db();
int rc;
char *query=NULL;
query=(char *)"DELETE FROM mysql_server_read_only_log WHERE time_start < ?1";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 1, start_time-mysql_thread___monitor_history*1000); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
while (i>0) {
i--;
int read_only=1; // as a safety mechanism , read_only=1 is the default
MySQL_Monitor_State_Data *mmsd=sds[i];
rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK);
if (mmsd->result) {
unsigned int num_fields;
unsigned int k;
MYSQL_FIELD *fields;
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
for(k = 0; k < num_fields; k++) {
//if (strcmp("VARIABLE_NAME", fields[k].name)==0) {
if (strcmp("Value", fields[k].name)==0) {
j=k;
}
}
if (j>-1) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
if (row[j]) {
if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF"))
read_only=0;
}
}
}
// if (repl_lag>=0) {
rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK);
// } else {
// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
// }
mysql_free_result(mmsd->result);
mmsd->result=NULL;
} else {
rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
}
rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
MyHGM->read_only_action(mmsd->hostname, mmsd->port, read_only);
delete mmsd;
}
sqlite3_finalize(statement);
free(sds);
}
if (resultset)
delete resultset;
event_base_free(libevent_base);
__sleep_monitor_read_only:
t2=monotonic_time();
if (t2<next_loop_at) {
unsigned long long st=0;
st=next_loop_at-t2;
if (st > 500000) {
st = 500000;
}
usleep(st);
}
}
return NULL;
}
void * MySQL_Monitor::monitor_replication_lag() {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
struct event_base *libevent_base;
@ -964,6 +1218,7 @@ void * MySQL_Monitor::run() {
mysql_thr->refresh_variables();
std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this);
std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this);
std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this);
std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this);
while (shutdown==false) {
unsigned int glover=GloMTH->get_global_version();
@ -976,6 +1231,7 @@ void * MySQL_Monitor::run() {
}
monitor_connect_thread->join();
monitor_ping_thread->join();
monitor_read_only_thread->join();
monitor_replication_lag_thread->join();
return NULL;
};

@ -797,6 +797,27 @@ handler_again:
proxy_warning("Error during query: %d, %s\n", myerr, mysql_error(myconn->mysql));
// FIXME: deprecate old MySQL_Result_to_MySQL_wire , not completed yet
//MySQL_Result_to_MySQL_wire(myconn->mysql,myconn->mysql_result,&client_myds->myprot);
bool retry_conn=false;
switch (myerr) {
case 1290: // read-only
if ((myds->myconn->reusable==true) && myds->myconn->IsActiveTransaction()==false) {
retry_conn=true;
}
myds->destroy_MySQL_Connection_From_Pool();
myds->fd=0;
if (retry_conn) {
myds->DSS=STATE_NOT_INITIALIZED;
previous_status.push(PROCESSING_QUERY);
NEXT_IMMEDIATE(CONNECTING_SERVER);
}
return -1;
break;
default:
break; // continue normally
}
MySQL_Result_to_MySQL_wire(myconn->mysql, myconn->MyRS);
CurrentQuery.end();
GloQPro->delete_QP_out(qpo);

@ -149,6 +149,8 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_connect_timeout",
(char *)"monitor_ping_interval",
(char *)"monitor_ping_timeout",
(char *)"monitor_read_only_interval",
(char *)"monitor_read_only_timeout",
(char *)"monitor_replication_lag_interval",
(char *)"monitor_replication_lag_timeout",
(char *)"monitor_username",
@ -158,6 +160,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_query_interval",
(char *)"monitor_query_timeout",
(char *)"monitor_timer_cached",
(char *)"monitor_writer_is_also_reader",
(char *)"max_transaction_time",
(char *)"threshold_query_length",
(char *)"threshold_resultset_size",
@ -213,6 +216,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_connect_timeout=200;
variables.monitor_ping_interval=60000;
variables.monitor_ping_timeout=100;
variables.monitor_read_only_interval=1000;
variables.monitor_read_only_timeout=100;
variables.monitor_replication_lag_interval=10000;
variables.monitor_replication_lag_timeout=1000;
variables.monitor_query_interval=60000;
@ -222,6 +227,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_query_variables=strdup((char *)"SELECT * FROM INFORMATION_SCHEMA.GLOBAL_VARIABLES");
variables.monitor_query_status=strdup((char *)"SELECT * FROM INFORMATION_SCHEMA.GLOBAL_STATUS");
variables.monitor_timer_cached=true;
variables.monitor_writer_is_also_reader=true;
variables.max_transaction_time=4*3600*1000;
variables.threshold_query_length=512*1024;
variables.threshold_resultset_size=4*1024*1024;
@ -357,11 +363,14 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"monitor_connect_timeout")) return (int)variables.monitor_connect_timeout;
if (!strcasecmp(name,"monitor_ping_interval")) return (int)variables.monitor_ping_interval;
if (!strcasecmp(name,"monitor_ping_timeout")) return (int)variables.monitor_ping_timeout;
if (!strcasecmp(name,"monitor_read_only_interval")) return (int)variables.monitor_read_only_interval;
if (!strcasecmp(name,"monitor_read_only_timeout")) return (int)variables.monitor_read_only_timeout;
if (!strcasecmp(name,"monitor_replication_lag_interval")) return (int)variables.monitor_replication_lag_interval;
if (!strcasecmp(name,"monitor_replication_lag_timeout")) return (int)variables.monitor_replication_lag_timeout;
if (!strcasecmp(name,"monitor_query_interval")) return (int)variables.monitor_query_interval;
if (!strcasecmp(name,"monitor_query_timeout")) return (int)variables.monitor_query_timeout;
if (!strcasecmp(name,"monitor_timer_cached")) return (int)variables.monitor_timer_cached;
if (!strcasecmp(name,"monitor_writer_is_also_reader")) return (int)variables.monitor_writer_is_also_reader;
}
if (!strcasecmp(name,"shun_on_failures")) return (int)variables.shun_on_failures;
if (!strcasecmp(name,"shun_recovery_time")) return (int)variables.shun_recovery_time;
@ -430,6 +439,14 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
sprintf(intbuf,"%d",variables.monitor_ping_timeout);
return strdup(intbuf);
}
if (!strcasecmp(name,"monitor_read_only_interval")) {
sprintf(intbuf,"%d",variables.monitor_read_only_interval);
return strdup(intbuf);
}
if (!strcasecmp(name,"monitor_read_only_timeout")) {
sprintf(intbuf,"%d",variables.monitor_read_only_timeout);
return strdup(intbuf);
}
if (!strcasecmp(name,"monitor_replication_lag_interval")) {
sprintf(intbuf,"%d",variables.monitor_replication_lag_interval);
return strdup(intbuf);
@ -449,6 +466,9 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
if (!strcasecmp(name,"monitor_timer_cached")) {
return strdup((variables.monitor_timer_cached ? "true" : "false"));
}
if (!strcasecmp(name,"monitor_writer_is_also_reader")) {
return strdup((variables.monitor_writer_is_also_reader ? "true" : "false"));
}
}
if (!strcasecmp(name,"default_charset")) {
const CHARSET_INFO *c = proxysql_find_charset_nr(variables.default_charset);
@ -665,6 +685,24 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"monitor_read_only_interval")) {
int intv=atoi(value);
if (intv >= 100 && intv <= 7*24*3600*1000) {
variables.monitor_read_only_interval=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"monitor_read_only_timeout")) {
int intv=atoi(value);
if (intv >= 100 && intv <= 600*1000) {
variables.monitor_read_only_timeout=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"monitor_replication_lag_interval")) {
int intv=atoi(value);
if (intv >= 100 && intv <= 7*24*3600*1000) {
@ -712,6 +750,17 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
}
return false;
}
if (!strcasecmp(name,"monitor_writer_is_also_reader")) {
if (strcasecmp(value,"true")==0 || strcasecmp(value,"1")==0) {
variables.monitor_writer_is_also_reader=true;
return true;
}
if (strcasecmp(value,"false")==0 || strcasecmp(value,"0")==0) {
variables.monitor_writer_is_also_reader=false;
return true;
}
return false;
}
}
if (!strcasecmp(name,"max_transaction_time")) {
int intv=atoi(value);
@ -1646,11 +1695,14 @@ void MySQL_Thread::refresh_variables() {
if (mysql_thread___monitor_query_status) free(mysql_thread___monitor_query_status);
mysql_thread___monitor_query_status=GloMTH->get_variable_string((char *)"monitor_query_status");
mysql_thread___monitor_timer_cached=(bool)GloMTH->get_variable_int((char *)"monitor_timer_cached");
mysql_thread___monitor_writer_is_also_reader=(bool)GloMTH->get_variable_int((char *)"monitor_writer_is_also_reader");
mysql_thread___monitor_history=GloMTH->get_variable_int((char *)"monitor_history");
mysql_thread___monitor_connect_interval=GloMTH->get_variable_int((char *)"monitor_connect_interval");
mysql_thread___monitor_connect_timeout=GloMTH->get_variable_int((char *)"monitor_connect_timeout");
mysql_thread___monitor_ping_interval=GloMTH->get_variable_int((char *)"monitor_ping_interval");
mysql_thread___monitor_ping_timeout=GloMTH->get_variable_int((char *)"monitor_ping_timeout");
mysql_thread___monitor_read_only_interval=GloMTH->get_variable_int((char *)"monitor_read_only_interval");
mysql_thread___monitor_read_only_timeout=GloMTH->get_variable_int((char *)"monitor_read_only_timeout");
mysql_thread___monitor_replication_lag_interval=GloMTH->get_variable_int((char *)"monitor_replication_lag_interval");
mysql_thread___monitor_replication_lag_timeout=GloMTH->get_variable_int((char *)"monitor_replication_lag_timeout");
mysql_thread___monitor_query_interval=GloMTH->get_variable_int((char *)"monitor_query_interval");

@ -57,6 +57,8 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE mysql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT NOT NULL DEFAULT 0 , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , flagOUT INT , replace_pattern VARCHAR , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED , delay INT UNSIGNED , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0)"
#define ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES "CREATE TABLE global_variables (variable_name VARCHAR NOT NULL PRIMARY KEY , variable_value VARCHAR NOT NULL)"
#define ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS "CREATE TABLE mysql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>0) , UNIQUE (reader_hostgroup))"
#define ADMIN_SQLITE_TABLE_MYSQL_COLLATIONS "CREATE TABLE mysql_collations (Id INTEGER NOT NULL PRIMARY KEY , Collation VARCHAR NOT NULL , Charset VARCHAR NOT NULL , `Default` VARCHAR NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)"
@ -1901,6 +1903,7 @@ bool ProxySQL_Admin::init() {
insert_into_tables_defs(tables_defs_admin,"mysql_servers", ADMIN_SQLITE_TABLE_MYSQL_SERVERS);
insert_into_tables_defs(tables_defs_admin,"mysql_users", ADMIN_SQLITE_TABLE_MYSQL_USERS);
insert_into_tables_defs(tables_defs_admin,"mysql_replication_hostgroups", ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS);
insert_into_tables_defs(tables_defs_admin,"mysql_query_rules", ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES);
insert_into_tables_defs(tables_defs_admin,"global_variables", ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES);
insert_into_tables_defs(tables_defs_admin,"mysql_collations", ADMIN_SQLITE_TABLE_MYSQL_COLLATIONS);
@ -1910,6 +1913,7 @@ bool ProxySQL_Admin::init() {
insert_into_tables_defs(tables_defs_config,"mysql_servers", ADMIN_SQLITE_TABLE_MYSQL_SERVERS);
insert_into_tables_defs(tables_defs_config,"mysql_users", ADMIN_SQLITE_TABLE_MYSQL_USERS);
insert_into_tables_defs(tables_defs_config,"mysql_replication_hostgroups", ADMIN_SQLITE_TABLE_MYSQL_REPLICATION_HOSTGROUPS);
insert_into_tables_defs(tables_defs_config,"mysql_query_rules", ADMIN_SQLITE_TABLE_MYSQL_QUERY_RULES);
insert_into_tables_defs(tables_defs_config,"global_variables", ADMIN_SQLITE_TABLE_GLOBAL_VARIABLES);
insert_into_tables_defs(tables_defs_config,"mysql_collations", ADMIN_SQLITE_TABLE_MYSQL_COLLATIONS);
@ -2793,6 +2797,7 @@ int ProxySQL_Admin::flush_debug_levels_database_to_runtime(SQLite3DB *db) {
void ProxySQL_Admin::__insert_or_ignore_maintable_select_disktable() {
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR IGNORE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR IGNORE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR IGNORE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("INSERT OR IGNORE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules");
admindb->execute("INSERT OR IGNORE INTO main.global_variables SELECT * FROM disk.global_variables");
@ -2805,6 +2810,7 @@ void ProxySQL_Admin::__insert_or_ignore_maintable_select_disktable() {
void ProxySQL_Admin::__insert_or_replace_maintable_select_disktable() {
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("INSERT OR REPLACE INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO main.mysql_users SELECT * FROM disk.mysql_users");
admindb->execute("INSERT OR REPLACE INTO main.mysql_query_rules SELECT * FROM disk.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO main.global_variables SELECT * FROM disk.global_variables");
@ -2816,6 +2822,7 @@ void ProxySQL_Admin::__insert_or_replace_maintable_select_disktable() {
void ProxySQL_Admin::__delete_disktable() {
admindb->execute("DELETE FROM disk.mysql_servers");
admindb->execute("DELETE FROM disk.mysql_replication_hostgroups");
admindb->execute("DELETE FROM disk.mysql_users");
admindb->execute("DELETE FROM disk.mysql_query_rules");
admindb->execute("DELETE FROM disk.global_variables");
@ -2826,6 +2833,7 @@ void ProxySQL_Admin::__delete_disktable() {
void ProxySQL_Admin::__insert_or_replace_disktable_select_maintable() {
admindb->execute("INSERT OR REPLACE INTO disk.mysql_servers SELECT * FROM main.mysql_servers");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_users SELECT * FROM main.mysql_users");
admindb->execute("INSERT OR REPLACE INTO disk.mysql_query_rules SELECT * FROM main.mysql_query_rules");
@ -2858,7 +2866,9 @@ void ProxySQL_Admin::flush_mysql_servers__from_disk_to_memory() {
admindb->wrlock();
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("DELETE FROM main.mysql_servers");
admindb->execute("DELETE FROM main.mysql_replication_hostgroups");
admindb->execute("INSERT INTO main.mysql_servers SELECT * FROM disk.mysql_servers");
admindb->execute("INSERT INTO main.mysql_replication_hostgroups SELECT * FROM disk.mysql_replication_hostgroups");
admindb->execute("PRAGMA foreign_keys = ON");
admindb->wrunlock();
}
@ -2867,7 +2877,9 @@ void ProxySQL_Admin::flush_mysql_servers__from_memory_to_disk() {
admindb->wrlock();
admindb->execute("PRAGMA foreign_keys = OFF");
admindb->execute("DELETE FROM disk.mysql_servers");
admindb->execute("DELETE FROM disk.mysql_replication_hostgroups");
admindb->execute("INSERT INTO disk.mysql_servers SELECT * FROM main.mysql_servers");
admindb->execute("INSERT INTO disk.mysql_replication_hostgroups SELECT * FROM main.mysql_replication_hostgroups");
admindb->execute("PRAGMA foreign_keys = ON");
admindb->wrunlock();
}
@ -3055,22 +3067,45 @@ void ProxySQL_Admin::save_mysql_users_runtime_to_database() {
}
void ProxySQL_Admin::save_mysql_servers_runtime_to_database() {
char *query=(char *)"DELETE FROM main.mysql_servers";
char *query=NULL;
SQLite3_result *resultset=NULL;
// dump mysql_servers
query=(char *)"DELETE FROM main.mysql_servers";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute(query);
SQLite3_result *resultset=MyHGM->dump_table_mysql_servers();
if (!resultset) return;
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s,%s)";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+strlen(r->fields[7])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6], r->fields[7]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
//fprintf(stderr,"%s\n",query);
admindb->execute(query);
free(query);
resultset=MyHGM->dump_table_mysql_servers();
if (resultset) {
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s,%s)";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+strlen(r->fields[7])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6], r->fields[7]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
admindb->execute(query);
free(query);
}
}
if(resultset) delete resultset;
resultset=NULL;
// dump mysql_replication_hostgroups
query=(char *)"DELETE FROM main.mysql_replication_hostgroups";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute(query);
resultset=MyHGM->dump_table_mysql_replication_hostgroups();
if (resultset) {
char *q=(char *)"INSERT INTO mysql_replication_hostgroups VALUES(%s,%s)";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+16);
sprintf(query, q, r->fields[0], r->fields[1]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
admindb->execute(query);
free(query);
}
}
if(resultset) delete resultset;
resultset=NULL;
}
@ -3107,7 +3142,33 @@ void ProxySQL_Admin::load_mysql_servers_to_runtime() {
//MyHGH->server_add_hg(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]));
}
}
//MyHGH->wrunlock();
if (resultset) delete resultset;
resultset=NULL;
query=(char *)"SELECT a.* FROM mysql_replication_hostgroups a JOIN mysql_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
proxy_error("Incompatible entry in mysql_replication_hostgroups will be ignored : ( %s , %s )\n", r->fields[0], r->fields[1]);
}
}
if (resultset) delete resultset;
resultset=NULL;
query=(char *)"SELECT a.* FROM mysql_replication_hostgroups a LEFT JOIN mysql_replication_hostgroups b ON a.writer_hostgroup=b.reader_hostgroup WHERE b.reader_hostgroup IS NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
//MyHGH->wrlock();
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
MyHGM->set_incoming_replication_hostgroups(resultset);
}
MyHGM->commit();
if (resultset) delete resultset;
}

@ -57,7 +57,7 @@ int listen_on_unix(char *path, int backlog) {
// remove the socket
r=unlink(path);
if ( (r==-1) && (errno!=ENOENT) ) {
proxy_error("Error unlink Unix Socket %s", path);
proxy_error("Error unlink Unix Socket %s\n", path);
return -1;
}

Loading…
Cancel
Save