From dca5b434fcf708741685d008bb981490b80c3d3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Tue, 28 Jul 2015 18:49:00 +0000 Subject: [PATCH] Many bug fixes: Simplified the ping within MySQL_Thread SQLite3_row::add_fields() to handle empty string prot_status was not initialized in MySQL_Protocol Session's default schema was always mysql_thread___default_schema , now fixed with correct value default_schema is now read from config file --- include/MySQL_Session.h | 14 +++++++++++++- include/MySQL_Thread.h | 4 ++-- include/sqlite3db.h | 16 +++++++++++----- lib/MySQL_Protocol.cpp | 1 + lib/MySQL_Session.cpp | 28 ++++++++++------------------ lib/MySQL_Thread.cpp | 40 +++++++++++++++++++++++++++++----------- lib/ProxySQL_Admin.cpp | 12 +++++++----- lib/mysql_backend.cpp | 14 ++++++++------ lib/mysql_connection.cpp | 34 ++++++++++++++++++++++++---------- src/proxysql.cfg | 7 ++++--- 10 files changed, 109 insertions(+), 61 deletions(-) diff --git a/include/MySQL_Session.h b/include/MySQL_Session.h index e93e6b36b..7eec70606 100644 --- a/include/MySQL_Session.h +++ b/include/MySQL_Session.h @@ -127,7 +127,19 @@ class MySQL_Session ~MySQL_Session(); void set_unhealthy(); - + + void set_status(enum session_status e) { + if (e==NONE) { + if (mybe) { + if (mybe->server_myds) { + assert(mybe->server_myds->myconn==0); + if (mybe->server_myds->myconn) + assert(mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE); + } + } + } + status=e; + } //MySQL_Protocol myprot_client; //MySQL_Protocol myprot_server; int handler(); diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 14fd678a3..0fde700d0 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -175,8 +175,8 @@ class MySQL_Thread void process_all_sessions(); void refresh_variables(); void process_all_sessions_connections_handler(); - void register_session_connection_handler(MySQL_Session *_sess); - void unregister_session_connection_handler(int idx); + void register_session_connection_handler(MySQL_Session *_sess, bool _new=false); + void unregister_session_connection_handler(int idx, bool _new=false); //void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n); //void myds_backend_pause_connect(MySQL_Data_Stream *myds); //void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n); diff --git a/include/sqlite3db.h b/include/sqlite3db.h index 2d4707f86..7c8c3869d 100644 --- a/include/sqlite3db.h +++ b/include/sqlite3db.h @@ -36,14 +36,20 @@ class SQLite3_row { // }; void add_fields(sqlite3_stmt *stmt) { int i; + int t; for (i=0;iserver_myds->mysql_real_query.size=pkt.size-5; mybe->server_myds->mysql_real_query.ptr=(char *)malloc(pkt.size-5); - mybe->server_myds->wait_until=thread->curtime+mysql_thread___ping_timeout_server*1000; + mybe->server_myds->wait_until=thread->curtime+mysql_thread___ping_timeout_server*10000; mybe->server_myds->killed_at=0; //fprintf(stderr,"times: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime); memcpy(mybe->server_myds->mysql_real_query.ptr,(char *)pkt.ptr+5,pkt.size-5); @@ -469,13 +469,17 @@ handler_again: if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { myds->return_MySQL_Connection_To_Pool(); } - status=NONE; - return 0; + delete mybe->server_myds; + mybe->server_myds=NULL; + set_status(NONE); + return -1; } else { if (rc==-1) { proxy_error("Detected a broken connection during ping\n"); myds->destroy_MySQL_Connection_From_Pool(); myds->fd=0; + delete mybe->server_myds; + mybe->server_myds=NULL; //thread->mypolls.remove_index_fast(myds->poll_fds_idx); return -1; } else { @@ -493,19 +497,6 @@ handler_again: if (mybe->server_myds->wait_until && thread->curtime >= mybe->server_myds->wait_until) { // query timed out MySQL_Data_Stream *myds=mybe->server_myds; -/* - char buf[80]; - sprintf(buf,"Query timeout for connection %lu", myds->myconn->mysql->thread_id); - myds->destroy_MySQL_Connection_From_Pool(); - myds->fd=0; - myds->DSS=STATE_NOT_INITIALIZED; - status=NONE; - client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,2000,(char *)"#28000",buf); - status=WAITING_CLIENT_DATA; - client_myds->DSS=STATE_SLEEP; - //wrong_pass=true; - return 0; -*/ // FIXME: make sure the connection is established first if (myds->killed_at==0) { myds->killed_at=thread->curtime; @@ -1286,7 +1277,8 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE( l_free(pkt->size,pkt->ptr); if (client_myds->encrypted==false) { if (client_myds->myconn->userinfo->schemaname==NULL) { - client_myds->myconn->userinfo->set_schemaname(mysql_thread___default_schema,strlen(mysql_thread___default_schema)); + //client_myds->myconn->userinfo->set_schemaname(mysql_thread___default_schema,strlen(mysql_thread___default_schema)); + client_myds->myconn->userinfo->set_schemaname(default_schema,strlen(default_schema)); } int free_users=0; if (admin==false) { diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index e9df9e3a7..9cb1981d7 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -959,7 +959,7 @@ bool MySQL_Thread::init() { assert(mysql_sessions_connections_handler); for (i=0; iattach_connection(mc); myds->assign_fd_from_mysql_conn(); myds->myds_type=MYDS_BACKEND; - MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(i); + //MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(i); + MySQL_Session *sess=new MySQL_Session(); myds->sess=sess; myds->init(); my_idle_myds[i]=myds; @@ -1058,7 +1059,8 @@ void MySQL_Thread::run() { // myds->DSS=STATE_QUERY_SENT_DS; sess->status=PINGING_SERVER; myds->DSS=STATE_MARIADB_PING; - + register_session_connection_handler(sess,true); + // myds->myconn->async_ping(0); // myds->myconn->async_state_machine=ASYNC_PING_START; @@ -1230,11 +1232,13 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned mypolls.last_recv[n]=curtime; myds->revents=mypolls.fds[n].revents; myds->sess->to_process=1; + assert(myds->sess->status!=NONE); } else { // no events if (myds->wait_until && curtime > myds->wait_until) { // timeout myds->sess->to_process=1; + assert(myds->sess->status!=NONE); } } if (myds->myds_type==MYDS_BACKEND && myds->sess->status!=FAST_FORWARD) { @@ -1244,6 +1248,10 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned // only if we aren't using MariaDB Client Library myds->read_from_net(); myds->read_pkts(); + } else { + if (mypolls.fds[n].revents) { + myds->myconn->handler(mypolls.fds[n].revents); + } } if ( (mypolls.fds[n].events & POLLOUT) && @@ -1253,7 +1261,7 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned } myds->check_data_flow(); - + if (myds->active==FALSE) { if (myds->sess->client_myds==myds) { @@ -1361,11 +1369,13 @@ void MySQL_Thread::process_all_sessions_connections_handler() { int rc; for (n=0; nlen; n++) { MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(n); -// //FIX_PING if (sess->to_process==1) { + //FIX_PING + if (sess->to_process==1) { + assert(sess->status!=NONE); rc=sess->handler(); sess->to_process=0; if (rc==-1) { - unregister_session_connection_handler(n); + unregister_session_connection_handler(n, false); n--; delete sess; //sess=new MySQL_Session(); @@ -1374,22 +1384,30 @@ void MySQL_Thread::process_all_sessions_connections_handler() { } else { sess->to_process=0; } -// } + } } } -void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess) { +void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess, bool _new) { if (mysql_sessions_connections_handler==NULL) return; - mysql_sessions_connections_handler->add(_sess); _sess->thread=this; _sess->connections_handler=true; proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Registered new session for connection handler\n", _sess->thread, _sess); + if (_new) { + mysql_sessions->add(_sess); + } else { + mysql_sessions_connections_handler->add(_sess); + } } -void MySQL_Thread::unregister_session_connection_handler(int idx) { +void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) { if (mysql_sessions_connections_handler==NULL) return; proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Unregistered session\n", this, mysql_sessions_connections_handler->index(idx)); - mysql_sessions_connections_handler->remove_index_fast(idx); + if (_new) { + mysql_sessions->remove_index_fast(idx); + } else { + mysql_sessions_connections_handler->remove_index_fast(idx); + } } diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index ad1bbdf80..03bcc13a3 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1384,9 +1384,9 @@ bool ProxySQL_Admin::init() { if (GloVars.configfile_open) { if (GloVars.confFile->cfg) { Read_MySQL_Servers_from_configfile(); - Read_MySQL_Users_from_configfile(); Read_Global_Variables_from_configfile("admin"); Read_Global_Variables_from_configfile("mysql"); + Read_MySQL_Users_from_configfile(); __insert_or_replace_disktable_select_maintable(); } else { if (GloVars.confFile->OpenFile(GloVars.config_file)==true) { @@ -2195,7 +2195,8 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype) { usertype, // backend/frontend (strcmp(r->fields[2],"1")==0 ? true : false) , // use_ssl atoi(r->fields[3]), // default_hostgroup - (r->fields[4]==NULL ? (char *)mysql_thread___default_schema : r->fields[4]), //default_schema + //(r->fields[4]==NULL ? (char *)mysql_thread___default_schema : r->fields[4]), //default_schema + (r->fields[4]==NULL ? (char *)"" : r->fields[4]), //default_schema (strcmp(r->fields[5],"1")==0 ? true : false) , // schema_locked (strcmp(r->fields[6],"1")==0 ? true : false) , // transaction_persistent (strcmp(r->fields[7],"1")==0 ? true : false), // fast_forward @@ -2402,7 +2403,7 @@ int ProxySQL_Admin::Read_MySQL_Users_from_configfile() { std::string password=""; int active=1; int default_hostgroup=0; - std::string default_schema; + std::string default_schema=""; int schema_locked=0; int transaction_persistent=0; int fast_forward=0; @@ -2411,13 +2412,14 @@ int ProxySQL_Admin::Read_MySQL_Users_from_configfile() { user.lookupValue("password", password); user.lookupValue("hostgroup", default_hostgroup); user.lookupValue("active", active); - //if (user.lookupValue("default_schema", default_schema)==false) default_schema=mysql_thread___default_schema; + //if (user.lookupValue("default_schema", default_schema)==false) default_schema=""; + user.lookupValue("default_schema", default_schema); user.lookupValue("schema_locked", schema_locked); user.lookupValue("transaction_persistent", transaction_persistent); user.lookupValue("fast_forward", fast_forward); user.lookupValue("max_connections", max_connections); char *query=(char *)malloc(strlen(q)+strlen(username.c_str())+strlen(password.c_str())+128); - sprintf(query,q, username.c_str(), password.c_str(), active, default_hostgroup, "information_schema", schema_locked, transaction_persistent, fast_forward, max_connections); + sprintf(query,q, username.c_str(), password.c_str(), active, default_hostgroup, default_schema.c_str(), schema_locked, transaction_persistent, fast_forward, max_connections); //fprintf(stderr, "%s\n", query); admindb->execute(query); free(query); diff --git a/lib/mysql_backend.cpp b/lib/mysql_backend.cpp index dd778342b..95cf838bf 100644 --- a/lib/mysql_backend.cpp +++ b/lib/mysql_backend.cpp @@ -20,15 +20,17 @@ MySQL_Backend::~MySQL_Backend() { } void MySQL_Backend::reset() { - if (server_myds->myconn) { + if (server_myds && server_myds->myconn) { if (server_myds->DSS==STATE_READY && server_myds->myconn->reusable==true && ((server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) { server_myds->myconn->last_time_used=server_myds->sess->thread->curtime; - MyHGM->push_MyConn_to_pool(server_myds->myconn); - server_myds->myconn=NULL; - server_myds->unplug_backend(); + server_myds->return_MySQL_Connection_To_Pool(); + //MyHGM->push_MyConn_to_pool(server_myds->myconn); + //server_myds->myconn=NULL; + //server_myds->unplug_backend(); } else { - MyHGM->destroy_MyConn_from_pool(server_myds->myconn); - server_myds->myconn=NULL; + server_myds->destroy_MySQL_Connection_From_Pool(); + //MyHGM->destroy_MyConn_from_pool(server_myds->myconn); + //server_myds->myconn=NULL; } }; //if (mshge) { diff --git a/lib/mysql_connection.cpp b/lib/mysql_connection.cpp index 476906ca9..30b56a06f 100644 --- a/lib/mysql_connection.cpp +++ b/lib/mysql_connection.cpp @@ -19,12 +19,15 @@ void MySQL_Connection::operator delete(void *ptr) { //extern __thread char *mysql_thread___default_schema; static int -mysql_status(short event) { +mysql_status(short event, short cont) { int status= 0; if (event & POLLIN) status|= MYSQL_WAIT_READ; if (event & POLLOUT) status|= MYSQL_WAIT_WRITE; +// if (event==0 && cont==true) { +// status |= MYSQL_WAIT_TIMEOUT; +// } // FIXME: handle timeout // if (event & PROXY_TIMEOUT) // status|= MYSQL_WAIT_TIMEOUT; @@ -106,9 +109,16 @@ void MySQL_Connection_userinfo::set(MySQL_Connection_userinfo *ui) { bool MySQL_Connection_userinfo::set_schemaname(char *_new, int l) { if ((schemaname==NULL) || (strncmp(_new,schemaname,l))) { if (schemaname) free(schemaname); - schemaname=(char *)malloc(l+1); - memcpy(schemaname,_new,l); - schemaname[l]=0; + if (l) { + schemaname=(char *)malloc(l+1); + memcpy(schemaname,_new,l); + schemaname[l]=0; + } else { + int k=strlen(mysql_thread___default_schema); + schemaname=(char *)malloc(k+1); + memcpy(schemaname,mysql_thread___default_schema,k); + schemaname[k]=0; + } compute_hash(); return true; } @@ -135,6 +145,7 @@ MySQL_Connection::MySQL_Connection() { options.compression_min_length=0; options.server_version=NULL; compression_pkt_id=0; + mysql_result=NULL; proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this); }; @@ -237,17 +248,19 @@ void MySQL_Connection::connect_start() { void MySQL_Connection::connect_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_real_connect_cont(&ret_mysql, mysql, mysql_status(event)); + async_exit_status = mysql_real_connect_cont(&ret_mysql, mysql, mysql_status(event, true)); } void MySQL_Connection::ping_start() { PROXY_TRACE(); + //fprintf(stderr,"ping_start FD %d\n", fd); async_exit_status = mysql_ping_start(&interr,mysql); } void MySQL_Connection::ping_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status(event)); + //fprintf(stderr,"ping_cont FD %d, event %d\n", fd, event); + async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status(event, true)); } void MySQL_Connection::initdb_start() { @@ -257,7 +270,7 @@ void MySQL_Connection::initdb_start() { void MySQL_Connection::initdb_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_select_db_cont(&interr,mysql, mysql_status(event)); + async_exit_status = mysql_select_db_cont(&interr,mysql, mysql_status(event, true)); } // FIXME: UTF8 is hardcoded for now, needs to be dynamic @@ -268,7 +281,7 @@ void MySQL_Connection::set_names_start() { void MySQL_Connection::set_names_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event)); + async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event, true)); } void MySQL_Connection::set_query(char *stmt, unsigned long length) { @@ -284,7 +297,7 @@ void MySQL_Connection::real_query_start() { void MySQL_Connection::real_query_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event)); + async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event, true)); } void MySQL_Connection::store_result_start() { @@ -294,7 +307,7 @@ void MySQL_Connection::store_result_start() { void MySQL_Connection::store_result_cont(short event) { proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event); - async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event)); + async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event, true)); } #define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0) @@ -345,6 +358,7 @@ handler_again: } break; case ASYNC_PING_CONT: + assert(myds->sess->status==PINGING_SERVER); ping_cont(event); if (async_exit_status) { next_event(ASYNC_PING_CONT); diff --git a/src/proxysql.cfg b/src/proxysql.cfg index caad91880..303d2318c 100644 --- a/src/proxysql.cfg +++ b/src/proxysql.cfg @@ -20,7 +20,7 @@ mysql_variables= { threads=4 //threads=32 - max_connections=32 + max_connections=256 default_query_delay=10 default_query_timeout=10000 have_compress=true @@ -58,7 +58,7 @@ mysql_servers = address="127.0.0.1" port=3306 hostgroup=0 - max_connections=1000 + max_connections=10 }, # { address="127.0.0.2" , port=3306 , hostgroup=0, max_connections=5 }, { address="127.0.0.1" , port=3306 , hostgroup=1 }, @@ -82,7 +82,8 @@ mysql_users: username = "root" password = "" default_hostgroup = 0 - max_connections=32 + max_connections=1000 + default_schema="test" # active = 1 }, { username = "vegaicm" , password = "password" , default_hostgroup = 0 , active = 1 },