Adding some documentation

pull/4664/head
René Cannaò 2 years ago
parent 04abd43a57
commit dd67530af4

@ -122,6 +122,17 @@ public:
}
PtrSize_t* get_PtrSize(unsigned c = PG_PKT_DEFAULT_SIZE);
/**
* @brief Moves the current packet data to a PtrSizeArray.
*
* This function adds the current `ptr` and `size` to the provided
* `PtrSizeArray` (`psa`). It then resets the internal buffer (`ptr` and
* `size`) to a new buffer with a capacity of `c` if `c` is not zero.
*
* @param psa The PtrSizeArray where the current packet data will be added.
* @param c The desired capacity of the new internal buffer.
*/
void to_PtrSizeArray(PtrSizeArray* psa, unsigned c = PG_PKT_DEFAULT_SIZE);
void set_multi_pkt_mode(bool mode) {
@ -130,12 +141,82 @@ public:
if (mode == false)
pkt_offset.clear();
}
/**
* @brief Resizes the internal buffer if needed to accommodate additional data.
*
* If the current size of the internal buffer (`size`) plus the requested length
* (`len`) exceeds the buffer's capacity (`capacity`), this function reallocates
* the buffer to a new size that's the nearest power of 2 greater than or equal
* to `size + len`.
*
* If the buffer already has enough space, this function does nothing.
*
* @param len The number of bytes of additional space required.
*
* @note This function only resizes the buffer if the `ownership` flag is true,
* indicating that the buffer is owned by the `PG_pkt` object.
*/
void make_space(unsigned int len);
/**
* @brief Appends a single character to the internal buffer.
*
* This function ensures there's enough space in the buffer and then appends
* the given character (`val`) to the end of the buffer.
*
* @param val The character to append.
*/
void put_char(char val);
/**
* @brief Appends a 16-bit unsigned integer to the internal buffer.
*
* This function ensures there's enough space in the buffer and then appends
* the given 16-bit unsigned integer (`val`) in big-endian byte order.
*
* @param val The 16-bit unsigned integer to append.
*/
void put_uint16(uint16_t val);
/**
* @brief Appends a 32-bit unsigned integer to the internal buffer.
*
* This function ensures there's enough space in the buffer and then appends
* the given 32-bit unsigned integer (`val`) in big-endian byte order.
*
* @param val The 32-bit unsigned integer to append.
*/
void put_uint32(uint32_t val);
/**
* @brief Appends a 64-bit unsigned integer to the internal buffer.
*
* This function appends the given 64-bit unsigned integer (`val`) to the
* internal buffer in big-endian byte order.
*
* @param val The 64-bit unsigned integer to append.
*/
void put_uint64(uint64_t val);
/**
* @brief Appends a block of bytes to the internal buffer.
*
* This function ensures there's enough space in the buffer and then copies
* `len` bytes from the provided data pointer (`data`) to the end of the buffer.
*
* @param data A pointer to the beginning of the data to append.
* @param len The number of bytes to append.
*/
void put_bytes(const void* data, int len);
/**
* @brief Appends a null-terminated string to the internal buffer.
*
* This function appends the given null-terminated string (`str`) to the
* internal buffer, including the null terminator.
*
* @param str The null-terminated string to append.
*/
void put_string(const char* str);
void write_generic(int type, const char* pktdesc, ...);
@ -169,7 +250,25 @@ public:
void write_DataRow(const char* tupdesc, ...);
private:
/**
* @brief Initializes a new packet with a specified type.
*
* This function sets the first byte of the packet to the given `type` and
* reserves space for the packet length (which will be filled in later).
*
* @param type The type of the packet (must be a value between 0 and 255).
*/
void start_packet(int type);
/**
* @brief Completes a packet by filling in the length field.
*
* This function calculates the length of the packet (excluding the type
* byte) and writes it to the appropriate position in the packet buffer.
*
* @note If the `multiple_pkt_mode` flag is set to true, the length is
* calculated and written based on the last recorded packet offset.
*/
void finish_packet();
char* ptr;
@ -197,17 +296,175 @@ public:
PgSQL_Query_Result();
~PgSQL_Query_Result();
/**
* @brief Initializes the PgSQL_Query_Result object.
*
* This method initializes the `PgSQL_Query_Result` object with the
* provided `PgSQL_Protocol`, `PgSQL_Data_Stream`, and `PgSQL_Connection`
* objects. It also initializes the internal buffer using the
* `buffer_init` method and resets any internal state.
*
* @param _proto A pointer to the `PgSQL_Protocol` object associated with
* this query result.
* @param _myds A pointer to the `PgSQL_Data_Stream` object associated with
* this query result.
* @param _conn A pointer to the `PgSQL_Connection` object associated with
* this query result.
*
* @note This method is typically called when a new query is executed.
*/
void init(PgSQL_Protocol* _proto, PgSQL_Data_Stream* _myds, PgSQL_Connection* _conn);
/**
* @brief Adds a row description to the query result.
*
* This method adds a row description (from a `PGresult` object) to the
* query result. It copies the row description data to the internal buffer
* or to the `PSarrayOUT` if the buffer is full.
*
* @param result A pointer to a `PGresult` object containing the row
* description to add.
*
* @return The number of bytes added to the query result.
*
* @note This method is used to prepare the client for receiving rows
* with the corresponding data types and column names.
*/
unsigned int add_row_description(const PGresult* result);
/**
* @brief Adds a row of data to the query result.
*
* This method adds a row of data (from a `PGresult` object) to the query
* result. It copies the row data to the internal buffer or to the
* `PSarrayOUT` if the buffer is full.
*
* @param result A pointer to a `PGresult` object containing the row data
* to add.
*
* @return The number of bytes added to the query result.
*/
unsigned int add_row(const PGresult* result);
/**
* @brief Adds a row of data to the query result from a PSresult.
*
* This method adds a row of data (from a `PSresult` object) to the query
* result. It copies the row data to the internal buffer or to the
* `PSarrayOUT` if the buffer is full.
*
* @param result A pointer to a `PSresult` object containing the row data
* to add.
*
* @return The number of bytes added to the query result.
*/
unsigned int add_row(const PSresult* result);
/**
* @brief Adds a command completion message to the query result.
*
* This method adds a command completion message (from a `PGresult`
* object) to the query result. It extracts the command tag and affected
* rows count (if requested) and adds them to the internal buffer or the
* `PSarrayOUT` if the buffer is full.
*
* @param result A pointer to a `PGresult` object containing the command
* completion message.
* @param extract_affected_rows A boolean flag indicating whether to
* extract the affected rows count from the
* `PGresult` object.
*
* @return The number of bytes added to the query result.
*
* @note This method is used to signal the completion of a command
* (e.g., INSERT, UPDATE, DELETE) and to send the appropriate
* response to the client.
*/
unsigned int add_command_completion(const PGresult* result, bool extract_affected_rows = true);
/**
* @brief Adds an error message to the query result.
*
* This method adds an error message (from a `PGresult` object) to the
* query result. It copies the error data to the internal buffer or to the
* `PSarrayOUT` if the buffer is full.
*
* @param result A pointer to a `PGresult` object containing the error
* message to add.
*
* @return The number of bytes added to the query result.
*
* @note This method is used to handle errors that occur during query
* execution and to send the error information to the client.
*/
unsigned int add_error(const PGresult* result);
/**
* @brief Adds an empty query response to the query result.
*
* This method adds an empty query response (for example from query
* returning no rows) to the query result. It copies the empty query
* response data to the internal buffer or to the `PSarrayOUT` if the
* buffer is full.
*
* @param result A pointer to a `PGresult` object representing the empty
* response.
*
* @return The number of bytes added to the query result.
*
* @note This method is used to handle cases where a query does not
* return any rows or data, and to send the appropriate response
* to the client.
*/
unsigned int add_empty_query_response(const PGresult* result);
/**
* @brief Adds a ready status message to the query result.
*
* This method adds a ready status message to the query result, indicating
* that the server is ready for a new query. The status reflects the
* transaction state.
*
* @param txn_status The transaction status type, indicating whether a
* transaction is in progress or not.
*
* @return The number of bytes added to the query result.
*
* @note This method is used to signal to the client that the server is
* ready for a new query and that any previous query has completed.
*/
unsigned int add_ready_status(PGTransactionStatusType txn_status);
/**
* @brief Retrieves the query result set and copies it to a PtrSizeArray.
*
* This method retrieves the accumulated query result, including row
* descriptions, rows, errors, etc., and copies it to the provided
* `PtrSizeArray`. It also resets the internal state of the
* `PgSQL_Query_Result` object after the result set is copied.
*
* @param PSarrayFinal The `PtrSizeArray` where the query result will be
* copied.
*
* @return `true` if the result set is complete (i.e., a ready status
* packet has been added), `false` otherwise.
*
* @note This method is typically called when all query results have been
* accumulated and are ready to be sent to the client.
*/
bool get_resultset(PtrSizeArray* PSarrayFinal); // this also calls reset
/**
* @brief Calculates the current size of the PgSQL_Query_Result object.
*
* This method calculates the total size of the `PgSQL_Query_Result`
* object in bytes, including the size of the object itself, the internal
* buffer, and any packets stored in the `PSarrayOUT`.
*
* @return The current size of the `PgSQL_Query_Result` object in bytes.
*/
unsigned long long current_size();
inline bool is_transfer_started() const { return transfer_started; }
inline unsigned long long get_num_rows() const { return num_rows; }
inline unsigned long long get_affected_rows() const { return affected_rows; }
@ -216,10 +473,64 @@ public:
inline uint8_t get_result_packet_type() const { return result_packet_type; }
private:
/**
* @brief Initializes the internal buffer for storing query results.
*
* If the `buffer` pointer is null, this function allocates a new buffer
* of size `PGSQL_RESULTSET_BUFLEN` and assigns it to the `buffer` pointer.
* It also resets the `buffer_used` counter to 0, indicating that the
* buffer is currently empty.
*
* @note This method is called by the `init` method to ensure that the
* buffer is properly initialized before any query results are added.
*/
void buffer_init();
inline unsigned int buffer_available_capacity() const { return (PGSQL_RESULTSET_BUFLEN - buffer_used); }
/**
* @brief Reserves space in the internal buffer and returns a pointer.
*
* This method checks if there is enough space in the internal `buffer`
* to store the requested `size` of data. If there is space, it returns
* a pointer to the available location and updates `buffer_used`.
* Otherwise, it flushes the buffer to `PSarrayOUT`, allocates a new
* buffer, and returns a pointer to the available location.
*
* @param size The number of bytes of space to reserve.
*
* @return A pointer to the reserved space in the buffer, or `NULL` if
* there is not enough space.
*
* @note This method is used to efficiently manage the internal buffer
* and avoid unnecessary memory allocations.
*/
unsigned char* buffer_reserve_space(unsigned int size);
/**
* @brief Flushes the internal buffer to the PSarrayOUT.
*
* This method moves the data currently stored in the internal `buffer`
* to the `PSarrayOUT` (a `PtrSizeArray`). It then resizes the
* `buffer` to the default size `PGSQL_RESULTSET_BUFLEN` and resets
* `buffer_used` to 0.
*
* @note This method is used when the internal `buffer` is full and
* needs to be flushed to release the memory and continue adding
* more data.
*/
void buffer_to_PSarrayOut();
/**
* @brief Resets the internal state of the PgSQL_Query_Result object.
*
* This method resets the internal state of the `PgSQL_Query_Result`
* object to its initial state, including clearing the result set data,
* resetting counters, and preparing for a new query result.
*
* @note This method is typically called after the query result has been
* sent to the client and the object is ready to handle a new query.
*/
void reset();
PtrSizeArray PSarrayOUT;
@ -250,28 +561,375 @@ public:
}
PgSQL_Data_Stream* get_myds() { return *myds; }
/**
* @brief Generates the initial handshake packet for the PostgreSQL protocol.
*
* This function generates the initial handshake packet that is sent to the
* PostgreSQL client. It includes an authentication request based on the
* configured authentication method (`pgsql_thread___authentication_method`).
*
* @param send A boolean flag indicating whether to send the packet immediately
* or just generate it.
* @param _ptr A pointer to a pointer where the generated packet data will be
* stored (if `send` is false).
* @param len A pointer to an unsigned integer where the length of the
* generated packet will be stored (if `send` is false).
* @param _thread_id A pointer to a 32-bit unsigned integer where the thread ID
* will be stored.
* @param deprecate_eof_active A boolean flag to control deprecation of EOF
* active behavior.
*
* @return `true` if the packet was successfully generated, `false` otherwise.
*
* @note This function updates the authentication method and next packet type
* in the `PgSQL_Data_Stream` object. If `send` is true, it also adds
* the generated packet to the output buffer and updates the data stream
* state.
*/
bool generate_pkt_initial_handshake(bool send, void** ptr, unsigned int* len, uint32_t* thread_id, bool deprecate_eof_active) override;
/**
* @brief Processes a PostgreSQL startup packet.
*
* This function processes a PostgreSQL startup packet received from the
* client. It extracts the connection parameters, checks for SSL requests,
* and validates the user name.
*
* @param pkt A pointer to the beginning of the packet buffer.
* @param len The length of the packet buffer in bytes.
* @param ssl_request A boolean variable that is set to `true` if the client
* requests an SSL connection.
*
* @return `true` if the startup packet was successfully processed, `false`
* otherwise.
*
* @note This function updates the data stream state to `STATE_SERVER_HANDSHAKE`
* after successfully processing the startup packet. It also handles
* SSL requests and generates an error packet if the user name is
* missing.
*/
bool process_startup_packet(unsigned char* pkt, unsigned int len, bool& ssl_request);
/**
* @brief Processes a PostgreSQL handshake response packet.
*
* This function processes a handshake response packet received from the
* PostgreSQL client. It handles authentication based on the selected
* authentication method (e.g., clear text password, SCRAM-SHA-256) and
* updates the session state.
*
* @param pkt A pointer to the beginning of the packet buffer.
* @param len The length of the packet buffer in bytes.
*
* @return The execution state after processing the handshake response
* packet.
*
* @note This function validates the packet type, retrieves user credentials
* from the database, performs authentication, and updates the session
* state. It also handles errors related to authentication and invalid
* packets.
*/
EXECUTION_STATE process_handshake_response_packet(unsigned char* pkt, unsigned int len);
/**
* @brief Sends a welcome message to the PostgreSQL client.
*
* This function sends a welcome message to the PostgreSQL client after a
* successful authentication. The welcome message includes parameter status
* messages and a ready-for-query message.
*
* @note This function updates the output buffer with the welcome message
* data. It also sets the session state to `STATE_CLIENT_AUTH_OK`.
*/
void welcome_client();
/**
* @brief Generates an error packet for the PostgreSQL protocol.
*
* This function generates an error packet that is sent to the PostgreSQL
* client in case of an error. It includes the error severity, code, and
* message.
*
* @param send A boolean flag indicating whether to send the packet
* immediately or just generate it.
* @param ready A boolean flag indicating whether to generate a ready-for-query
* packet after the error.
* @param msg The error message to be included in the packet.
* @param code The error code.
* @param fatal A boolean flag indicating whether the error is fatal.
* @param track A boolean flag to control whether to track the error count.
* @param _ptr A pointer to a `PtrSize_t` structure (if `send` is false)
* where the generated packet data will be stored.
*
* @note This function updates the output buffer with the generated error
* packet. It also updates the data stream state to `STATE_ERR` if
* necessary.
*/
void generate_error_packet(bool send, bool ready, const char* msg, PGSQL_ERROR_CODES code, bool fatal, bool track = false, PtrSize_t* _ptr = NULL);
/**
* @brief Generates an "OK" packet for the PostgreSQL protocol.
*
* This function generates an "OK" packet, which is sent to the PostgreSQL
* client after a successful command execution (e.g., INSERT, UPDATE, DELETE,
* SELECT). It includes a command tag (e.g., "INSERT 0 10" for an INSERT
* command that affected 10 rows) and a ready-for-query message if `ready`
* is true.
*
* @param send A boolean flag indicating whether to send the packet
* immediately or just generate it.
* @param ready A boolean flag indicating whether to generate a ready-for-query
* packet after the "OK" packet.
* @param msg An optional message to be included in the "OK" packet.
* @param rows The number of rows affected by the command (used for
* INSERT, UPDATE, DELETE, and SELECT).
* @param query The original query string that was executed.
* @param _ptr A pointer to a `PtrSize_t` structure (if `send` is false)
* where the generated packet data will be stored.
*
* @return `true` if the packet was successfully generated, `false` otherwise.
*
* @note This function extracts the appropriate command tag based on the
* `query` string and constructs the "OK" packet accordingly. It also
* updates the output buffer with the generated packet. If `ready` is
* true, it also generates and sends a ready-for-query packet.
*/
bool generate_ok_packet(bool send, bool ready, const char* msg, int rows, const char* query, PtrSize_t* _ptr = NULL);
//bool generate_row_description(bool send, PgSQL_Query_Result* rs, const PG_Fields& fields, unsigned int size);
/**
* @brief Copies a row description from a PGresult to a PgSQL_Query_Result.
*
* This function copies the row description from a `PGresult` object (typically
* obtained from libpq) to a `PgSQL_Query_Result` object. The row description
* contains information about the columns returned by a query, such as column
* names, data types, and other metadata.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* row description will be copied.
* @param result A pointer to the `PGresult` object containing the row
* description to be copied.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*
* @note This function is used to prepare the client for receiving rows
* with the corresponding data types and column names.
*/
unsigned int copy_row_description_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
/**
* @brief Copies a row of data from a PGresult to a PgSQL_Query_Result.
*
* This function copies a row of data from a `PGresult` object (typically
* obtained from libpq) to a `PgSQL_Query_Result` object. The row data
* represents a single row from the result set of a query.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* row data will be copied.
* @param result A pointer to the `PGresult` object containing the row data
* to be copied.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*/
unsigned int copy_row_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
/**
* @brief Copies a command completion message from a PGresult to a
* PgSQL_Query_Result.
*
* This function copies a command completion message from a `PGresult` object
* (typically obtained from libpq) to a `PgSQL_Query_Result` object. The
* command completion message indicates that a command (e.g., INSERT, UPDATE,
* DELETE) has finished executing.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* command completion message will be copied.
* @param result A pointer to the `PGresult` object containing the command
* completion message to be copied.
* @param extract_affected_rows A boolean flag indicating whether to extract
* the affected rows count from the `PGresult`
* object.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*
* @note This function extracts the command tag and affected rows count (if
* requested) and copies them to the `PgSQL_Query_Result` object.
*/
unsigned int copy_command_completion_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result, bool extract_affected_rows);
/**
* @brief Copies an error message from a PGresult to a PgSQL_Query_Result.
*
* This function copies an error message from a `PGresult` object (typically
* obtained from libpq) to a `PgSQL_Query_Result` object. The error message
* contains information about an error that occurred during query execution.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* error message will be copied.
* @param result A pointer to the `PGresult` object containing the error
* message to be copied.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*
* @note This function extracts the various error fields (severity, code,
* message, detail, etc.) from the `PGresult` object and copies them
* to the `PgSQL_Query_Result` object.
*/
unsigned int copy_error_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
/**
* @brief Copies an empty query response from a PGresult to a
* PgSQL_Query_Result.
*
* This function copies an empty query response from a `PGresult` object
* (typically obtained from libpq) to a `PgSQL_Query_Result` object. The
* empty query response indicates that a query did not return any rows.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* empty query response will be copied.
* @param result A pointer to the `PGresult` object containing the empty query
* response to be copied.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*/
unsigned int copy_empty_query_response_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PGresult* result);
/**
* @brief Copies a ready status message from a PGresult to a
* PgSQL_Query_Result.
*
* This function copies a ready status message from a `PGresult` object
* (typically obtained from libpq) to a `PgSQL_Query_Result` object. The
* ready status indicates that the server is ready for a new query.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* ready status message will be copied.
* @param txn_status The transaction status type, indicating whether a
* transaction is in progress or not.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*/
unsigned int copy_ready_status_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, PGTransactionStatusType txn_status);
/**
* @brief Copies a buffer from a PSresult to a PgSQL_Query_Result.
*
* This function copies a buffer of data from a `PSresult` object to a
* `PgSQL_Query_Result` object. The buffer can contain various types of
* data, including row data or other results.
*
* @param send A boolean flag indicating whether to send the generated packet
* immediately or just generate it. (Currently not supported).
* @param pg_query_result A pointer to the `PgSQL_Query_Result` object where the
* buffer will be copied.
* @param result A pointer to the `PSresult` object containing the buffer to
* be copied.
*
* @return The number of bytes copied to the `PgSQL_Query_Result` object.
*/
unsigned int copy_buffer_to_PgSQL_Query_Result(bool send, PgSQL_Query_Result* pg_query_result, const PSresult* result);
private:
/**
* @brief Extracts the header information from a PostgreSQL packet.
*
* This function reads the header information from a received PostgreSQL
* packet and populates the `pgsql_hdr` structure with the packet type and
* length. It handles both the new (v3) and old (v2) packet formats.
*
* @param pkt A pointer to the beginning of the packet buffer.
* @param pkt_len The length of the packet buffer in bytes.
* @param hdr A pointer to a `pgsql_hdr` structure where the extracted header
* information will be stored.
*
* @return `true` if the header was successfully parsed, `false` otherwise.
*
* @note This function performs basic validation on the packet length and
* header fields to ensure that the packet is valid.
*/
bool get_header(unsigned char* pkt, unsigned int len, pgsql_hdr* hdr);
/**
* @brief Loads the connection parameters from a PostgreSQL startup packet.
*
* This function extracts the connection parameters (e.g., user, database,
* client encoding) from a PostgreSQL startup packet and stores them in the
* connection parameters object (`myconn->conn_params`).
*
* @param pkt A pointer to a `pgsql_hdr` structure containing the startup
* packet data.
* @param startup A boolean flag indicating whether this is a startup packet.
*
* @note This function iterates through the key-value pairs in the startup
* packet and stores them in the connection parameters object.
*/
void load_conn_parameters(pgsql_hdr* pkt, bool startup);
/**
* @brief Handles the client's first message in a SCRAM-SHA-256
* authentication exchange.
*
* This function receives the client's first message during the SCRAM-SHA-256
* authentication process. It parses the message, generates the server's
* first message, and sends it back to the client.
*
* @param scram_state A pointer to the `ScramState` structure that maintains
* the state of the SCRAM exchange.
* @param user A pointer to the `PgCredentials` structure containing the user
* credentials.
* @param data A pointer to the buffer containing the client's first message.
* @param datalen The length of the client's first message in bytes.
*
* @return `true` if the client's first message was successfully handled,
* `false` otherwise.
*
* @note This function performs the following steps:
* 1. Parses the client's first message to extract the authentication
* mechanism and client nonce.
* 2. Generates the server's first message, which includes the server
* nonce and salt.
* 3. Sends the server's first message to the client.
*/
bool scram_handle_client_first(ScramState* scram_state, PgCredentials* user, const unsigned char* data, uint32_t datalen);
/**
* @brief Handles the client's final message in a SCRAM-SHA-256
* authentication exchange.
*
* This function receives the client's final message during the SCRAM-SHA-256
* authentication process. It validates the client's proof, generates the
* server's final message, and sends it back to the client.
*
* @param scram_state A pointer to the `ScramState` structure that maintains
* the state of the SCRAM exchange.
* @param user A pointer to the `PgCredentials` structure containing the user
* credentials.
* @param data A pointer to the buffer containing the client's final message.
* @param datalen The length of the client's final message in bytes.
*
* @return `true` if the client's final message was successfully handled,
* `false` otherwise.
*
* @note This function performs the following steps:
* 1. Parses the client's final message to extract the client proof.
* 2. Verifies the client's proof against the expected value.
* 3. Generates the server's final message.
* 4. Sends the server's final message to the client.
*/
bool scram_handle_client_final(ScramState* scram_state, PgCredentials* user, const unsigned char* data, uint32_t datalen);
PgSQL_Data_Stream** myds;

File diff suppressed because it is too large Load Diff

@ -1522,37 +1522,6 @@ void PgSQL_Session::handler_again___new_thread_to_kill_connection() {
// true should jump to handler_again
#define NEXT_IMMEDIATE_NEW(new_st) do { set_status(new_st); return true; } while (0)
#if 0
bool PgSQL_Session::handler_again___verify_backend_multi_statement() {
if ((client_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS) != (mybe->server_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS)) {
if (client_myds->myconn->options.client_flag & CLIENT_MULTI_STATEMENTS)
mybe->server_myds->myconn->options.client_flag |= CLIENT_MULTI_STATEMENTS;
else
mybe->server_myds->myconn->options.client_flag &= ~CLIENT_MULTI_STATEMENTS;
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
NEXT_IMMEDIATE_NEW(SETTING_MULTI_STMT);
}
return false;
}
#endif // 0
bool PgSQL_Session::handler_again___verify_init_connect() {
if (mybe->server_myds->myconn->options.init_connect_sent == false) {
// we needs to set it to true
@ -1574,117 +1543,6 @@ bool PgSQL_Session::handler_again___verify_init_connect() {
return false;
}
#if 0
bool PgSQL_Session::handler_again___verify_backend_session_track_gtids() {
bool ret = false;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->options.session_track_gtids, mybe->server_myds->myconn->options.session_track_gtids);
// we first verify that the backend supports it
// if backend is old (or if it is not pgsql) ignore this setting
if ((mybe->server_myds->myconn->pgsql->server_capabilities & CLIENT_SESSION_TRACKING) == 0) {
// the backend doesn't support CLIENT_SESSION_TRACKING
return ret; // exit immediately
}
uint32_t b_int = mybe->server_myds->myconn->options.session_track_gtids_int;
uint32_t f_int = client_myds->myconn->options.session_track_gtids_int;
// we need to precompute and hardcode the values for OFF and OWN_GTID
// for performance reason we hardcoded the values
// OFF = 114160514
if (
(b_int == 114160514) // OFF
||
(b_int == 0) // not configured yet
) {
if (strcmp(mysql_thread___default_session_track_gtids, (char*)"OWN_GTID") == 0) {
// backend connection doesn't have session_track_gtids enabled
ret = true;
}
else {
if (f_int != 0 && f_int != 114160514) {
// client wants GTID
ret = true;
}
}
}
if (ret) {
// we deprecated handler_again___verify_backend__generic_variable
// and moved the logic here
if (mybe->server_myds->myconn->options.session_track_gtids) { // reset current value
free(mybe->server_myds->myconn->options.session_track_gtids);
mybe->server_myds->myconn->options.session_track_gtids = NULL;
}
// because the only two possible values are OWN_GTID and OFF
// and because we don't mind receiving GTIDs , if we reach here
// it means we are setting it to OWN_GTID, either because the client
// wants it, or because it is the default
// therefore we hardcode "OWN_GTID"
mybe->server_myds->myconn->options.session_track_gtids = strdup((char*)"OWN_GTID");
mybe->server_myds->myconn->options.session_track_gtids_int =
SpookyHash::Hash32((char*)"OWN_GTID", strlen((char*)"OWN_GTID"), 10);
// we now switch status to set session_track_gtids
// Sets the previous status of the PgSQL session according to the current status.
set_previous_status_mode3();
NEXT_IMMEDIATE_NEW(SETTING_SESSION_TRACK_GTIDS);
}
return ret;
}
bool PgSQL_Session::handler_again___verify_ldap_user_variable() {
bool ret = false;
if (mybe->server_myds->myconn->options.ldap_user_variable_sent == false) {
ret = true;
}
if (mybe->server_myds->myconn->options.ldap_user_variable_value == NULL) {
ret = true;
}
if (ret == false) {
if (mybe->server_myds->myconn->options.ldap_user_variable_sent) {
if (client_myds && client_myds->myconn) {
if (client_myds->myconn->userinfo) {
if (client_myds->myconn->userinfo->fe_username) {
if (strcmp(mybe->server_myds->myconn->options.ldap_user_variable_value, client_myds->myconn->userinfo->fe_username)) {
ret = true;
free(mybe->server_myds->myconn->options.ldap_user_variable);
mybe->server_myds->myconn->options.ldap_user_variable = NULL;
free(mybe->server_myds->myconn->options.ldap_user_variable_value);
mybe->server_myds->myconn->options.ldap_user_variable_value = NULL;
mybe->server_myds->myconn->options.ldap_user_variable_sent = false;
}
}
}
}
}
}
if (ret) {
// we needs to set it to true
mybe->server_myds->myconn->options.ldap_user_variable_sent = true;
if (mysql_thread___ldap_user_variable) {
// we send ldap user variable query only if set
mybe->server_myds->myconn->options.ldap_user_variable = strdup(mysql_thread___ldap_user_variable);
switch (status) { // this switch can be replaced with a simple previous_status.push(status), but it is here for readibility
case PROCESSING_QUERY:
previous_status.push(PROCESSING_QUERY);
break;
case PROCESSING_STMT_PREPARE:
previous_status.push(PROCESSING_STMT_PREPARE);
break;
case PROCESSING_STMT_EXECUTE:
previous_status.push(PROCESSING_STMT_EXECUTE);
break;
default:
// LCOV_EXCL_START
assert(0);
break;
// LCOV_EXCL_STOP
}
NEXT_IMMEDIATE_NEW(SETTING_LDAP_USER_VARIABLE);
}
}
return false;
}
#endif // 0
bool PgSQL_Session::handler_again___verify_backend_user_db() {
PgSQL_Data_Stream* myds = mybe->server_myds;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session %p , client: %s , backend: %s\n", this, client_myds->myconn->userinfo->username, mybe->server_myds->myconn->userinfo->username);
@ -1784,193 +1642,6 @@ bool PgSQL_Session::handler_again___status_SETTING_INIT_CONNECT(int* _rc) {
return ret;
}
#if 0
bool PgSQL_Session::handler_again___status_SETTING_LDAP_USER_VARIABLE(int* _rc) {
bool ret = false;
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
myds->DSS = STATE_MARIADB_QUERY;
enum session_status st = status;
if (
(GloMyLdapAuth == NULL) || (use_ldap_auth == false)
||
(client_myds == NULL || client_myds->myconn == NULL || client_myds->myconn->userinfo == NULL)
) { // nothing to do
myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately!
//myds->free_mysql_real_query();
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
}
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc;
if (myconn->async_state_machine == ASYNC_IDLE) {
char* fe = client_myds->myconn->userinfo->fe_username;
char* a = (char*)"SET @%s:='%s'";
if (fe == NULL) {
fe = (char*)"unknown";
}
if (myconn->options.ldap_user_variable_value) {
free(myconn->options.ldap_user_variable_value);
}
myconn->options.ldap_user_variable_value = strdup(fe);
char* buf = (char*)malloc(strlen(fe) + strlen(a) + strlen(myconn->options.ldap_user_variable));
sprintf(buf, a, myconn->options.ldap_user_variable, fe);
rc = myconn->async_send_simple_command(myds->revents, buf, strlen(buf));
free(buf);
}
else { // if async_state_machine is not ASYNC_IDLE , arguments are ignored
rc = myconn->async_send_simple_command(myds->revents, (char*)"", 0);
}
if (rc == 0) {
myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately!
//myds->free_mysql_real_query();
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
}
else {
if (rc == -1) {
// the command failed
int myerr = mysql_errno(myconn->pgsql);
PgHGM->p_update_pgsql_error_counter(
p_pgsql_error_type::pgsql,
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
(myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV)
);
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting LDAP USER VARIABLE", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc = -1; // an error happened, we should destroy the Session
return ret;
}
else {
proxy_warning("Error while setting LDAP USER VARIABLE: %s:%d hg %d : %d, %s\n", myconn->parent->address, myconn->parent->port, current_hostgroup, myerr, mysql_error(myconn->pgsql));
// we won't go back to PROCESSING_QUERY
st = previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql));
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd = 0;
status = WAITING_CLIENT_DATA;
client_myds->DSS = STATE_SLEEP;
}
}
else {
// rc==1 , nothing to do for now
}
}
return ret;
}
bool PgSQL_Session::handler_again___status_SETTING_SQL_LOG_BIN(int* _rc) {
bool ret = false;
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
myds->DSS = STATE_MARIADB_QUERY;
enum session_status st = status;
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
char* query = NULL;
unsigned long query_length = 0;
if (myconn->async_state_machine == ASYNC_IDLE) {
char* q = (char*)"SET SQL_LOG_BIN=%s";
query = (char*)malloc(strlen(q) + 8);
sprintf(query, q, pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN));
query_length = strlen(query);
}
int rc = myconn->async_send_simple_command(myds->revents, query, query_length);
if (query) {
free(query);
query = NULL;
}
if (rc == 0) {
if (!strcmp("0", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN)) || !strcasecmp("OFF", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN))) {
// Pay attention here. STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0 sets sql_log_bin to ZERO:
// - sql_log_bin=0 => true
// - sql_log_bin=1 => false
myconn->set_status(true, STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0);
}
else if (!strcmp("1", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN)) || !strcasecmp("ON", pgsql_variables.client_get_value(this, SQL_SQL_LOG_BIN))) {
myconn->set_status(false, STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0);
}
myds->revents |= POLLOUT; // we also set again POLLOUT to send a query immediately!
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
}
else {
if (rc == -1) {
// the command failed
int myerr = mysql_errno(myconn->pgsql);
PgHGM->p_update_pgsql_error_counter(
p_pgsql_error_type::pgsql,
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
(myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV)
);
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting SQL_LOG_BIN", myconn);
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc = -1; // an error happened, we should destroy the Session
return ret;
}
else {
proxy_warning("Error while setting SQL_LOG_BIN: %s:%d hg %d : %d, %s\n", myconn->parent->address, myconn->parent->port, current_hostgroup, myerr, mysql_error(myconn->pgsql));
// we won't go back to PROCESSING_QUERY
st = previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql));
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd = 0;
RequestEnd(myds);
}
}
else {
// rc==1 , nothing to do for now
}
}
return ret;
}
#endif // 0
bool PgSQL_Session::handler_again___status_CHANGING_CHARSET(int* _rc) {
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
@ -2252,148 +1923,6 @@ bool PgSQL_Session::handler_again___status_SETTING_GENERIC_VARIABLE(int* _rc, co
return ret;
}
#if 0
bool PgSQL_Session::handler_again___status_SETTING_MULTI_STMT(int* _rc) {
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
enum session_status st = status;
bool ret = false;
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc = myconn->async_set_option(myds->revents, myconn->options.client_flag & CLIENT_MULTI_STATEMENTS);
if (rc == 0) {
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
}
else {
if (rc == -1) {
// the command failed
int myerr = mysql_errno(myconn->pgsql);
PgHGM->p_update_pgsql_error_counter(
p_pgsql_error_type::pgsql,
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
(myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV)
);
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "while setting MYSQL_OPTION_MULTI_STATEMENTS", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc = -1; // an error happened, we should destroy the Session
return ret;
}
else {
proxy_warning("Error during MYSQL_OPTION_MULTI_STATEMENTS : %d, %s\n", myerr, mysql_error(myconn->pgsql));
// we won't go back to PROCESSING_QUERY
st = previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql));
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd = 0;
RequestEnd(myds);
}
}
else {
// rc==1 , nothing to do for now
}
}
return ret;
}
bool PgSQL_Session::handler_again___status_SETTING_SESSION_TRACK_GTIDS(int* _rc) {
bool ret = false;
assert(mybe->server_myds->myconn);
ret = handler_again___status_SETTING_GENERIC_VARIABLE(_rc, (char*)"SESSION_TRACK_GTIDS", mybe->server_myds->myconn->options.session_track_gtids, true);
return ret;
}
bool PgSQL_Session::handler_again___status_CHANGING_SCHEMA(int* _rc) {
bool ret = false;
//fprintf(stderr,"CHANGING_SCHEMA\n");
assert(mybe->server_myds->myconn);
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
myds->DSS = STATE_MARIADB_QUERY;
enum session_status st = status;
if (myds->mypolls == NULL) {
thread->mypolls.add(POLLIN | POLLOUT, mybe->server_myds->fd, mybe->server_myds, thread->curtime);
}
int rc = myconn->async_select_db(myds->revents);
if (rc == 0) {
//__sync_fetch_and_add(&PgHGM->status.backend_init_db, 1);
myds->myconn->userinfo->set(client_myds->myconn->userinfo);
myds->DSS = STATE_MARIADB_GENERIC;
st = previous_status.top();
previous_status.pop();
NEXT_IMMEDIATE_NEW(st);
}
else {
if (rc == -1) {
// the command failed
int myerr = mysql_errno(myconn->pgsql);
PgHGM->p_update_pgsql_error_counter(
p_pgsql_error_type::pgsql,
myconn->parent->myhgc->hid,
myconn->parent->address,
myconn->parent->port,
(myerr ? myerr : ER_PROXYSQL_OFFLINE_SRV)
);
if (myerr >= 2000 || myerr == 0) {
bool retry_conn = false;
// client error, serious
detected_broken_connection(__FILE__, __LINE__, __func__, "during INIT_DB", myconn);
//if ((myds->myconn->reusable==true) && ((myds->myprot.prot_status & SERVER_STATUS_IN_TRANS)==0)) {
if ((myds->myconn->reusable == true) && myds->myconn->IsActiveTransaction() == false && myds->myconn->MultiplexDisabled() == false) {
retry_conn = true;
}
myds->destroy_MySQL_Connection_From_Pool(false);
myds->fd = 0;
if (retry_conn) {
myds->DSS = STATE_NOT_INITIALIZED;
NEXT_IMMEDIATE_NEW(CONNECTING_SERVER);
}
*_rc = -1; // an error happened, we should destroy the Session
return ret;
}
else {
proxy_warning("Error during INIT_DB: %d, %s\n", myerr, mysql_error(myconn->pgsql));
// we won't go back to PROCESSING_QUERY
st = previous_status.top();
previous_status.pop();
char sqlstate[10];
sprintf(sqlstate, "%s", mysql_sqlstate(myconn->pgsql));
client_myds->myprot.generate_pkt_ERR(true, NULL, NULL, 1, mysql_errno(myconn->pgsql), sqlstate, mysql_error(myconn->pgsql));
myds->destroy_MySQL_Connection_From_Pool(true);
myds->fd = 0;
RequestEnd(myds);
}
}
else {
// rc==1 , nothing to do for now
}
}
return false;
}
#endif // 0
bool PgSQL_Session::handler_again___status_CONNECTING_SERVER(int* _rc) {
//fprintf(stderr,"CONNECTING_SERVER\n");
unsigned long long curtime = monotonic_time();
@ -3493,15 +3022,6 @@ __get_pkts_from_client:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
#if 0
if (GloMyLdapAuth) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) {
add_ldap_comment_to_pkt(&pkt);
}
}
}
#endif // 0
mybe->server_myds->mysql_real_query.init(&pkt);
mybe->server_myds->statuses.questions++;
client_myds->setDSS_STATE_QUERY_SENT_NET();
@ -3713,15 +3233,6 @@ __get_pkts_from_client:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Received query to be processed with MariaDB Client library\n");
mybe->server_myds->killed_at = 0;
mybe->server_myds->kill_type = 0;
#if 0
if (GloMyLdapAuth) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) {
add_ldap_comment_to_pkt(&pkt);
}
}
}
#endif // 0
mybe->server_myds->mysql_real_query.init(&pkt);
mybe->server_myds->statuses.questions++;
client_myds->setDSS_STATE_QUERY_SENT_NET();
@ -3731,142 +3242,11 @@ __get_pkts_from_client:
}
break;
case _MYSQL_COM_STMT_PREPARE:
#if 0
if (GloMyLdapAuth) {
if (session_type == PROXYSQL_SESSION_PGSQL) {
if (mysql_thread___add_ldap_user_comment && strlen(mysql_thread___add_ldap_user_comment)) {
add_ldap_comment_to_pkt(&pkt);
}
}
}
#endif // 0
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_PREPARE(pkt);
break;
case _MYSQL_COM_STMT_EXECUTE:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_EXECUTE(pkt);
break;
#if 0
case _MYSQL_COM_STMT_RESET:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(pkt);
break;
case _MYSQL_COM_STMT_CLOSE:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(pkt);
break;
case _MYSQL_COM_STMT_SEND_LONG_DATA:
handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_SEND_LONG_DATA(pkt);
break;
case _MYSQL_COM_BINLOG_DUMP:
case _MYSQL_COM_BINLOG_DUMP_GTID:
case _MYSQL_COM_REGISTER_SLAVE:
// In this switch we handle commands that download binlog events from MySQL
// servers. For these commands a lot of the features provided by ProxySQL
// aren't useful, like multiplexing, query parsing, etc. For this reason,
// ProxySQL enables fast_forward when it receives these commands. 
{
// we use a switch to write the command in the info message
std::string q = "Received command ";
switch ((enum_mysql_command)c) {
case _MYSQL_COM_BINLOG_DUMP:
q += "MYSQL_COM_BINLOG_DUMP";
break;
case _MYSQL_COM_BINLOG_DUMP_GTID:
q += "MYSQL_COM_BINLOG_DUMP_GTID";
break;
case _MYSQL_COM_REGISTER_SLAVE:
q += "MYSQL_COM_REGISTER_SLAVE";
break;
default:
assert(0);
break;
};
// we add the client details in the info message
if (client_myds && client_myds->addr.addr) {
q += " from client " + std::string(client_myds->addr.addr) + ":" + std::to_string(client_myds->addr.port);
}
q += " . Changing session fast_forward to true";
proxy_info("%s\n", q.c_str());
}
session_fast_forward = true;
if (client_myds->PSarrayIN->len) {
proxy_error("UNEXPECTED PACKET FROM CLIENT -- PLEASE REPORT A BUG\n");
assert(0);
}
client_myds->PSarrayIN->add(pkt.ptr, pkt.size);
// The following code prepares the session as if it was configured with fast
// forward before receiving the command. This way the state machine will
// handle the command automatically.
current_hostgroup = previous_hostgroup;
mybe = find_or_create_backend(current_hostgroup); // set a backend
mybe->server_myds->reinit_queues(); // reinitialize the queues in the myds . By default, they are not active
// We reinitialize the 'wait_until' since this session shouldn't wait for processing as
// we are now transitioning to 'FAST_FORWARD'.
mybe->server_myds->wait_until = 0;
if (mybe->server_myds->DSS == STATE_NOT_INITIALIZED) {
// NOTE: This section is entirely borrowed from 'STATE_SLEEP' for 'session_fast_forward'.
// Check comments there for extra information.
// =============================================================================
if (mybe->server_myds->max_connect_time == 0) {
uint64_t connect_timeout =
pgsql_thread___connect_timeout_server < pgsql_thread___connect_timeout_server_max ?
pgsql_thread___connect_timeout_server_max : pgsql_thread___connect_timeout_server;
mybe->server_myds->max_connect_time = thread->curtime + connect_timeout * 1000;
}
mybe->server_myds->connect_retries_on_failure = pgsql_thread___connect_retries_on_failure;
CurrentQuery.start_time = thread->curtime;
// =============================================================================
// we don't have a connection
previous_status.push(FAST_FORWARD); // next status will be FAST_FORWARD
set_status(CONNECTING_SERVER); // now we need a connection
}
else {
// In case of having a connection, we need to make user to reset the state machine
// for current server 'PgSQL_Data_Stream', setting it outside of any state handled
// by 'mariadb' library. Otherwise 'MySQL_Thread' will threat this
// 'PgSQL_Data_Stream' as library handled.
mybe->server_myds->DSS = STATE_READY;
// myds needs to have encrypted value set correctly
{
PgSQL_Data_Stream* myds = mybe->server_myds;
PgSQL_Connection* myconn = myds->myconn;
assert(myconn != NULL);
// PMC-10005
// if backend connection uses SSL we will set
// encrypted = true and we will start using the SSL structure
// directly from P_MARIADB_TLS structure.
MYSQL* pgsql = myconn->pgsql;
if (pgsql && myconn->ret_mysql) {
if (pgsql->options.use_ssl == 1) {
P_MARIADB_TLS* matls = (P_MARIADB_TLS*)pgsql->net.pvio->ctls;
if (matls != NULL) {
myds->encrypted = true;
myds->ssl = (SSL*)matls->ssl;
myds->rbio_ssl = BIO_new(BIO_s_mem());
myds->wbio_ssl = BIO_new(BIO_s_mem());
SSL_set_bio(myds->ssl, myds->rbio_ssl, myds->wbio_ssl);
}
else {
// if pgsql->options.use_ssl == 1 but matls == NULL
// it means that ProxySQL tried to use SSL to connect to the backend
// but the backend didn't support SSL
}
}
}
}
set_status(FAST_FORWARD); // we can set status to FAST_FORWARD
}
break;
case _MYSQL_COM_QUIT:
proxy_debug(PROXY_DEBUG_MYSQL_COM, 5, "Got COM_QUIT packet\n");
if (GloPgSQL_Logger) { GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_AUTH_QUIT, this, NULL); }
l_free(pkt.size, pkt.ptr);
handler_ret = -1;
return handler_ret;
break;
#endif // 0
default:
// in this switch we only handle the most common commands.
// The not common commands are handled by "default" , that
@ -4482,32 +3862,8 @@ handler_again:
if (handler_again___verify_init_connect()) {
goto handler_again;
}
#if 0
if (use_ldap_auth) {
if (handler_again___verify_ldap_user_variable()) {
goto handler_again;
}
}
if (handler_again___verify_backend_autocommit()) {
goto handler_again;
}
#endif // 0
if (locked_on_hostgroup == -1 || locked_on_hostgroup_and_all_variables_set == false) {
#if 0
if (handler_again___verify_backend_multi_statement()) {
goto handler_again;
}
if (handler_again___verify_backend_session_track_gtids()) {
goto handler_again;
}
#endif // 0
// Optimize network traffic when we can use 'SET NAMES'
//if (verify_set_names(this)) {
// goto handler_again;
//}
for (auto i = 0; i < SQL_NAME_LAST_LOW_WM; i++) {
auto client_hash = client_myds->myconn->var_hash[i];
#ifdef DEBUG
@ -4872,28 +4228,9 @@ bool PgSQL_Session::handler_again___multiple_statuses(int* rc) {
case RESETTING_CONNECTION_V2:
ret = handler_again___status_RESETTING_CONNECTION(rc);
break;
#if 0
case CHANGING_AUTOCOMMIT:
ret = handler_again___status_CHANGING_AUTOCOMMIT(rc);
break;
case CHANGING_SCHEMA:
ret = handler_again___status_CHANGING_SCHEMA(rc);
break;
case SETTING_LDAP_USER_VARIABLE:
ret = handler_again___status_SETTING_LDAP_USER_VARIABLE(rc);
break;
#endif // 0
case SETTING_INIT_CONNECT:
ret = handler_again___status_SETTING_INIT_CONNECT(rc);
break;
#if 0
case SETTING_MULTI_STMT:
ret = handler_again___status_SETTING_MULTI_STMT(rc);
break;
case SETTING_SESSION_TRACK_GTIDS:
ret = handler_again___status_SETTING_SESSION_TRACK_GTIDS(rc);
break;
#endif // 0
case SETTING_SET_NAMES:
ret = handler_again___status_CHANGING_CHARSET(rc);
break;
@ -4903,74 +4240,6 @@ bool PgSQL_Session::handler_again___multiple_statuses(int* rc) {
return ret;
}
/*
void PgSQL_Session::handler___status_CHANGING_USER_CLIENT___STATE_CLIENT_HANDSHAKE(PtrSize_t *pkt, bool *wrong_pass) {
// FIXME: no support for SSL yet
if (
client_myds->myprot.process_pkt_auth_swich_response((unsigned char *)pkt->ptr,pkt->size)==true
) {
l_free(pkt->size,pkt->ptr);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . Successful connection\n", this, client_myds);
client_myds->myprot.generate_pkt_OK(true,NULL,NULL,2,0,0,0,0,NULL);
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_CHANGE_USER_OK, this, NULL);
status=WAITING_CLIENT_DATA;
client_myds->DSS=STATE_SLEEP;
} else {
l_free(pkt->size,pkt->ptr);
*wrong_pass=true;
// FIXME: this should become close connection
client_myds->setDSS_STATE_QUERY_SENT_NET();
char *client_addr=NULL;
if (client_myds->client_addr) {
char buf[512];
switch (client_myds->client_addr->sa_family) {
case AF_INET: {
struct sockaddr_in *ipv4 = (struct sockaddr_in *)client_myds->client_addr;
if (ipv4->sin_port) {
inet_ntop(client_myds->client_addr->sa_family, &ipv4->sin_addr, buf, INET_ADDRSTRLEN);
client_addr = strdup(buf);
} else {
client_addr = strdup((char *)"localhost");
}
break;
}
case AF_INET6: {
struct sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)client_myds->client_addr;
inet_ntop(client_myds->client_addr->sa_family, &ipv6->sin6_addr, buf, INET6_ADDRSTRLEN);
client_addr = strdup(buf);
break;
}
default:
client_addr = strdup((char *)"localhost");
break;
}
} else {
client_addr = strdup((char *)"");
}
char *_s=(char *)malloc(strlen(client_myds->myconn->userinfo->username)+100+strlen(client_addr));
sprintf(_s,"ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
proxy_error("ProxySQL Error: Access denied for user '%s'@'%s' (using password: %s)", client_myds->myconn->userinfo->username, client_addr, (client_myds->myconn->userinfo->password ? "YES" : "NO"));
client_myds->myprot.generate_pkt_ERR(true,NULL,NULL,2,1045,(char *)"28000", _s, true);
#ifdef DEBUG
if (client_myds->myconn->userinfo->password) {
char *tmp_pass=strdup(client_myds->myconn->userinfo->password);
int lpass = strlen(tmp_pass);
for (int i=2; i<lpass-1; i++) {
tmp_pass[i]='*';
}
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . Wrong credentials for frontend: %s:%s . Password=%s . Disconnecting\n", this, client_myds, client_myds->myconn->userinfo->username, client_addr, tmp_pass);
free(tmp_pass);
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Session=%p , DS=%p . Wrong credentials for frontend: %s:%s . No password. Disconnecting\n", this, client_myds, client_myds->myconn->userinfo->username, client_addr);
}
#endif //DEBUG
GloPgSQL_Logger->log_audit_entry(PROXYSQL_MYSQL_CHANGE_USER_ERR, this, NULL);
free(_s);
__sync_fetch_and_add(&PgHGM->status.access_denied_wrong_password, 1);
}
}
*/
void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(PtrSize_t* pkt, bool* wrong_pass) {
bool is_encrypted = client_myds->encrypted;
bool handshake_response_return = false;
@ -5076,16 +4345,6 @@ void PgSQL_Session::handler___status_CONNECTING_CLIENT___STATE_SERVER_HANDSHAKE(
l_free(pkt->size, pkt->ptr);
//if (client_myds->encrypted==false) {
if (client_myds->myconn->userinfo->dbname == NULL) {
#if 0
#ifdef PROXYSQLCLICKHOUSE
if (session_type == PROXYSQL_SESSION_CLICKHOUSE) {
if (strlen(default_schema) == 0) {
free(default_schema);
default_schema = strdup((char*)"default");
}
}
#endif /* PROXYSQLCLICKHOUSE */
#endif
client_myds->myconn->userinfo->set_dbname(default_schema);
}
int free_users = 0;
@ -7378,58 +6637,6 @@ bool PgSQL_Session::handle_command_query_kill(PtrSize_t* pkt) {
return false;
}
#if 0
void PgSQL_Session::add_ldap_comment_to_pkt(PtrSize_t* _pkt) {
if (GloMyLdapAuth == NULL)
return;
if (use_ldap_auth == false)
return;
if (client_myds == NULL || client_myds->myconn == NULL || client_myds->myconn->userinfo == NULL)
return;
if (client_myds->myconn->userinfo->fe_username == NULL)
return;
char* fe = client_myds->myconn->userinfo->fe_username;
char* a = (char*)" /* %s=%s */";
char* b = (char*)malloc(strlen(a) + strlen(fe) + strlen(mysql_thread___add_ldap_user_comment));
sprintf(b, a, mysql_thread___add_ldap_user_comment, fe);
PtrSize_t _new_pkt;
_new_pkt.ptr = malloc(strlen(b) + _pkt->size);
memcpy(_new_pkt.ptr, _pkt->ptr, 5);
unsigned char* _c = (unsigned char*)_new_pkt.ptr;
_c += 5;
void* idx = memchr((char*)_pkt->ptr + 5, ' ', _pkt->size - 5);
if (idx) {
size_t first_word_len = (char*)idx - (char*)_pkt->ptr - 5;
if (((char*)_pkt->ptr + 5)[0] == '/' && ((char*)_pkt->ptr + 5)[1] == '*') {
void* comment_endpos = memmem(static_cast<char*>(_pkt->ptr) + 7, _pkt->size - 7, "*/", strlen("*/"));
if (comment_endpos == NULL || idx < comment_endpos) {
b[1] = ' ';
b[2] = ' ';
b[strlen(b) - 1] = ' ';
b[strlen(b) - 2] = ' ';
}
}
memcpy(_c, (char*)_pkt->ptr + 5, first_word_len);
_c += first_word_len;
memcpy(_c, b, strlen(b));
_c += strlen(b);
memcpy(_c, (char*)idx, _pkt->size - 5 - first_word_len);
}
else {
memcpy(_c, (char*)_pkt->ptr + 5, _pkt->size - 5);
_c += _pkt->size - 5;
memcpy(_c, b, strlen(b));
}
l_free(_pkt->size, _pkt->ptr);
_pkt->size = _pkt->size + strlen(b);
_pkt->ptr = _new_pkt.ptr;
free(b);
CurrentQuery.QueryLength = _pkt->size - 5;
CurrentQuery.QueryPointer = (unsigned char*)_pkt->ptr + 5;
}
#endif // 0
void PgSQL_Session::finishQuery(PgSQL_Data_Stream* myds, PgSQL_Connection* myconn, bool prepared_stmt_with_no_params) {
myds->myconn->reduce_auto_increment_delay_token();
if (locked_on_hostgroup >= 0) {
@ -7582,54 +6789,6 @@ void PgSQL_Session::unable_to_parse_set_statement(bool* lock_hostgroup) {
}
}
#if 0
void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_RESET(PtrSize_t& pkt) {
uint32_t stmt_global_id = 0;
memcpy(&stmt_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t));
SLDH->reset(stmt_global_id);
l_free(pkt.size, pkt.ptr);
client_myds->setDSS_STATE_QUERY_SENT_NET();
unsigned int nTrx = NumActiveTransactions();
uint16_t setStatus = (nTrx ? SERVER_STATUS_IN_TRANS : 0);
if (autocommit) setStatus |= SERVER_STATUS_AUTOCOMMIT;
client_myds->myprot.generate_pkt_OK(true, NULL, NULL, 1, 0, 0, setStatus, 0, NULL);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
}
void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_CLOSE(PtrSize_t& pkt) {
uint32_t client_global_id = 0;
memcpy(&client_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t));
// FIXME: no input validation
uint64_t stmt_global_id = 0;
stmt_global_id = client_myds->myconn->local_stmts->find_global_stmt_id_from_client(client_global_id);
SLDH->reset(client_global_id);
if (stmt_global_id) {
sess_STMTs_meta->erase(stmt_global_id);
}
client_myds->myconn->local_stmts->client_close(client_global_id);
l_free(pkt.size, pkt.ptr);
// FIXME: this is not complete. Counters should be decreased
thread->status_variables.stvar[st_var_frontend_stmt_close]++;
thread->status_variables.stvar[st_var_queries]++;
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
}
void PgSQL_Session::handler___status_WAITING_CLIENT_DATA___STATE_SLEEP___MYSQL_COM_STMT_SEND_LONG_DATA(PtrSize_t& pkt) {
// FIXME: no input validation
uint32_t stmt_global_id = 0;
memcpy(&stmt_global_id, (char*)pkt.ptr + 5, sizeof(uint32_t));
uint32_t stmt_param_id = 0;
memcpy(&stmt_param_id, (char*)pkt.ptr + 9, sizeof(uint16_t));
SLDH->add(stmt_global_id, stmt_param_id, (char*)pkt.ptr + 11, pkt.size - 11);
client_myds->DSS = STATE_SLEEP;
status = WAITING_CLIENT_DATA;
l_free(pkt.size, pkt.ptr);
}
#endif // 0
void PgSQL_Session::detected_broken_connection(const char* file, unsigned int line, const char* func, const char* action, PgSQL_Connection* myconn, bool verbose) {
const char* code = PgSQL_Error_Helper::get_error_code(PGSQL_ERROR_CODES::ERRCODE_RAISE_EXCEPTION);;

@ -5190,9 +5190,6 @@ PgSQL_Connection* PgSQL_Thread::get_MyConn_local(unsigned int _hid, PgSQL_Sessio
if (it != parents.end()) {
// we didn't exclude this server (yet?)
bool gtid_found = false;
#if 0
gtid_found = PgHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid);
#endif // 0
if (gtid_found) { // this server has the correct GTID
c = (PgSQL_Connection*)cached_connections->remove_index_fast(i);
return c;

Loading…
Cancel
Save