* Refactored PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo.

* Fixed a crash occurring during session destruction.
* Query rules will now apply only to the first message in an extended query frame.
* OK message will apply to execute message.
* Query rewrite, error messages, and large packet handling will apply to parse message.
* Added query processing support for the Bind message.
pull/5044/head
Rahim Kanji 8 months ago
parent 4c832a9694
commit 9e4b76d6d1

@ -28,9 +28,9 @@ private:
void* re;
char* s;
public:
Session_Regex(char* p);
Session_Regex(const char* p);
~Session_Regex();
bool match(char* m);
bool match(const char* m);
};
class MySQL_Thread;

@ -50,6 +50,19 @@ public:
return _data;
}
/**
* @brief Returns a reference to the internal packet data.
*
* @return Reference to the PtrSize_t structure containing packet data.
*/
inline const PtrSize_t& get_raw_pkt() const noexcept {
return _pkt;
}
inline PtrSize_t& get_raw_pkt() noexcept {
return _pkt;
}
protected:
/**
* @brief Provides mutable access to the internal data.
@ -161,7 +174,6 @@ private:
uint16_t remaining; ///< Number of fields remaining to read.
};
struct PgSQL_Parse_Data {
const char* stmt_name; // The name of the prepared statement
const char* query_string; // The query string to be prepared
@ -171,6 +183,7 @@ private:
const unsigned char* param_types_start_ptr; // Array of parameter types (can be nullptr if none)
friend class PgSQL_Parse_Message;
friend class PgSQL_Session; // need it for void PgSQL_Session::handler_WCD_SS_MCQ_qpo_QueryRewrite(PtrSize_t* pkt);
};
class PgSQL_Parse_Message : public Base_Extended_Query_Message<PgSQL_Parse_Data,PgSQL_Parse_Message> {

@ -34,6 +34,8 @@ enum PgSQL_Extended_Query_Type : uint8_t {
PGSQL_EXTENDED_QUERY_TYPE_PARSE = 0x01,
PGSQL_EXTENDED_QUERY_TYPE_DESCRIBE = 0x02,
PGSQL_EXTENDED_QUERY_TYPE_EXECUTE = 0x04,
PGSQL_EXTENDED_QUERY_TYPE_BIND = 0x08,
PGSQL_EXTENDED_QUERY_TYPE_CLOSE = 0x10,
};
/* Enumerated types for output format and date order */
@ -188,6 +190,7 @@ private:
using PktType = std::variant<std::unique_ptr<PgSQL_Parse_Message>,std::unique_ptr<PgSQL_Describe_Message>,
std::unique_ptr<PgSQL_Close_Message>, std::unique_ptr<PgSQL_Bind_Message>, std::unique_ptr<PgSQL_Execute_Message>>;
bool extended_query_exec_qp = false;
std::queue<PktType> extended_query_frame;
std::unique_ptr<const PgSQL_Bind_Message> bind_waiting_for_execute;
@ -236,6 +239,13 @@ private:
#endif
void handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED__get_connection();
bool is_multi_statement_command(const char* cmd);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_SET_command(const char* dig, bool* lock_hostgroup);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_RESET_command(const char* dig, bool* lock_hostgroup);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DISCARD_command(const char* dig);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_DEALLOCATE_command(const char* dig);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___handle_special_commands(const char* dig, bool* lock_hostgroup);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_QUERY_qpo(PtrSize_t*, bool* lock_hostgroup,
PgSQL_Extended_Query_Type stmt_type = PGSQL_EXTENDED_QUERY_TYPE_NOT_SET);
bool handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___PGSQL_PARSE(PtrSize_t& pkt);

@ -683,7 +683,7 @@ int Base_Session<S,DS,B,T>::FindOneActiveTransaction(bool check_savepoint) {
return ret;
}
Session_Regex::Session_Regex(char* p) {
Session_Regex::Session_Regex(const char* p) {
s = strdup(p);
re2::RE2::Options* opt2 = new re2::RE2::Options(RE2::Quiet);
opt2->set_case_sensitive(false);
@ -697,7 +697,7 @@ Session_Regex::~Session_Regex() {
delete (re2::RE2::Options*)opt;
}
bool Session_Regex::match(char* m) {
bool Session_Regex::match(const char* m) {
bool rc = false;
rc = RE2::PartialMatch(m, *(RE2*)re);
return rc;

@ -702,6 +702,9 @@ handler_again:
break;
case ASYNC_STMT_PREPARE_START:
stmt_prepare_start();
__sync_fetch_and_add(&parent->queries_sent, 1);
update_bytes_sent(query.length + 5);
statuses.questions++;
if (async_exit_status) {
next_event(ASYNC_STMT_PREPARE_CONT);
}

@ -1480,27 +1480,34 @@ bool PgSQL_Protocol::generate_ok_packet(bool send, bool ready, const char* msg,
pgpkt.set_multi_pkt_mode(true);
}
char* tag = extract_tag_from_query(query);
assert(tag);
char tmpbuf[128];
if (strcmp(tag, "INSERT") == 0) {
sprintf(tmpbuf, "%s 0 %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
} else if (strcmp(tag, "UPDATE") == 0 ||
strcmp(tag, "DELETE") == 0 ||
strcmp(tag, "MERGE") == 0 ||
strcmp(tag, "MOVE") == 0 ||
strcmp(tag, "FETCH") == 0 ||
strcmp(tag, "COPY") == 0 ||
strcmp(tag, "SELECT") == 0) {
sprintf(tmpbuf, "%s %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
} else {
pgpkt.write_CommandComplete(tag);
if (query) {
char* tag = extract_tag_from_query(query);
assert(tag);
char tmpbuf[128];
if (strcmp(tag, "INSERT") == 0) {
sprintf(tmpbuf, "%s 0 %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
}
else if (strcmp(tag, "UPDATE") == 0 ||
strcmp(tag, "DELETE") == 0 ||
strcmp(tag, "MERGE") == 0 ||
strcmp(tag, "MOVE") == 0 ||
strcmp(tag, "FETCH") == 0 ||
strcmp(tag, "COPY") == 0 ||
strcmp(tag, "SELECT") == 0) {
sprintf(tmpbuf, "%s %d", tag, rows);
pgpkt.write_CommandComplete(tmpbuf);
}
else {
pgpkt.write_CommandComplete(tag);
}
free(tag);
} else if (msg) {
// if no query, but message is provided, use it as tag
pgpkt.write_CommandComplete(msg);
}
free(tag);
for (auto& [param_name, param_value] : param_status) {
pgpkt.write_ParameterStatus(param_name.c_str(), param_value.c_str());
}

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save