Merge branch 'v1.3.1-connthr' into v1.3.1-dev

v1.3.1-dev-mem
René Cannaò 10 years ago
commit 0ff1327620

@ -3,6 +3,10 @@
#include "proxysql.h"
#include "cpp.h"
#include <thread>
#include "thread.h"
#include "wqueue.h"
/*
Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool
@ -139,6 +143,8 @@ class MySQL_HostGroups_Manager {
void generate_mysql_replication_hostgroups_table();
SQLite3_result *incoming_replication_hostgroups;
std::thread *HGCU_thread;
public:
struct {
unsigned int servers_table_version;
@ -160,6 +166,7 @@ class MySQL_HostGroups_Manager {
unsigned long long commit_cnt_filtered;
unsigned long long rollback_cnt_filtered;
} status;
wqueue<MySQL_Connection *> queue;
MySQL_HostGroups_Manager();
~MySQL_HostGroups_Manager();
// void rdlock();

@ -280,6 +280,9 @@ class MySQL_Threads_Handler
size_t stacksize;
pthread_attr_t attr;
rwlock_t rwlock;
PtrArray *bind_fds;
MySQL_Listeners_Manager *MLM;
public:
struct {
int monitor_history;
int monitor_connect_interval;
@ -356,9 +359,6 @@ class MySQL_Threads_Handler
char * ssl_p2s_cipher;
int query_cache_size_MB;
} variables;
PtrArray *bind_fds;
MySQL_Listeners_Manager *MLM;
public:
unsigned int num_threads;
proxysql_mysql_thread_t *mysql_threads;
proxysql_mysql_thread_t *mysql_threads_idles;

@ -154,5 +154,6 @@ class MySQL_Connection {
void set_is_client(); // used for local_stmts
void reset();
};
#endif /* __CLASS_MYSQL_CONNECTION_H */

@ -4,6 +4,10 @@
#define char_malloc (char *)malloc
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
#include "thread.h"
#include "wqueue.h"
//#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, PRIMARY KEY (hostgroup_id, hostname, port) )"
@ -19,6 +23,76 @@ class MySrvList;
class MyHGC;
static void * HGCU_thread_run() {
PtrArray *conn_array=new PtrArray();
while(1) {
MySQL_Connection *myconn=(MySQL_Connection *)MyHGM->queue.remove();
if (myconn==NULL) {
// intentionally exit immediately
return NULL;
}
conn_array->add(myconn);
while (MyHGM->queue.size()) {
myconn=(MySQL_Connection *)MyHGM->queue.remove();
if (myconn==NULL) return NULL;
conn_array->add(myconn);
}
unsigned int l=conn_array->len;
int *errs=(int *)malloc(sizeof(int)*l);
int *statuses=(int *)malloc(sizeof(int)*l);
my_bool *ret=(my_bool *)malloc(sizeof(my_bool)*l);
int i;
for (i=0;i<(int)l;i++) {
myconn=(MySQL_Connection *)conn_array->index(i);
statuses[i]=mysql_change_user_start(&ret[i], myconn->mysql, myconn->userinfo->password, myconn->userinfo->password, myconn->userinfo->schemaname);
}
for (i=0;i<(int)conn_array->len;i++) {
if (statuses[i]==0) {
myconn=(MySQL_Connection *)conn_array->remove_index_fast(i);
if (!ret[i]) {
myconn->reset();
MyHGM->push_MyConn_to_pool(myconn);
} else {
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
i--;
}
}
unsigned long long now=monotonic_time();
while (conn_array->len && ((monotonic_time() - now) < 1000000)) {
usleep(50);
for (i=0;i<(int)conn_array->len;i++) {
myconn=(MySQL_Connection *)conn_array->index(i);
statuses[i]=mysql_change_user_cont(&ret[i], myconn->mysql, statuses[i]);
}
for (i=0;i<(int)conn_array->len;i++) {
if (statuses[i]==0) {
myconn=(MySQL_Connection *)conn_array->remove_index_fast(i);
if (!ret[i]) {
myconn->reset();
MyHGM->push_MyConn_to_pool(myconn);
} else {
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
i--;
}
}
}
while (conn_array->len) {
// we reached here, and there are still connections
myconn=(MySQL_Connection *)conn_array->remove_index_fast(0);
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
free(statuses);
free(errs);
}
}
MySQL_Connection *MySrvConnList::index(unsigned int _k) {
return (MySQL_Connection *)conns->index(_k);
}
@ -293,6 +367,7 @@ MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
mydb->execute(MYHGM_MYSQL_REPLICATION_HOSTGROUPS);
MyHostGroups=new PtrArray();
incoming_replication_hostgroups=NULL;
HGCU_thread = new std::thread(&HGCU_thread_run);
}
MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
@ -305,6 +380,8 @@ MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
if (admindb) {
delete admindb;
}
queue.add(NULL);
HGCU_thread->join();
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_destroy(&lock);
#endif
@ -661,14 +738,16 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lo
mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
mysrvc->ConnectionsUsed->remove(c);
if (c->largest_query_length > (unsigned int)mysql_thread___threshold_query_length) {
//if (c->largest_query_length > (unsigned int)mysql_thread___threshold_query_length) {
if (c->largest_query_length > (unsigned int)GloMTH->variables.threshold_query_length) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d . largest_query_length = %lu\n", c, mysrvc->address, mysrvc->port, mysrvc->status, c->largest_query_length);
delete c;
goto __exit_push_MyConn_to_pool;
}
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) {
if (c->async_state_machine==ASYNC_IDLE) {
if (c->local_stmts->get_num_entries() > mysql_thread___max_stmts_per_connection) {
//if (c->local_stmts->get_num_entries() > mysql_thread___max_stmts_per_connection) {
if (c->local_stmts->get_num_entries() > (unsigned int)GloMTH->variables.max_stmts_per_connection) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d because has too many prepared statements\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
} else {
@ -897,14 +976,25 @@ MySQL_Connection * MySQL_HostGroups_Manager::get_MyConn_from_pool(unsigned int _
}
void MySQL_HostGroups_Manager::destroy_MyConn_from_pool(MySQL_Connection *c) {
wrlock();
bool to_del=true; // the default, legacy behavior
MySrvC *mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
mysrvc->ConnectionsUsed->remove(c);
status.myconnpoll_destroy++;
//status.server_connections_connected--;
wrunlock();
delete c;
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit && queue.size() < 100) {
// overall, the backend seems healthy and so it is the connection. Try to reset it
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
to_del=false;
c->userinfo->set(mysql_thread___monitor_username,mysql_thread___monitor_password,mysql_thread___default_schema,NULL);
queue.add(c);
} else {
// we lock only this part of the code because we need to remove the connection from ConnectionsUsed
wrlock();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
mysrvc->ConnectionsUsed->remove(c);
status.myconnpoll_destroy++;
wrunlock();
}
if (to_del) {
delete c;
}
}

@ -1587,3 +1587,10 @@ int MySQL_Connection::async_send_simple_command(short event, char *stmt, unsigne
}
return 1;
}
void MySQL_Connection::reset() {
status_flags=0;
reusable=true;
delete local_stmts;
local_stmts=new MySQL_STMTs_local(false);
}

Loading…
Cancel
Save