|
|
|
|
@ -9,6 +9,7 @@
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include <map>
|
|
|
|
|
#include <mutex>
|
|
|
|
|
#include <thread>
|
|
|
|
|
#include "proxysql.h"
|
|
|
|
|
#include "cpp.h"
|
|
|
|
|
@ -115,124 +116,63 @@ static void close_mysql(MYSQL *my) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
struct cmp_str {
|
|
|
|
|
bool operator()(char const *a, char const *b) const
|
|
|
|
|
{
|
|
|
|
|
return strcmp(a, b) < 0;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class MySQL_Monitor_Connection_Pool {
|
|
|
|
|
private:
|
|
|
|
|
pthread_mutex_t mutex;
|
|
|
|
|
int size;
|
|
|
|
|
std::map<char *, PtrArray *, cmp_str> my_connections;
|
|
|
|
|
public:
|
|
|
|
|
MySQL_Monitor_Connection_Pool();
|
|
|
|
|
~MySQL_Monitor_Connection_Pool();
|
|
|
|
|
private:
|
|
|
|
|
std::mutex mutex;
|
|
|
|
|
std::map<std::pair<std::string, int>, std::vector<MYSQL*> > my_connections;
|
|
|
|
|
public:
|
|
|
|
|
MYSQL * get_connection(char *hostname, int port);
|
|
|
|
|
void put_connection(char *hostname, int port, MYSQL *my);
|
|
|
|
|
void purge_idle_connections();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
MySQL_Monitor_Connection_Pool::MySQL_Monitor_Connection_Pool() {
|
|
|
|
|
size=0;
|
|
|
|
|
pthread_mutex_init(&mutex,NULL);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MySQL_Monitor_Connection_Pool::~MySQL_Monitor_Connection_Pool() {
|
|
|
|
|
// FIXME: destructor is yet not completed
|
|
|
|
|
std::map<char *, PtrArray *>::iterator it;
|
|
|
|
|
for(it = my_connections.begin(); it != my_connections.end(); it++) {
|
|
|
|
|
PtrArray *lst=it->second;
|
|
|
|
|
delete lst;
|
|
|
|
|
char *host=it->first;
|
|
|
|
|
free(host);
|
|
|
|
|
}
|
|
|
|
|
my_connections.erase(my_connections.begin(),my_connections.end());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MySQL_Monitor_Connection_Pool::purge_idle_connections() {
|
|
|
|
|
unsigned long long now=monotonic_time();
|
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
|
std::map<char *, PtrArray *>::iterator it;
|
|
|
|
|
//fprintf(stderr,"conn pool size: %d\n",my_connections.size());
|
|
|
|
|
unsigned int totconn;
|
|
|
|
|
totconn=0;
|
|
|
|
|
for(it = my_connections.begin(); it != my_connections.end(); it++) {
|
|
|
|
|
PtrArray *lst=it->second;
|
|
|
|
|
totconn+=lst->len;
|
|
|
|
|
}
|
|
|
|
|
__loop_purge_idle_connections:
|
|
|
|
|
//fprintf(stderr,"tot conn in pool: %d\n",totconn);
|
|
|
|
|
for(it = my_connections.begin(); it != my_connections.end(); it++) {
|
|
|
|
|
PtrArray *lst=it->second;
|
|
|
|
|
if (lst->len) {
|
|
|
|
|
unsigned int it3;
|
|
|
|
|
for(it3 = 0; it3 < lst->len; it3++) {
|
|
|
|
|
MYSQL *my=(MYSQL *)(lst->index(it3));
|
|
|
|
|
unsigned long long then=0;
|
|
|
|
|
memcpy(&then,my->net.buff,sizeof(unsigned long long));
|
|
|
|
|
if (now > (then + mysql_thread___monitor_ping_interval*1000 * 3)) {
|
|
|
|
|
MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false);
|
|
|
|
|
mmsd->mysql=my;
|
|
|
|
|
WorkItem *item;
|
|
|
|
|
item=new WorkItem(mmsd,NULL);
|
|
|
|
|
GloMyMon->queue.add(item);
|
|
|
|
|
lst->remove_index_fast(it3);
|
|
|
|
|
it3--;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
unsigned long long now = monotonic_time();
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
for(auto it = my_connections.begin(); it != my_connections.end();) {
|
|
|
|
|
auto& lst = it->second;
|
|
|
|
|
for(auto it3 = lst.begin(); it3 != lst.end();) {
|
|
|
|
|
MYSQL *my = *it3;
|
|
|
|
|
unsigned long long then = *(unsigned long long*)my->net.buff;
|
|
|
|
|
if (now > (then + mysql_thread___monitor_ping_interval*1000 * 3)) {
|
|
|
|
|
MySQL_Monitor_State_Data *mmsd= new MySQL_Monitor_State_Data((char *)"",0,NULL,false);
|
|
|
|
|
mmsd->mysql=my;
|
|
|
|
|
GloMyMon->queue.add(new WorkItem(mmsd,NULL));
|
|
|
|
|
std::swap(*it3, lst.back());
|
|
|
|
|
if(it3 == lst.end() - 1)
|
|
|
|
|
it3 = lst.erase(it3);
|
|
|
|
|
else
|
|
|
|
|
lst.pop_back();
|
|
|
|
|
} else
|
|
|
|
|
++it3;
|
|
|
|
|
}
|
|
|
|
|
if (lst.size()) {
|
|
|
|
|
++it;
|
|
|
|
|
} else {
|
|
|
|
|
my_connections.erase(it);
|
|
|
|
|
goto __loop_purge_idle_connections;
|
|
|
|
|
it = my_connections.erase(it);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MYSQL * MySQL_Monitor_Connection_Pool::get_connection(char *hostname, int port) {
|
|
|
|
|
std::map<char *, PtrArray * , cmp_str >::iterator it;
|
|
|
|
|
char *buf=(char *)malloc(16+strlen(hostname));
|
|
|
|
|
sprintf(buf,"%s:%d",hostname,port);
|
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
|
it = my_connections.find(buf);
|
|
|
|
|
free(buf);
|
|
|
|
|
if (it != my_connections.end()) {
|
|
|
|
|
PtrArray *lst=it->second;
|
|
|
|
|
if (lst->len) {
|
|
|
|
|
MYSQL *ret=(MYSQL *)lst->remove_index_fast(0);
|
|
|
|
|
size--;
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
memset(ret->net.buff,0,sizeof(unsigned long long)); // reset what was polluted
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
return NULL;
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
auto it = my_connections.find(std::make_pair(hostname, port));
|
|
|
|
|
if (it == my_connections.end() || !it->second.size())
|
|
|
|
|
return NULL;
|
|
|
|
|
MYSQL *my = it->second.back();
|
|
|
|
|
it->second.pop_back();
|
|
|
|
|
*(unsigned long long*)my->net.buff = 0;
|
|
|
|
|
return my;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void MySQL_Monitor_Connection_Pool::put_connection(char *hostname, int port, MYSQL *my) {
|
|
|
|
|
size++;
|
|
|
|
|
std::map<char *, PtrArray * , cmp_str >::iterator it;
|
|
|
|
|
char * buf=(char *)malloc(16+strlen(hostname));
|
|
|
|
|
sprintf(buf,"%s:%d",hostname,port);
|
|
|
|
|
unsigned long long now=monotonic_time();
|
|
|
|
|
memcpy(my->net.buff,&now,sizeof(unsigned long long)); //mark insert time
|
|
|
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
|
it = my_connections.find(buf);
|
|
|
|
|
PtrArray *lst=NULL;
|
|
|
|
|
if (it==my_connections.end()) {
|
|
|
|
|
lst=new PtrArray();
|
|
|
|
|
my_connections.insert(my_connections.begin(), std::pair<char *, PtrArray *>(buf,lst));
|
|
|
|
|
} else {
|
|
|
|
|
free(buf);
|
|
|
|
|
lst=it->second;
|
|
|
|
|
}
|
|
|
|
|
lst->add(my);
|
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
|
unsigned long long now = monotonic_time();
|
|
|
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
|
|
|
*(unsigned long long*)my->net.buff = now;
|
|
|
|
|
auto it = my_connections.emplace(std::piecewise_construct,
|
|
|
|
|
std::forward_as_tuple(hostname, port), std::forward_as_tuple()).first;
|
|
|
|
|
it->second.push_back(my);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(char *h, int p, struct event_base *b, bool _use_ssl, int g) {
|
|
|
|
|
|