From d59ee332ac0256029d7e96c3f1da7670cca572ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sat, 23 Jul 2016 13:14:58 +0000 Subject: [PATCH] Implementation of thread pool on Monitor For connect, ping and read_only --- include/MySQL_Monitor.hpp | 15 +++++++ lib/MySQL_Monitor.cpp | 95 ++++++++++++++++++++++++++++++--------- lib/ProxySQL_Admin.cpp | 16 +++---- 3 files changed, 96 insertions(+), 30 deletions(-) diff --git a/include/MySQL_Monitor.hpp b/include/MySQL_Monitor.hpp index e05247940..54ba44983 100644 --- a/include/MySQL_Monitor.hpp +++ b/include/MySQL_Monitor.hpp @@ -5,6 +5,9 @@ //#include "btree_map.h" #include "proxysql.h" #include "cpp.h" +#include "thread.h" +#include "wqueue.h" + #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_CONNECT "CREATE TABLE mysql_server_connect (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_since INT NOT NULL DEFAULT 0 , time_until INT NOT NULL DEFAULT 0 , connect_success_count INT NOT NULL DEFAULT 0 , connect_success_first INT NOT NULL DEFAULT 0 , connect_success_last INT NOT NULL DEFAULT 0 , connect_success_time_min INT NOT NULL DEFAULT 0 , connect_success_time_max INT NOT NULL DEFAULT 0 , connect_success_time_total INT NOT NULL DEFAULT 0 , connect_failure_count INT NOT NULL DEFAULT 0 , connect_failure_first INT NOT NULL DEFAULT 0 , connect_failure_last INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port))" @@ -19,6 +22,7 @@ #define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))" + class MySQL_Monitor_Connection_Pool; enum MySQL_Monitor_State_Data_Task_Type { @@ -67,6 +71,16 @@ class MySQL_Monitor_State_Data { // void next_event(MDB_ASYNC_ST new_st); }; +class WorkItem { + public: + MySQL_Monitor_State_Data *mmsd; + void *(*routine) (void *); + WorkItem(MySQL_Monitor_State_Data *_mmsd, void *(*start_routine) (void *)) { + mmsd=_mmsd; + routine=start_routine; + } + ~WorkItem() {} +}; class MySQL_Monitor { private: @@ -77,6 +91,7 @@ class MySQL_Monitor { void drop_tables_defs(std::vector *tables_defs); void check_and_build_standard_tables(SQLite3DB *db, std::vector *tables_defs); public: + wqueue queue; MySQL_Monitor_Connection_Pool *My_Conn_Pool; bool shutdown; SQLite3DB *admindb; // internal database diff --git a/lib/MySQL_Monitor.cpp b/lib/MySQL_Monitor.cpp index 05dd20de0..da3fe9970 100644 --- a/lib/MySQL_Monitor.cpp +++ b/lib/MySQL_Monitor.cpp @@ -43,23 +43,44 @@ static MySQL_Monitor *GloMyMon; static void state_machine_handler(int fd, short event, void *arg); -class ConsumerThreadPing : public Thread { - wqueue& m_queue; - void *(*routine) (void *); +/* +class WorkItem { + public: + MySQL_Monitor_State_Data *mmsd; + void *(*routine) (void *); + WorkItem(MySQL_Monitor_State_Data *_mmsd, void *(*start_routine) (void *)) { + mmsd=_mmsd; + routine=start_routine; + } + ~WorkItem() {} +}; +*/ +class ConsumerThread : public Thread { + //wqueue& m_queue; + wqueue& m_queue; + //void *(*routine) (void *); int thrn; public: - ConsumerThreadPing(wqueue& queue, void *(*start_routine) (void *), int _n) : m_queue(queue) { - routine=start_routine; + //ConsumerThreadPing(wqueue& queue, void *(*start_routine) (void *), int _n) : m_queue(queue) { + ConsumerThread(wqueue& queue, int _n) : m_queue(queue) { + //routine=start_routine; thrn=_n; } void* run() { // Remove 1 item at a time and process it. Blocks if no items are // available to process. for (int i = 0;; i++) { - printf("thread %d, loop %d - waiting for item...\n", thrn, i); - MySQL_Monitor_State_Data* mmsd = (MySQL_Monitor_State_Data*)m_queue.remove(); - printf("thread %d, loop %d - got one item\n", thrn, i); - routine((void *)mmsd); +// printf("thread %d, loop %d - waiting for item...\n", thrn, i); + //MySQL_Monitor_State_Data* mmsd = (MySQL_Monitor_State_Data*)m_queue.remove(); + WorkItem* item = (WorkItem*)m_queue.remove(); + if (item==NULL) { + // this is intentional to EXIT immediately + return NULL; + } +// printf("thread %d, loop %d - got one item\n", thrn, i); + item->routine((void *)item->mmsd); + //routine((void *)mmsd); + delete item; } return NULL; } @@ -1157,10 +1178,13 @@ void * MySQL_Monitor::monitor_connect() { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); mmsd->mondb=monitordb; - pthread_t thr_; - if ( pthread_create(&thr_, &attr, monitor_connect_thread, (void *)mmsd) != 0 ) { - perror("Thread creation monitor_connect_thread"); - } + //pthread_t thr_; + //if ( pthread_create(&thr_, &attr, monitor_connect_thread, (void *)mmsd) != 0 ) { + // perror("Thread creation monitor_connect_thread"); + //} + WorkItem* item; + item=new WorkItem(mmsd,monitor_connect_thread); + GloMyMon->queue.add(item); } } @@ -1224,13 +1248,20 @@ void * MySQL_Monitor::monitor_ping() { unsigned long long t2; unsigned long long start_time; unsigned long long next_loop_at=0; - wqueue queue; - ConsumerThreadPing **threads= (ConsumerThreadPing **)malloc(sizeof(ConsumerThreadPing *)*MONTHREADS); +// wqueue queue; +// ConsumerThreadPing **threads= (ConsumerThreadPing **)malloc(sizeof(ConsumerThreadPing *)*MONTHREADS); +// for (int i=0;istart(); +// } +/* + wqueue queue; + ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*MONTHREADS); for (int i=0;istart(); } - +*/ while (shutdown==false) { unsigned int glover; @@ -1272,7 +1303,9 @@ void * MySQL_Monitor::monitor_ping() { // if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) { // perror("Thread creation monitor_ping_thread"); // } - queue.add(mmsd); + WorkItem* item; + item=new WorkItem(mmsd,monitor_ping_thread); + GloMyMon->queue.add(item); } } @@ -1419,10 +1452,12 @@ __sleep_monitor_ping_loop: delete mysql_thr; mysql_thr=NULL; } +/* for (int i=0;ijoin(); } free(threads); +*/ return NULL; } @@ -1482,10 +1517,13 @@ void * MySQL_Monitor::monitor_read_only() { SQLite3_row *r=*it; MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2])); mmsd->mondb=monitordb; - pthread_t thr_; - if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) { - perror("Thread creation monitor_read_only_thread"); - } + //pthread_t thr_; + //if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) { + // perror("Thread creation monitor_read_only_thread"); + //} + WorkItem* item; + item=new WorkItem(mmsd,monitor_read_only_thread); + GloMyMon->queue.add(item); } } @@ -1725,6 +1763,12 @@ void * MySQL_Monitor::run() { mysql_thr->curtime=monotonic_time(); MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version(); mysql_thr->refresh_variables(); + //wqueue queue; + ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*MONTHREADS); + for (int i=0;istart(); + } std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this); std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this); std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this); @@ -1739,6 +1783,13 @@ void * MySQL_Monitor::run() { } usleep(500000); } + for (int i=0;iqueue.add(NULL); + } + for (int i=0;ijoin(); + } + free(threads); monitor_connect_thread->join(); monitor_ping_thread->join(); monitor_read_only_thread->join(); diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index f4a587c70..2b6acc415 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -131,13 +131,13 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER; #define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY , verbosity INT NOT NULL DEFAULT 0)" #endif /* DEBUG */ - +/* #define CMD1 1 #define CMD2 2 #define CMD3 3 #define CMD4 4 #define CMD5 5 - +*/ static char * admin_variables_names[]= { (char *)"admin_credentials", @@ -154,13 +154,12 @@ static char * admin_variables_names[]= { NULL }; - +/* static t_symstruct lookuptable[] = { { SpookyHash::Hash32("SHOW",4,0), CMD1 }, { SpookyHash::Hash32("SET",3,0), CMD2 }, { SpookyHash::Hash32("FLUSH",5,0), CMD3 }, }; - #define NKEYS (sizeof(lookuptable)/sizeof(t_symstruct)) static uint32_t keyfromhash(uint32_t hash) { @@ -174,6 +173,7 @@ static uint32_t keyfromhash(uint32_t hash) { } return -1; } +*/ static ProxySQL_Admin *SPA=NULL; @@ -2165,7 +2165,7 @@ void* child_telnet(void* arg) } char *eow = strchr(line, '\n'); if (eow) *eow=0; - SPA->is_command(line); + //SPA->is_command(line); if (strncmp(line,"shutdown",8)==0) glovars.shutdown=1; if (send(client, line, strlen(line), MSG_NOSIGNAL)==-1) break; if (send(client, "\nOK\n", 4, MSG_NOSIGNAL)==-1) break; @@ -2624,7 +2624,7 @@ ProxySQL_Admin::~ProxySQL_Admin() { delete (re2::RE2::Options *)match_regexes.opt; }; - +/* bool ProxySQL_Admin::is_command(std::string s) { std::string cps; std::size_t found = s.find_first_of("\n\r\t "); @@ -2633,7 +2633,7 @@ bool ProxySQL_Admin::is_command(std::string s) { } else { cps=s; } - transform(cps.begin(), cps.end(), cps.begin(), toupper); + std::transform(cps.begin(), cps.end(), cps.begin(), std::toupper); uint32 cmd_hash=SpookyHash::Hash32(cps.c_str(),cps.length(),0); std::cout<