From f350102f72548ad452de2cd67aa8363d96ffb5bd Mon Sep 17 00:00:00 2001 From: Rahim Kanji Date: Thu, 3 Jul 2025 00:02:01 +0500 Subject: [PATCH] Code cleanup --- include/PgSQL_Data_Stream.h | 2 +- lib/PgSQL_Data_Stream.cpp | 2 +- lib/PgSQL_Session.cpp | 294 ++---------------------------------- lib/PgSQL_Thread.cpp | 2 +- 4 files changed, 17 insertions(+), 283 deletions(-) diff --git a/include/PgSQL_Data_Stream.h b/include/PgSQL_Data_Stream.h index ef27812bc..916c05226 100644 --- a/include/PgSQL_Data_Stream.h +++ b/include/PgSQL_Data_Stream.h @@ -203,7 +203,7 @@ public: void unplug_backend(); void check_data_flow(); - int assign_fd_from_mysql_conn(); + int assign_fd_from_pgsql_conn(); static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del); static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size, diff --git a/lib/PgSQL_Data_Stream.cpp b/lib/PgSQL_Data_Stream.cpp index d220dead4..9afde9c49 100644 --- a/lib/PgSQL_Data_Stream.cpp +++ b/lib/PgSQL_Data_Stream.cpp @@ -1104,7 +1104,7 @@ int PgSQL_Data_Stream::array2buffer_full() { return rc; } -int PgSQL_Data_Stream::assign_fd_from_mysql_conn() { +int PgSQL_Data_Stream::assign_fd_from_pgsql_conn() { assert(myconn); //proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->myconn.net.fd); proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->fd); diff --git a/lib/PgSQL_Session.cpp b/lib/PgSQL_Session.cpp index e2db2c524..2bd48656d 100644 --- a/lib/PgSQL_Session.cpp +++ b/lib/PgSQL_Session.cpp @@ -1736,7 +1736,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { rc = myconn->async_connect(myds->revents); if (myds->mypolls == NULL) { // connection yet not in mypolls - myds->assign_fd_from_mysql_conn(); + myds->assign_fd_from_pgsql_conn(); thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, curtime); if (mirror) { PROXY_TRACE(); @@ -1749,7 +1749,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) { // PQconnectPoll has changed the file descriptor (FD) during the connection process. // We need to update the new FD in mypolls, replacing the old one, // Note: previous FD is closed by PQconnectPoll - myds->assign_fd_from_mysql_conn(); + myds->assign_fd_from_pgsql_conn(); thread->mypolls.update_fd_at_index(myds->poll_fds_idx, myds->fd); } } @@ -2323,8 +2323,7 @@ __get_pkts_from_client: //handler_ret = -1; return handler_ret; } - } - else { + } else { char command = c = *((unsigned char*)pkt.ptr); switch (command) { case 'Q': @@ -2368,47 +2367,6 @@ __get_pkts_from_client: (begint.tv_sec * 1000000000 + begint.tv_nsec); } assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo -#if 0 - // This block was moved from 'handler_special_queries' to support - // handling of 'USE' statements which are preceded by a comment. - // For more context check issue: #3493. - // =================================================== - if (session_type != PROXYSQL_SESSION_CLICKHOUSE) { - const char* qd = CurrentQuery.get_digest_text(); - bool use_db_query = false; - - if (qd != NULL) { - if ( - (strncasecmp((char*)"USE", qd, 3) == 0) - && - ( - (strncasecmp((char*)"USE ", qd, 4) == 0) - || - (strncasecmp((char*)"USE`", qd, 4) == 0) - ) - ) { - use_db_query = true; - } - } - else { - if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) { - use_db_query = true; - } - } - - if (use_db_query) { - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt); - - if (mirror == false) { - break; - } - else { - handler_ret = -1; - return handler_ret; - } - } - } -#endif // =================================================== if (qpo->max_lag_ms >= 0) { thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; @@ -2474,7 +2432,6 @@ __get_pkts_from_client: } } } - mybe = find_or_create_backend(current_hostgroup); status = PROCESSING_QUERY; // set query retries @@ -2515,6 +2472,7 @@ __get_pkts_from_client: handler_ret = -1; return handler_ret; break; + // Extended Query Handling case 'P': if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(pkt) == false) { handler_ret = -1; @@ -2580,225 +2538,6 @@ __get_pkts_from_client: } break; } - if (session_type == PROXYSQL_SESSION_CLICKHOUSE) { - if ((enum_mysql_command)c == _MYSQL_COM_INIT_DB) { - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB_replace_CLICKHOUSE(pkt); - c = *((unsigned char*)pkt.ptr + sizeof(mysql_hdr)); - } - } - client_myds->com_field_list = false; // default - if (c == _MYSQL_COM_FIELD_LIST) { - if (session_type == PROXYSQL_SESSION_PGSQL) { - MySQL_Protocol* myprot = &client_myds->myprot; - bool rcp = myprot->generate_COM_QUERY_from_COM_FIELD_LIST(&pkt); - if (rcp) { - // all went well - c = *((unsigned char*)pkt.ptr + sizeof(mysql_hdr)); - client_myds->com_field_list = true; - } - else { - // parsing failed, proxysql will return not suppported command - } - } - } - switch ((enum_mysql_command)c) { - case _MYSQL_COM_QUERY: - __sync_add_and_fetch(&thread->status_variables.stvar[st_var_queries], 1); - if (session_type == PROXYSQL_SESSION_PGSQL) { - bool rc_break = false; - bool lock_hostgroup = false; - if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) { - // Note: CurrentQuery sees the query as sent by the client. - // shortly after, the packets it used to contain the query will be deallocated - CurrentQuery.begin((unsigned char*)pkt.ptr, pkt.size, true); - } - rc_break = handler_special_queries(&pkt, &lock_hostgroup); - if (rc_break == true) { - if (mirror == false) { - // track also special queries - //RequestEnd(NULL); - // we moved this inside handler_special_queries() - // because a pointer was becoming invalid - break; - } - else { - handler_ret = -1; - return handler_ret; - } - } - timespec begint; - timespec endt; - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint); - } - - unsigned int query_len = pkt.size - 4 - 1; // excluding header - char* query_ptr = (char*)pkt.ptr + 4 + 1; - - qpo = GloPgQPro->process_query(this, query_ptr, query_len, &CurrentQuery); - if (thread->variables.stats_time_query_processor) { - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt); - thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] + - (endt.tv_sec * 1000000000 + endt.tv_nsec) - - (begint.tv_sec * 1000000000 + begint.tv_nsec); - } - assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo -#if 0 - // This block was moved from 'handler_special_queries' to support - // handling of 'USE' statements which are preceded by a comment. - // For more context check issue: #3493. - // =================================================== - if (session_type != PROXYSQL_SESSION_CLICKHOUSE) { - const char* qd = CurrentQuery.get_digest_text(); - bool use_db_query = false; - - if (qd != NULL) { - if ( - (strncasecmp((char*)"USE", qd, 3) == 0) - && - ( - (strncasecmp((char*)"USE ", qd, 4) == 0) - || - (strncasecmp((char*)"USE`", qd, 4) == 0) - ) - ) { - use_db_query = true; - } - } - else { - if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) { - use_db_query = true; - } - } - - if (use_db_query) { - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt); - - if (mirror == false) { - break; - } - else { - handler_ret = -1; - return handler_ret; - } - } - } -#endif - // =================================================== - if (qpo->max_lag_ms >= 0) { - thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++; - } - rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup); - if (mirror == false && rc_break == false) { - if (pgsql_thread___automatic_detect_sqli) { - if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) { - handler_ret = -1; - return handler_ret; - } - } - } - if (rc_break == true) { - if (mirror == false) { - break; - } - else { - handler_ret = -1; - return handler_ret; - } - } - if (mirror == false) { - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session(); - } - - if (autocommit_on_hostgroup >= 0) { - } - if (pgsql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6 - if (locked_on_hostgroup < 0) { - if (lock_hostgroup) { - // we are locking on hostgroup now - if (qpo->destination_hostgroup >= 0) { - if (transaction_persistent_hostgroup == -1) { - current_hostgroup = qpo->destination_hostgroup; - } - } - locked_on_hostgroup = current_hostgroup; - thread->status_variables.stvar[st_var_hostgroup_locked]++; - thread->status_variables.stvar[st_var_hostgroup_locked_set_cmds]++; - } - } - if (locked_on_hostgroup >= 0) { - if (current_hostgroup != locked_on_hostgroup) { - client_myds->DSS = STATE_QUERY_SENT_NET; - int l = CurrentQuery.QueryLength; - char* end = (char*)""; - if (l > 256) { - l = 253; - end = (char*)"..."; - } - string nqn = string((char*)CurrentQuery.QueryPointer, l); - char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s"; - char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64); - sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end); - client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION, - false, true); - thread->status_variables.stvar[st_var_hostgroup_locked_queries]++; - RequestEnd(NULL); - free(buf); - l_free(pkt.size, pkt.ptr); - break; - } - } - } - mybe = find_or_create_backend(current_hostgroup); - status = PROCESSING_QUERY; - // set query retries - mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure; - // if a number of retries is set in mysql_query_rules, that takes priority - if (qpo) { - if (qpo->retries >= 0) { - mybe->server_myds->query_retries_on_failure = qpo->retries; - } - } - mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure; - mybe->server_myds->wait_until = 0; - pause_until = 0; - if (pgsql_thread___default_query_delay) { - pause_until = thread->curtime + pgsql_thread___default_query_delay * 1000; - } - if (qpo) { - if (qpo->delay > 0) { - if (pause_until == 0) - pause_until = thread->curtime; - pause_until += qpo->delay * 1000; - } - } - - - proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n"); - mybe->server_myds->killed_at = 0; - mybe->server_myds->kill_type = 0; - mybe->server_myds->pgsql_real_query.init(&pkt); - mybe->server_myds->statuses.questions++; - client_myds->setDSS_STATE_QUERY_SENT_NET(); - } - else { - handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___not_mysql(pkt); - } - break; - default: - // in this switch we only handle the most common commands. - // The not common commands are handled by "default" , that - // calls the following function - // handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM__various - //if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM__various(&pkt, &wrong_pass) == false) { - // If even this cannot find the command, we return an error to the client - proxy_error("RECEIVED AN UNKNOWN COMMAND: %d -- PLEASE REPORT A BUG\n", c); - l_free(pkt.size, pkt.ptr); - handler_ret = -1; // immediately drop the connection - return handler_ret; - //} - break; - } break; default: handler___status_WAITING_CLIENT_DATA___default(); @@ -3434,8 +3173,7 @@ handler_again: } RequestEnd(myds); finishQuery(myds, myconn, prepared_stmt_with_no_params); - } - else { + } else { if (rc == -1) { // the query failed const bool is_error_present = myconn->is_error_present(); // false means failure is due to server being in OFFLINE state @@ -3509,7 +3247,6 @@ handler_again: } } - // FIXME: Temporary workaround. Update the logic below when pipeline mode is implemented if (rc != 1 && pkt.size && pkt.ptr && ((char*)pkt.ptr)[0] == 'S') { // it's a sync packet // sent sync packet again to client queue, to execute sync in next iteration to handle remaining pending packets @@ -3529,9 +3266,12 @@ handler_again: } break; + case PROCESSING_EXTENDED_QUERY_SYNC: + assert(0); //no handled yet + break; + case SETTING_ISOLATION_LEVEL: case SETTING_TRANSACTION_READ: - //case SETTING_CHARSET: case SETTING_VARIABLE: case SETTING_NEXT_ISOLATION_LEVEL: case SETTING_NEXT_TRANSACTION_READ: @@ -5284,10 +5024,9 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED mybe->server_myds->DSS = STATE_MARIADB_CONNECTING; status = CONNECTING_SERVER; mybe->server_myds->myconn->reusable = true; - } - else { + } else { proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- PgSQL Connection found = %p\n", this, mybe->server_myds->myconn); - mybe->server_myds->assign_fd_from_mysql_conn(); + mybe->server_myds->assign_fd_from_pgsql_conn(); mybe->server_myds->myds_type = MYDS_BACKEND; mybe->server_myds->DSS = STATE_READY; @@ -5683,7 +5422,7 @@ void PgSQL_Session::create_new_session_and_reset_connection(PgSQL_Data_Stream* _ new_myds = new_sess->mybe->server_myds; new_myds->attach_connection(mc); - new_myds->assign_fd_from_mysql_conn(); + new_myds->assign_fd_from_pgsql_conn(); new_myds->myds_type = MYDS_BACKEND; new_sess->to_process = 1; new_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout @@ -5811,8 +5550,7 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon if (transaction_persistent == true) { transaction_persistent_hostgroup = -1; } - } - else { + } else { myconn->multiplex_delayed = false; myconn->compute_unknown_transaction_status(); myconn->async_state_machine = ASYNC_IDLE; @@ -5822,8 +5560,7 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon if (myds->myconn->IsActiveTransaction() == true) { // only active transaction is important here. Ignore other criterias transaction_persistent_hostgroup = current_hostgroup; } - } - else { + } else { if (myds->myconn->IsActiveTransaction() == false) { // a transaction just completed transaction_persistent_hostgroup = -1; } @@ -5832,7 +5569,6 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon } } - bool PgSQL_Session::known_query_for_locked_on_hostgroup(uint64_t digest) { bool ret = false; /*switch (digest) { @@ -5845,8 +5581,6 @@ bool PgSQL_Session::known_query_for_locked_on_hostgroup(uint64_t digest) { return ret; } - - void PgSQL_Session::unable_to_parse_set_statement(bool* lock_hostgroup) { // we couldn't parse the query string query_str = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength); diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 6b653c320..75cb95891 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -2941,7 +2941,7 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) { myds = sess->mybe->server_myds; myds->attach_connection(mc); - myds->assign_fd_from_mysql_conn(); + myds->assign_fd_from_pgsql_conn(); myds->myds_type = MYDS_BACKEND; sess->to_process = 1;