Implementation of thread pool on Monitor

For connect, ping and read_only
pull/631/head
René Cannaò 10 years ago
parent f3899950e5
commit d59ee332ac

@ -5,6 +5,9 @@
//#include "btree_map.h"
#include "proxysql.h"
#include "cpp.h"
#include "thread.h"
#include "wqueue.h"
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_CONNECT "CREATE TABLE mysql_server_connect (hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_since INT NOT NULL DEFAULT 0 , time_until INT NOT NULL DEFAULT 0 , connect_success_count INT NOT NULL DEFAULT 0 , connect_success_first INT NOT NULL DEFAULT 0 , connect_success_last INT NOT NULL DEFAULT 0 , connect_success_time_min INT NOT NULL DEFAULT 0 , connect_success_time_max INT NOT NULL DEFAULT 0 , connect_success_time_total INT NOT NULL DEFAULT 0 , connect_failure_count INT NOT NULL DEFAULT 0 , connect_failure_first INT NOT NULL DEFAULT 0 , connect_failure_last INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostname, port))"
@ -19,6 +22,7 @@
#define MONITOR_SQLITE_TABLE_MYSQL_SERVER_REPLICATION_LAG_LOG "CREATE TABLE mysql_server_replication_lag_log ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , time_start INT NOT NULL DEFAULT 0 , success_time INT DEFAULT 0 , repl_lag INT DEFAULT 0 , error VARCHAR , PRIMARY KEY (hostname, port, time_start))"
class MySQL_Monitor_Connection_Pool;
enum MySQL_Monitor_State_Data_Task_Type {
@ -67,6 +71,16 @@ class MySQL_Monitor_State_Data {
// void next_event(MDB_ASYNC_ST new_st);
};
class WorkItem {
public:
MySQL_Monitor_State_Data *mmsd;
void *(*routine) (void *);
WorkItem(MySQL_Monitor_State_Data *_mmsd, void *(*start_routine) (void *)) {
mmsd=_mmsd;
routine=start_routine;
}
~WorkItem() {}
};
class MySQL_Monitor {
private:
@ -77,6 +91,7 @@ class MySQL_Monitor {
void drop_tables_defs(std::vector<table_def_t *> *tables_defs);
void check_and_build_standard_tables(SQLite3DB *db, std::vector<table_def_t *> *tables_defs);
public:
wqueue<WorkItem*> queue;
MySQL_Monitor_Connection_Pool *My_Conn_Pool;
bool shutdown;
SQLite3DB *admindb; // internal database

@ -43,23 +43,44 @@ static MySQL_Monitor *GloMyMon;
static void state_machine_handler(int fd, short event, void *arg);
class ConsumerThreadPing : public Thread {
wqueue<MySQL_Monitor_State_Data*>& m_queue;
void *(*routine) (void *);
/*
class WorkItem {
public:
MySQL_Monitor_State_Data *mmsd;
void *(*routine) (void *);
WorkItem(MySQL_Monitor_State_Data *_mmsd, void *(*start_routine) (void *)) {
mmsd=_mmsd;
routine=start_routine;
}
~WorkItem() {}
};
*/
class ConsumerThread : public Thread {
//wqueue<MySQL_Monitor_State_Data*>& m_queue;
wqueue<WorkItem*>& m_queue;
//void *(*routine) (void *);
int thrn;
public:
ConsumerThreadPing(wqueue<MySQL_Monitor_State_Data*>& queue, void *(*start_routine) (void *), int _n) : m_queue(queue) {
routine=start_routine;
//ConsumerThreadPing(wqueue<MySQL_Monitor_State_Data*>& queue, void *(*start_routine) (void *), int _n) : m_queue(queue) {
ConsumerThread(wqueue<WorkItem*>& queue, int _n) : m_queue(queue) {
//routine=start_routine;
thrn=_n;
}
void* run() {
// Remove 1 item at a time and process it. Blocks if no items are
// available to process.
for (int i = 0;; i++) {
printf("thread %d, loop %d - waiting for item...\n", thrn, i);
MySQL_Monitor_State_Data* mmsd = (MySQL_Monitor_State_Data*)m_queue.remove();
printf("thread %d, loop %d - got one item\n", thrn, i);
routine((void *)mmsd);
// printf("thread %d, loop %d - waiting for item...\n", thrn, i);
//MySQL_Monitor_State_Data* mmsd = (MySQL_Monitor_State_Data*)m_queue.remove();
WorkItem* item = (WorkItem*)m_queue.remove();
if (item==NULL) {
// this is intentional to EXIT immediately
return NULL;
}
// printf("thread %d, loop %d - got one item\n", thrn, i);
item->routine((void *)item->mmsd);
//routine((void *)mmsd);
delete item;
}
return NULL;
}
@ -1157,10 +1178,13 @@ void * MySQL_Monitor::monitor_connect() {
SQLite3_row *r=*it;
MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2]));
mmsd->mondb=monitordb;
pthread_t thr_;
if ( pthread_create(&thr_, &attr, monitor_connect_thread, (void *)mmsd) != 0 ) {
perror("Thread creation monitor_connect_thread");
}
//pthread_t thr_;
//if ( pthread_create(&thr_, &attr, monitor_connect_thread, (void *)mmsd) != 0 ) {
// perror("Thread creation monitor_connect_thread");
//}
WorkItem* item;
item=new WorkItem(mmsd,monitor_connect_thread);
GloMyMon->queue.add(item);
}
}
@ -1224,13 +1248,20 @@ void * MySQL_Monitor::monitor_ping() {
unsigned long long t2;
unsigned long long start_time;
unsigned long long next_loop_at=0;
wqueue<MySQL_Monitor_State_Data*> queue;
ConsumerThreadPing **threads= (ConsumerThreadPing **)malloc(sizeof(ConsumerThreadPing *)*MONTHREADS);
// wqueue<MySQL_Monitor_State_Data*> queue;
// ConsumerThreadPing **threads= (ConsumerThreadPing **)malloc(sizeof(ConsumerThreadPing *)*MONTHREADS);
// for (int i=0;i<MONTHREADS; i++) {
// threads[i] = new ConsumerThreadPing(queue,monitor_ping_thread, i);
// threads[i]->start();
// }
/*
wqueue<WorkItem*> queue;
ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*MONTHREADS);
for (int i=0;i<MONTHREADS; i++) {
threads[i] = new ConsumerThreadPing(queue,monitor_ping_thread, i);
threads[i] = new ConsumerThread(queue, i);
threads[i]->start();
}
*/
while (shutdown==false) {
unsigned int glover;
@ -1272,7 +1303,9 @@ void * MySQL_Monitor::monitor_ping() {
// if ( pthread_create(&thr_, &attr, monitor_ping_thread, (void *)mmsd) != 0 ) {
// perror("Thread creation monitor_ping_thread");
// }
queue.add(mmsd);
WorkItem* item;
item=new WorkItem(mmsd,monitor_ping_thread);
GloMyMon->queue.add(item);
}
}
@ -1419,10 +1452,12 @@ __sleep_monitor_ping_loop:
delete mysql_thr;
mysql_thr=NULL;
}
/*
for (int i=0;i<MONTHREADS; i++) {
threads[i]->join();
}
free(threads);
*/
return NULL;
}
@ -1482,10 +1517,13 @@ void * MySQL_Monitor::monitor_read_only() {
SQLite3_row *r=*it;
MySQL_Monitor_State_Data *mmsd=new MySQL_Monitor_State_Data(r->fields[0],atoi(r->fields[1]), NULL, atoi(r->fields[2]));
mmsd->mondb=monitordb;
pthread_t thr_;
if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) {
perror("Thread creation monitor_read_only_thread");
}
//pthread_t thr_;
//if ( pthread_create(&thr_, &attr, monitor_read_only_thread, (void *)mmsd) != 0 ) {
// perror("Thread creation monitor_read_only_thread");
//}
WorkItem* item;
item=new WorkItem(mmsd,monitor_read_only_thread);
GloMyMon->queue.add(item);
}
}
@ -1725,6 +1763,12 @@ void * MySQL_Monitor::run() {
mysql_thr->curtime=monotonic_time();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
mysql_thr->refresh_variables();
//wqueue<WorkItem*> queue;
ConsumerThread **threads= (ConsumerThread **)malloc(sizeof(ConsumerThread *)*MONTHREADS);
for (int i=0;i<MONTHREADS; i++) {
threads[i] = new ConsumerThread(queue, i);
threads[i]->start();
}
std::thread * monitor_connect_thread = new std::thread(&MySQL_Monitor::monitor_connect,this);
std::thread * monitor_ping_thread = new std::thread(&MySQL_Monitor::monitor_ping,this);
std::thread * monitor_read_only_thread = new std::thread(&MySQL_Monitor::monitor_read_only,this);
@ -1739,6 +1783,13 @@ void * MySQL_Monitor::run() {
}
usleep(500000);
}
for (int i=0;i<MONTHREADS; i++) {
GloMyMon->queue.add(NULL);
}
for (int i=0;i<MONTHREADS; i++) {
threads[i]->join();
}
free(threads);
monitor_connect_thread->join();
monitor_ping_thread->join();
monitor_read_only_thread->join();

@ -131,13 +131,13 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY , verbosity INT NOT NULL DEFAULT 0)"
#endif /* DEBUG */
/*
#define CMD1 1
#define CMD2 2
#define CMD3 3
#define CMD4 4
#define CMD5 5
*/
static char * admin_variables_names[]= {
(char *)"admin_credentials",
@ -154,13 +154,12 @@ static char * admin_variables_names[]= {
NULL
};
/*
static t_symstruct lookuptable[] = {
{ SpookyHash::Hash32("SHOW",4,0), CMD1 },
{ SpookyHash::Hash32("SET",3,0), CMD2 },
{ SpookyHash::Hash32("FLUSH",5,0), CMD3 },
};
#define NKEYS (sizeof(lookuptable)/sizeof(t_symstruct))
static uint32_t keyfromhash(uint32_t hash) {
@ -174,6 +173,7 @@ static uint32_t keyfromhash(uint32_t hash) {
}
return -1;
}
*/
static ProxySQL_Admin *SPA=NULL;
@ -2165,7 +2165,7 @@ void* child_telnet(void* arg)
}
char *eow = strchr(line, '\n');
if (eow) *eow=0;
SPA->is_command(line);
//SPA->is_command(line);
if (strncmp(line,"shutdown",8)==0) glovars.shutdown=1;
if (send(client, line, strlen(line), MSG_NOSIGNAL)==-1) break;
if (send(client, "\nOK\n", 4, MSG_NOSIGNAL)==-1) break;
@ -2624,7 +2624,7 @@ ProxySQL_Admin::~ProxySQL_Admin() {
delete (re2::RE2::Options *)match_regexes.opt;
};
/*
bool ProxySQL_Admin::is_command(std::string s) {
std::string cps;
std::size_t found = s.find_first_of("\n\r\t ");
@ -2633,7 +2633,7 @@ bool ProxySQL_Admin::is_command(std::string s) {
} else {
cps=s;
}
transform(cps.begin(), cps.end(), cps.begin(), toupper);
std::transform(cps.begin(), cps.end(), cps.begin(), std::toupper);
uint32 cmd_hash=SpookyHash::Hash32(cps.c_str(),cps.length(),0);
std::cout<<cps<<" "<<cmd_hash<<" "<<std::endl;
switch (keyfromhash(cmd_hash)) {
@ -2652,7 +2652,7 @@ bool ProxySQL_Admin::is_command(std::string s) {
}
return true;
};
*/
void ProxySQL_Admin::dump_mysql_collations() {
const CHARSET_INFO * c = compiled_charsets;

Loading…
Cancel
Save