Code cleanup

v3.0_extended_query_protocol
Rahim Kanji 8 months ago
parent 8bb248cef5
commit f350102f72

@ -203,7 +203,7 @@ public:
void unplug_backend();
void check_data_flow();
int assign_fd_from_mysql_conn();
int assign_fd_from_pgsql_conn();
static unsigned char* copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length, bool del);
static void copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,

@ -1104,7 +1104,7 @@ int PgSQL_Data_Stream::array2buffer_full() {
return rc;
}
int PgSQL_Data_Stream::assign_fd_from_mysql_conn() {
int PgSQL_Data_Stream::assign_fd_from_pgsql_conn() {
assert(myconn);
//proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->myconn.net.fd);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->fd);

@ -1736,7 +1736,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
rc = myconn->async_connect(myds->revents);
if (myds->mypolls == NULL) {
// connection yet not in mypolls
myds->assign_fd_from_mysql_conn();
myds->assign_fd_from_pgsql_conn();
thread->mypolls.add(POLLIN | POLLOUT, myds->fd, myds, curtime);
if (mirror) {
PROXY_TRACE();
@ -1749,7 +1749,7 @@ bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
// PQconnectPoll has changed the file descriptor (FD) during the connection process.
// We need to update the new FD in mypolls, replacing the old one,
// Note: previous FD is closed by PQconnectPoll
myds->assign_fd_from_mysql_conn();
myds->assign_fd_from_pgsql_conn();
thread->mypolls.update_fd_at_index(myds->poll_fds_idx, myds->fd);
}
}
@ -2323,8 +2323,7 @@ __get_pkts_from_client:
//handler_ret = -1;
return handler_ret;
}
}
else {
} else {
char command = c = *((unsigned char*)pkt.ptr);
switch (command) {
case 'Q':
@ -2368,47 +2367,6 @@ __get_pkts_from_client:
(begint.tv_sec * 1000000000 + begint.tv_nsec);
}
assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo
#if 0
// This block was moved from 'handler_special_queries' to support
// handling of 'USE' statements which are preceded by a comment.
// For more context check issue: #3493.
// ===================================================
if (session_type != PROXYSQL_SESSION_CLICKHOUSE) {
const char* qd = CurrentQuery.get_digest_text();
bool use_db_query = false;
if (qd != NULL) {
if (
(strncasecmp((char*)"USE", qd, 3) == 0)
&&
(
(strncasecmp((char*)"USE ", qd, 4) == 0)
||
(strncasecmp((char*)"USE`", qd, 4) == 0)
)
) {
use_db_query = true;
}
}
else {
if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) {
use_db_query = true;
}
}
if (use_db_query) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt);
if (mirror == false) {
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
}
#endif
// ===================================================
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
@ -2474,7 +2432,6 @@ __get_pkts_from_client:
}
}
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
@ -2515,6 +2472,7 @@ __get_pkts_from_client:
handler_ret = -1;
return handler_ret;
break;
// Extended Query Handling
case 'P':
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(pkt) == false) {
handler_ret = -1;
@ -2580,225 +2538,6 @@ __get_pkts_from_client:
}
break;
}
if (session_type == PROXYSQL_SESSION_CLICKHOUSE) {
if ((enum_mysql_command)c == _MYSQL_COM_INIT_DB) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_INIT_DB_replace_CLICKHOUSE(pkt);
c = *((unsigned char*)pkt.ptr + sizeof(mysql_hdr));
}
}
client_myds->com_field_list = false; // default
if (c == _MYSQL_COM_FIELD_LIST) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
MySQL_Protocol* myprot = &client_myds->myprot;
bool rcp = myprot->generate_COM_QUERY_from_COM_FIELD_LIST(&pkt);
if (rcp) {
// all went well
c = *((unsigned char*)pkt.ptr + sizeof(mysql_hdr));
client_myds->com_field_list = true;
}
else {
// parsing failed, proxysql will return not suppported command
}
}
}
switch ((enum_mysql_command)c) {
case _MYSQL_COM_QUERY:
__sync_add_and_fetch(&thread->status_variables.stvar[st_var_queries], 1);
if (session_type == PROXYSQL_SESSION_PGSQL) {
bool rc_break = false;
bool lock_hostgroup = false;
if (session_fast_forward == SESSION_FORWARD_TYPE_NONE) {
// Note: CurrentQuery sees the query as sent by the client.
// shortly after, the packets it used to contain the query will be deallocated
CurrentQuery.begin((unsigned char*)pkt.ptr, pkt.size, true);
}
rc_break = handler_special_queries(&pkt, &lock_hostgroup);
if (rc_break == true) {
if (mirror == false) {
// track also special queries
//RequestEnd(NULL);
// we moved this inside handler_special_queries()
// because a pointer was becoming invalid
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
timespec begint;
timespec endt;
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &begint);
}
unsigned int query_len = pkt.size - 4 - 1; // excluding header
char* query_ptr = (char*)pkt.ptr + 4 + 1;
qpo = GloPgQPro->process_query(this, query_ptr, query_len, &CurrentQuery);
if (thread->variables.stats_time_query_processor) {
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &endt);
thread->status_variables.stvar[st_var_query_processor_time] = thread->status_variables.stvar[st_var_query_processor_time] +
(endt.tv_sec * 1000000000 + endt.tv_nsec) -
(begint.tv_sec * 1000000000 + begint.tv_nsec);
}
assert(qpo); // GloPgQPro->process_mysql_query() should always return a qpo
#if 0
// This block was moved from 'handler_special_queries' to support
// handling of 'USE' statements which are preceded by a comment.
// For more context check issue: #3493.
// ===================================================
if (session_type != PROXYSQL_SESSION_CLICKHOUSE) {
const char* qd = CurrentQuery.get_digest_text();
bool use_db_query = false;
if (qd != NULL) {
if (
(strncasecmp((char*)"USE", qd, 3) == 0)
&&
(
(strncasecmp((char*)"USE ", qd, 4) == 0)
||
(strncasecmp((char*)"USE`", qd, 4) == 0)
)
) {
use_db_query = true;
}
}
else {
if (pkt.size > (5 + 4) && strncasecmp((char*)"USE ", (char*)pkt.ptr + 5, 4) == 0) {
use_db_query = true;
}
}
if (use_db_query) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_USE_DB(&pkt);
if (mirror == false) {
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
}
#endif
// ===================================================
if (qpo->max_lag_ms >= 0) {
thread->status_variables.stvar[st_var_queries_with_max_lag_ms]++;
}
rc_break = handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_qpo(&pkt, &lock_hostgroup);
if (mirror == false && rc_break == false) {
if (pgsql_thread___automatic_detect_sqli) {
if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY_detect_SQLi()) {
handler_ret = -1;
return handler_ret;
}
}
}
if (rc_break == true) {
if (mirror == false) {
break;
}
else {
handler_ret = -1;
return handler_ret;
}
}
if (mirror == false) {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___create_mirror_session();
}
if (autocommit_on_hostgroup >= 0) {
}
if (pgsql_thread___set_query_lock_on_hostgroup == 1) { // algorithm introduced in 2.0.6
if (locked_on_hostgroup < 0) {
if (lock_hostgroup) {
// we are locking on hostgroup now
if (qpo->destination_hostgroup >= 0) {
if (transaction_persistent_hostgroup == -1) {
current_hostgroup = qpo->destination_hostgroup;
}
}
locked_on_hostgroup = current_hostgroup;
thread->status_variables.stvar[st_var_hostgroup_locked]++;
thread->status_variables.stvar[st_var_hostgroup_locked_set_cmds]++;
}
}
if (locked_on_hostgroup >= 0) {
if (current_hostgroup != locked_on_hostgroup) {
client_myds->DSS = STATE_QUERY_SENT_NET;
int l = CurrentQuery.QueryLength;
char* end = (char*)"";
if (l > 256) {
l = 253;
end = (char*)"...";
}
string nqn = string((char*)CurrentQuery.QueryPointer, l);
char* err_msg = (char*)"Session trying to reach HG %d while locked on HG %d . Rejecting query: %s";
char* buf = (char*)malloc(strlen(err_msg) + strlen(nqn.c_str()) + strlen(end) + 64);
sprintf(buf, err_msg, current_hostgroup, locked_on_hostgroup, nqn.c_str(), end);
client_myds->myprot.generate_error_packet(true, true, buf, PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION,
false, true);
thread->status_variables.stvar[st_var_hostgroup_locked_queries]++;
RequestEnd(NULL);
free(buf);
l_free(pkt.size, pkt.ptr);
break;
}
}
}
mybe = find_or_create_backend(current_hostgroup);
status = PROCESSING_QUERY;
// set query retries
mybe->server_myds->query_retries_on_failure = pgsql_thread___query_retries_on_failure;
// if a number of retries is set in mysql_query_rules, that takes priority
if (qpo) {
if (qpo->retries >= 0) {
mybe->server_myds->query_retries_on_failure = qpo->retries;
}
}
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
mybe->server_myds->wait_until = 0;
pause_until = 0;
if (pgsql_thread___default_query_delay) {
pause_until = thread->curtime + pgsql_thread___default_query_delay * 1000;
}
if (qpo) {
if (qpo->delay > 0) {
if (pause_until == 0)
pause_until = thread->curtime;
pause_until += qpo->delay * 1000;
}
}
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
mybe->server_myds->pgsql_real_query.init(&pkt);
mybe->server_myds->statuses.questions++;
client_myds->setDSS_STATE_QUERY_SENT_NET();
}
else {
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_QUERY___not_mysql(pkt);
}
break;
default:
// in this switch we only handle the most common commands.
// The not common commands are handled by "default" , that
// calls the following function
// handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM__various
//if (handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM__various(&pkt, &wrong_pass) == false) {
// If even this cannot find the command, we return an error to the client
proxy_error("RECEIVED AN UNKNOWN COMMAND: %d -- PLEASE REPORT A BUG\n", c);
l_free(pkt.size, pkt.ptr);
handler_ret = -1; // immediately drop the connection
return handler_ret;
//}
break;
}
break;
default:
handler___status_WAITING_CLIENT_DATA___default();
@ -3434,8 +3173,7 @@ handler_again:
}
RequestEnd(myds);
finishQuery(myds, myconn, prepared_stmt_with_no_params);
}
else {
} else {
if (rc == -1) {
// the query failed
const bool is_error_present = myconn->is_error_present(); // false means failure is due to server being in OFFLINE state
@ -3509,7 +3247,6 @@ handler_again:
}
}
// FIXME: Temporary workaround. Update the logic below when pipeline mode is implemented
if (rc != 1 && pkt.size && pkt.ptr && ((char*)pkt.ptr)[0] == 'S') { // it's a sync packet
// sent sync packet again to client queue, to execute sync in next iteration to handle remaining pending packets
@ -3529,9 +3266,12 @@ handler_again:
}
break;
case PROCESSING_EXTENDED_QUERY_SYNC:
assert(0); //no handled yet
break;
case SETTING_ISOLATION_LEVEL:
case SETTING_TRANSACTION_READ:
//case SETTING_CHARSET:
case SETTING_VARIABLE:
case SETTING_NEXT_ISOLATION_LEVEL:
case SETTING_NEXT_TRANSACTION_READ:
@ -5284,10 +5024,9 @@ void PgSQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
mybe->server_myds->DSS = STATE_MARIADB_CONNECTING;
status = CONNECTING_SERVER;
mybe->server_myds->myconn->reusable = true;
}
else {
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p -- PgSQL Connection found = %p\n", this, mybe->server_myds->myconn);
mybe->server_myds->assign_fd_from_mysql_conn();
mybe->server_myds->assign_fd_from_pgsql_conn();
mybe->server_myds->myds_type = MYDS_BACKEND;
mybe->server_myds->DSS = STATE_READY;
@ -5683,7 +5422,7 @@ void PgSQL_Session::create_new_session_and_reset_connection(PgSQL_Data_Stream* _
new_myds = new_sess->mybe->server_myds;
new_myds->attach_connection(mc);
new_myds->assign_fd_from_mysql_conn();
new_myds->assign_fd_from_pgsql_conn();
new_myds->myds_type = MYDS_BACKEND;
new_sess->to_process = 1;
new_myds->wait_until = thread->curtime + pgsql_thread___connect_timeout_server * 1000; // max_timeout
@ -5811,8 +5550,7 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
if (transaction_persistent == true) {
transaction_persistent_hostgroup = -1;
}
}
else {
} else {
myconn->multiplex_delayed = false;
myconn->compute_unknown_transaction_status();
myconn->async_state_machine = ASYNC_IDLE;
@ -5822,8 +5560,7 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
if (myds->myconn->IsActiveTransaction() == true) { // only active transaction is important here. Ignore other criterias
transaction_persistent_hostgroup = current_hostgroup;
}
}
else {
} else {
if (myds->myconn->IsActiveTransaction() == false) { // a transaction just completed
transaction_persistent_hostgroup = -1;
}
@ -5832,7 +5569,6 @@ void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* mycon
}
}
bool PgSQL_Session::known_query_for_locked_on_hostgroup(uint64_t digest) {
bool ret = false;
/*switch (digest) {
@ -5845,8 +5581,6 @@ bool PgSQL_Session::known_query_for_locked_on_hostgroup(uint64_t digest) {
return ret;
}
void PgSQL_Session::unable_to_parse_set_statement(bool* lock_hostgroup) {
// we couldn't parse the query
string query_str = string((char*)CurrentQuery.QueryPointer, CurrentQuery.QueryLength);

@ -2941,7 +2941,7 @@ void PgSQL_Thread::run___get_multiple_idle_connections(int& num_idles) {
myds = sess->mybe->server_myds;
myds->attach_connection(mc);
myds->assign_fd_from_mysql_conn();
myds->assign_fd_from_pgsql_conn();
myds->myds_type = MYDS_BACKEND;
sess->to_process = 1;

Loading…
Cancel
Save