Merge pull request #340 from renecannao/master

Bugfixes, enhancements, and code cleanup
pull/350/head
Andrei-Adnan Ismail 11 years ago
commit 1d7ae25567

@ -168,12 +168,12 @@ class MySQL_Data_Stream
}
void free_mysql_real_query();
void destroy_MySQL_Connection() {
MySQL_Connection *mc=myconn;
detach_connection();
unplug_backend();
delete mc;
}
// void destroy_MySQL_Connection() {
// MySQL_Connection *mc=myconn;
// detach_connection();
// unplug_backend();
// delete mc;
// }
};
#endif /* __CLASS_MYSQL_DATA_STREAM_H */

@ -45,11 +45,18 @@ class MySrvC { // MySQL Server Container
enum MySerStatus status;
unsigned int compression;
unsigned int max_connections;
unsigned int connect_OK;
unsigned int connect_ERR;
time_t time_last_detected_error;
unsigned int connect_ERR_at_time_last_detected_error;
unsigned long long queries_sent;
bool shunned_automatic;
//uint8_t charset;
MySrvConnList *ConnectionsUsed;
MySrvConnList *ConnectionsFree;
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int, unsigned int _max_connections);
~MySrvC();
void connect_error();
};
class MySrvList { // MySQL Server List
@ -90,6 +97,8 @@ class MySQL_HostGroups_Manager {
public:
struct {
unsigned long client_connections_aborted;
unsigned long client_connections_created;
int client_connections;
unsigned long myconnpoll_get;
unsigned long myconnpoll_get_ok;

@ -42,10 +42,10 @@ class MySQL_Session
// bool handler___status_CHANGING_SCHEMA(PtrSize_t *);
bool handler___status_CHANGING_USER_SERVER(PtrSize_t *);
// bool handler___status_CHANGING_CHARSET(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrSize_t *);
// void handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrSize_t *);
// void handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *);
void handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *);
// void handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *);
// void handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *);
//void handler___status_CONNECTING_SERVER___STATE_NOT_CONNECTED(PtrSize_t *);
//void handler___status_CONNECTING_SERVER___STATE_CLIENT_HANDSHAKE(PtrSize_t *, bool *);
void handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t *, bool *);
@ -158,6 +158,8 @@ class MySQL_Session
void SQLite3_to_MySQL(SQLite3_result *, char *, int , MySQL_Protocol *);
void MySQL_Result_to_MySQL_wire(MYSQL *mysql, MYSQL_RES *result, MySQL_Protocol *myprot);
SQLite3_result * SQL3_Session_status();
unsigned int NumActiveTransactions();
unsigned long long IdleTime();
void reset_all_backends();
void writeout();

@ -210,6 +210,7 @@ class iface_info {
~iface_info() {
free(iface);
free(address);
close(fd);
}
};
@ -253,6 +254,8 @@ class MySQL_Threads_Handler
bool monitor_timer_cached;
int ping_interval_server;
int ping_timeout_server;
int shun_on_failures;
int shun_recovery_time;
int connect_retries_on_failure;
int connect_retries_delay;
int connect_timeout_server;
@ -270,6 +273,7 @@ class MySQL_Threads_Handler
bool default_reconnect;
bool have_compress;
int max_transaction_time;
int wait_timeout;
int max_connections;
int default_query_delay;
int default_query_timeout;
@ -312,8 +316,10 @@ class MySQL_Threads_Handler
int listener_del(const char *iface);
int listener_del(const char *address, int port);
void start_listeners();
void stop_listeners();
void signal_all_threads(unsigned char _c=0);
SQLite3_result * SQL3_Processlist();
SQLite3_result * SQL3_GlobalStatus();
bool kill_session(uint32_t _thread_session_id);
unsigned long long get_total_queries();
unsigned long long get_slow_queries();

@ -125,6 +125,7 @@ class ProxySQL_Admin {
void stats___mysql_commands_counters();
void stats___mysql_processlist();
void stats___mysql_connection_pool();
void stats___mysql_global();
int Read_Global_Variables_from_configfile(const char *prefix);
int Read_MySQL_Users_from_configfile();

@ -668,6 +668,7 @@ MySQL_HostGroups_Manager *MyHGM;
__thread char *mysql_thread___default_schema;
__thread char *mysql_thread___server_version;
__thread int mysql_thread___max_transaction_time;
__thread int mysql_thread___wait_timeout;
__thread int mysql_thread___max_connections;
__thread int mysql_thread___default_query_delay;
__thread int mysql_thread___default_query_timeout;
@ -675,6 +676,8 @@ __thread int mysql_thread___long_query_time;
__thread int mysql_thread___free_connections_pct;
__thread int mysql_thread___ping_interval_server;
__thread int mysql_thread___ping_timeout_server;
__thread int mysql_thread___shun_on_failures;
__thread int mysql_thread___shun_recovery_time;
__thread int mysql_thread___connect_retries_on_failure;
__thread int mysql_thread___connect_retries_delay;
__thread int mysql_thread___connect_timeout_server;
@ -717,6 +720,7 @@ extern MySQL_HostGroups_Manager *MyHGM;
extern __thread char *mysql_thread___default_schema;
extern __thread char *mysql_thread___server_version;
extern __thread int mysql_thread___max_transaction_time;
extern __thread int mysql_thread___wait_timeout;
extern __thread int mysql_thread___max_connections;
extern __thread int mysql_thread___default_query_delay;
extern __thread int mysql_thread___default_query_timeout;
@ -724,6 +728,8 @@ extern __thread int mysql_thread___long_query_time;
extern __thread int mysql_thread___free_connections_pct;
extern __thread int mysql_thread___ping_interval_server;
extern __thread int mysql_thread___ping_timeout_server;
extern __thread int mysql_thread___shun_on_failures;
extern __thread int mysql_thread___shun_recovery_time;
extern __thread int mysql_thread___connect_retries_on_failure;
extern __thread int mysql_thread___connect_retries_delay;
extern __thread int mysql_thread___connect_timeout_server;

@ -109,12 +109,36 @@ MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _st
status=_status;
compression=_compression;
max_connections=_max_connections;
connect_OK=0;
connect_ERR=0;
queries_sent=0;
time_last_detected_error=0;
connect_ERR_at_time_last_detected_error=0;
shunned_automatic=false;
//charset=_charset;
myhgc=NULL;
ConnectionsUsed=new MySrvConnList(this);
ConnectionsFree=new MySrvConnList(this);
}
void MySrvC::connect_error() {
// NOTE: this function operates without any mutex
// although, it is not extremely important if any counter is lost
// as a single connection failure won't make a significant difference
__sync_fetch_and_add(&connect_ERR,1);
time_t t=time(NULL);
if (t!=time_last_detected_error) {
time_last_detected_error=t;
connect_ERR_at_time_last_detected_error=1;
} else {
int max_failures = ( mysql_thread___shun_on_failures > mysql_thread___connect_retries_on_failure ? mysql_thread___connect_retries_on_failure : mysql_thread___shun_on_failures) ;
if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) {
status=MYSQL_SERVER_STATUS_SHUNNED;
shunned_automatic=true;
}
}
}
MySrvC::~MySrvC() {
if (address) free(address);
delete ConnectionsUsed;
@ -185,6 +209,8 @@ class MyHGC { // MySQL Host Group Container
MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
status.client_connections=0;
status.client_connections_aborted=0;
status.client_connections_created=0;
status.myconnpoll_get=0;
status.myconnpoll_get_ok=0;
status.myconnpoll_get_ping=0;
@ -294,6 +320,9 @@ bool MySQL_HostGroups_Manager::commit() {
if (atoi(r->fields[4])!=atoi(r->fields[9])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[9]));
mysrvc->status=(MySerStatus)atoi(r->fields[9]);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic=false;
}
}
if (atoi(r->fields[5])!=atoi(r->fields[10])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[10]));
@ -426,7 +455,12 @@ void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
mysrvc->ConnectionsUsed->remove(c);
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) {
mysrvc->ConnectionsFree->add(c);
if (c->async_state_machine==ASYNC_IDLE) {
mysrvc->ConnectionsFree->add(c);
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
@ -449,6 +483,23 @@ MySrvC *MyHGC::get_random_MySrvC() {
if (mysrvc->ConnectionsUsed->conns->len < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
sum+=mysrvc->weight;
}
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
// try to recover shunned servers
if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time) {
time_t t;
t=time(NULL);
// we do all these changes without locking . We assume the server is not used from long
// even if the server is still in used and any of the follow command fails it is not critical
// because this is only an attempt to recover a server that is probably dead anyway
if ((t - mysrvc->time_last_detected_error) > mysql_thread___shun_recovery_time) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
mysrvc->shunned_automatic=false;
mysrvc->connect_ERR_at_time_last_detected_error=0;
mysrvc->time_last_detected_error=0;
}
}
}
}
}
if (sum==0) {
@ -600,7 +651,7 @@ __exit_get_multiple_idle_connections:
}
SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
const int colnum=6;
const int colnum=9;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping Connection Pool\n");
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"hostgroup");
@ -609,6 +660,9 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
result->add_column_definition(SQLITE_TEXT,"status");
result->add_column_definition(SQLITE_TEXT,"ConnUsed");
result->add_column_definition(SQLITE_TEXT,"ConnFree");
result->add_column_definition(SQLITE_TEXT,"ConnOK");
result->add_column_definition(SQLITE_TEXT,"ConnERR");
result->add_column_definition(SQLITE_TEXT,"Queries");
wrlock();
int i,j, k;
@ -653,6 +707,12 @@ SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
pta[4]=strdup(buf);
sprintf(buf,"%u", mysrvc->ConnectionsFree->conns->len);
pta[5]=strdup(buf);
sprintf(buf,"%u", mysrvc->connect_OK);
pta[6]=strdup(buf);
sprintf(buf,"%u", mysrvc->connect_ERR);
pta[7]=strdup(buf);
sprintf(buf,"%llu", mysrvc->queries_sent);
pta[8]=strdup(buf);
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])

@ -1266,80 +1266,10 @@ __exit_DSS__STATE_NOT_INITIALIZED:
case PROCESSING_QUERY:
break;
case PINGING_SERVER:
/*
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_PING_SUCCESSFUL) {
myds->DSS=STATE_READY;
/// multi-plexing attempt
if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
myds->myconn->last_time_used=thread->curtime;
myds->myconn->async_state_machine=ASYNC_IDLE;
MyHGM->push_MyConn_to_pool(myds->myconn);
//MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);
//mybe->server_myds->myconn=NULL;
myds->detach_connection();
myds->unplug_backend();
}
// multi-plexing attempt
status=NONE;
}
}
*/
break;
case CHANGING_SCHEMA:
/*
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_INITDB_SUCCESSFUL) {
myds->DSS=STATE_READY;
status=WAITING_CLIENT_DATA;
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
}
if (myconn->async_state_machine==ASYNC_INITDB_FAILED) {
set_unhealthy();
myds->myconn->reusable=false;
return -1;
}
}
*/
break;
case CHANGING_CHARSET:
/*
if (myds->revents) {
myconn->handler(myds->revents);
if (myconn->async_state_machine==ASYNC_SET_NAMES_SUCCESSFUL) {
#ifdef EXPMARIA
myds->DSS=STATE_MARIADB_QUERY;
status=PROCESSING_QUERY;
myds->myconn->async_state_machine=ASYNC_QUERY_START;
myds->myconn->set_query(myds->mysql_real_query.ptr,myds->mysql_real_query.size);
myds->myconn->handler(0);
#else
myds->DSS=STATE_READY;
status=WAITING_CLIENT_DATA;
#endif // EXPMARIA
unsigned int k;
PtrSize_t pkt2;
for (k=0; k<mybe->server_myds->PSarrayOUTpending->len;) {
myds->PSarrayOUTpending->remove_index(0,&pkt2);
myds->PSarrayOUT->add(pkt2.ptr, pkt2.size);
myds->DSS=STATE_QUERY_SENT_DS;
}
}
if (myconn->async_state_machine==ASYNC_SET_NAMES_FAILED) {
set_unhealthy();
myds->myconn->reusable=false;
return -1;
}
}
*/
break;
default:
assert(0);
@ -1353,55 +1283,11 @@ __exit_DSS__STATE_NOT_INITIALIZED:
/*
ATTEMPT TO COMMENT THIS BLOCK
leaving ONLY FAST_FORWARD for now
for (j=0; j<mybe->server_myds->PSarrayIN->len;) {
mybe->server_myds->PSarrayIN->remove_index(0,&pkt);
switch (status) {
case WAITING_SERVER_DATA:
switch (mybe->server_myds->DSS) {
// case STATE_PING_SENT_NET:
// handler___status_WAITING_SERVER_DATA___STATE_PING_SENT(&pkt);
// break;
case STATE_QUERY_SENT_NET:
handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(&pkt);
break;
case STATE_ROW:
handler___status_WAITING_SERVER_DATA___STATE_ROW(&pkt);
break;
case STATE_EOF1:
handler___status_WAITING_SERVER_DATA___STATE_EOF1(&pkt);
break;
case STATE_READING_COM_STMT_PREPARE_RESPONSE:
handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(&pkt);
break;
default:
assert(0);
}
break;
// case CHANGING_SCHEMA:
// if (handler___status_CHANGING_SCHEMA(&pkt)==false) {
// return -1;
// }
// break;
case CHANGING_USER_SERVER:
if (handler___status_CHANGING_USER_SERVER(&pkt)==false) {
return -1;
}
break;
// case CHANGING_CHARSET:
// if (handler___status_CHANGING_CHARSET(&pkt)==false) {
// return -1;
// }
// break;
case FAST_FORWARD:
client_myds->PSarrayOUT->add(pkt.ptr, pkt.size);
break;
@ -1418,73 +1304,6 @@ __exit_DSS__STATE_NOT_INITIALIZED:
writeout();
// FIXME: see bug #211
if (
mybe
&&
mybe->server_myds
&&
mybe->server_myds->DSS==STATE_QUERY_SENT_DS
&&
mybe->server_myds->PSarrayOUT->len==0
&&
mybe->server_myds->PSarrayOUTpending->len==0
&&
mybe->server_myds->net_failure==false
&&
mybe->server_myds->available_data_out()==false
) {
if (connections_handler) {
//fprintf(stderr,"time=%llu\n",monotonic_time());
//mybe->server_myds->timeout=thread->curtime+100;
//mybe->server_myds->DSS=STATE_PING_SENT_NET;
} else {
mybe->server_myds->setDSS_STATE_QUERY_SENT_NET();
}
}
if (mybe && mybe->server_myds) {
if (mybe->server_myds->net_failure) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p , MYDS:%p , myds_type=%d, DSS=%d , myconn:%p\n" , this, mybe->server_myds , mybe->server_myds->myds_type , mybe->server_myds->DSS, mybe->server_myds->myconn);
if (( mybe->server_myds->DSS==STATE_READY || mybe->server_myds->DSS==STATE_QUERY_SENT_DS ) && mybe->server_myds->myds_type==MYDS_BACKEND) {
//mybe->server_myds->myconn=NULL;
mybe->server_myds->detach_connection();
mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
mybe->server_myds->move_from_OUT_to_OUTpending();
if (mybe->server_myds->myconn) {
MyHGM->destroy_MyConn_from_pool(mybe->server_myds->myconn);
//mybe->server_myds->myconn=NULL;
mybe->server_myds->detach_connection();
}
if (mybe->server_myds->fd) {
mybe->server_myds->shut_hard();
// shutdown(mybe->server_myds->fd,SHUT_RDWR);
// close(mybe->server_myds->fd);
mybe->server_myds->fd=0;
thread->mypolls.remove_index_fast(mybe->server_myds->poll_fds_idx);
//server_fd=0;
}
mybe->server_myds->clean_net_failure();
mybe->server_myds->active=1;
goto __get_a_backend;
} else {
set_unhealthy();
}
}
}
//writeout();
/*
if ( // FIXME: this implementation is horrible
(server_myds ? server_myds->PSarrayIN->len==0 : 1 ) &&
(server_myds ? server_myds->PSarrayOUT->len==0 : 1 ) &&
(client_myds ? client_myds->PSarrayIN->len==0 : 1 ) &&
(client_myds ? client_myds->PSarrayOUT->len==0 : 1 )
)
{
to_process=0;
}
*/
if (wrong_pass==true) {
client_myds->array2buffer_full();
client_myds->write_to_net();
@ -1523,6 +1342,7 @@ bool MySQL_Session::handler___status_CHANGING_USER_SERVER(PtrSize_t *pkt) {
return false;
}
/*
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrSize_t *pkt) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Statuses: WAITING_SERVER_DATA - STATE_QUERY_SENT\n");
unsigned char c;
@ -1530,7 +1350,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS
if (mybe->server_myds->myconn->processing_prepared_statement_prepare==false && mybe->server_myds->myconn->processing_prepared_statement_execute==false) {
if (c==0 || c==0xff) {
mybe->server_myds->DSS=STATE_READY;
/* multi-plexing attempt */
// multi-plexing attempt
if (c==0) {
mybe->server_myds->myprot.process_pkt_OK((unsigned char *)pkt->ptr,pkt->size);
if ((mybe->server_myds->myconn->reusable==true) && ((mybe->server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
@ -1542,7 +1362,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS
// mybe->server_myds->unplug_backend();
}
}
/* multi-plexing attempt */
// multi-plexing attempt
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
@ -1616,6 +1436,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_QUERY_SENT(PtrS
}
}
}
*/
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STMT_PREPARE_RESPONSE(PtrSize_t *pkt) {
unsigned char c;
@ -1644,6 +1465,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_READING_COM_STM
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
}
/*
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *pkt) {
unsigned char c;
c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
@ -1657,8 +1479,8 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_ROW(PtrSize_t *
client_myds->PSarrayOUT->add(pkt->ptr, pkt->size);
}
}
*/
/*
void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t *pkt) {
unsigned char c;
c=*((unsigned char *)pkt->ptr+sizeof(mysql_hdr));
@ -1676,7 +1498,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t
client_myds->DSS=STATE_SLEEP;
/* multi-plexing attempt */
// multi-plexing attempt
if (c==0xfe) {
mybe->server_myds->myprot.process_pkt_EOF((unsigned char *)pkt->ptr,pkt->size);
//fprintf(stderr,"hid=%d status=%d\n", mybe->hostgroup_id, server_myds->myprot.prot_status);
@ -1689,7 +1511,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t
// mybe->server_myds->unplug_backend();
}
}
/* multi-plexing attempt */
// multi-plexing attempt
if (qpo) {
if (qpo->cache_ttl>0) { // Fixed bug #145
@ -1746,7 +1568,7 @@ void MySQL_Session::handler___status_WAITING_SERVER_DATA___STATE_EOF1(PtrSize_t
}
}
}
*/
void MySQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) {
// FIXME: no support for SSL yet
@ -1797,6 +1619,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
*wrong_pass=true;
client_myds->setDSS_STATE_QUERY_SENT_NET();
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1040,(char *)"#HY000", (char *)"Too many connections");
__sync_add_and_fetch(&MyHGM->status.client_connections_aborted,1);
client_myds->DSS=STATE_SLEEP;
} else {
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL);
@ -1837,6 +1660,7 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100);
sprintf(_s,"Access denied for user '%s' (using password: %s)", client_myds->myconn->userinfo->username, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"#28000", _s);
__sync_add_and_fetch(&MyHGM->status.client_connections_aborted,1);
free(_s);
client_myds->DSS=STATE_SLEEP;
//return -1;
@ -2401,3 +2225,28 @@ void MySQL_Session::destroy_MySQL_Connection(MySQL_Data_Stream *myds) {
MyHGM->destroy_MyConn_from_pool(myconn);
}
*/
unsigned int MySQL_Session::NumActiveTransactions() {
unsigned int ret=0;
if (mybes==0) return ret;
MySQL_Backend *_mybe;
unsigned int i;
for (i=0; i < mybes->len; i++) {
_mybe=(MySQL_Backend *)mybes->index(i);
if (_mybe->server_myds)
if (_mybe->server_myds->myconn)
if (_mybe->server_myds->myconn->IsActiveTransaction())
ret++;
}
return ret;
}
unsigned long long MySQL_Session::IdleTime() {
if (client_myds==0) return 0;
if (status!=WAITING_CLIENT_DATA) return 0;
int idx=client_myds->poll_fds_idx;
unsigned long long last_sent=thread->mypolls.last_sent[idx];
unsigned long long last_recv=thread->mypolls.last_recv[idx];
unsigned long long last_time=(last_sent > last_recv ? last_sent : last_recv);
return thread->curtime - last_time;
}

@ -134,6 +134,8 @@ void MySQL_Listeners_Manager::del(unsigned int idx) {
}
static char * mysql_thread_variables_names[]= {
(char *)"shun_on_failures",
(char *)"shun_recovery_time",
(char *)"connect_retries_on_failure",
(char *)"connect_retries_delay",
(char *)"connect_timeout_server",
@ -156,6 +158,7 @@ static char * mysql_thread_variables_names[]= {
(char *)"monitor_query_timeout",
(char *)"monitor_timer_cached",
(char *)"max_transaction_time",
(char *)"wait_timeout",
(char *)"max_connections",
(char *)"default_query_delay",
(char *)"default_query_timeout",
@ -195,6 +198,8 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
shutdown_=0;
spinlock_rwlock_init(&rwlock);
pthread_attr_init(&attr);
variables.shun_on_failures=5;
variables.shun_recovery_time=10;
variables.connect_retries_on_failure=5;
variables.connect_timeout_server=1000;
variables.connect_timeout_server_max=10000;
@ -213,6 +218,7 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() {
variables.monitor_query_status=strdup((char *)"SELECT * FROM INFORMATION_SCHEMA.GLOBAL_STATUS");
variables.monitor_timer_cached=true;
variables.max_transaction_time=4*3600*1000;
variables.wait_timeout=8*3600*1000;
variables.max_connections=10*1000;
variables.default_query_delay=0;
variables.default_query_timeout=24*3600*1000;
@ -286,7 +292,7 @@ int MySQL_Threads_Handler::listener_del(const char *iface) {
}
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_fetch_and_add(&thr->mypolls.pending_listener_del,0));
while(__sync_fetch_and_add(&thr->mypolls.pending_listener_del,0));
}
MLM->del(idx);
shutdown(fd,SHUT_RDWR);
@ -350,11 +356,14 @@ int MySQL_Threads_Handler::get_variable_int(char *name) {
if (!strcasecmp(name,"monitor_query_timeout")) return (int)variables.monitor_query_timeout;
if (!strcasecmp(name,"monitor_timer_cached")) return (int)variables.monitor_timer_cached;
}
if (!strcasecmp(name,"shun_on_failures")) return (int)variables.shun_on_failures;
if (!strcasecmp(name,"shun_recovery_time")) return (int)variables.shun_recovery_time;
if (!strcasecmp(name,"connect_retries_on_failure")) return (int)variables.connect_retries_on_failure;
if (!strcasecmp(name,"connect_timeout_server")) return (int)variables.connect_timeout_server;
if (!strcasecmp(name,"connect_timeout_server_max")) return (int)variables.connect_timeout_server_max;
if (!strcasecmp(name,"connect_retries_delay")) return (int)variables.connect_retries_delay;
if (!strcasecmp(name,"max_transaction_time")) return (int)variables.max_transaction_time;
if (!strcasecmp(name,"wait_timeout")) return (int)variables.wait_timeout;
if (!strcasecmp(name,"max_connections")) return (int)variables.max_connections;
if (!strcasecmp(name,"default_query_delay")) return (int)variables.default_query_delay;
if (!strcasecmp(name,"default_query_timeout")) return (int)variables.default_query_timeout;
@ -433,6 +442,14 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
}
return strdup(c->csname);
}
if (!strcasecmp(name,"shun_on_failures")) {
sprintf(intbuf,"%d",variables.shun_on_failures);
return strdup(intbuf);
}
if (!strcasecmp(name,"shun_recovery_time")) {
sprintf(intbuf,"%d",variables.shun_recovery_time);
return strdup(intbuf);
}
if (!strcasecmp(name,"connect_retries_on_failure")) {
sprintf(intbuf,"%d",variables.connect_retries_on_failure);
return strdup(intbuf);
@ -457,6 +474,10 @@ char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public f
sprintf(intbuf,"%d",variables.max_transaction_time);
return strdup(intbuf);
}
if (!strcasecmp(name,"wait_timeout")) {
sprintf(intbuf,"%d",variables.wait_timeout);
return strdup(intbuf);
}
if (!strcasecmp(name,"max_connections")) {
sprintf(intbuf,"%d",variables.max_connections);
return strdup(intbuf);
@ -659,6 +680,15 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"wait_timeout")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 20*24*3600*1000) {
variables.wait_timeout=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"free_connections_pct")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 100) {
@ -722,6 +752,24 @@ bool MySQL_Threads_Handler::set_variable(char *name, char *value) { // this is t
return false;
}
}
if (!strcasecmp(name,"shun_on_failures")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 10000000) {
variables.shun_on_failures=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"shun_recovery_time")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 3600*24*365) {
variables.shun_recovery_time=intv;
return true;
} else {
return false;
}
}
if (!strcasecmp(name,"connect_retries_on_failure")) {
int intv=atoi(value);
if (intv >= 0 && intv <= 1000) {
@ -1003,6 +1051,17 @@ void MySQL_Threads_Handler::start_listeners() {
free_tokenizer( &tok );
}
void MySQL_Threads_Handler::stop_listeners() {
if (variables.interfaces==NULL || strlen(variables.interfaces)==0)
return;
tokenizer_t tok = tokenizer( variables.interfaces, ";", TOKENIZER_NO_EMPTIES );
const char* token;
for (token = tokenize( &tok ); token; token = tokenize( &tok )) {
listener_del((char *)token);
}
free_tokenizer( &tok );
}
MySQL_Threads_Handler::~MySQL_Threads_Handler() {
if (variables.connect_timeout_server_error) free(variables.connect_timeout_server_error);
if (variables.default_schema) free(variables.default_schema);
@ -1288,10 +1347,11 @@ void MySQL_Thread::run() {
// }
}
while ((n=__sync_add_and_fetch(&mypolls.pending_listener_del,0))) { // spin here
poll_listener_del(n);
assert(__sync_bool_compare_and_swap(&mypolls.pending_listener_del,n,0));
if (mysql_thread___wait_timeout==0) {
// we should be going into PAUSE mode
if (mypolls.poll_timeout==0 || mypolls.poll_timeout > 100000) {
mypolls.poll_timeout=100000;
}
}
//this is the only portion of code not protected by a global mutex
@ -1301,6 +1361,11 @@ void MySQL_Thread::run() {
rc=poll(mypolls.fds,mypolls.len, ( mypolls.poll_timeout ? ( mypolls.poll_timeout/1000 < (unsigned int) mysql_thread___poll_timeout ? mypolls.poll_timeout/1000 : mysql_thread___poll_timeout ) : mysql_thread___poll_timeout ) );
proxy_debug(PROXY_DEBUG_NET,5,"%s\n", "Returning poll");
while ((n=__sync_add_and_fetch(&mypolls.pending_listener_del,0))) { // spin here
poll_listener_del(n);
assert(__sync_bool_compare_and_swap(&mypolls.pending_listener_del,n,0));
}
curtime=monotonic_time();
spin_wrlock(&thread_mutex);
@ -1462,6 +1527,15 @@ void MySQL_Thread::process_all_sessions() {
}
for (n=0; n<mysql_sessions->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions->index(n);
unsigned long long sess_time = sess->IdleTime();
unsigned int numTrx = sess->NumActiveTransactions();
if (numTrx) {
// the session has idle transactions, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___max_transaction_time) sess->killed=true;
} else {
// the session is idle, kill it
if (sess_time/1000 > (unsigned long long)mysql_thread___wait_timeout) sess->killed=true;
}
if (sess->healthy==0) {
unregister_session(n);
n--;
@ -1494,12 +1568,15 @@ void MySQL_Thread::refresh_variables() {
GloMTH->wrlock();
__thread_MySQL_Thread_Variables_version=__global_MySQL_Thread_Variables_version;
mysql_thread___max_transaction_time=GloMTH->get_variable_int((char *)"max_transaction_time");
mysql_thread___wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout");
mysql_thread___max_connections=GloMTH->get_variable_int((char *)"max_connections");
mysql_thread___default_query_delay=GloMTH->get_variable_int((char *)"default_query_delay");
mysql_thread___default_query_timeout=GloMTH->get_variable_int((char *)"default_query_timeout");
mysql_thread___long_query_time=GloMTH->get_variable_int((char *)"long_query_time");
mysql_thread___ping_interval_server=GloMTH->get_variable_int((char *)"ping_interval_server");
mysql_thread___ping_timeout_server=GloMTH->get_variable_int((char *)"ping_timeout_server");
mysql_thread___shun_on_failures=GloMTH->get_variable_int((char *)"shun_on_failures");
mysql_thread___shun_recovery_time=GloMTH->get_variable_int((char *)"shun_recovery_time");
mysql_thread___connect_retries_on_failure=GloMTH->get_variable_int((char *)"connect_retries_on_failure");
mysql_thread___connect_timeout_server=GloMTH->get_variable_int((char *)"connect_timeout_server");
mysql_thread___connect_timeout_server_max=GloMTH->get_variable_int((char *)"connect_timeout_server_max");
@ -1629,6 +1706,7 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig
MySQL_Session *sess=create_new_session_and_client_data_stream(c);
//sess->myprot_client.generate_pkt_initial_handshake(sess->client_myds,true,NULL,NULL);
//sess->myprot_client.generate_pkt_initial_handshake(true,NULL,NULL);
__sync_add_and_fetch(&MyHGM->status.client_connections_created,1);
if (__sync_add_and_fetch(&MyHGM->status.client_connections,1) > mysql_thread___max_connections) {
sess->max_connections_reached=true;
}
@ -1749,6 +1827,49 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Threads_status(MySQL_Session *sess)
return result;
}
SQLite3_result * MySQL_Threads_Handler::SQL3_GlobalStatus() {
const int colnum=2;
char buf[256];
char **pta=(char **)malloc(sizeof(char *)*colnum);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping MySQL Global Status\n");
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"Variable_Name");
result->add_column_definition(SQLITE_TEXT,"Variable_Value");
// NOTE: as there is no string copy, we do NOT free pta[0] and pta[1]
{ // Connections created
pta[0]=(char *)"Client_Connections_aborted";
sprintf(buf,"%lu",MyHGM->status.client_connections_aborted);
pta[1]=buf;
result->add_row(pta);
}
{ // Connections
pta[0]=(char *)"Client_Connections_connected";
sprintf(buf,"%d",MyHGM->status.client_connections);
pta[1]=buf;
result->add_row(pta);
}
{ // Connections created
pta[0]=(char *)"Client_Connections_created";
sprintf(buf,"%lu",MyHGM->status.client_connections_created);
pta[1]=buf;
result->add_row(pta);
}
{ // Queries
pta[0]=(char *)"Questions";
sprintf(buf,"%llu",get_total_queries());
pta[1]=buf;
result->add_row(pta);
}
{ // Slow queries
pta[0]=(char *)"Slow_queries";
sprintf(buf,"%llu",get_slow_queries());
pta[1]=buf;
result->add_row(pta);
}
free(pta);
return result;
}
SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() {
const int colnum=14;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping MySQL Processlist\n");
@ -1944,8 +2065,11 @@ unsigned long long MySQL_Threads_Handler::get_total_queries() {
unsigned long long q=0;
unsigned int i;
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
q+=__sync_fetch_and_add(&thr->status_variables.queries,0);
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.queries,0);
}
}
return q;
}
@ -1954,8 +2078,11 @@ unsigned long long MySQL_Threads_Handler::get_slow_queries() {
unsigned long long q=0;
unsigned int i;
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
q+=__sync_fetch_and_add(&thr->status_variables.queries_slow,0);
if (mysql_threads) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
if (thr)
q+=__sync_fetch_and_add(&thr->status_variables.queries_slow,0);
}
}
return q;
}

@ -35,6 +35,8 @@ static volatile bool nostart_=false;
static int __admin_refresh_interval=0;
static bool proxysql_mysql_paused=false;
static int old_wait_timeout;
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
@ -60,12 +62,14 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY , hits INT NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters (Command VARCHAR NOT NULL PRIMARY KEY , Total_Time_us INT NOT NULL , Total_cnt INT NOT NULL , cnt_100us INT NOT NULL , cnt_500us INT NOT NULL , cnt_1ms INT NOT NULL , cnt_5ms INT NOT NULL , cnt_10ms INT NOT NULL , cnt_50ms INT NOT NULL , cnt_100ms INT NOT NULL , cnt_500ms INT NOT NULL , cnt_1s INT NOT NULL , cnt_5s INT NOT NULL , cnt_10s INT NOT NULL , cnt_INFs)"
#define STATS_SQLITE_TABLE_MYSQL_PROCESSLIST "CREATE TABLE stats_mysql_processlist (ThreadID INT NOT NULL , SessionID INTEGER PRIMARY KEY , user VARCHAR , db VARCHAR , cli_host VARCHAR , cli_port VARCHAR , hostgroup VARCHAR , l_srv_host VARCHAR , l_srv_port VARCHAR , srv_host VARCHAR , srv_port VARCHAR , command VARCHAR , time_ms INT NOT NULL , info VARCHAR)"
#define STATS_SQLITE_TABLE_MYSQL_CONNECTION_POOL "CREATE TABLE stats_mysql_connection_pool (hostgroup VARCHAR , srv_host VARCHAR , srv_port VARCHAR , status VARCHAR , ConnUsed INT , ConnFree INT)"
#define STATS_SQLITE_TABLE_MYSQL_CONNECTION_POOL "CREATE TABLE stats_mysql_connection_pool (hostgroup VARCHAR , srv_host VARCHAR , srv_port VARCHAR , status VARCHAR , ConnUsed INT , ConnFree INT , ConnOK INT , ConnERR INT , Queries INT)"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST "CREATE TABLE stats_mysql_query_digest (schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , PRIMARY KEY(schemaname, username, digest))"
#define STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST_RESET "CREATE TABLE stats_mysql_query_digest_reset (schemaname VARCHAR NOT NULL , username VARCHAR NOT NULL , digest VARCHAR NOT NULL , digest_text VARCHAR NOT NULL , count_star INTEGER NOT NULL , first_seen INTEGER NOT NULL , last_seen INTEGER NOT NULL , sum_time INTEGER NOT NULL , min_time INTEGER NOT NULL , max_time INTEGER NOT NULL , PRIMARY KEY(schemaname, username, digest))"
#define STATS_SQLITE_TABLE_MYSQL_GLOBAL "CREATE TABLE stats_mysql_global (Variable_Name VARCHAR NOT NULL PRIMARY KEY , Variable_Value VARCHAR NOT NULL)"
#ifdef DEBUG
#define ADMIN_SQLITE_TABLE_DEBUG_LEVELS "CREATE TABLE debug_levels (module VARCHAR NOT NULL PRIMARY KEY , verbosity INT NOT NULL DEFAULT 0)"
#endif /* DEBUG */
@ -309,8 +313,68 @@ bool admin_handler_command_proxysql(char *query_no_space, unsigned int query_no_
if (query_no_space_length==strlen("PROXYSQL STOP") && !strncasecmp("PROXYSQL STOP",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL STOP command\n");
__sync_bool_compare_and_swap(&glovars.shutdown,0,1);
// to speed up this process we first change wait_timeout to 0
// MySQL_thread will call poll() with a maximum timeout of 100ms
old_wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout");
GloMTH->set_variable((char *)"wait_timeout",(char *)"0");
GloMTH->commit();
GloMTH->signal_all_threads(0);
GloMTH->stop_listeners();
char buf[32];
sprintf(buf,"%d",old_wait_timeout);
GloMTH->set_variable((char *)"wait_timeout",buf);
GloMTH->commit();
glovars.reload=2;
__sync_bool_compare_and_swap(&glovars.shutdown,0,1);
return false;
}
if (query_no_space_length==strlen("PROXYSQL PAUSE") && !strncasecmp("PROXYSQL PAUSE",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL PAUSE command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (nostart_) {
if (__sync_fetch_and_add(&GloVars.global.nostart,0)) {
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"ProxySQL MySQL module not running, impossible to pause");
return false;
}
}
if (proxysql_mysql_paused==false) {
old_wait_timeout=GloMTH->get_variable_int((char *)"wait_timeout");
GloMTH->set_variable((char *)"wait_timeout",(char *)"0");
GloMTH->commit();
// to speed up this process we first change wait_timeout to 0
// MySQL_thread will call poll() with a maximum timeout of 100ms
GloMTH->signal_all_threads(0);
GloMTH->stop_listeners();
proxysql_mysql_paused=true;
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL);
} else {
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"ProxySQL MySQL module is already paused, impossible to pause");
}
return false;
}
if (query_no_space_length==strlen("PROXYSQL RESUME") && !strncasecmp("PROXYSQL RESUME",query_no_space, query_no_space_length)) {
proxy_info("Received PROXYSQL RESUME command\n");
ProxySQL_Admin *SPA=(ProxySQL_Admin *)pa;
if (nostart_) {
if (__sync_fetch_and_add(&GloVars.global.nostart,0)) {
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"ProxySQL MySQL module not running, impossible to resume");
return false;
}
}
if (proxysql_mysql_paused==true) {
// to speed up the process we add the listeners while poll() is called with a maximum timeout of of 100ms
GloMTH->start_listeners();
char buf[32];
sprintf(buf,"%d",old_wait_timeout);
GloMTH->set_variable((char *)"wait_timeout",buf);
GloMTH->commit();
proxysql_mysql_paused=false;
SPA->send_MySQL_OK(&sess->client_myds->myprot, NULL);
} else {
SPA->send_MySQL_ERR(&sess->client_myds->myprot, (char *)"ProxySQL MySQL module is not paused, impossible to resume");
}
return false;
}
@ -872,6 +936,7 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
bool stats_mysql_connection_pool=false;
bool stats_mysql_query_digest=false;
bool stats_mysql_query_digest_reset=false;
bool stats_mysql_global=false;
bool dump_global_variables=false;
if (strcasestr(query_no_space,"processlist"))
@ -884,6 +949,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
{ stats_mysql_query_digest=true; refresh=true; }
if (strstr(query_no_space,"stats_mysql_query_digest_reset"))
{ stats_mysql_query_digest_reset=true; refresh=true; }
if (strstr(query_no_space,"stats_mysql_global"))
{ stats_mysql_global=true; refresh=true; }
if (strstr(query_no_space,"stats_mysql_connection_pool"))
{ stats_mysql_connection_pool=true; refresh=true; }
if (admin) {
@ -902,6 +969,8 @@ void ProxySQL_Admin::GenericRefreshStatistics(const char *query_no_space, unsign
stats___mysql_query_digests_reset();
if (stats_mysql_connection_pool)
stats___mysql_connection_pool();
if (stats_mysql_global)
stats___mysql_global();
if (admin) {
if (dump_global_variables) {
flush_admin_variables___runtime_to_database(admindb, false, false, false);
@ -945,7 +1014,9 @@ void admin_session_handler(MySQL_Session *sess, ProxySQL_Admin *pa, PtrSize_t *p
if (sess->stats==false) {
if ((query_no_space_length>8) && (!strncasecmp("PROXYSQL ", query_no_space, 8))) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "Received PROXYSQL command\n");
pthread_mutex_lock(&admin_mutex);
run_query=admin_handler_command_proxysql(query_no_space, query_no_space_length, sess, pa);
pthread_mutex_unlock(&admin_mutex);
goto __run_query;
}
if ((query_no_space_length>5) && ( (!strncasecmp("SAVE ", query_no_space, 5)) || (!strncasecmp("LOAD ", query_no_space, 5))) ) {
@ -1091,6 +1162,14 @@ void admin_session_handler(MySQL_Session *sess, ProxySQL_Admin *pa, PtrSize_t *p
goto __run_query;
}
if (query_no_space_length==strlen("SHOW MYSQL STATUS") && !strncasecmp("SHOW MYSQL STATUS",query_no_space, query_no_space_length)) {
l_free(query_length,query);
query=l_strdup("SELECT Variable_Name AS Variable_name, Variable_Value AS Value FROM stats_mysql_global ORDER BY variable_name");
query_length=strlen(query)+1;
GloAdmin->stats___mysql_global();
goto __run_query;
}
strA=(char *)"SHOW CREATE TABLE ";
strB=(char *)"SELECT name AS 'table' , REPLACE(REPLACE(sql,' , ', X'2C0A'),'CREATE TABLE %s (','CREATE TABLE %s ('||X'0A') AS 'Create Table' FROM %s.sqlite_master WHERE type='table' AND name='%s'";
strAl=strlen(strA);
@ -1134,7 +1213,7 @@ void admin_session_handler(MySQL_Session *sess, ProxySQL_Admin *pa, PtrSize_t *p
if (query_no_space_length==strlen("SHOW PROCESSLIST") && !strncasecmp("SHOW PROCESSLIST",query_no_space, query_no_space_length)) {
l_free(query_length,query);
query=l_strdup("SELECT SessionID, user, db, hostgroup, command, time_ms, SUBSTR(info,0,100) info FROM stats_mysql_processlist;");
query=l_strdup("SELECT SessionID, user, db, hostgroup, command, time_ms, SUBSTR(info,0,100) info FROM stats_mysql_processlist");
query_length=strlen(query)+1;
goto __run_query;
}
@ -1551,6 +1630,7 @@ bool ProxySQL_Admin::init() {
insert_into_tables_defs(tables_defs_stats,"stats_mysql_connection_pool", STATS_SQLITE_TABLE_MYSQL_CONNECTION_POOL);
insert_into_tables_defs(tables_defs_stats,"stats_mysql_query_digest", STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST);
insert_into_tables_defs(tables_defs_stats,"stats_mysql_query_digest_reset", STATS_SQLITE_TABLE_MYSQL_QUERY_DIGEST_RESET);
insert_into_tables_defs(tables_defs_stats,"stats_mysql_global", STATS_SQLITE_TABLE_MYSQL_GLOBAL);
check_and_build_standard_tables(admindb, tables_defs_admin);
@ -2088,6 +2168,28 @@ bool ProxySQL_Admin::set_variable(char *name, char *value) { // this is the pub
void ProxySQL_Admin::stats___mysql_global() {
if (!GloMTH) return;
SQLite3_result * resultset=GloMTH->SQL3_GlobalStatus();
if (resultset==NULL) return;
statsdb->execute("BEGIN");
statsdb->execute("DELETE FROM stats_mysql_global");
char *a=(char *)"INSERT INTO stats_mysql_global VALUES (\"%s\",\"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<2; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1]);
statsdb->execute(query);
free(query);
}
statsdb->execute("COMMIT");
delete resultset;
}
void ProxySQL_Admin::stats___mysql_processlist() {
if (!GloMTH) return;
SQLite3_result * resultset=GloMTH->SQL3_Processlist();
@ -2117,15 +2219,15 @@ void ProxySQL_Admin::stats___mysql_connection_pool() {
if (resultset==NULL) return;
statsdb->execute("BEGIN");
statsdb->execute("DELETE FROM stats_mysql_connection_pool");
char *a=(char *)"INSERT INTO stats_mysql_connection_pool VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
char *a=(char *)"INSERT INTO stats_mysql_connection_pool VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<6; i++) {
for (int i=0; i<9; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5]);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7],r->fields[8]);
statsdb->execute(query);
free(query);
}

@ -386,11 +386,14 @@ handler_again:
}
break;
case ASYNC_CONNECT_SUCCESSFUL:
__sync_fetch_and_add(&parent->connect_OK,1);
break;
case ASYNC_CONNECT_FAILED:
parent->connect_error();
break;
case ASYNC_CONNECT_TIMEOUT:
proxy_error("Connect timeout on %s:%d : %llu - %llu = %llu\n", parent->address, parent->port, myds->sess->thread->curtime , myds->wait_until, myds->sess->thread->curtime - myds->wait_until);
parent->connect_error();
break;
case ASYNC_CHANGE_USER_START:
change_user_start();
@ -451,6 +454,7 @@ handler_again:
break;
case ASYNC_QUERY_START:
real_query_start();
__sync_fetch_and_add(&parent->queries_sent,1);
if (async_exit_status) {
next_event(ASYNC_QUERY_CONT);
} else {

Loading…
Cancel
Save