For on Scheduler , issue #578

Scheduler is almost completed.
proxysql forks and call external scripts
connleak
René Cannaò 10 years ago
parent bda2cf4350
commit 79d70b3136

@ -27,12 +27,16 @@ class Scheduler_Row {
class ProxySQL_External_Scheduler {
private:
unsigned long long next_run;
public:
unsigned int last_version;
unsigned int version;
rwlock_t rwlock;
std::vector<Scheduler_Row *> Scheduler_Rows;
ProxySQL_External_Scheduler();
~ProxySQL_External_Scheduler();
void run_once();
unsigned long long run_once();
void update_table(SQLite3_result *result);
};
@ -187,5 +191,9 @@ class ProxySQL_Admin {
void mysql_servers_wrlock();
void mysql_servers_wrunlock();
// wrapper to call a private function
unsigned long long scheduler_run_once() { return scheduler->run_once(); }
};
#endif /* __CLASS_PROXYSQL_ADMIN_H */

@ -2150,7 +2150,17 @@ static void * admin_main_loop(void *arg)
socklen_t addr_size = sizeof(addr);
pthread_t child;
size_t stacks;
rc=poll(fds,nfds,1000);
unsigned long long curtime=monotonic_time();
unsigned long long next_run=GloAdmin->scheduler_run_once();
unsigned long long poll_wait=500000;
if (next_run < curtime + 500000) {
poll_wait=next_run-curtime;
}
if (poll_wait > 500000) {
poll_wait=500000;
}
poll_wait=poll_wait/1000; // conversion to millisecond
rc=poll(fds,nfds,poll_wait);
//if (__sync_fetch_and_add(&GloVars.global.nostart,0)==0) {
// __sync_fetch_and_add(&GloVars.global.nostart,1);
if ((nostart_ && __sync_val_compare_and_swap(&GloVars.global.nostart,0,1)==0) || __sync_fetch_and_add(&glovars.shutdown,0)==1) {
@ -4346,6 +4356,9 @@ Scheduler_Row::~Scheduler_Row() {
ProxySQL_External_Scheduler::ProxySQL_External_Scheduler() {
spinlock_rwlock_init(&rwlock);
last_version=0;
version=0;
next_run=0;
}
ProxySQL_External_Scheduler::~ProxySQL_External_Scheduler() {
@ -4372,5 +4385,75 @@ void ProxySQL_External_Scheduler::update_table(SQLite3_result *resultset) {
);
Scheduler_Rows.push_back(sr);
}
// increase version
__sync_fetch_and_add(&version,1);
// unlock
spin_wrunlock(&rwlock);
}
unsigned long long ProxySQL_External_Scheduler::run_once() {
Scheduler_Row *sr=NULL;
unsigned long long curtime=monotonic_time();
curtime=curtime/1000;
spin_rdlock(&rwlock);
if (__sync_add_and_fetch(&version,0) > last_version) { // version was changed
next_run=0;
last_version=version;
for (std::vector<Scheduler_Row *>::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) {
sr=*it;
sr->next=curtime+sr->interval_ms;
if (next_run==0) {
next_run=sr->next;
} else {
if (sr->next < next_run) { // we try to find the first event that needs to be executed
next_run=sr->next;
}
}
}
}
if (curtime >= next_run) {
next_run=0;
for (std::vector<Scheduler_Row *>::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) {
sr=*it;
if (curtime >= sr->next) {
// the event is scheduled for execution
sr->next=curtime+sr->interval_ms;
char **newargs=(char **)malloc(7*sizeof(char *));
for (int i=1;i<7;i++) {
newargs[i]=sr->args[i-1];
}
newargs[0]=sr->filename;
pid_t cpid;
cpid = fork();
if (cpid == -1) {
perror("fork");
exit(EXIT_FAILURE);
}
if (cpid == 0) {
char *newenviron[] = { NULL };
int rc;
rc=execve(sr->filename, newargs, newenviron);
if (rc) {
exit(EXIT_FAILURE);
}
}
free(newargs);
}
if (next_run==0) {
next_run=sr->next;
} else {
if (sr->next < next_run) { // we try to find the first event that needs to be executed
next_run=sr->next;
}
}
}
}
// find the smaller next_run
for (std::vector<Scheduler_Row *>::iterator it=Scheduler_Rows.begin(); it!=Scheduler_Rows.end(); ++it) {
sr=*it;
if (next_run==0) {
}
}
spin_rdunlock(&rwlock);
return next_run;
}

Loading…
Cancel
Save