diff --git a/include/MySQL_HostGroups_Manager.h b/include/MySQL_HostGroups_Manager.h index a8edfc2bd..c1a8de603 100644 --- a/include/MySQL_HostGroups_Manager.h +++ b/include/MySQL_HostGroups_Manager.h @@ -138,6 +138,7 @@ class MySQL_HostGroups_Manager { void destroy_MyConn_from_pool(MySQL_Connection *); void replication_lag_action(int, char*, unsigned int, int); + void read_only_action(SQLite3_result *result); }; #endif /* __CLASS_MYSQL_HOSTGROUPS_MANAGER_H */ diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index 0a5d02af2..a98f45b92 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -14,6 +14,8 @@ #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG "CREATE TABLE mysql_server_ping_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , ping_success_time INT DEFAULT 0 , ping_error VARCHAR , PRIMARY KEY (hostname, port, time_start))" +#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG "CREATE TABLE mysql_server_read_only_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , read_only INT DEFAULT 1 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))" + #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))" @@ -37,6 +39,7 @@ class MySQL_Monitor { void print_version(); void * monitor_connect(); void * monitor_ping(); + void * monitor_read_only(); void * monitor_replication_lag(); void * run(); }; diff --git a/lib/MySQL_HostGroups_Manager.cpp b/lib/MySQL_HostGroups_Manager.cpp index a6367f934..4233f6c30 100644 --- a/lib/MySQL_HostGroups_Manager.cpp +++ b/lib/MySQL_HostGroups_Manager.cpp @@ -823,3 +823,8 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() { wrunlock(); return result; } + +void MySQL_HostGroups_Manager::read_only_action(SQLite3_result *result) { + wrlock(); + wrunlock(); +} diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index ffebddf8a..cc7dc382a 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -35,7 +35,8 @@ static MySQL_Monitor *GloMyMon; static void state_machine_handler(int fd, short event, void *arg); - +// FIXME: +const int mysql_thread___monitor_read_only_interval=1000; /* struct state_data { int ST; @@ -58,6 +59,8 @@ static int ping__num_active_connections; static int total_ping__num_active_connections=0; static int replication_lag__num_active_connections; static int total_replication_lag__num_active_connections=0; +static int read_only__num_active_connections; +static int total_read_only__num_active_connections=0; struct cmp_str { @@ -125,6 +128,7 @@ void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYS enum MySQL_Monitor_State_Data_Task_Type { MON_CONNECT, MON_PING, + MON_READ_ONLY, MON_REPLICATION_LAG }; @@ -223,6 +227,9 @@ again: case MON_PING: NEXT_IMMEDIATE(7); break; + case MON_READ_ONLY: + NEXT_IMMEDIATE(20); + break; case MON_REPLICATION_LAG: NEXT_IMMEDIATE(10); break; @@ -337,6 +344,72 @@ again: } break; + case 20: + if (mysql_thread___monitor_timer_cached==true) { + event_base_gettimeofday_cached(base, &tv_out); + } else { + evutil_gettimeofday(&tv_out, NULL); + } + t1=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); + status=mysql_query_start(&interr,mysql,"SHOW GLOBAL VARIABLES LIKE 'read_only'"); + if (status) + next_event(21,status); + else + NEXT_IMMEDIATE(22); + break; + + case 21: + status=mysql_query_cont(&interr,mysql, mysql_status(event)); + if (status) + next_event(21,status); + else + NEXT_IMMEDIATE(22); + break; + + case 22: + if (interr) { + mysql_error_msg=strdup(mysql_error(mysql)); + mysql_close(mysql); + mysql=NULL; + NEXT_IMMEDIATE(50); + } else { + status=mysql_store_result_start(&result, mysql); + if (status) + next_event(23,status); + else + NEXT_IMMEDIATE(24); + } + break; + + case 23: + status=mysql_store_result_cont(&result, mysql, mysql_status(event)); + if (status) + next_event(23,status); + else + NEXT_IMMEDIATE(24); + break; + + case 24: + if (result) { + if (mysql_thread___monitor_timer_cached==true) { + event_base_gettimeofday_cached(base, &tv_out); + } else { + evutil_gettimeofday(&tv_out, NULL); + } + t2=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); + GloMyMon->My_Conn_Pool->put_connection(hostname,port,mysql); + mysql=NULL; + return -1; + } else { + // no resultset, consider it an error + // FIXME: if this happen, should be logged + mysql_error_msg=strdup(mysql_error(mysql)); + mysql_close(mysql); + mysql=NULL; + NEXT_IMMEDIATE(50); + } + break; + case 39: if (mysql_thread___monitor_timer_cached==true) { event_base_gettimeofday_cached(base, &tv_out); @@ -440,6 +513,11 @@ state_machine_handler(int fd __attribute__((unused)), short event, void *arg) { if (ping__num_active_connections == 0) event_base_loopbreak(base); break; + case MON_READ_ONLY: + read_only__num_active_connections--; + if (read_only__num_active_connections == 0) + event_base_loopbreak(base); + break; case MON_REPLICATION_LAG: replication_lag__num_active_connections--; if (replication_lag__num_active_connections == 0) @@ -472,11 +550,13 @@ MySQL_Monitor::MySQL_Monitor() { insert_into_tables_defs(tables_defs_monitor,"mysql_server_connect_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_CONNECT_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING); insert_into_tables_defs(tables_defs_monitor,"mysql_server_ping_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_PING_LOG); + insert_into_tables_defs(tables_defs_monitor,"mysql_server_read_only_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_READ_ONLY_LOG); insert_into_tables_defs(tables_defs_monitor,"mysql_server_replication_lag_log", MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG); // create monitoring tables check_and_build_standard_tables(monitordb, tables_defs_monitor); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_connect_log_time_start ON mysql_server_connect_log (time_start)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_ping_log_time_start ON mysql_server_ping_log (time_start)"); + monitordb->execute("CREATE INDEX IF NOT EXISTS idx_read_only_log_time_start ON mysql_server_read_only_log (time_start)"); monitordb->execute("CREATE INDEX IF NOT EXISTS idx_replication_lag_log_time_start ON mysql_server_replication_lag_log (time_start)"); @@ -785,6 +865,197 @@ __sleep_monitor_ping_loop: } return NULL; } + +void * MySQL_Monitor::monitor_read_only() { + // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) + struct event_base *libevent_base; + unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version; + MySQL_Thread * mysql_thr = new MySQL_Thread(); + mysql_thr->curtime=monotonic_time(); + MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); + mysql_thr->refresh_variables(); + + unsigned long long t1; + unsigned long long t2; + unsigned long long start_time; + unsigned long long next_loop_at=0; + + while (shutdown==false) { + + unsigned int glover; + char *error=NULL; +// int cols=0; +// int affected_rows=0; + SQLite3_result *resultset=NULL; + MySQL_Monitor_State_Data **sds=NULL; + int i=0; + char *query=(char *)"SELECT DISTINCT hostname, port FROM mysql_servers JOIN mysql_replication_hostgroups ON hostgroup_id=writer_hostgroup OR hostgroup_id=reader_hostgroup"; + t1=monotonic_time(); + + if (t1 < next_loop_at) { + goto __sleep_monitor_read_only; + } + next_loop_at=t1+1000*mysql_thread___monitor_read_only_interval; + + struct timeval tv_out; + evutil_gettimeofday(&tv_out, NULL); + start_time=(((unsigned long long) tv_out.tv_sec) * 1000000) + (tv_out.tv_usec); + + read_only__num_active_connections=0; + // create libevent base + libevent_base= event_base_new(); + + glover=GloMTH->get_global_version(); + if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) { + MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover; + mysql_thr->refresh_variables(); + //proxy_error("%s\n","MySQL_Monitor - PING - refreshing variables"); + } + + proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query); +// admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset); + resultset = MyHGM->execute_query(query, &error); + assert(resultset); + if (error) { + proxy_error("Error on %s : %s\n", query, error); + goto __end_monitor_read_only_loop; + } else { + if (resultset->rows_count==0) { + goto __end_monitor_read_only_loop; + } + sds=(MySQL_Monitor_State_Data **)malloc(resultset->rows_count * sizeof(MySQL_Monitor_State_Data *)); + for (std::vector::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) { + SQLite3_row *r=*it; + sds[i] = new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]),libevent_base); + sds[i]->task_id=MON_READ_ONLY; +// sds[i]->hostgroup_id=atoi(r->fields[0]); +// sds[i]->repl_lag=atoi(r->fields[3]); + read_only__num_active_connections++; + total_read_only__num_active_connections++; + MySQL_Monitor_State_Data *_mmsd=sds[i]; + _mmsd->mysql=GloMyMon->My_Conn_Pool->get_connection(_mmsd->hostname, _mmsd->port); + if (_mmsd->mysql==NULL) { + state_machine_handler(-1,-1,_mmsd); + } else { + int fd=mysql_get_socket(_mmsd->mysql); + _mmsd->ST=20; + state_machine_handler(fd,-1,_mmsd); + } + i++; + } + } + + // start libevent loop + event_base_dispatch(libevent_base); + +__end_monitor_read_only_loop: + if (sds) { + sqlite3_stmt *statement; + sqlite3 *mondb=monitordb->get_db(); + int rc; + char *query=NULL; + query=(char *)"DELETE FROM mysql_server_read_only_log WHERE time_start < ?1"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 1, start_time-mysql_thread___monitor_history*1000); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); + sqlite3_finalize(statement); + + query=(char *)"INSERT OR REPLACE INTO mysql_server_read_only_log VALUES (?1 , ?2 , ?3 , ?4 , ?5 , ?6)"; + rc=sqlite3_prepare_v2(mondb, query, -1, &statement, 0); + assert(rc==SQLITE_OK); + SQLite3_result *result=new SQLite3_result(3); + result->add_column_definition(SQLITE_TEXT,"Host"); + result->add_column_definition(SQLITE_TEXT,"Port"); + result->add_column_definition(SQLITE_TEXT,"RO"); + while (i>0) { + i--; + int read_only=1; // as a safety mechanism , read_only=1 is the default + MySQL_Monitor_State_Data *mmsd=sds[i]; + rc=sqlite3_bind_text(statement, 1, mmsd->hostname, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int(statement, 2, mmsd->port); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 3, start_time); assert(rc==SQLITE_OK); + rc=sqlite3_bind_int64(statement, 4, (mmsd->mysql_error_msg ? 0 : mmsd->t2-mmsd->t1)); assert(rc==SQLITE_OK); + if (mmsd->result) { + unsigned int num_fields; + unsigned int k; + MYSQL_FIELD *fields; + int j=-1; + num_fields = mysql_num_fields(mmsd->result); + fields = mysql_fetch_fields(mmsd->result); + for(k = 0; k < num_fields; k++) { + //if (strcmp("VARIABLE_NAME", fields[k].name)==0) { + if (strcmp("Value", fields[k].name)==0) { + j=k; + } + } + if (j>-1) { + MYSQL_ROW row=mysql_fetch_row(mmsd->result); + if (row) { + if (row[j]) { + if (!strcmp(row[j],"0") || !strcasecmp(row[j],"OFF")) + read_only=0; + } + } + } +// if (repl_lag>=0) { + rc=sqlite3_bind_int64(statement, 5, read_only); assert(rc==SQLITE_OK); +// } else { +// rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); +// } + mysql_free_result(mmsd->result); + mmsd->result=NULL; + } else { + rc=sqlite3_bind_null(statement, 5); assert(rc==SQLITE_OK); + } + rc=sqlite3_bind_text(statement, 6, mmsd->mysql_error_msg, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK); + SAFE_SQLITE3_STEP(statement); + rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK); + rc=sqlite3_reset(statement); assert(rc==SQLITE_OK); +// FIXME: MyHGM->replication_lag_action(mmsd->hostgroup_id, mmsd->hostname, mmsd->port, (repl_lag==-1 ? 0 : repl_lag)); + + char *pta[3]; + char roport[10]; + char roval[10]; + pta[0]=mmsd->hostname; + sprintf(roport,"%d",mmsd->port); + pta[1]=roport; + sprintf(roval,"%d",read_only); + pta[2]=roval; + result->add_row(pta); + delete mmsd; + } + sqlite3_finalize(statement); + free(sds); + + // we now have a resultset we will sent to MyHGM to perform all the switches + MyHGM->read_only_action(result); + delete result; + } + + + if (resultset) + delete resultset; + + event_base_free(libevent_base); + + +__sleep_monitor_read_only: + t2=monotonic_time(); + if (t2 500000) { + st = 500000; + } + usleep(st); + } + } + return NULL; +} + void * MySQL_Monitor::monitor_replication_lag() { // initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it) struct event_base *libevent_base; @@ -964,6 +1235,7 @@ void * MySQL_Monitor::run() { mysql_thr->refresh_variables(); std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this); std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this); + std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); std::thread * monitor_replication_lag_thread = new std::thread(&MySQL_Monitor::monitor_replication_lag,this); while (shutdown==false) { unsigned int glover=GloMTH->get_global_version(); @@ -976,6 +1248,7 @@ void * MySQL_Monitor::run() { } monitor_connect_thread->join(); monitor_ping_thread->join(); + monitor_read_only_thread->join(); monitor_replication_lag_thread->join(); return NULL; };