From d2f8952a83cfe6f36711e4223bf0e388d59c2c86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Canna=C3=B2?= Date: Sun, 17 Feb 2019 15:11:22 +1100 Subject: [PATCH] Fix crashes when fast_forward is used #1891 Memory was incorrectly freed more than once. Also, connections related to fast_forward were attempted to be reset. --- lib/MySQL_Session.cpp | 2 ++ lib/MySQL_Thread.cpp | 16 ++++++++-------- lib/mysql_backend.cpp | 9 +++++---- lib/mysql_data_stream.cpp | 19 +++++++++++-------- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lib/MySQL_Session.cpp b/lib/MySQL_Session.cpp index f1c4887d4..447abac0f 100644 --- a/lib/MySQL_Session.cpp +++ b/lib/MySQL_Session.cpp @@ -1843,6 +1843,8 @@ bool MySQL_Session::handler_again___status_CONNECTING_SERVER(int *_rc) { // we have a successful connection and session_fast_forward enabled // set DSS=STATE_SLEEP or it will believe it have to use MARIADB client library myds->DSS=STATE_SLEEP; + myds->myconn->send_quit = false; + myds->myconn->reusable = false; } NEXT_IMMEDIATE_NEW(st); break; diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 4703a3bbe..1da275e78 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -620,7 +620,7 @@ uint8_t MySQL_Threads_Handler::get_variable_uint8(char *name) { } int MySQL_Threads_Handler::get_variable_int(const char *name) { -VALGRIND_DISABLE_ERROR_REPORTING; +//VALGRIND_DISABLE_ERROR_REPORTING; #ifdef DEBUG if (!strcasecmp(name,"session_debug")) return (int)variables.session_debug; #endif /* DEBUG */ @@ -782,11 +782,11 @@ VALGRIND_DISABLE_ERROR_REPORTING; if (!strcasecmp(name,"client_multi_statements")) return (int)variables.client_multi_statements; proxy_error("Not existing variable: %s\n", name); assert(0); return 0; -VALGRIND_ENABLE_ERROR_REPORTING; +//VALGRIND_ENABLE_ERROR_REPORTING; } char * MySQL_Threads_Handler::get_variable(char *name) { // this is the public function, accessible from admin -VALGRIND_DISABLE_ERROR_REPORTING; +//VALGRIND_DISABLE_ERROR_REPORTING; #define INTBUFSIZE 4096 char intbuf[INTBUFSIZE]; if (!strcasecmp(name,"init_connect")) { @@ -1221,7 +1221,7 @@ VALGRIND_DISABLE_ERROR_REPORTING; return strdup((variables.default_reconnect ? "true" : "false")); } return NULL; -VALGRIND_ENABLE_ERROR_REPORTING; +//VALGRIND_ENABLE_ERROR_REPORTING; } @@ -3317,9 +3317,9 @@ __run_skip_2: } } } else { - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; pthread_mutex_lock(&thr->myexchange.mutex_resumes); - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; if (shutdown==0 && thr->shutdown==0 && thr->myexchange.resume_mysql_sessions->len) { unsigned char c=0; int fd=thr->pipefd[1]; @@ -3327,9 +3327,9 @@ __run_skip_2: //proxy_error("Error while signaling maintenance thread\n"); } } - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; pthread_mutex_unlock(&thr->myexchange.mutex_resumes); - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; } } else { #endif // IDLE_THREADS diff --git a/lib/mysql_backend.cpp b/lib/mysql_backend.cpp index 4347d3900..4c0ad5d84 100644 --- a/lib/mysql_backend.cpp +++ b/lib/mysql_backend.cpp @@ -28,12 +28,13 @@ void MySQL_Backend::reset() { server_myds->myconn->last_time_used=server_myds->sess->thread->curtime; server_myds->return_MySQL_Connection_To_Pool(); } else { - server_myds->destroy_MySQL_Connection_From_Pool(true); + if (server_myds->sess && server_myds->sess->session_fast_forward == false) { + server_myds->destroy_MySQL_Connection_From_Pool(true); + } else { + server_myds->destroy_MySQL_Connection_From_Pool(false); + } } }; - //if (mshge) { - // FIXME: what to do with it? - //} if (server_myds) { delete server_myds; } diff --git a/lib/mysql_data_stream.cpp b/lib/mysql_data_stream.cpp index f8dcf1826..a76f28cd8 100644 --- a/lib/mysql_data_stream.cpp +++ b/lib/mysql_data_stream.cpp @@ -539,7 +539,7 @@ int MySQL_Data_Stream::write_to_net() { return 0; } } - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; // splitting the ternary operation in IF condition for better readability if (encrypted) { bytes_io = SSL_write (ssl, queue_r_ptr(queueOUT), s); @@ -597,7 +597,7 @@ int MySQL_Data_Stream::write_to_net() { if (encrypted) { //proxy_info("bytes_io: %d\n", bytes_io); } - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; if (bytes_io < 0) { if (encrypted==false) { if ((poll_fds_idx < 0) || (mypolls->fds[poll_fds_idx].revents & POLLOUT)) { // in write_to_net_poll() we has remove this safety @@ -791,6 +791,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + queueIN.pkt.ptr = NULL; } else { if (PSarrayIN->len == 0) { // it is empty, create a new block @@ -800,6 +801,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + queueIN.pkt.ptr = NULL; } else { // get a pointer to the last entry in PSarrayIN PtrSize_t *last_pkt = PSarrayIN->index(PSarrayIN->len - 1); @@ -811,6 +813,7 @@ int MySQL_Data_Stream::buffer2array() { memcpy(queueIN.pkt.ptr, queue_r_ptr(queueIN) , queueIN.pkt.size); queue_r(queueIN, queueIN.pkt.size); PSarrayIN->add(queueIN.pkt.ptr,queueIN.pkt.size); + queueIN.pkt.ptr = NULL; } else { // we append the packet at the end of the previous packet memcpy((char *)last_pkt->ptr+last_pkt->size, queue_r_ptr(queueIN) , queueIN.pkt.size); @@ -1079,7 +1082,7 @@ int MySQL_Data_Stream::array2buffer() { } } while (cont) { - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; if (queue_available(queueOUT)==0) { goto __exit_array2buffer; } @@ -1090,15 +1093,15 @@ int MySQL_Data_Stream::array2buffer() { l_free(queueOUT.pkt.size,queueOUT.pkt.ptr); queueOUT.pkt.ptr=NULL; } - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; if (myconn->get_status_compression()==true) { proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "DataStream: %p -- Compression enabled\n", this); generate_compressed_packet(); // it is copied directly into queueOUT.pkt } else { - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; memcpy(&queueOUT.pkt,PSarrayOUT->index(idx), sizeof(PtrSize_t)); idx++; - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; // this is a special case, needed because compression is enabled *after* the first OK if (DSS==STATE_CLIENT_AUTH_OK) { DSS=STATE_SLEEP; @@ -1123,9 +1126,9 @@ int MySQL_Data_Stream::array2buffer() { } } int b= ( queue_available(queueOUT) > (queueOUT.pkt.size - queueOUT.partial) ? (queueOUT.pkt.size - queueOUT.partial) : queue_available(queueOUT) ); - VALGRIND_DISABLE_ERROR_REPORTING; + //VALGRIND_DISABLE_ERROR_REPORTING; memcpy(queue_w_ptr(queueOUT), (unsigned char *)queueOUT.pkt.ptr + queueOUT.partial, b); - VALGRIND_ENABLE_ERROR_REPORTING; + //VALGRIND_ENABLE_ERROR_REPORTING; queue_w(queueOUT,b); proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "DataStream: %p -- Copied %d bytes into send buffer\n", this, b); queueOUT.partial+=b;