New implementation of read_only check #628

pull/631/head
René Cannaò 10 years ago
parent b1292f8ba4
commit af69092d84

@ -54,6 +54,7 @@ class MySQL_Monitor_State_Data {
void next_event(int new_st, int status);
void unregister();
SQLite3DB *mondb;
bool create_new_connection();
// we are copying these from MySQL_Connection
// short wait_events;
// unsigned long long timeout;

@ -123,7 +123,7 @@ static int ping__num_active_connections;
static int replication_lag__num_active_connections;
static int total_replication_lag__num_active_connections=0;
static int read_only__num_active_connections;
static int total_read_only__num_active_connections=0;
//static int total_read_only__num_active_connections=0;
struct cmp_str {
@ -815,24 +815,8 @@ void * monitor_connect_thread(void *arg) {
mysql_thr->curtime=monotonic_time();
mysql_thr->refresh_variables();
mmsd->mysql=mysql_init(NULL);
assert(mmsd->mysql);
if (mmsd->use_ssl) {
mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher);
}
unsigned int timeout=mysql_thread___monitor_connect_timeout/1000;
mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout);
mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
MYSQL *myrc=NULL;
if (mmsd->port) {
myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0);
} else {
myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0);
}
if (myrc==NULL) {
mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql));
}
mmsd->create_new_connection();
unsigned long long start_time=mysql_thr->curtime;
mmsd->t1=start_time;
@ -872,34 +856,10 @@ void * monitor_ping_thread(void *arg) {
mmsd->t1=start_time;
if (mmsd->mysql==NULL) { // we don't have a connection, let's create it
mmsd->mysql=mysql_init(NULL);
assert(mmsd->mysql);
if (mmsd->use_ssl) {
mysql_ssl_set(mmsd->mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher);
}
unsigned int timeout=mysql_thread___monitor_connect_timeout/1000;
mysql_options(mmsd->mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
mysql_options(mmsd->mysql, MYSQL_OPT_READ_TIMEOUT, &timeout);
mysql_options(mmsd->mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
MYSQL *myrc=NULL;
if (mmsd->port) {
myrc=mysql_real_connect(mmsd->mysql, mmsd->hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, mmsd->port, NULL, 0);
} else {
myrc=mysql_real_connect(mmsd->mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, mmsd->hostname, 0);
}
if (myrc==NULL) {
mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql));
bool rc;
rc=mmsd->create_new_connection();
if (rc==false) {
goto __exit_monitor_ping_thread;
} else {
// mariadb client library disables NONBLOCK for SSL connections ... re-enable it!
mysql_options(mmsd->mysql, MYSQL_OPT_NONBLOCK, 0);
int f=fcntl(mmsd->mysql->net.fd, F_GETFL);
#ifdef FD_CLOEXEC
// asynchronously set also FD_CLOEXEC , this to prevent then when a fork happens the FD are duplicated to new process
fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK|FD_CLOEXEC);
#else
fcntl(mmsd->mysql->net.fd, F_SETFL, f|O_NONBLOCK);
#endif /* FD_CLOEXEC */
}
}
@ -924,7 +884,6 @@ void * monitor_ping_thread(void *arg) {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
mmsd->mysql=NULL;
}
mmsd->t2=monotonic_time();
__exit_monitor_ping_thread:
mmsd->t2=monotonic_time();
@ -945,23 +904,166 @@ __exit_monitor_ping_thread:
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
/*
free(sds);
query=(char *)"INSERT OR REPLACE INTO mysql_server_connect_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)";
}
__fast_exit_monitor_ping_thread:
if (mmsd->mysql) {
mysql_close(mmsd->mysql); // if we reached here we didn't put the connection back
}
delete mysql_thr;
return NULL;
}
bool MySQL_Monitor_State_Data::create_new_connection() {
mysql=mysql_init(NULL);
assert(mysql);
if (use_ssl) {
mysql_ssl_set(mysql, mysql_thread___ssl_p2s_key, mysql_thread___ssl_p2s_cert, mysql_thread___ssl_p2s_ca, NULL, mysql_thread___ssl_p2s_cipher);
}
unsigned int timeout=mysql_thread___monitor_connect_timeout/1000;
mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, &timeout);
// mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, &timeout);
// mysql_options(mysql, MYSQL_OPT_WRITE_TIMEOUT, &timeout);
MYSQL *myrc=NULL;
if (port) {
myrc=mysql_real_connect(mysql, hostname, mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, port, NULL, 0);
} else {
myrc=mysql_real_connect(mysql, "localhost", mysql_thread___monitor_username, mysql_thread___monitor_password, NULL, 0, hostname, 0);
}
if (myrc==NULL) {
mysql_error_msg=strdup(mysql_error(mysql));
return false;
} else {
// mariadb client library disables NONBLOCK for SSL connections ... re-enable it!
mysql_options(mysql, MYSQL_OPT_NONBLOCK, 0);
int f=fcntl(mysql->net.fd, F_GETFL);
#ifdef FD_CLOEXEC
// asynchronously set also FD_CLOEXEC , this to prevent then when a fork happens the FD are duplicated to new process
fcntl(mysql->net.fd, F_SETFL, f|O_NONBLOCK|FD_CLOEXEC);
#else
fcntl(mysql->net.fd, F_SETFL, f|O_NONBLOCK);
#endif /* FD_CLOEXEC */
}
return true;
}
void * monitor_read_only_thread(void *arg) {
MySQL_Monitor_State_Data *mmsd=(MySQL_Monitor_State_Data *)arg;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
mysql_thr->refresh_variables();
mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port);
unsigned long long start_time=mysql_thr->curtime;
mmsd->t1=start_time;
if (mmsd->mysql==NULL) { // we don't have a connection, let's create it
bool rc;
rc=mmsd->create_new_connection();
if (rc==false) {
goto __exit_monitor_read_only_thread;
}
}
mmsd->t1=monotonic_time();
//async_exit_status=mysql_change_user_start(&ret_bool, mysql,"msandbox2","msandbox2","information_schema");
//mmsd->async_exit_status=mysql_ping_start(&mmsd->interr,mmsd->mysql);
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SHOW GLOBAL VARIABLES LIKE 'read_only'");
while (mmsd->async_exit_status) {
mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
mmsd->async_exit_status=mysql_query_cont(&mmsd->interr, mmsd->mysql, mmsd->async_exit_status);
unsigned long long now=monotonic_time();
if (now > mmsd->t1 + mysql_thread___monitor_ping_timeout * 1000) {
mmsd->mysql_error_msg=strdup("timeout check");
goto __exit_monitor_read_only_thread;
}
if (GloMyMon->shutdown==true) {
goto __fast_exit_monitor_read_only_thread; // exit immediately
}
}
mmsd->async_exit_status=mysql_store_result_start(&mmsd->result,mmsd->mysql);
while (mmsd->async_exit_status) {
mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
mmsd->async_exit_status=mysql_store_result_cont(&mmsd->result, mmsd->mysql, mmsd->async_exit_status);
unsigned long long now=monotonic_time();
if (now > mmsd->t1 + mysql_thread___monitor_ping_timeout * 1000) {
mmsd->mysql_error_msg=strdup("timeout check");
goto __exit_monitor_read_only_thread;
}
if (GloMyMon->shutdown==true) {
goto __fast_exit_monitor_read_only_thread; // exit immediately
}
}
if (mmsd->interr) { // ping failed
mmsd->mysql_error_msg=strdup(mysql_error(mmsd->mysql));
// } else {
// GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
// mmsd->mysql=NULL;
}
__exit_monitor_read_only_thread:
mmsd->t2=monotonic_time();
{
sqlite3_stmt *statement;
sqlite3 *mondb=mmsd->mondb->get_db();
int rc;
char *query=NULL;
query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
int read_only=1; // as a safety mechanism , read_only=1 is the default
rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
if (mmsd->result) {
int num_fields=0;
int k=0;
MYSQL_FIELD *fields=NULL;
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
for(k = 0; k < num_fields; k++) {
//if (strcmp("VARIABLE_NAME", fields[k].name)==0) {
if (strcmp("Value", fields[k].name)==0) {
j=k;
}
}
if (j>-1) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
if (row[j]) {
if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF"))
read_only=0;
}
}
}
// if (repl_lag>=0) {
rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK);
// } else {
// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
// }
mysql_free_result(mmsd->result);
mmsd->result=NULL;
} else {
rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
}
rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
MyHGM->read_only_action(mmsd->hostname, mmsd->port, read_only);
sqlite3_finalize(statement);
*/
}
__fast_exit_monitor_ping_thread:
if (mmsd->interr) { // check failed
} else {
GloMyMon->My_Conn_Pool->put_connection(mmsd->hostname,mmsd->port,mmsd->mysql);
mmsd->mysql=NULL;
}
__fast_exit_monitor_read_only_thread:
if (mmsd->mysql) {
mysql_close(mmsd->mysql); // if we reached here we didn't put the connection back
}
@ -1096,9 +1198,6 @@ void * MySQL_Monitor::monitor_ping() {
unsigned long long t2;
unsigned long long start_time;
unsigned long long next_loop_at=0;
//unsigned int t1;
//unsigned int t2;
//t1=monotonic_time();
while (shutdown==false) {
@ -1107,10 +1206,6 @@ void * MySQL_Monitor::monitor_ping() {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
// MySQL_Monitor_State_Data **sds=NULL;
//int i=0;
//char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'";
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers WHERE status!='OFFLINE_HARD' GROUP BY hostname, port";
t1=monotonic_time();
@ -1126,15 +1221,7 @@ void * MySQL_Monitor::monitor_ping() {
}
next_loop_at=t1+1000*mysql_thread___monitor_ping_interval;
/*
struct timeval tv_out;
evutil_gettimeofday(&tv_out, NULL);
start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
*/
start_time=monotonic_time();
// ping__num_active_connections=0;
// // create libevent base
// libevent_base= event_base_new();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
@ -1145,39 +1232,17 @@ void * MySQL_Monitor::monitor_ping() {
if (resultset->rows_count==0) {
goto __end_monitor_ping_loop;
}
// sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
MySQL_Monitor_State_Data *mmsd = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2]));
/*
sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2]));
sds[i]->task_id=MON_PING;
ping__num_active_connections++;
total_ping__num_active_connections++;
MySQL_Monitor_State_Data *_mmsd=sds[i];
*/
// mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(mmsd->hostname, mmsd->port);
mmsd->mondb=monitordb;
pthread_t thr_;
if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) {
perror("Thread creation monitor_ping_thread");
}
/*
if (_mmsd->mysql==NULL) {
state_machine_handler(-1,-1,_mmsd);
} else {
int fd=mysql_get_socket(_mmsd->mysql);
_mmsd->ST=7;
state_machine_handler(fd,-1,_mmsd);
}
i++;
*/
}
}
// start libevent loop
// event_base_dispatch(libevent_base);
__end_monitor_ping_loop:
/* if (sds) */ {
sqlite3_stmt *statement;
@ -1196,276 +1261,6 @@ __end_monitor_ping_loop:
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
/*
query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
while (i>0) {
i--;
MySQL_Monitor_State_Data *mmsd=sds[i];
rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
delete mmsd;
}
sqlite3_finalize(statement);
free(sds);
*/
}
if (resultset) {
delete resultset;
resultset=NULL;
}
// event_base_free(libevent_base);
// now it is time to shun all problematic hosts
query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NOT NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
// get all addresses and ports
int i=0;
int j=0;
char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *));
char **ports=(char **)malloc(resultset->rows_count * sizeof(char *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
addresses[i]=strdup(r->fields[0]);
ports[i]=strdup(r->fields[1]);
i++;
}
if (resultset) {
delete resultset;
resultset=NULL;
}
char *new_query=NULL;
new_query=(char *)"SELECT 1 FROM (SELECT hostname,port,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT %d) a WHERE ping_error IS NOT NULL GROUP BY hostname,port HAVING COUNT(*)=%d";
for (j=0;j<i;j++) {
char *buff=(char *)malloc(strlen(new_query)+strlen(addresses[j])+strlen(ports[j])+16);
int max_failures=mysql_thread___monitor_ping_max_failures;
sprintf(buff,new_query,addresses[j],ports[j],max_failures,max_failures);
monitordb->execute_statement(buff, &error , &cols , &affected_rows , &resultset);
if (!error) {
if (resultset) {
if (resultset->rows_count) {
// disable host
proxy_error("Server %s:%s missed %d heartbeats, shunning it and killing all the connections\n", addresses[j], ports[j], max_failures);
MyHGM->shun_and_killall(addresses[j],atoi(ports[j]));
}
delete resultset;
resultset=NULL;
}
} else {
proxy_error("Error on %s : %s\n", query, error);
}
free(buff);
}
while (i) { // now free all the addresses/ports
i--;
free(addresses[i]);
free(ports[i]);
}
free(addresses);
free(ports);
}
// now it is time to update current_lantency_ms
query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
// get all addresses and ports
int i=0;
int j=0;
char **addresses=(char **)malloc(resultset->rows_count * sizeof(char *));
char **ports=(char **)malloc(resultset->rows_count * sizeof(char *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
addresses[i]=strdup(r->fields[0]);
ports[i]=strdup(r->fields[1]);
i++;
}
if (resultset) {
delete resultset;
resultset=NULL;
}
char *new_query=NULL;
new_query=(char *)"SELECT hostname,port,COALESCE(CAST(AVG(ping_success_time) AS INTEGER),10000) FROM (SELECT hostname,port,ping_success_time,ping_error FROM mysql_server_ping_log WHERE hostname='%s' AND port='%s' ORDER BY time_start DESC LIMIT 3) a WHERE ping_error IS NULL GROUP BY hostname,port";
for (j=0;j<i;j++) {
char *buff=(char *)malloc(strlen(new_query)+strlen(addresses[j])+strlen(ports[j])+16);
sprintf(buff,new_query,addresses[j],ports[j]);
monitordb->execute_statement(buff, &error , &cols , &affected_rows , &resultset);
if (!error) {
if (resultset) {
if (resultset->rows_count) {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it; // this should be called just once, but we create a generic for loop
// update current_latency_ms
MyHGM->set_server_current_latency_us(addresses[j],atoi(ports[j]), atoi(r->fields[2]));
}
}
delete resultset;
resultset=NULL;
}
} else {
proxy_error("Error on %s : %s\n", query, error);
}
free(buff);
}
}
__sleep_monitor_ping_loop:
t2=monotonic_time();
if (t2<next_loop_at) {
unsigned long long st=0;
st=next_loop_at-t2;
if (st > 500000) {
st = 500000;
}
usleep(st);
}
}
if (mysql_thr) {
delete mysql_thr;
mysql_thr=NULL;
}
return NULL;
}
/*
void * MySQL_Monitor::monitor_ping() {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
mysql_thr->refresh_variables();
unsigned long long t1;
unsigned long long t2;
unsigned long long start_time;
unsigned long long next_loop_at=0;
//unsigned int t1;
//unsigned int t2;
//t1=monotonic_time();
while (shutdown==false) {
unsigned int glover;
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
MySQL_Monitor_State_Data **sds=NULL;
int i=0;
//char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE status!='OFFLINE_HARD'";
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers WHERE status!='OFFLINE_HARD' GROUP BY hostname, port";
t1=monotonic_time();
glover=GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover;
mysql_thr->refresh_variables();
next_loop_at=0;
}
if (t1 < next_loop_at) {
goto __sleep_monitor_ping_loop;
}
next_loop_at=t1+1000*mysql_thread___monitor_ping_interval;
struct timeval tv_out;
evutil_gettimeofday(&tv_out, NULL);
start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
ping__num_active_connections=0;
// create libevent base
libevent_base= event_base_new();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
goto __end_monitor_ping_loop;
} else {
if (resultset->rows_count==0) {
goto __end_monitor_ping_loop;
}
sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2]));
sds[i]->task_id=MON_PING;
ping__num_active_connections++;
total_ping__num_active_connections++;
MySQL_Monitor_State_Data *_mmsd=sds[i];
_mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port);
if (_mmsd->mysql==NULL) {
state_machine_handler(-1,-1,_mmsd);
} else {
int fd=mysql_get_socket(_mmsd->mysql);
_mmsd->ST=7;
state_machine_handler(fd,-1,_mmsd);
}
i++;
}
}
// start libevent loop
event_base_dispatch(libevent_base);
__end_monitor_ping_loop:
if (sds) {
sqlite3_stmt *statement;
sqlite3 *mondb=monitordb->get_db();
int rc;
char *query=NULL;
query=(char *)"DELETE FROM mysql_server_ping_log WHERE time_start < ?1";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
if (mysql_thread___monitor_history < mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 )) { // issue #626
if (mysql_thread___monitor_ping_interval < 3600000)
mysql_thread___monitor_history = mysql_thread___monitor_ping_interval * (mysql_thread___monitor_ping_max_failures + 1 );
}
rc=sqlite3_bind_int64(statement, 1, start_time-mysql_thread___monitor_history*1000); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
query=(char *)"INSERT OR REPLACE INTO mysql_server_ping_log VALUES (?1 , ?2 , ?3 , ?4 , ?5)";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
while (i>0) {
i--;
MySQL_Monitor_State_Data *mmsd=sds[i];
rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 5, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
delete mmsd;
}
sqlite3_finalize(statement);
free(sds);
}
if (resultset) {
@ -1473,8 +1268,6 @@ __end_monitor_ping_loop:
resultset=NULL;
}
event_base_free(libevent_base);
// now it is time to shun all problematic hosts
query=(char *)"SELECT DISTINCT a.hostname, a.port FROM mysql_servers a JOIN monitor.mysql_server_ping_log b ON a.hostname=b.hostname WHERE status!='OFFLINE_HARD' AND b.ping_error IS NOT NULL";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
@ -1595,38 +1388,36 @@ __sleep_monitor_ping_loop:
}
return NULL;
}
*/
void * MySQL_Monitor::monitor_read_only() {
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
struct event_base *libevent_base;
// struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
mysql_thr->curtime=monotonic_time();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
mysql_thr->refresh_variables();
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// pthread_attr_setstacksize (&attr, 192*1024);
unsigned long long t1;
unsigned long long t2;
unsigned long long start_time;
unsigned long long next_loop_at=0;
unsigned int num_fields=0;
unsigned int k=0;
MYSQL_FIELD *fields=NULL;
while (shutdown==false) {
unsigned int glover;
char *error=NULL;
// int cols=0;
// int affected_rows=0;
SQLite3_result *resultset=NULL;
MySQL_Monitor_State_Data **sds=NULL;
int i=0;
//char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD'";
// add support for SSL
char *query=(char *)"SELECT hostname, port, MAX(use_ssl) use_ssl FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup WHERE status!='OFFLINE_HARD' GROUP BY hostname, port";
t1=monotonic_time();
start_time=t1;
glover=GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
@ -1639,15 +1430,6 @@ void * MySQL_Monitor::monitor_read_only() {
goto __sleep_monitor_read_only;
}
next_loop_at=t1+1000*mysql_thread___monitor_read_only_interval;
struct timeval tv_out;
evutil_gettimeofday(&tv_out, NULL);
start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec);
read_only__num_active_connections=0;
// create libevent base
libevent_base= event_base_new();
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
resultset = MyHGM->execute_query(query, &error);
@ -1659,33 +1441,19 @@ void * MySQL_Monitor::monitor_read_only() {
if (resultset->rows_count==0) {
goto __end_monitor_read_only_loop;
}
sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *));
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base, atoi(r->fields[2]));
sds[i]->task_id=MON_READ_ONLY;
// sds[i]->hostgroup_id=atoi(r->fields[0]);
// sds[i]->repl_lag=atoi(r->fields[3]);
read_only__num_active_connections++;
total_read_only__num_active_connections++;
MySQL_Monitor_State_Data *_mmsd=sds[i];
_mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port);
if (_mmsd->mysql==NULL) {
state_machine_handler(-1,-1,_mmsd);
} else {
int fd=mysql_get_socket(_mmsd->mysql);
_mmsd->ST=20;
state_machine_handler(fd,-1,_mmsd);
MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2]));
mmsd->mondb=monitordb;
pthread_t thr_;
if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) {
perror("Thread creation monitor_read_only_thread");
}
i++;
}
}
// start libevent loop
event_base_dispatch(libevent_base);
__end_monitor_read_only_loop:
if (sds) {
/* if (sds) */ {
sqlite3_stmt *statement=NULL;
sqlite3 *mondb=monitordb->get_db();
int rc;
@ -1702,70 +1470,12 @@ __end_monitor_read_only_loop:
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)";
rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
while (i>0) {
i--;
int read_only=1; // as a safety mechanism , read_only=1 is the default
MySQL_Monitor_State_Data *mmsd=NULL;
mmsd=sds[i];
rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK);
if (mmsd->result) {
num_fields=0;
k=0;
fields=NULL;
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
for(k = 0; k < num_fields; k++) {
//if (strcmp("VARIABLE_NAME", fields[k].name)==0) {
if (strcmp("Value", fields[k].name)==0) {
j=k;
}
}
if (j>-1) {
MYSQL_ROW row=mysql_fetch_row(mmsd->result);
if (row) {
if (row[j]) {
if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF"))
read_only=0;
}
}
}
// if (repl_lag>=0) {
rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK);
// } else {
// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
// }
mysql_free_result(mmsd->result);
mmsd->result=NULL;
} else {
rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK);
}
rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
MyHGM->read_only_action(mmsd->hostname, mmsd->port, read_only);
delete mmsd;
}
sqlite3_finalize(statement);
free(sds);
}
if (resultset)
delete resultset;
event_base_free(libevent_base);
__sleep_monitor_read_only:
@ -1980,7 +1690,7 @@ void * MySQL_Monitor::run() {
mysql_thr->refresh_variables();
std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this);
std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this);
// std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this);
std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this);
// std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this);
while (shutdown==false) {
unsigned int glover=GloMTH->get_global_version();
@ -1994,7 +1704,7 @@ void * MySQL_Monitor::run() {
}
monitor_connect_thread->join();
monitor_ping_thread->join();
// monitor_read_only_thread->join();
monitor_read_only_thread->join();
// monitor_replication_lag_thread->join();
if (mysql_thr) {
delete mysql_thr;

Loading…
Cancel
Save