Many bug fixes:

Simplified the ping within MySQL_Thread
SQLite3_row::add_fields() to handle empty string
prot_status was not initialized in MySQL_Protocol
Session's default schema was always mysql_thread___default_schema , now fixed with correct value
default_schema is now read from config file
pull/317/head
René Cannaò 11 years ago
parent 76edfb126f
commit dca5b434fc

@ -127,7 +127,19 @@ class MySQL_Session
~MySQL_Session();
void set_unhealthy();
void set_status(enum session_status e) {
if (e==NONE) {
if (mybe) {
if (mybe->server_myds) {
assert(mybe->server_myds->myconn==0);
if (mybe->server_myds->myconn)
assert(mybe->server_myds->myconn->async_state_machine==ASYNC_IDLE);
}
}
}
status=e;
}
//MySQL_Protocol myprot_client;
//MySQL_Protocol myprot_server;
int handler();

@ -175,8 +175,8 @@ class MySQL_Thread
void process_all_sessions();
void refresh_variables();
void process_all_sessions_connections_handler();
void register_session_connection_handler(MySQL_Session *_sess);
void unregister_session_connection_handler(int idx);
void register_session_connection_handler(MySQL_Session *_sess, bool _new=false);
void unregister_session_connection_handler(int idx, bool _new=false);
//void myds_backend_set_failed_connect(MySQL_Data_Stream *myds, unsigned int n);
//void myds_backend_pause_connect(MySQL_Data_Stream *myds);
//void myds_backend_first_packet_after_connect(MySQL_Data_Stream *myds, unsigned int n);

@ -36,14 +36,20 @@ class SQLite3_row {
// };
void add_fields(sqlite3_stmt *stmt) {
int i;
int t;
for (i=0;i<cnt;i++) {
sizes[i]=sqlite3_column_bytes(stmt,i);
if (sizes[i]) {
const char *c=(char *)sqlite3_column_text(stmt,i);
fields[i]=strdup(c);
} else {
t=sqlite3_column_type(stmt,i);
const char *c=(char *)sqlite3_column_text(stmt,i);
if (t==SQLITE_NULL) {
sizes[i]=0;
fields[i]=NULL;
} else {
sizes[i]=sqlite3_column_bytes(stmt,i);
fields[i]=strdup(c);
}
// } else {
// fields[i]=NULL;
// }
//fields[i]=(sizes[i] ? strdup((char *)sqlite3_column_text(stmt,i)) : NULL);
}
};

@ -457,6 +457,7 @@ void MySQL_Protocol::init(MySQL_Data_Stream **__myds, MySQL_Connection_userinfo
userinfo=__userinfo;
sess=__sess;
current_PreStmt=NULL;
prot_status=0;
}
int MySQL_Protocol::pkt_handshake_client(unsigned char *pkt, unsigned int length) {

@ -120,7 +120,6 @@ void MySQL_Session::operator delete(void *ptr) {
MySQL_Session::MySQL_Session() {
pause=0;
pause_until=0;
status=NONE;
qpo=NULL;
command_counters=new StatCounters(15,10,false);
healthy=1;
@ -140,6 +139,7 @@ MySQL_Session::MySQL_Session() {
to_process=0;
mybe=NULL;
mybes= new (true) PtrArray(4,true);
set_status(NONE);
current_hostgroup=-1;
default_hostgroup=-1;
@ -349,7 +349,7 @@ int MySQL_Session::handler() {
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->mysql_real_query.size=pkt.size-5;
mybe->server_myds->mysql_real_query.ptr=(char *)malloc(pkt.size-5);
mybe->server_myds->wait_until=thread->curtime+mysql_thread___ping_timeout_server*1000;
mybe->server_myds->wait_until=thread->curtime+mysql_thread___ping_timeout_server*10000;
mybe->server_myds->killed_at=0;
//fprintf(stderr,"times: %llu, %llu\n", mybe->server_myds->wait_until, thread->curtime);
memcpy(mybe->server_myds->mysql_real_query.ptr,(char *)pkt.ptr+5,pkt.size-5);
@ -469,13 +469,17 @@ handler_again:
if ((myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
myds->return_MySQL_Connection_To_Pool();
}
status=NONE;
return 0;
delete mybe->server_myds;
mybe->server_myds=NULL;
set_status(NONE);
return -1;
} else {
if (rc==-1) {
proxy_error("Detected a broken connection during ping\n");
myds->destroy_MySQL_Connection_From_Pool();
myds->fd=0;
delete mybe->server_myds;
mybe->server_myds=NULL;
//thread->mypolls.remove_index_fast(myds->poll_fds_idx);
return -1;
} else {
@ -493,19 +497,6 @@ handler_again:
if (mybe->server_myds->wait_until && thread->curtime >= mybe->server_myds->wait_until) {
// query timed out
MySQL_Data_Stream *myds=mybe->server_myds;
/*
char buf[80];
sprintf(buf,"Query timeout for connection %lu", myds->myconn->mysql->thread_id);
myds->destroy_MySQL_Connection_From_Pool();
myds->fd=0;
myds->DSS=STATE_NOT_INITIALIZED;
status=NONE;
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,1,2000,(char *)"#28000",buf);
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
//wrong_pass=true;
return 0;
*/
// FIXME: make sure the connection is established first
if (myds->killed_at==0) {
myds->killed_at=thread->curtime;
@ -1286,7 +1277,8 @@ void MySQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
l_free(pkt->size,pkt->ptr);
if (client_myds->encrypted==false) {
if (client_myds->myconn->userinfo->schemaname==NULL) {
client_myds->myconn->userinfo->set_schemaname(mysql_thread___default_schema,strlen(mysql_thread___default_schema));
//client_myds->myconn->userinfo->set_schemaname(mysql_thread___default_schema,strlen(mysql_thread___default_schema));
client_myds->myconn->userinfo->set_schemaname(default_schema,strlen(default_schema));
}
int free_users=0;
if (admin==false) {

@ -959,7 +959,7 @@ bool MySQL_Thread::init() {
assert(mysql_sessions_connections_handler);
for (i=0; i<SESSIONS_FOR_CONNECTIONS_HANDLER;i++) {
MySQL_Session *sess=new MySQL_Session();
register_session_connection_handler(sess);
register_session_connection_handler(sess, false);
}
shutdown=0;
my_idle_conns=(MySQL_Connection **)malloc(sizeof(MySQL_Connection *)*SESSIONS_FOR_CONNECTIONS_HANDLER);
@ -1041,7 +1041,8 @@ void MySQL_Thread::run() {
myds->attach_connection(mc);
myds->assign_fd_from_mysql_conn();
myds->myds_type=MYDS_BACKEND;
MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(i);
//MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(i);
MySQL_Session *sess=new MySQL_Session();
myds->sess=sess;
myds->init();
my_idle_myds[i]=myds;
@ -1058,7 +1059,8 @@ void MySQL_Thread::run() {
// myds->DSS=STATE_QUERY_SENT_DS;
sess->status=PINGING_SERVER;
myds->DSS=STATE_MARIADB_PING;
register_session_connection_handler(sess,true);
// myds->myconn->async_ping(0);
// myds->myconn->async_state_machine=ASYNC_PING_START;
@ -1230,11 +1232,13 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
mypolls.last_recv[n]=curtime;
myds->revents=mypolls.fds[n].revents;
myds->sess->to_process=1;
assert(myds->sess->status!=NONE);
} else {
// no events
if (myds->wait_until && curtime > myds->wait_until) {
// timeout
myds->sess->to_process=1;
assert(myds->sess->status!=NONE);
}
}
if (myds->myds_type==MYDS_BACKEND && myds->sess->status!=FAST_FORWARD) {
@ -1244,6 +1248,10 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
// only if we aren't using MariaDB Client Library
myds->read_from_net();
myds->read_pkts();
} else {
if (mypolls.fds[n].revents) {
myds->myconn->handler(mypolls.fds[n].revents);
}
}
if ( (mypolls.fds[n].events & POLLOUT)
&&
@ -1253,7 +1261,7 @@ void MySQL_Thread::process_data_on_data_stream(MySQL_Data_Stream *myds, unsigned
}
myds->check_data_flow();
if (myds->active==FALSE) {
if (myds->sess->client_myds==myds) {
@ -1361,11 +1369,13 @@ void MySQL_Thread::process_all_sessions_connections_handler() {
int rc;
for (n=0; n<mysql_sessions_connections_handler->len; n++) {
MySQL_Session *sess=(MySQL_Session *)mysql_sessions_connections_handler->index(n);
// //FIX_PING if (sess->to_process==1) {
//FIX_PING
if (sess->to_process==1) {
assert(sess->status!=NONE);
rc=sess->handler();
sess->to_process=0;
if (rc==-1) {
unregister_session_connection_handler(n);
unregister_session_connection_handler(n, false);
n--;
delete sess;
//sess=new MySQL_Session();
@ -1374,22 +1384,30 @@ void MySQL_Thread::process_all_sessions_connections_handler() {
} else {
sess->to_process=0;
}
// }
}
}
}
void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess) {
void MySQL_Thread::register_session_connection_handler(MySQL_Session *_sess, bool _new) {
if (mysql_sessions_connections_handler==NULL) return;
mysql_sessions_connections_handler->add(_sess);
_sess->thread=this;
_sess->connections_handler=true;
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Registered new session for connection handler\n", _sess->thread, _sess);
if (_new) {
mysql_sessions->add(_sess);
} else {
mysql_sessions_connections_handler->add(_sess);
}
}
void MySQL_Thread::unregister_session_connection_handler(int idx) {
void MySQL_Thread::unregister_session_connection_handler(int idx, bool _new) {
if (mysql_sessions_connections_handler==NULL) return;
proxy_debug(PROXY_DEBUG_NET,1,"Thread=%p, Session=%p -- Unregistered session\n", this, mysql_sessions_connections_handler->index(idx));
mysql_sessions_connections_handler->remove_index_fast(idx);
if (_new) {
mysql_sessions->remove_index_fast(idx);
} else {
mysql_sessions_connections_handler->remove_index_fast(idx);
}
}

@ -1384,9 +1384,9 @@ bool ProxySQL_Admin::init() {
if (GloVars.configfile_open) {
if (GloVars.confFile->cfg) {
Read_MySQL_Servers_from_configfile();
Read_MySQL_Users_from_configfile();
Read_Global_Variables_from_configfile("admin");
Read_Global_Variables_from_configfile("mysql");
Read_MySQL_Users_from_configfile();
__insert_or_replace_disktable_select_maintable();
} else {
if (GloVars.confFile->OpenFile(GloVars.config_file)==true) {
@ -2195,7 +2195,8 @@ void ProxySQL_Admin::__add_active_users(enum cred_username_type usertype) {
usertype, // backend/frontend
(strcmp(r->fields[2],"1")==0 ? true : false) , // use_ssl
atoi(r->fields[3]), // default_hostgroup
(r->fields[4]==NULL ? (char *)mysql_thread___default_schema : r->fields[4]), //default_schema
//(r->fields[4]==NULL ? (char *)mysql_thread___default_schema : r->fields[4]), //default_schema
(r->fields[4]==NULL ? (char *)"" : r->fields[4]), //default_schema
(strcmp(r->fields[5],"1")==0 ? true : false) , // schema_locked
(strcmp(r->fields[6],"1")==0 ? true : false) , // transaction_persistent
(strcmp(r->fields[7],"1")==0 ? true : false), // fast_forward
@ -2402,7 +2403,7 @@ int ProxySQL_Admin::Read_MySQL_Users_from_configfile() {
std::string password="";
int active=1;
int default_hostgroup=0;
std::string default_schema;
std::string default_schema="";
int schema_locked=0;
int transaction_persistent=0;
int fast_forward=0;
@ -2411,13 +2412,14 @@ int ProxySQL_Admin::Read_MySQL_Users_from_configfile() {
user.lookupValue("password", password);
user.lookupValue("hostgroup", default_hostgroup);
user.lookupValue("active", active);
//if (user.lookupValue("default_schema", default_schema)==false) default_schema=mysql_thread___default_schema;
//if (user.lookupValue("default_schema", default_schema)==false) default_schema="";
user.lookupValue("default_schema", default_schema);
user.lookupValue("schema_locked", schema_locked);
user.lookupValue("transaction_persistent", transaction_persistent);
user.lookupValue("fast_forward", fast_forward);
user.lookupValue("max_connections", max_connections);
char *query=(char *)malloc(strlen(q)+strlen(username.c_str())+strlen(password.c_str())+128);
sprintf(query,q, username.c_str(), password.c_str(), active, default_hostgroup, "information_schema", schema_locked, transaction_persistent, fast_forward, max_connections);
sprintf(query,q, username.c_str(), password.c_str(), active, default_hostgroup, default_schema.c_str(), schema_locked, transaction_persistent, fast_forward, max_connections);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
free(query);

@ -20,15 +20,17 @@ MySQL_Backend::~MySQL_Backend() {
}
void MySQL_Backend::reset() {
if (server_myds->myconn) {
if (server_myds && server_myds->myconn) {
if (server_myds->DSS==STATE_READY && server_myds->myconn->reusable==true && ((server_myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
server_myds->myconn->last_time_used=server_myds->sess->thread->curtime;
MyHGM->push_MyConn_to_pool(server_myds->myconn);
server_myds->myconn=NULL;
server_myds->unplug_backend();
server_myds->return_MySQL_Connection_To_Pool();
//MyHGM->push_MyConn_to_pool(server_myds->myconn);
//server_myds->myconn=NULL;
//server_myds->unplug_backend();
} else {
MyHGM->destroy_MyConn_from_pool(server_myds->myconn);
server_myds->myconn=NULL;
server_myds->destroy_MySQL_Connection_From_Pool();
//MyHGM->destroy_MyConn_from_pool(server_myds->myconn);
//server_myds->myconn=NULL;
}
};
//if (mshge) {

@ -19,12 +19,15 @@ void MySQL_Connection::operator delete(void *ptr) {
//extern __thread char *mysql_thread___default_schema;
static int
mysql_status(short event) {
mysql_status(short event, short cont) {
int status= 0;
if (event & POLLIN)
status|= MYSQL_WAIT_READ;
if (event & POLLOUT)
status|= MYSQL_WAIT_WRITE;
// if (event==0 && cont==true) {
// status |= MYSQL_WAIT_TIMEOUT;
// }
// FIXME: handle timeout
// if (event & PROXY_TIMEOUT)
// status|= MYSQL_WAIT_TIMEOUT;
@ -106,9 +109,16 @@ void MySQL_Connection_userinfo::set(MySQL_Connection_userinfo *ui) {
bool MySQL_Connection_userinfo::set_schemaname(char *_new, int l) {
if ((schemaname==NULL) || (strncmp(_new,schemaname,l))) {
if (schemaname) free(schemaname);
schemaname=(char *)malloc(l+1);
memcpy(schemaname,_new,l);
schemaname[l]=0;
if (l) {
schemaname=(char *)malloc(l+1);
memcpy(schemaname,_new,l);
schemaname[l]=0;
} else {
int k=strlen(mysql_thread___default_schema);
schemaname=(char *)malloc(k+1);
memcpy(schemaname,mysql_thread___default_schema,k);
schemaname[k]=0;
}
compute_hash();
return true;
}
@ -135,6 +145,7 @@ MySQL_Connection::MySQL_Connection() {
options.compression_min_length=0;
options.server_version=NULL;
compression_pkt_id=0;
mysql_result=NULL;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "Creating new MySQL_Connection %p\n", this);
};
@ -237,17 +248,19 @@ void MySQL_Connection::connect_start() {
void MySQL_Connection::connect_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_real_connect_cont(&ret_mysql, mysql, mysql_status(event));
async_exit_status = mysql_real_connect_cont(&ret_mysql, mysql, mysql_status(event, true));
}
void MySQL_Connection::ping_start() {
PROXY_TRACE();
//fprintf(stderr,"ping_start FD %d\n", fd);
async_exit_status = mysql_ping_start(&interr,mysql);
}
void MySQL_Connection::ping_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status(event));
//fprintf(stderr,"ping_cont FD %d, event %d\n", fd, event);
async_exit_status = mysql_ping_cont(&interr,mysql, mysql_status(event, true));
}
void MySQL_Connection::initdb_start() {
@ -257,7 +270,7 @@ void MySQL_Connection::initdb_start() {
void MySQL_Connection::initdb_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_select_db_cont(&interr,mysql, mysql_status(event));
async_exit_status = mysql_select_db_cont(&interr,mysql, mysql_status(event, true));
}
// FIXME: UTF8 is hardcoded for now, needs to be dynamic
@ -268,7 +281,7 @@ void MySQL_Connection::set_names_start() {
void MySQL_Connection::set_names_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event));
async_exit_status = mysql_set_character_set_cont(&interr,mysql, mysql_status(event, true));
}
void MySQL_Connection::set_query(char *stmt, unsigned long length) {
@ -284,7 +297,7 @@ void MySQL_Connection::real_query_start() {
void MySQL_Connection::real_query_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event));
async_exit_status = mysql_real_query_cont(&interr ,mysql , mysql_status(event, true));
}
void MySQL_Connection::store_result_start() {
@ -294,7 +307,7 @@ void MySQL_Connection::store_result_start() {
void MySQL_Connection::store_result_cont(short event) {
proxy_debug(PROXY_DEBUG_MYSQL_PROTOCOL, 6,"event=%d\n", event);
async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event));
async_exit_status = mysql_store_result_cont(&mysql_result , mysql , mysql_status(event, true));
}
#define NEXT_IMMEDIATE(new_st) do { async_state_machine = new_st; goto handler_again; } while (0)
@ -345,6 +358,7 @@ handler_again:
}
break;
case ASYNC_PING_CONT:
assert(myds->sess->status==PINGING_SERVER);
ping_cont(event);
if (async_exit_status) {
next_event(ASYNC_PING_CONT);

@ -20,7 +20,7 @@ mysql_variables=
{
threads=4
//threads=32
max_connections=32
max_connections=256
default_query_delay=10
default_query_timeout=10000
have_compress=true
@ -58,7 +58,7 @@ mysql_servers =
address="127.0.0.1"
port=3306
hostgroup=0
max_connections=1000
max_connections=10
},
# { address="127.0.0.2" , port=3306 , hostgroup=0, max_connections=5 },
{ address="127.0.0.1" , port=3306 , hostgroup=1 },
@ -82,7 +82,8 @@ mysql_users:
username = "root"
password = ""
default_hostgroup = 0
max_connections=32
max_connections=1000
default_schema="test"
# active = 1
},
{ username = "vegaicm" , password = "password" , default_hostgroup = 0 , active = 1 },

Loading…
Cancel
Save