diff --git a/include/proxysql_admin.h b/include/proxysql_admin.h index 0a3d55fb6..5d208ec28 100644 --- a/include/proxysql_admin.h +++ b/include/proxysql_admin.h @@ -27,12 +27,16 @@ class Scheduler_Row { class ProxySQL_External_Scheduler { + private: + unsigned long long next_run; public: + unsigned int last_version; + unsigned int version; rwlock_t rwlock; std::vector Scheduler_Rows; ProxySQL_External_Scheduler(); ~ProxySQL_External_Scheduler(); - void run_once(); + unsigned long long run_once(); void update_table(SQLite3_result *result); }; @@ -187,5 +191,9 @@ class ProxySQL_Admin { void mysql_servers_wrlock(); void mysql_servers_wrunlock(); + + // wrapper to call a private function + unsigned long long scheduler_run_once() { return scheduler->run_once(); } + }; #endif /* __CLASS_PROXYSQL_ADMIN_H */ diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index b61aae641..218864ef1 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -2150,7 +2150,17 @@ static void * admin_main_loop(void *arg) socklen_t addr_size = sizeof(addr); pthread_t child; size_t stacks; - rc=poll(fds,nfds,1000); + unsigned long long curtime=monotonic_time(); + unsigned long long next_run=GloAdmin->scheduler_run_once(); + unsigned long long poll_wait=500000; + if (next_run < curtime + 500000) { + poll_wait=next_run-curtime; + } + if (poll_wait > 500000) { + poll_wait=500000; + } + poll_wait=poll_wait/1000; // conversion to millisecond + rc=poll(fds,nfds,poll_wait); //if (__sync_fetch_and_add(&GloVars.global.nostart,0)==0) { // __sync_fetch_and_add(&GloVars.global.nostart,1); if ((nostart_ && __sync_val_compare_and_swap(&GloVars.global.nostart,0,1)==0) || __sync_fetch_and_add(&glovars.shutdown,0)==1) { @@ -4346,6 +4356,9 @@ Scheduler_Row::~Scheduler_Row() { ProxySQL_External_Scheduler::ProxySQL_External_Scheduler() { spinlock_rwlock_init(&rwlock); + last_version=0; + version=0; + next_run=0; } ProxySQL_External_Scheduler::~ProxySQL_External_Scheduler() { @@ -4372,5 +4385,75 @@ void ProxySQL_External_Scheduler::update_table(SQLite3_result *resultset) { ); Scheduler_Rows.push_back(sr); } + // increase version + __sync_fetch_and_add(&version,1); + // unlock spin_wrunlock(&rwlock); } + +unsigned long long ProxySQL_External_Scheduler::run_once() { + Scheduler_Row *sr=NULL; + unsigned long long curtime=monotonic_time(); + curtime=curtime/1000; + spin_rdlock(&rwlock); + if (__sync_add_and_fetch(&version,0) > last_version) { // version was changed + next_run=0; + last_version=version; + for (std::vector::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) { + sr=*it; + sr->next=curtime+sr->interval_ms; + if (next_run==0) { + next_run=sr->next; + } else { + if (sr->next < next_run) { // we try to find the first event that needs to be executed + next_run=sr->next; + } + } + } + } + if (curtime >= next_run) { + next_run=0; + for (std::vector::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) { + sr=*it; + if (curtime >= sr->next) { + // the event is scheduled for execution + sr->next=curtime+sr->interval_ms; + char **newargs=(char **)malloc(7*sizeof(char *)); + for (int i=1;i<7;i++) { + newargs[i]=sr->args[i-1]; + } + newargs[0]=sr->filename; + pid_t cpid; + cpid = fork(); + if (cpid == -1) { + perror("fork"); + exit(EXIT_FAILURE); + } + if (cpid == 0) { + char *newenviron[] = { NULL }; + int rc; + rc=execve(sr->filename, newargs, newenviron); + if (rc) { + exit(EXIT_FAILURE); + } + } + free(newargs); + } + if (next_run==0) { + next_run=sr->next; + } else { + if (sr->next < next_run) { // we try to find the first event that needs to be executed + next_run=sr->next; + } + } + } + } + // find the smaller next_run + for (std::vector::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) { + sr=*it; + if (next_run==0) { + } + } + spin_rdunlock(&rwlock); + return next_run; +}