processlist shows client info

pull/317/head
René Cannaò 11 years ago
parent 7f27c500a7
commit fa881667e9

@ -59,6 +59,9 @@ class MySQL_Data_Stream
bytes_stats_t bytes_info; // bytes statistics
int fd; // file descriptor
struct sockaddr *client_addr;
socklen_t client_addrlen;
unsigned long long wait_until;
unsigned long long killed_at;
unsigned long long max_connect_time;

@ -1449,7 +1449,10 @@ void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) {
void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsigned int n) {
int c;
c=accept(myds->fd, NULL, NULL);
struct sockaddr *addr=(struct sockaddr *)malloc(sizeof(struct sockaddr));
socklen_t addrlen=sizeof(struct sockaddr);
memset(addr, 0, sizeof(struct sockaddr));
c=accept(myds->fd, addr, &addrlen);
if (c>-1) { // accept() succeeded
// create a new client connection
mypolls.fds[n].revents=0;
@ -1459,11 +1462,14 @@ void MySQL_Thread::listener_handle_new_connection(MySQL_Data_Stream *myds, unsig
if (__sync_add_and_fetch(&MyHGM->status.client_connections,1) > mysql_thread___max_connections) {
sess->max_connections_reached=true;
}
sess->client_myds->client_addrlen=addrlen;
sess->client_myds->client_addr=addr;
sess->client_myds->myprot.generate_pkt_initial_handshake(true,NULL,NULL, &sess->thread_session_id);
ioctl_FIONBIO(sess->client_myds->fd, 1);
mypolls.add(POLLIN|POLLOUT, sess->client_myds->fd, sess->client_myds, curtime);
proxy_debug(PROXY_DEBUG_NET,1,"Session=%p -- Adding client FD %d\n", sess, sess->client_myds->fd);
} else {
free(addr);
// if we arrive here, accept() failed
// because multiple threads try to handle the same incoming connection, this is OK
}
@ -1574,11 +1580,13 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Threads_status(MySQL_Session *sess)
}
SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() {
const int colnum=6;
const int colnum=8;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping MySQL Processlist\n");
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"ThreadID");
result->add_column_definition(SQLITE_TEXT,"SessionID");
result->add_column_definition(SQLITE_TEXT,"cli_host");
result->add_column_definition(SQLITE_TEXT,"cli_port");
result->add_column_definition(SQLITE_TEXT,"hostgroup");
result->add_column_definition(SQLITE_TEXT,"srv_host");
result->add_column_definition(SQLITE_TEXT,"srv_port");
@ -1602,25 +1610,34 @@ SQLite3_result * MySQL_Threads_Handler::SQL3_Processlist() {
pta[0]=strdup(buf);
sprintf(buf,"%u", sess->thread_session_id);
pta[1]=strdup(buf);
if (sess->client_myds->client_addr->sa_family==AF_INET) {
struct sockaddr_in * ipv4addr=(struct sockaddr_in *)sess->client_myds->client_addr;
pta[2]=strdup(inet_ntoa(ipv4addr->sin_addr));
sprintf(buf,"%d", htons(ipv4addr->sin_port));
pta[3]=strdup(buf);
} else {
pta[2]=strdup("localhost");
pta[3]=NULL;
}
sprintf(buf,"%d", sess->current_hostgroup);
pta[2]=strdup(buf);
pta[4]=strdup(buf);
if (sess->mybe && sess->mybe->server_myds && sess->mybe->server_myds->myconn) {
MySQL_Connection *mc=sess->mybe->server_myds->myconn;
sprintf(buf,"%s", mc->parent->address);
pta[3]=strdup(buf);
pta[5]=strdup(buf);
sprintf(buf,"%d", mc->parent->port);
pta[4]=strdup(buf);
pta[6]=strdup(buf);
if (mc->query.length) {
pta[5]=(char *)malloc(mc->query.length+1);
strncpy(pta[5],mc->query.ptr,mc->query.length);
pta[5][mc->query.length]='\0';
pta[7]=(char *)malloc(mc->query.length+1);
strncpy(pta[7],mc->query.ptr,mc->query.length);
pta[7][mc->query.length]='\0';
} else {
pta[5]=NULL;
pta[7]=NULL;
}
} else {
pta[3]=NULL;
pta[4]=NULL;
pta[5]=NULL;
pta[6]=NULL;
pta[7]=NULL;
}
result->add_row(pta);
unsigned int k;

@ -46,7 +46,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define STATS_SQLITE_TABLE_MYSQL_QUERY_RULES "CREATE TABLE stats_mysql_query_rules (rule_id INTEGER PRIMARY KEY, hits INT NOT NULL)"
#define STATS_SQLITE_TABLE_MYSQL_COMMANDS_COUNTERS "CREATE TABLE stats_mysql_commands_counters ( Command VARCHAR NOT NULL PRIMARY KEY, Total_Time_us INT NOT NULL, Total_cnt INT NOT NULL, cnt_100us INT NOT NULL, cnt_500us INT NOT NULL, cnt_1ms INT NOT NULL, cnt_5ms INT NOT NULL, cnt_10ms INT NOT NULL, cnt_50ms INT NOT NULL, cnt_100ms INT NOT NULL, cnt_500ms INT NOT NULL, cnt_1s INT NOT NULL, cnt_5s INT NOT NULL, cnt_10s INT NOT NULL, cnt_INFs)"
#define STATS_SQLITE_TABLE_MYSQL_PROCESSLIST "CREATE TABLE stats_mysql_processlist (ThreadID INT NOT NULL, SessionID INTEGER PRIMARY KEY, hostgroup VARCHAR, srv_host VARCHAR, srv_port VARCHAR, info VARCHAR)"
#define STATS_SQLITE_TABLE_MYSQL_PROCESSLIST "CREATE TABLE stats_mysql_processlist (ThreadID INT NOT NULL, SessionID INTEGER PRIMARY KEY, cli_host VARCHAR, cli_port VARCHAR, hostgroup VARCHAR, srv_host VARCHAR, srv_port VARCHAR, info VARCHAR)"
@ -1869,15 +1869,15 @@ void ProxySQL_Admin::stats___mysql_processlist() {
if (resultset==NULL) return;
statsdb->execute("BEGIN");
statsdb->execute("DELETE FROM stats_mysql_processlist");
char *a=(char *)"INSERT INTO stats_mysql_processlist VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\")";
char *a=(char *)"INSERT INTO stats_mysql_processlist VALUES (\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\")";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<6; i++) {
for (int i=0; i<8; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5]);
sprintf(query,a,r->fields[0],r->fields[1],r->fields[2],r->fields[3],r->fields[4],r->fields[5],r->fields[6],r->fields[7]);
statsdb->execute(query);
free(query);
}

@ -94,6 +94,7 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
bytes_info.bytes_sent=0;
pkts_recv=0;
pkts_sent=0;
client_addr=NULL;
mysql_real_query.ptr=NULL;
mysql_real_query.size=0;
@ -132,6 +133,10 @@ MySQL_Data_Stream::~MySQL_Data_Stream() {
queue_destroy(queueIN);
queue_destroy(queueOUT);
if (client_addr) {
free(client_addr);
client_addr=NULL;
}
free_mysql_real_query();

Loading…
Cancel
Save