You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
proxysql/lib/PgSQL_Data_Stream.cpp

1276 lines
38 KiB

#include "proxysql.h"
#include "cpp.h"
#include <zlib.h>
#ifndef UNIX_PATH_MAX
#define UNIX_PATH_MAX 108
#endif
#include "PgSQL_PreparedStatement.h"
#include "PgSQL_Data_Stream.h"
#include "openssl/x509v3.h"
#define RESULTSET_BUFLEN_DS_16K 16000
#define RESULTSET_BUFLEN_DS_1M 1000*1024
extern PgSQL_Threads_Handler* GloPTH;
#ifdef DEBUG
static void __dump_pkt(const char* func, unsigned char* _ptr, unsigned int len) {
if (GloVars.global.gdbg == 0) return;
if (GloVars.global.gdbg_lvl[PROXY_DEBUG_PKT_ARRAY].verbosity < 8) return;
unsigned int i;
fprintf(stderr, "DUMP %d bytes FROM %s\n", len, func);
for (i = 0; i < len; i++) {
if (isprint(_ptr[i])) fprintf(stderr, "%c", _ptr[i]); else fprintf(stderr, ".");
if (i > 0 && (i % 16 == 15 || i == len - 1)) {
unsigned int j;
if (i % 16 != 15) {
j = 15 - i % 16;
while (j--) fprintf(stderr, " ");
}
fprintf(stderr, " --- ");
for (j = (i == len - 1 ? ((int)(i / 16)) * 16 : i - 15); j <= i; j++) {
fprintf(stderr, "%02x ", _ptr[j]);
}
fprintf(stderr, "\n");
}
}
fprintf(stderr, "\n\n");
}
#endif
#define queue_init(_q,_s) { \
_q.size=_s; \
_q.buffer=malloc(_q.size); \
_q.head=0; \
_q.tail=0; \
_q.partial=0; \
_q.pkt.ptr=NULL; \
_q.pkt.size=0; \
}
#define queue_destroy(_q) { \
if (_q.buffer) free(_q.buffer); \
_q.buffer=NULL; \
if (_q.pkt.ptr) { \
l_free(_q.pkt.size,_q.pkt.ptr); \
queueOUT.pkt.ptr=NULL; \
} \
}
#define queue_zero(_q) { \
if (_q.tail != 0) { \
memcpy(_q.buffer, (unsigned char *)_q.buffer + _q.tail, _q.head - _q.tail); \
} \
_q.head-=_q.tail; \
_q.tail=0; \
}
#define queue_available(_q) (_q.size-_q.head)
#define queue_data(_q) (_q.head-_q.tail)
#define queue_r(_q, _s) { \
_q.tail+=_s; \
if (_q.tail==_q.head) { \
_q.head=0; \
_q.tail=0; \
} \
}
#define queue_w(_q,_s) (_q.head+=_s)
#define queue_r_ptr(_q) ((unsigned char *)_q.buffer+_q.tail)
#define queue_w_ptr(_q) ((unsigned char *)_q.buffer+_q.head)
#define add_to_data_packet_history(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \
if (static_cast<int>(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \
_o.set_max_size(GloVars.global.data_packets_history_size); \
} \
_o.push(_p,_s);\
}
// memory deallocation responsibility is now transferred to the queue as the buffer is directly assigned to it.
// if the size of data_packet_history is 0, the memory will be released.
#define add_to_data_packet_history_without_alloc(_o,_p,_s) if (unlikely(GloVars.global.data_packets_history_size)) { \
if (static_cast<int>(_o.get_max_size()) != GloVars.global.data_packets_history_size) { \
_o.set_max_size(GloVars.global.data_packets_history_size); \
} \
_o.push<false>(_p,_s);\
} else { \
l_free(_s,_p); \
}
//enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL};
static enum pgsql_sslstatus get_sslstatus(SSL* ssl, int n)
{
int err = SSL_get_error(ssl, n);
ERR_clear_error();
switch (err) {
case SSL_ERROR_NONE:
return PGSQL_SSLSTATUS_OK;
case SSL_ERROR_WANT_WRITE:
case SSL_ERROR_WANT_READ:
return PGSQL_SSLSTATUS_WANT_IO;
case SSL_ERROR_ZERO_RETURN:
case SSL_ERROR_SYSCALL:
default:
return PGSQL_SSLSTATUS_FAIL;
}
}
void PgSQL_Data_Stream::queue_encrypted_bytes(const char* buf, size_t len) {
ssl_write_buf = (char*)realloc(ssl_write_buf, ssl_write_len + len);
memcpy(ssl_write_buf + ssl_write_len, buf, len);
ssl_write_len += len;
//proxy_info("New ssl_write_len size: %u\n", ssl_write_len);
}
enum pgsql_sslstatus PgSQL_Data_Stream::do_ssl_handshake() {
char buf[MY_SSL_BUFFER];
enum pgsql_sslstatus status;
int n = SSL_do_handshake(ssl);
if (n == 1) {
//proxy_info("SSL handshake completed\n");
X509* cert;
cert = SSL_get_peer_certificate(ssl);
if (cert) {
GENERAL_NAMES* alt_names = (stack_st_GENERAL_NAME*)X509_get_ext_d2i((X509*)cert, NID_subject_alt_name, 0, 0);
int alt_name_count = sk_GENERAL_NAME_num(alt_names);
// Iterate all the SAN names, looking for SPIFFE identifier
for (int i = 0; i < alt_name_count; i++) {
GENERAL_NAME* san = sk_GENERAL_NAME_value(alt_names, i);
// We only care about URI names
if (san->type == GEN_URI) {
if (san->d.uniformResourceIdentifier->data) {
const char* resource_data =
reinterpret_cast<const char*>(san->d.uniformResourceIdentifier->data);
const char* spiffe_loc = strstr(resource_data, "spiffe");
// First name starting with 'spiffe' is considered the match.
if (spiffe_loc == resource_data) {
x509_subject_alt_name = strdup(resource_data);
}
}
}
}
sk_GENERAL_NAME_pop_free(alt_names, GENERAL_NAME_free);
X509_free(cert);
}
else {
// we currently disable this annoying error
// in future we can configure this as per user level, specifying if the certificate is mandatory or not
// see issue #3424
//proxy_error("X509 error: no required certificate sent by client\n");
}
// In case the supplied certificate has a 'SAN'-'URI' identifier
// starting with 'spiffe', client certificate verification is performed.
if (x509_subject_alt_name != NULL) {
long rc = SSL_get_verify_result(ssl);
if (rc != X509_V_OK) {
proxy_error("Disconnecting %s:%d: X509 client SSL certificate verify error: (%ld:%s)\n", addr.addr, addr.port, rc, X509_verify_cert_error_string(rc));
return PGSQL_SSLSTATUS_FAIL;
}
}
}
status = get_sslstatus(ssl, n);
//proxy_info("SSL status = %d\n", status);
/* Did SSL request to write bytes? */
if (status == PGSQL_SSLSTATUS_WANT_IO) {
//proxy_info("SSL status is WANT_IO %d\n", status);
do {
n = BIO_read(wbio_ssl, buf, sizeof(buf));
//proxy_info("BIO read = %d\n", n);
if (n > 0) {
//proxy_info("Queuing %d encrypted bytes\n", n);
queue_encrypted_bytes(buf, n);
}
else if (!BIO_should_retry(wbio_ssl)) {
//proxy_info("BIO_should_retry failed\n");
return PGSQL_SSLSTATUS_FAIL;
}
} while (n > 0);
}
return status;
}
void* PgSQL_Data_Stream::operator new(size_t size) {
return l_alloc(size);
}
void PgSQL_Data_Stream::operator delete(void* ptr) {
l_free(sizeof(PgSQL_Data_Stream), ptr);
}
// Constructor
PgSQL_Data_Stream::PgSQL_Data_Stream() {
bytes_info.bytes_recv = 0;
bytes_info.bytes_sent = 0;
pkts_recv = 0;
pkts_sent = 0;
client_addr = NULL;
addr.addr = NULL;
addr.port = 0;
proxy_addr.addr = NULL;
proxy_addr.port = 0;
sess = NULL;
pgsql_real_query.pkt.ptr = NULL;
pgsql_real_query.pkt.size = 0;
pgsql_real_query.QueryPtr = NULL;
pgsql_real_query.QuerySize = 0;
query_retries_on_failure = 0;
connect_retries_on_failure = 0;
max_connect_time = 0;
wait_until = 0;
pause_until = 0;
kill_type = 0;
cancel_query = false;
connect_tries = 0;
poll_fds_idx = -1;
//resultset_length = 0;
revents = 0;
PSarrayIN = NULL;
PSarrayOUT = NULL;
//resultset = NULL;
queue_init(queueIN, QUEUE_T_DEFAULT_SIZE);
queue_init(queueOUT, QUEUE_T_DEFAULT_SIZE);
mybe = NULL;
active = 1;
mypolls = NULL;
myconn = NULL; // 20141011
DSS = STATE_NOT_CONNECTED;
encrypted = false;
switching_auth_stage = 0;
switching_auth_type = 0;
x509_subject_alt_name = NULL;
ssl = NULL;
rbio_ssl = NULL;
wbio_ssl = NULL;
ssl_write_len = 0;
ssl_write_buf = NULL;
net_failure = false;
CompPktIN.pkt.ptr = NULL;
CompPktIN.pkt.size = 0;
CompPktIN.partial = 0;
CompPktOUT.pkt.ptr = NULL;
CompPktOUT.pkt.size = 0;
CompPktOUT.partial = 0;
//multi_pkt.ptr = NULL;
//multi_pkt.size = 0;
statuses.questions = 0;
statuses.pgconnpoll_get = 0;
statuses.pgconnpoll_put = 0;
com_field_wild = NULL;
scram_state = nullptr;
}
// Destructor
PgSQL_Data_Stream::~PgSQL_Data_Stream() {
queue_destroy(queueIN);
queue_destroy(queueOUT);
if (client_addr) {
free(client_addr);
client_addr = NULL;
}
if (addr.addr) {
free(addr.addr);
addr.addr = NULL;
}
if (proxy_addr.addr) {
free(proxy_addr.addr);
proxy_addr.addr = NULL;
}
free_pgsql_real_query();
if (com_field_wild) {
free(com_field_wild);
com_field_wild = NULL;
}
proxy_debug(PROXY_DEBUG_NET, 1, "Shutdown Data Stream. Session=%p, DataStream=%p\n", sess, this);
PtrSize_t pkt;
if (PSarrayIN) {
while (PSarrayIN->len) {
PSarrayIN->remove_index_fast(0, &pkt);
l_free(pkt.size, pkt.ptr);
}
delete PSarrayIN;
}
if (PSarrayOUT) {
while (PSarrayOUT->len) {
PSarrayOUT->remove_index_fast(0, &pkt);
l_free(pkt.size, pkt.ptr);
}
delete PSarrayOUT;
}
if (mypolls) mypolls->remove_index_fast(poll_fds_idx);
if (fd > 0) {
// // Changing logic here. The socket should be closed only if it is not a backend
if (myds_type == MYDS_FRONTEND) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess:%p , MYDS:%p , PgSQL_Connection %p %s: shutdown socket\n", sess, this, myconn, (myconn ? "not reusable" : "is empty"));
shut_hard();
}
}
// Commenting the follow line of code and adding an assert. We should ensure that if a myconn exists it should be removed *before*
if (myds_type == MYDS_BACKEND || myds_type == MYDS_BACKEND_NOT_CONNECTED) {
assert(myconn == NULL);
}
if ((myconn) && (myds_type == MYDS_FRONTEND)) { delete myconn; myconn = NULL; }
if (encrypted) {
if (ssl) {
// NOTE: SSL standard requires a final 'close_notify' alert on socket
// shutdown. But for avoiding any kind of locking IO waiting for the
// other part, we perform a 'quiet' shutdown. For more context see
// MYSQL #29579.
SSL_set_quiet_shutdown(ssl, 1);
if (SSL_shutdown(ssl) < 0)
ERR_clear_error();
}
if (ssl) SSL_free(ssl);
}
//if (multi_pkt.ptr) {
// l_free(multi_pkt.size, multi_pkt.ptr);
// multi_pkt.ptr = NULL;
// multi_pkt.size = 0;
//}
if (CompPktIN.pkt.ptr) {
l_free(CompPktIN.pkt.size, CompPktIN.pkt.ptr);
CompPktIN.pkt.ptr = NULL;
CompPktIN.pkt.size = 0;
}
if (CompPktOUT.pkt.ptr) {
l_free(CompPktOUT.pkt.size, CompPktOUT.pkt.ptr);
CompPktOUT.pkt.ptr = NULL;
CompPktOUT.pkt.size = 0;
}
if (x509_subject_alt_name) {
free(x509_subject_alt_name);
x509_subject_alt_name = NULL;
}
free_scram_state(scram_state);
}
// this function initializes a PgSQL_Data_Stream
void PgSQL_Data_Stream::init() {
if (myds_type != MYDS_LISTENER) {
proxy_debug(PROXY_DEBUG_NET, 1, "Init Data Stream. Session=%p, DataStream=%p -- type %d\n", sess, this, myds_type);
if (PSarrayIN == NULL) PSarrayIN = new PtrSizeArray();
if (PSarrayOUT == NULL) PSarrayOUT = new PtrSizeArray();
// if (PSarrayOUTpending==NULL) PSarrayOUTpending= new PtrSizeArray();
//if (resultset == NULL) resultset = new PtrSizeArray();
if (unlikely(GloVars.global.data_packets_history_size)) {
data_packets_history_IN.set_max_size(GloVars.global.data_packets_history_size);
data_packets_history_OUT.set_max_size(GloVars.global.data_packets_history_size);
}
}
if (myds_type != MYDS_FRONTEND) {
queue_destroy(queueIN);
queue_destroy(queueOUT);
}
}
void PgSQL_Data_Stream::reinit_queues() {
if (queueIN.buffer == NULL)
queue_init(queueIN, QUEUE_T_DEFAULT_SIZE);
if (queueOUT.buffer == NULL)
queue_init(queueOUT, QUEUE_T_DEFAULT_SIZE);
}
// this function initializes a PgSQL_Data_Stream with arguments
void PgSQL_Data_Stream::init(PgSQL_DS_type _type, PgSQL_Session* _sess, int _fd) {
myds_type = _type;
sess = _sess;
init();
fd = _fd;
proxy_debug(PROXY_DEBUG_NET, 1, "Initialized Data Stream. Session=%p, DataStream=%p, type=%d, fd=%d, myconn=%p\n", sess, this, myds_type, fd, myconn);
//if (myconn==NULL) myconn = new PgSQL_Connection();
if (myconn) myconn->fd = fd;
}
// Soft shutdown of socket : it only deactivate the data stream
// TODO: should check the status of the data stream, and identify if it is safe to reconnect or if the session should be destroyed
void PgSQL_Data_Stream::shut_soft() {
proxy_debug(PROXY_DEBUG_NET, 4, "Shutdown soft fd=%d. Session=%p, DataStream=%p\n", fd, sess, this);
active = 0;
set_net_failure();
//if (sess) sess->net_failure=1;
}
// Hard shutdown of socket
void PgSQL_Data_Stream::shut_hard() {
proxy_debug(PROXY_DEBUG_NET, 4, "Shutdown hard fd=%d. Session=%p, DataStream=%p\n", fd, sess, this);
set_net_failure();
if (encrypted) {
// NOTE: SSL standard requires a final 'close_notify' alert on socket
// shutdown. But for avoiding any kind of locking IO waiting for the
// other part, we perform a 'quiet' shutdown. For more context see
// MYSQL #29579.
SSL_set_quiet_shutdown(ssl, 1);
if (SSL_shutdown(ssl) < 0)
ERR_clear_error();
}
if (fd >= 0) {
shutdown(fd, SHUT_RDWR);
close(fd);
fd = -1;
}
}
void PgSQL_Data_Stream::check_data_flow() {
if ((PSarrayIN->len || queue_data(queueIN)) && (PSarrayOUT->len || queue_data(queueOUT))) {
// there is data at both sides of the data stream: this is considered a fatal error
proxy_error("Session=%p, DataStream=%p -- Data at both ends of a PgSQL data stream: IN <%d bytes %d packets> , OUT <%d bytes %d packets>\n", sess, this, PSarrayIN->len, queue_data(queueIN), PSarrayOUT->len, queue_data(queueOUT));
shut_soft();
generate_coredump();
}
if ((myds_type == MYDS_BACKEND) && myconn && (myconn->fd == 0) && (revents & POLLOUT)) {
int rc;
int error;
socklen_t len = sizeof(error);
rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
assert(rc == 0);
if (error == 0) {
myconn->fd = fd; // connect succeeded
}
else {
errno = error;
perror("check_data_flow");
shut_soft();
}
}
}
int PgSQL_Data_Stream::read_from_net() {
if (encrypted) {
//proxy_info("Entering\n");
}
if ((revents & POLLHUP) && ((revents & POLLIN) == 0)) {
// Previously this was (revents & POLLHUP) , but now
// we call shut_soft() only if POLLIN is not set .
//
// This means that if we receive data (POLLIN) we process it
// temporarily ignoring POLLHUP .
// In this way we can intercept a COM_QUIT executed by the client
// before closing the socket
shut_soft();
return -1;
}
// this check was moved after the previous one about POLLHUP,
// otherwise the previous check was never true
if ((revents & POLLIN) == 0) return 0;
int r = 0;
int s = queue_available(queueIN);
if (encrypted) {
// proxy_info("Queue available of %d bytes\n", s);
}
if (encrypted == false) {
if (pkts_recv) {
r = recv(fd, queue_w_ptr(queueIN), s, 0);
}
else {
if (queueIN.partial == 0) {
// we are reading the very first packet
// to avoid issue with SSL, we will only read the header and eventually the first packet
r = recv(fd, queue_w_ptr(queueIN), 5, 0);
if (r == 5) {
// let's try to read a whole packet
unsigned int read_pos = 0;
unsigned char* buff = (unsigned char*)queueIN.buffer;
const uint8_t type8 = buff[0];
if (type8 != 0)
read_pos++;
uint32_t length = 0;
unsigned a, b, c, d;
a = buff[read_pos++];
b = buff[read_pos++];
c = buff[read_pos++];
d = buff[read_pos++];
length = (a << 24) | (b << 16) | (c << 8) | d;
r += recv(fd, queue_w_ptr(queueIN) + 5, length, 0);
}
}
else {
r = recv(fd, queue_w_ptr(queueIN), s, 0);
}
}
}
else { // encrypted == true
/*
if (!SSL_is_init_finished(ssl)) {
int ret = SSL_do_handshake(ssl);
int ret2;
if (ret != 1) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error(ssl, ret);
fprintf(stderr,"%d\n",ret2);
}
return 0;
} else {
r = SSL_read (ssl, queue_w_ptr(queueIN), s);
}
*/
PROXY_TRACE();
if (s < MY_SSL_BUFFER) {
return 0; // no enough space for reads
}
char buf[MY_SSL_BUFFER];
//ssize_t n = read(fd, buf, sizeof(buf));
int n = recv(fd, buf, sizeof(buf), 0);
//proxy_info("SSL recv of %d bytes\n", n);
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p: recv() read %d bytes. num_write: %lu , num_read: %lu\n", sess, n, BIO_number_written(rbio_ssl), BIO_number_read(rbio_ssl));
if (n > 0 || BIO_number_written(rbio_ssl) > BIO_number_read(rbio_ssl)) {
//on_read_cb(buf, (size_t)n);
enum pgsql_sslstatus status;
char buf2[MY_SSL_BUFFER];
int n2;
//enum pgsql_sslstatus pgsql_status;
char* src = buf;
int len = n;
while (len > 0) {
n2 = BIO_write(rbio_ssl, src, len);
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p: write %d bytes into BIO %p, len=%d\n", sess, n2, rbio_ssl, len);
//proxy_info("BIO_write with len = %d and %d bytes\n", len , n2);
if (n2 <= 0) {
shut_soft();
return -1;
}
src += n2;
len -= n2;
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished NOT completed\n");
if (do_ssl_handshake() == PGSQL_SSLSTATUS_FAIL) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft();
return -1;
}
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished yet NOT completed\n");
return 0;
}
}
else {
//proxy_info("SSL_is_init_finished completed\n");
}
}
n2 = SSL_read(ssl, queue_w_ptr(queueIN), s);
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p: read %d bytes from BIO %p into a buffer with %d bytes free\n", sess, n2, rbio_ssl, s);
r = n2;
//proxy_info("Read %d bytes from SSL\n", r);
if (n2 > 0) {
}
/*
do {
n2 = SSL_read(ssl, buf2, sizeof(buf2));
if (n2 > 0) {
}
} while (n > 0);
*/
status = get_sslstatus(ssl, n2);
//proxy_info("SSL status = %d\n", status);
if (status == PGSQL_SSLSTATUS_WANT_IO) {
do {
n2 = BIO_read(wbio_ssl, buf2, sizeof(buf2));
//proxy_info("BIO_read with %d bytes\n", n2);
if (n2 > 0) {
queue_encrypted_bytes(buf2, n2);
}
else if (!BIO_should_retry(wbio_ssl)) {
shut_soft();
return -1;
}
} while (n2 > 0);
}
if (status == PGSQL_SSLSTATUS_FAIL) {
shut_soft();
return -1;
}
}
else {
r = n;
//r += SSL_read (ssl, queue_w_ptr(queueIN), s);
//proxy_info("Read %d bytes from SSL\n", r);
}
}
//__exit_read_from_next:
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p: read %d bytes from fd %d into a buffer of %d bytes free\n", sess, r, fd, s);
//proxy_error("read %d bytes from fd %d into a buffer of %d bytes free\n", r, fd, s);
if (r < 1) {
if (encrypted == false) {
int myds_errno = errno;
if (r == 0 || (r == -1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
shut_soft();
}
}
else {
int ssl_ret = SSL_get_error(ssl, r);
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- session_id: %u , SSL_get_error(): %d , errno: %d\n", sess, this, sess->thread_session_id, ssl_ret, errno);
if (ssl_ret == SSL_ERROR_SYSCALL && (errno == EINTR || errno == EAGAIN)) {
// the read was interrupted, do nothing
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- SSL_get_error() is SSL_ERROR_SYSCALL, errno: %d\n", sess, this, errno);
}
else {
if (r == 0) { // we couldn't read any data
if (revents & POLLIN) {
// If revents is holding either POLLIN, or POLLIN and POLLHUP, but 'recv()' returns 0,
// reading no data, the socket has been already closed by the peer. Due to this we can
// ignore POLLHUP in this check, since we should reach here ONLY if POLLIN was set.
proxy_debug(PROXY_DEBUG_NET, 5, "Session=%p, Datastream=%p -- shutdown soft\n", sess, this);
shut_soft();
}
}
if (ssl_ret != SSL_ERROR_WANT_READ && ssl_ret != SSL_ERROR_WANT_WRITE) shut_soft();
// it seems we end in shut_soft() anyway
}
}
}
else {
queue_w(queueIN, r);
bytes_info.bytes_recv += r;
if (mypolls) mypolls->last_recv[poll_fds_idx] = sess->thread->curtime;
}
return r;
}
int PgSQL_Data_Stream::write_to_net() {
int bytes_io = 0;
int s = queue_data(queueOUT);
int n;
if (encrypted) {
//proxy_info("Data in write buffer: %d bytes\n", s);
}
if (s == 0) {
if (encrypted == false) {
return 0;
}
if (ssl_write_len == 0 && BIO_number_written(wbio_ssl) == BIO_number_read(wbio_ssl)) {
return 0;
}
}
//VALGRIND_DISABLE_ERROR_REPORTING;
// splitting the ternary operation in IF condition for better readability
if (encrypted) {
bytes_io = SSL_write(ssl, queue_r_ptr(queueOUT), s);
//proxy_info("Used SSL_write to write %d bytes\n", bytes_io);
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p, Datastream=%p: SSL_write() wrote %d bytes . queueOUT before: %u\n", sess, this, bytes_io, queue_data(queueOUT));
if (ssl_write_len || BIO_number_written(wbio_ssl) > BIO_number_read(wbio_ssl)) {
//proxy_info("ssl_write_len = %d , num_write = %d , num_read = %d\n", ssl_write_len , wbio_ssl->num_write , wbio_ssl->num_read);
char buf[MY_SSL_BUFFER];
do {
n = BIO_read(wbio_ssl, buf, sizeof(buf));
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p, Datastream=%p: BIO_read() read %d bytes\n", sess, this, n);
//proxy_info("BIO read = %d\n", n);
if (n > 0) {
//proxy_info("Setting %d byte in queue encrypted\n", n);
queue_encrypted_bytes(buf, n);
}
else if (!BIO_should_retry(wbio_ssl)) {
//proxy_info("BIO_should_retry failed\n");
shut_soft();
return -1;
}
} while (n > 0);
}
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p, Datastream=%p: current ssl_write_len is %lu bytes\n", sess, this, ssl_write_len);
if (ssl_write_len) {
n = write(fd, ssl_write_buf, ssl_write_len);
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p, Datastream=%p: write() wrote %d bytes in FD %d\n", sess, this, n, fd);
//proxy_info("Calling write() on SSL: %d\n", n);
if (n > 0) {
if ((size_t)n < ssl_write_len) {
memmove(ssl_write_buf, ssl_write_buf + n, ssl_write_len - n);
}
ssl_write_len -= n;
if (ssl_write_len) {
ssl_write_buf = (char*)realloc(ssl_write_buf, ssl_write_len);
}
else {
free(ssl_write_buf);
ssl_write_buf = NULL;
}
//proxy_info("new ssl_write_len: %u\n", ssl_write_len);
//if (ssl_write_len) {
// return n; // stop here
//} else {
// rc = n; // and continue
//}
//bytes_io += n;
}
else {
int myds_errno = errno;
if (n == 0 || (n == -1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
shut_soft();
return 0;
}
else {
return -1;
}
}
}
}
else {
#ifdef __APPLE__
bytes_io = send(fd, queue_r_ptr(queueOUT), s, 0);
#else
bytes_io = send(fd, queue_r_ptr(queueOUT), s, MSG_NOSIGNAL);
#endif
proxy_debug(PROXY_DEBUG_NET, 7, "Session=%p, Datastream=%p: send() wrote %d bytes in FD %d\n", sess, this, bytes_io, fd);
}
if (encrypted) {
//proxy_info("bytes_io: %d\n", bytes_io);
}
//VALGRIND_ENABLE_ERROR_REPORTING;
if (bytes_io < 0) {
if (encrypted == false) {
if ((poll_fds_idx < 0) || (mypolls->fds[poll_fds_idx].revents & POLLOUT)) { // in write_to_net_poll() we has remove this safety
// so we enforce it here
shut_soft();
}
}
else {
int ssl_ret = SSL_get_error(ssl, bytes_io);
if (ssl_ret != SSL_ERROR_WANT_READ && ssl_ret != SSL_ERROR_WANT_WRITE) shut_soft();
}
}
else {
queue_r(queueOUT, bytes_io);
if (mypolls) mypolls->last_sent[poll_fds_idx] = sess->thread->curtime;
bytes_info.bytes_sent += bytes_io;
}
if (bytes_io > 0) {
if (myds_type == MYDS_FRONTEND) {
if (sess) {
if (sess->thread) {
sess->thread->status_variables.stvar[st_var_queries_frontends_bytes_sent] += bytes_io;
}
}
}
}
return bytes_io;
}
bool PgSQL_Data_Stream::available_data_out() {
int buflen = queue_data(queueOUT);
if (buflen || PSarrayOUT->len) {
return true;
}
return false;
}
void PgSQL_Data_Stream::remove_pollout() {
struct pollfd* _pollfd;
_pollfd = &mypolls->fds[poll_fds_idx];
_pollfd->events = 0;
}
void PgSQL_Data_Stream::set_pollout() {
struct pollfd* _pollfd;
_pollfd = &mypolls->fds[poll_fds_idx];
if (DSS > STATE_MARIADB_BEGIN && DSS < STATE_MARIADB_END) {
_pollfd->events = myconn->wait_events;
}
else {
_pollfd->events = POLLIN;
//if (PSarrayOUT->len || available_data_out() || queueOUT.partial || (encrypted && !SSL_is_init_finished(ssl))) {
if (PSarrayOUT->len || available_data_out() || queueOUT.partial) {
_pollfd->events |= POLLOUT;
}
if (encrypted) {
if (ssl_write_len || BIO_number_written(wbio_ssl) > BIO_number_read(wbio_ssl)) {
_pollfd->events |= POLLOUT;
}
else {
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished NOT completed\n");
if (do_ssl_handshake() == PGSQL_SSLSTATUS_FAIL) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft();
return;
}
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished yet NOT completed\n");
return;
}
_pollfd->events |= POLLOUT;
}
else {
//proxy_info("SSL_is_init_finished completed\n");
}
}
}
}
proxy_debug(PROXY_DEBUG_NET, 1, "Session=%p, DataStream=%p -- Setting poll events %d for FD %d , DSS=%d , myconn=%p\n", sess, this, _pollfd->events, fd, DSS, myconn);
}
int PgSQL_Data_Stream::write_to_net_poll() {
int rc = 0;
if (active == 0) return rc;
/*
if (encrypted && !SSL_is_init_finished(ssl)) {
int ret = SSL_do_handshake(ssl);
int ret2;
if (ret != 1) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error(ssl, ret);
fprintf(stderr,"%d\n",ret2);
}
return 0;
}
*/
if (encrypted) {
if (!SSL_is_init_finished(ssl)) {
//proxy_info("SSL_is_init_finished completed: NO!\n");
if (do_ssl_handshake() == PGSQL_SSLSTATUS_FAIL) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft();
return -1;
}
}
else {
//proxy_info("SSL_is_init_finished completed: YES\n");
}
/*
if (!SSL_is_init_finished(ssl)) {
proxy_info("SSL_is_init_finished completed: NO!\n");
if (fd>0 && sess->session_type == PROXYSQL_SESSION_PGSQL) {
set_pollout();
return 0;
}
}
*/
//proxy_info("ssl_write_len: %u\n", ssl_write_len);
if (ssl_write_len) {
int n = write(fd, ssl_write_buf, ssl_write_len);
//proxy_info("Calling write() on SSL: %d\n", n);
if (n > 0) {
if ((size_t)n < ssl_write_len) {
memmove(ssl_write_buf, ssl_write_buf + n, ssl_write_len - n);
}
ssl_write_len -= n;
if (ssl_write_len) {
ssl_write_buf = (char*)realloc(ssl_write_buf, ssl_write_len);
}
else {
free(ssl_write_buf);
ssl_write_buf = NULL;
}
//proxy_info("new ssl_write_len: %u\n", ssl_write_len);
if (ssl_write_len) {
return n; // stop here
}
else {
rc = n; // and continue
}
}
else {
int myds_errno = errno;
if (n == 0 || (n == -1 && myds_errno != EINTR && myds_errno != EAGAIN)) {
shut_soft();
return 0;
}
else {
return -1;
}
}
}
}
proxy_debug(PROXY_DEBUG_NET, 1, "Session=%p, DataStream=%p --\n", sess, this);
bool call_write_to_net = false;
if (queue_data(queueOUT)) {
call_write_to_net = true;
}
if (call_write_to_net == false) {
if (encrypted) {
if (ssl_write_len || BIO_number_written(wbio_ssl) > BIO_number_read(wbio_ssl)) {
call_write_to_net = true;
}
}
}
if (call_write_to_net) {
if (sess->session_type == PROXYSQL_SESSION_PGSQL) {
if (poll_fds_idx > -1) { // NOTE: attempt to force writes
if (net_failure == false)
rc += write_to_net();
}
}
else {
rc += write_to_net();
}
}
if (fd > 0 && sess->session_type == PROXYSQL_SESSION_PGSQL) {
// PROXYSQL_SESSION_PGSQL is a requirement, because it uses threads pool
// the other session types do not
set_pollout();
}
return rc;
}
int PgSQL_Data_Stream::read_pkts() {
int rc = 0;
int r = 0;
while ((r = buffer2array())) rc += r;
return rc;
}
int PgSQL_Data_Stream::array2buffer() {
int ret = 0;
unsigned int idx = 0;
bool cont = true;
if (sess) {
if (sess->mirror == true) { // if this is a mirror session, just empty it
idx = PSarrayOUT->len;
goto __exit_array2buffer;
}
}
while (cont) {
//VALGRIND_DISABLE_ERROR_REPORTING;
if (queue_available(queueOUT) == 0) {
goto __exit_array2buffer;
}
if (queueOUT.partial == 0) { // read a new packet
if (PSarrayOUT->len - idx) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Removing a packet from array\n", sess, this);
if (queueOUT.pkt.ptr) {
//l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
add_to_data_packet_history_without_alloc(data_packets_history_OUT, queueOUT.pkt.ptr, queueOUT.pkt.size);
queueOUT.pkt.ptr = NULL;
}
memcpy(&queueOUT.pkt, PSarrayOUT->index(idx), sizeof(PtrSize_t));
idx++;
if (DSS == STATE_CLIENT_AUTH_OK) {
DSS = STATE_SLEEP;
//explicitly disable compression
//myconn->options.compression_min_length = 0;
myconn->set_status(false, STATUS_PGSQL_CONNECTION_COMPRESSION);
}
#ifdef DEBUG
{ __dump_pkt(__func__, (unsigned char*)queueOUT.pkt.ptr, queueOUT.pkt.size); }
#endif
}
else {
cont = false;
continue;
}
}
int b = (queue_available(queueOUT) > (queueOUT.pkt.size - queueOUT.partial) ? (queueOUT.pkt.size - queueOUT.partial) : queue_available(queueOUT));
//VALGRIND_DISABLE_ERROR_REPORTING;
memcpy(queue_w_ptr(queueOUT), (unsigned char*)queueOUT.pkt.ptr + queueOUT.partial, b);
//VALGRIND_ENABLE_ERROR_REPORTING;
queue_w(queueOUT, b);
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Copied %d bytes into send buffer\n", sess, this, b);
queueOUT.partial += b;
ret = b;
if (queueOUT.partial == queueOUT.pkt.size) {
if (queueOUT.pkt.ptr) {
//l_free(queueOUT.pkt.size,queueOUT.pkt.ptr);
add_to_data_packet_history_without_alloc(data_packets_history_OUT, queueOUT.pkt.ptr, queueOUT.pkt.size);
queueOUT.pkt.ptr = NULL;
}
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . DataStream: %p -- Packet completely written into send buffer\n", sess, this);
queueOUT.partial = 0;
pkts_sent += 1;
}
}
__exit_array2buffer:
if (idx) {
PSarrayOUT->remove_index_range(0, idx);
}
return ret;
}
unsigned char* PgSQL_Data_Stream::copy_array_to_buffer(PtrSizeArray* resultset, size_t resultset_length,
bool del) {
unsigned int i;
unsigned int l = 0;
unsigned char* mybuff = (unsigned char*)l_alloc(resultset_length);
PtrSize_t* ps;
for (i = 0; i < resultset->len; i++) {
ps = resultset->index(i);
memcpy(mybuff + l, ps->ptr, ps->size);
if (del) l_free(ps->size, ps->ptr);
l += ps->size;
}
return mybuff;
};
static inline uint32_t get_uint32(const unsigned char* ptr) {
return (static_cast<uint32_t>(ptr[0]) << 24) |
(static_cast<uint32_t>(ptr[1]) << 16) |
(static_cast<uint32_t>(ptr[2]) << 8) |
static_cast<uint32_t>(ptr[3]);
}
void PgSQL_Data_Stream::copy_buffer_to_resultset(PtrSizeArray* resultset, unsigned char* ptr, uint64_t size,
char current_transaction_state) {
unsigned char* current_ptr = ptr;
unsigned char* buffer = NULL;
uint32_t data_len;
uint32_t buffer_len;
uint32_t remaining_space;
while (current_ptr < ptr + size) {
uint8_t packet_type = *current_ptr; // read packet type (unused in the code)
// Read 4-byte length
/*unsigned int a = current_ptr[read_pos++];
unsigned int b = current_ptr[read_pos++];
unsigned int c = current_ptr[read_pos++];
unsigned int d = current_ptr[read_pos++];
const uint32_t packet_length = (a << 24) | (b << 16) | (c << 8) | d;
*/
data_len = get_uint32(current_ptr + 1 /*skip type*/) + 1; // total space needed, including type
// Handle buffer space
if (buffer) {
if (remaining_space < data_len) {
// Insufficient space, add buffer to resultset
resultset->add(buffer, buffer_len - remaining_space);
buffer = NULL;
}
}
// Allocate new buffer if needed
if (buffer == NULL) {
// Set buffer size depending on available space
if (current_ptr + RESULTSET_BUFLEN_DS_1M <= ptr + size) {
buffer_len = RESULTSET_BUFLEN_DS_1M;
}
else {
buffer_len = RESULTSET_BUFLEN_DS_16K;
}
// Ensure buffer is large enough for the current packet
if (data_len > buffer_len) {
buffer_len = data_len;
}
buffer = (unsigned char*)malloc(buffer_len);
remaining_space = buffer_len;
}
// Copy data to buffer
memcpy((unsigned char*)buffer + (buffer_len - remaining_space), current_ptr, data_len);
remaining_space -= data_len;
current_ptr += data_len;
if (packet_type == 'Z') {
// if packet is a 'Z' packet (Ready Packet), we need to overwrite trasaction state
*(buffer + (buffer_len - remaining_space) - 1) = current_transaction_state;
}
}
// Add the last buffer to the resultset, if any
if (buffer) {
resultset->add(buffer, buffer_len - remaining_space);
}
}
int PgSQL_Data_Stream::array2buffer_full() {
int rc = 0;
int r = 0;
while ((r = array2buffer())) rc += r;
return rc;
}
int PgSQL_Data_Stream::assign_fd_from_pgsql_conn() {
assert(myconn);
//proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->myconn.net.fd);
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p, oldFD=%d, newFD=%d\n", this->sess, this, fd, myconn->fd);
fd = myconn->fd;
return fd;
}
void PgSQL_Data_Stream::unplug_backend() {
DSS = STATE_NOT_INITIALIZED;
myconn = NULL;
myds_type = MYDS_BACKEND_NOT_CONNECTED;
mypolls->remove_index_fast(poll_fds_idx);
mypolls = NULL;
fd = 0;
}
void PgSQL_Data_Stream::set_net_failure() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p , myds_type:%d\n", this->sess, this, myds_type);
#ifdef DEBUG
if (myds_type != MYDS_FRONTEND) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p , myds_type:%d not frontend\n", this->sess, this, myds_type);
}
#endif /* DEBUG */
net_failure = true;
}
void PgSQL_Data_Stream::setDSS_STATE_QUERY_SENT_NET() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Sess=%p, myds=%p\n", this->sess, this);
DSS = STATE_QUERY_SENT_NET;
}
void PgSQL_Data_Stream::return_MySQL_Connection_To_Pool() {
PgSQL_Connection* mc = myconn;
mc->last_time_used = sess->thread->curtime;
// before detaching, check if last_HG_affected_rows matches . if yes, set it back to -1
if (mybe) {
if (mybe->hostgroup_id == sess->last_HG_affected_rows) {
sess->last_HG_affected_rows = -1;
}
}
unsigned long long intv = pgsql_thread___connection_max_age_ms;
intv *= 1000;
if (
(((intv) && (mc->last_time_used > mc->creation_time + intv)) ||
(mc->local_stmts->get_num_backend_stmts() > (unsigned int)GloPTH->variables.max_stmts_per_connection))
&&
// NOTE: If the current session if in 'PINGING_SERVER' status, there is
// no need to reset the session. The destruction and creation of a new
// session in case this session has exceeded the time specified by
// 'connection_max_age_ms' will be deferred to the next time the session
// is used outside 'PINGING_SERVER' operation. For more context see #3502.
sess->status != PINGING_SERVER
) {
sess->create_new_session_and_reset_connection(this);
} else {
detach_connection();
unplug_backend();
#ifdef STRESSTEST_POOL
PgHGM->push_MyConn_to_pool(mc); // #644
#else
sess->thread->push_MyConn_local(mc);
#endif
}
}
void PgSQL_Data_Stream::free_pgsql_real_query() {
if (pgsql_real_query.QueryPtr) {
pgsql_real_query.end();
}
}
void PgSQL_Data_Stream::destroy_queues() {
queue_destroy(queueIN);
queue_destroy(queueOUT);
}
void PgSQL_Data_Stream::destroy_MySQL_Connection_From_Pool(bool sq) {
PgSQL_Connection* mc = myconn;
PgSQL_SrvC* mysrvc = mc->parent;
if (sq && mysrvc->status == MYSQL_SERVER_STATUS_ONLINE &&
mc->async_state_machine == ASYNC_IDLE &&
mc->is_connection_in_reusable_state() == true) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset PgSQL_Connection %p, server %s:%d\n", mc, mysrvc->address, mysrvc->port);
sess->create_new_session_and_reset_connection(this);
} else {
mc->last_time_used = sess->thread->curtime;
mc->send_quit = sq;
detach_connection();
unplug_backend();
PgHGM->destroy_MyConn_from_pool(mc);
}
}
bool PgSQL_Data_Stream::data_in_rbio() {
if (BIO_number_written(rbio_ssl) > BIO_number_read(rbio_ssl)) {
return true;
}
return false;
}
void PgSQL_Data_Stream::reset_connection() {
if (myconn) {
if (pgsql_thread___multiplexing && (DSS == STATE_MARIADB_GENERIC || DSS == STATE_READY) && myconn->reusable == true &&
myconn->IsActiveTransaction() == false && myconn->MultiplexDisabled() == false && myconn->async_state_machine == ASYNC_IDLE &&
myconn->is_pipeline_active() == false) {
myconn->last_time_used = sess->thread->curtime;
return_MySQL_Connection_To_Pool();
} else {
if (sess && sess->session_fast_forward == SESSION_FORWARD_TYPE_NONE) {
// If the startup connection includes untracked session parameters (passed via options=),
// the connection must be destroyed instead of returned to the pool. Issue #5102
bool can_reuse_connection = sess->untracked_option_parameters.empty();
destroy_MySQL_Connection_From_Pool(can_reuse_connection);
} else {
destroy_MySQL_Connection_From_Pool(false);
}
}
}
}
int PgSQL_Data_Stream::buffer2array() {
int ret = 0;
{
unsigned long s = queue_data(queueIN);
if (s == 0) return ret;
if ((queueIN.pkt.size == 0) && s < 5) {
queue_zero(queueIN);
}
}
unsigned char header[5];
if ((queueIN.pkt.size == 0) && queue_data(queueIN) >= sizeof(header)) {
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . Reading the header of a new packet\n", sess);
memcpy(header, queue_r_ptr(queueIN), sizeof(header));
queue_r(queueIN, sizeof(header));
uint32_t pkgsize = 0;
unsigned int read_pos = 0;
const uint8_t type8 = header[0];
if (type8 != 0) {
read_pos++;
pkgsize++;
}
unsigned a, b, c, d;
a = header[read_pos++];
b = header[read_pos++];
c = header[read_pos++];
d = header[read_pos++];
pkgsize += (a << 24) | (b << 16) | (c << 8) | d;
queueIN.pkt.size = pkgsize;
queueIN.pkt.ptr = l_alloc(queueIN.pkt.size);
memcpy(queueIN.pkt.ptr, header, sizeof(header)); // immediately copy the header into the packet
queueIN.partial = sizeof(header);
ret += sizeof(header);
}
if ((queueIN.pkt.size > 0) && queue_data(queueIN)) {
int b = (queue_data(queueIN) > (queueIN.pkt.size - queueIN.partial) ? (queueIN.pkt.size - queueIN.partial) : queue_data(queueIN));
proxy_debug(PROXY_DEBUG_PKT_ARRAY, 5, "Session=%p . Copied %d bytes into packet\n", sess, b);
memcpy((unsigned char*)queueIN.pkt.ptr + queueIN.partial, queue_r_ptr(queueIN), b);
queue_r(queueIN, b);
queueIN.partial += b;
ret += b;
}
if ((queueIN.pkt.size > 0) && (queueIN.pkt.size == queueIN.partial)) {
PSarrayIN->add(queueIN.pkt.ptr, queueIN.pkt.size);
pkts_recv++;
queueIN.pkt.size = 0;
queueIN.pkt.ptr = NULL;
}
return ret;
}