@ -72,6 +72,56 @@ static void __dump_pkt(const char *func, unsigned char *_ptr, unsigned int len)
//enum sslstatus { SSLSTATUS_OK, SSLSTATUS_WANT_IO, SSLSTATUS_FAIL};
static enum sslstatus get_sslstatus ( SSL * ssl , int n )
{
int err = SSL_get_error ( ssl , n ) ;
switch ( err ) {
case SSL_ERROR_NONE :
return SSLSTATUS_OK ;
case SSL_ERROR_WANT_WRITE :
case SSL_ERROR_WANT_READ :
return SSLSTATUS_WANT_IO ;
case SSL_ERROR_ZERO_RETURN :
case SSL_ERROR_SYSCALL :
default :
return SSLSTATUS_FAIL ;
}
}
void MySQL_Data_Stream : : queue_encrypted_bytes ( const char * buf , size_t len ) {
ssl_write_buf = ( char * ) realloc ( ssl_write_buf , ssl_write_len + len ) ;
memcpy ( ssl_write_buf + ssl_write_len , buf , len ) ;
ssl_write_len + = len ;
//proxy_info("New ssl_write_len size: %u\n", ssl_write_len);
}
enum sslstatus MySQL_Data_Stream : : do_ssl_handshake ( ) {
char buf [ MY_SSL_BUFFER ] ;
enum sslstatus status ;
int n = SSL_do_handshake ( ssl ) ;
status = get_sslstatus ( ssl , n ) ;
//proxy_info("SSL status = %d\n", status);
/* Did SSL request to write bytes? */
if ( status = = SSLSTATUS_WANT_IO ) {
//proxy_info("SSL status is WANT_IO %d\n", status);
do {
n = BIO_read ( wbio_ssl , buf , sizeof ( buf ) ) ;
//proxy_info("BIO read = %d\n", n);
if ( n > 0 ) {
//proxy_info("Queuing %d encrypted bytes\n", n);
queue_encrypted_bytes ( buf , n ) ;
} else if ( ! BIO_should_retry ( wbio_ssl ) ) {
//proxy_info("BIO_should_retry failed\n");
return SSLSTATUS_FAIL ;
}
} while ( n > 0 ) ;
}
return status ;
}
void * MySQL_Data_Stream : : operator new ( size_t size ) {
return l_alloc ( size ) ;
}
@ -122,6 +172,10 @@ MySQL_Data_Stream::MySQL_Data_Stream() {
DSS = STATE_NOT_CONNECTED ;
encrypted = false ;
ssl = NULL ;
rbio_ssl = NULL ;
wbio_ssl = NULL ;
ssl_write_len = 0 ;
ssl_write_buf = NULL ;
net_failure = false ;
CompPktIN . pkt . ptr = NULL ;
CompPktIN . pkt . size = 0 ;
@ -193,6 +247,15 @@ MySQL_Data_Stream::~MySQL_Data_Stream() {
if ( ( myconn ) & & ( myds_type = = MYDS_FRONTEND ) ) { delete myconn ; myconn = NULL ; }
if ( encrypted ) {
if ( ssl ) SSL_free ( ssl ) ;
/*
SSL_free ( ) should also take care of these
if ( rbio_ssl ) {
BIO_free ( rbio_ssl ) ;
}
if ( wbio_ssl ) {
BIO_free ( wbio_ssl ) ;
}
*/
}
if ( multi_pkt . ptr ) {
l_free ( multi_pkt . size , multi_pkt . ptr ) ;
@ -257,6 +320,9 @@ void MySQL_Data_Stream::shut_soft() {
void MySQL_Data_Stream : : shut_hard ( ) {
proxy_debug ( PROXY_DEBUG_NET , 4 , " Shutdown hard fd=%d. Session=%p, DataStream=%p \n " , fd , sess , this ) ;
set_net_failure ( ) ;
if ( encrypted ) {
SSL_set_quiet_shutdown ( ssl , 1 ) ;
}
if ( fd > = 0 ) {
shutdown ( fd , SHUT_RDWR ) ;
close ( fd ) ;
@ -287,10 +353,125 @@ void MySQL_Data_Stream::check_data_flow() {
}
int MySQL_Data_Stream : : read_from_net ( ) {
if ( encrypted ) {
//proxy_info("Entering\n");
}
if ( ( revents & POLLIN ) = = 0 ) return 0 ;
int r ;
int r = 0 ;
int s = queue_available ( queueIN ) ;
r = ( encrypted ? SSL_read ( ssl , queue_w_ptr ( queueIN ) , s ) : recv ( fd , queue_w_ptr ( queueIN ) , s , 0 ) ) ;
if ( encrypted ) {
// proxy_info("Queue available of %d bytes\n", s);
}
if ( encrypted = = false ) {
if ( pkts_recv ) {
r = recv ( fd , queue_w_ptr ( queueIN ) , s , 0 ) ;
} else {
if ( queueIN . partial = = 0 ) {
// we are reading the very first packet
// to avoid issue with SSL, we will only read the header and eventually the first packet
r = recv ( fd , queue_w_ptr ( queueIN ) , 4 , 0 ) ;
if ( r = = 4 ) {
// let's try to read a whole packet
mysql_hdr Hdr ;
memcpy ( & Hdr , queueIN . buffer , sizeof ( mysql_hdr ) ) ;
r + = recv ( fd , queue_w_ptr ( queueIN ) + 4 , Hdr . pkt_length , 0 ) ;
}
} else {
r = recv ( fd , queue_w_ptr ( queueIN ) , s , 0 ) ;
}
}
} else {
/*
if ( ! SSL_is_init_finished ( ssl ) ) {
int ret = SSL_do_handshake ( ssl ) ;
int ret2 ;
if ( ret ! = 1 ) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error ( ssl , ret ) ;
fprintf ( stderr , " %d \n " , ret2 ) ;
}
return 0 ;
} else {
r = SSL_read ( ssl , queue_w_ptr ( queueIN ) , s ) ;
}
*/
if ( s < MY_SSL_BUFFER ) {
return 0 ; // no enough space for reads
}
char buf [ MY_SSL_BUFFER ] ;
//ssize_t n = read(fd, buf, sizeof(buf));
int n = recv ( fd , buf , sizeof ( buf ) , 0 ) ;
//proxy_info("SSL recv of %d bytes\n", n);
if ( n > 0 ) {
//on_read_cb(buf, (size_t)n);
char buf2 [ MY_SSL_BUFFER ] ;
int n2 ;
enum sslstatus status ;
char * src = buf ;
int len = n ;
while ( len ) {
n2 = BIO_write ( rbio_ssl , src , len ) ;
//proxy_info("BIO_write with len = %d and %d bytes\n", len , n2);
if ( n2 < = 0 ) {
shut_soft ( ) ;
return - 1 ;
}
src + = n2 ;
len - = n2 ;
if ( ! SSL_is_init_finished ( ssl ) ) {
//proxy_info("SSL_is_init_finished NOT completed\n");
if ( do_ssl_handshake ( ) = = SSLSTATUS_FAIL ) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft ( ) ;
return - 1 ;
}
if ( ! SSL_is_init_finished ( ssl ) ) {
//proxy_info("SSL_is_init_finished yet NOT completed\n");
return 0 ;
}
} else {
//proxy_info("SSL_is_init_finished completed\n");
}
}
n2 = SSL_read ( ssl , queue_w_ptr ( queueIN ) , s ) ;
r = n2 ;
//proxy_info("Read %d bytes from SSL\n", r);
if ( n2 > 0 ) {
}
/*
do {
n2 = SSL_read ( ssl , buf2 , sizeof ( buf2 ) ) ;
if ( n2 > 0 ) {
}
} while ( n > 0 ) ;
*/
status = get_sslstatus ( ssl , n2 ) ;
//proxy_info("SSL status = %d\n", status);
if ( status = = SSLSTATUS_WANT_IO ) {
do {
n2 = BIO_read ( wbio_ssl , buf2 , sizeof ( buf2 ) ) ;
//proxy_info("BIO_read with %d bytes\n", n2);
if ( n2 > 0 ) {
queue_encrypted_bytes ( buf2 , n2 ) ;
} else if ( ! BIO_should_retry ( wbio_ssl ) ) {
shut_soft ( ) ;
return - 1 ;
}
} while ( n2 > 0 ) ;
}
if ( status = = SSLSTATUS_FAIL ) {
shut_soft ( ) ;
return - 1 ;
}
} else {
r = n ;
//r += SSL_read (ssl, queue_w_ptr(queueIN), s);
//proxy_info("Read %d bytes from SSL\n", r);
}
}
//__exit_read_from_next:
proxy_debug ( PROXY_DEBUG_NET , 5 , " read %d bytes from fd %d into a buffer of %d bytes free \n " , r , fd , s ) ;
//proxy_error("read %d bytes from fd %d into a buffer of %d bytes free\n", r, fd, s);
if ( r < 1 ) {
@ -314,11 +495,61 @@ int MySQL_Data_Stream::read_from_net() {
int MySQL_Data_Stream : : write_to_net ( ) {
int bytes_io = 0 ;
int s = queue_data ( queueOUT ) ;
if ( s = = 0 ) return 0 ;
int n ;
if ( encrypted ) {
//proxy_info("Data in write buffer: %d bytes\n", s);
}
if ( s = = 0 ) {
if ( encrypted = = false ) {
return 0 ;
}
if ( ssl_write_len = = 0 & & wbio_ssl - > num_write = = wbio_ssl - > num_read ) {
return 0 ;
}
}
VALGRIND_DISABLE_ERROR_REPORTING ;
// splitting the ternary operation in IF condition for better readability
if ( encrypted ) {
bytes_io = SSL_write ( ssl , queue_r_ptr ( queueOUT ) , s ) ;
//proxy_info("Used SSL_write to write %d bytes\n", bytes_io);
if ( ssl_write_len | | wbio_ssl - > num_write > wbio_ssl - > num_read ) {
//proxy_info("ssl_write_len = %d , num_write = %d , num_read = %d\n", ssl_write_len , wbio_ssl->num_write , wbio_ssl->num_read);
char buf [ MY_SSL_BUFFER ] ;
do {
n = BIO_read ( wbio_ssl , buf , sizeof ( buf ) ) ;
//proxy_info("BIO read = %d\n", n);
if ( n > 0 ) {
//proxy_info("Setting %d byte in queue encrypted\n", n);
queue_encrypted_bytes ( buf , n ) ;
}
else if ( ! BIO_should_retry ( wbio_ssl ) ) {
//proxy_info("BIO_should_retry failed\n");
shut_soft ( ) ;
return - 1 ;
}
} while ( n > 0 ) ;
}
if ( ssl_write_len ) {
n = write ( fd , ssl_write_buf , ssl_write_len ) ;
//proxy_info("Calling write() on SSL: %d\n", n);
if ( n > 0 ) {
if ( ( size_t ) n < ssl_write_len ) {
memmove ( ssl_write_buf , ssl_write_buf + n , ssl_write_len - n ) ;
}
ssl_write_len - = n ;
ssl_write_buf = ( char * ) realloc ( ssl_write_buf , ssl_write_len ) ;
//proxy_info("new ssl_write_len: %u\n", ssl_write_len);
//if (ssl_write_len) {
// return n; // stop here
//} else {
// rc = n; // and continue
//}
//bytes_io += n;
} else {
shut_soft ( ) ;
return - 1 ;
}
}
} else {
# ifdef __APPLE__
bytes_io = send ( fd , queue_r_ptr ( queueOUT ) , s , 0 ) ;
@ -326,6 +557,9 @@ int MySQL_Data_Stream::write_to_net() {
bytes_io = send ( fd , queue_r_ptr ( queueOUT ) , s , MSG_NOSIGNAL ) ;
# endif
}
if ( encrypted ) {
//proxy_info("bytes_io: %d\n", bytes_io);
}
VALGRIND_ENABLE_ERROR_REPORTING ;
if ( bytes_io < 0 ) {
if ( encrypted = = false ) {
@ -374,10 +608,31 @@ void MySQL_Data_Stream::set_pollout() {
if ( DSS > STATE_MARIADB_BEGIN & & DSS < STATE_MARIADB_END ) {
_pollfd - > events = myconn - > wait_events ;
} else {
_pollfd - > events = POLLIN ;
//if (PSarrayOUT->len || available_data_out() || queueOUT.partial || (encrypted && !SSL_is_init_finished(ssl))) {
if ( PSarrayOUT - > len | | available_data_out ( ) | | queueOUT . partial ) {
_pollfd - > events = POLLIN | POLLOUT ;
} else {
_pollfd - > events = POLLIN ;
_pollfd - > events | = POLLOUT ;
}
if ( encrypted ) {
if ( ssl_write_len | | wbio_ssl - > num_write > wbio_ssl - > num_read ) {
_pollfd - > events | = POLLOUT ;
} else {
if ( ! SSL_is_init_finished ( ssl ) ) {
//proxy_info("SSL_is_init_finished NOT completed\n");
if ( do_ssl_handshake ( ) = = SSLSTATUS_FAIL ) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft ( ) ;
return ;
}
if ( ! SSL_is_init_finished ( ssl ) ) {
//proxy_info("SSL_is_init_finished yet NOT completed\n");
return ;
}
} else {
//proxy_info("SSL_is_init_finished completed\n");
}
_pollfd - > events | = POLLOUT ;
}
}
}
proxy_debug ( PROXY_DEBUG_NET , 1 , " Session=%p, DataStream=%p -- Setting poll events %d for FD %d , DSS=%d , myconn=%p \n " , sess , this , _pollfd - > events , fd , DSS , myconn ) ;
@ -386,18 +641,87 @@ void MySQL_Data_Stream::set_pollout() {
int MySQL_Data_Stream : : write_to_net_poll ( ) {
int rc = 0 ;
if ( active = = 0 ) return rc ;
/*
if ( encrypted & & ! SSL_is_init_finished ( ssl ) ) {
int ret = SSL_do_handshake ( ssl ) ;
int ret2 ;
if ( ret ! = 1 ) {
//ERR_print_errors_fp(stderr);
ret2 = SSL_get_error ( ssl , ret ) ;
fprintf ( stderr , " %d \n " , ret2 ) ;
}
return 0 ;
}
*/
if ( encrypted ) {
if ( ! SSL_is_init_finished ( ssl ) ) {
//proxy_info("SSL_is_init_finished completed: NO!\n");
if ( do_ssl_handshake ( ) = = SSLSTATUS_FAIL ) {
//proxy_info("SSL_is_init_finished failed!!\n");
shut_soft ( ) ;
return - 1 ;
}
} else {
//proxy_info("SSL_is_init_finished completed: YES\n");
}
/*
if ( ! SSL_is_init_finished ( ssl ) ) {
proxy_info ( " SSL_is_init_finished completed: NO! \n " ) ;
if ( fd > 0 & & sess - > session_type = = PROXYSQL_SESSION_MYSQL ) {
set_pollout ( ) ;
return 0 ;
}
}
*/
//proxy_info("ssl_write_len: %u\n", ssl_write_len);
if ( ssl_write_len ) {
int n = write ( fd , ssl_write_buf , ssl_write_len ) ;
//proxy_info("Calling write() on SSL: %d\n", n);
if ( n > 0 ) {
if ( ( size_t ) n < ssl_write_len ) {
memmove ( ssl_write_buf , ssl_write_buf + n , ssl_write_len - n ) ;
}
ssl_write_len - = n ;
ssl_write_buf = ( char * ) realloc ( ssl_write_buf , ssl_write_len ) ;
//proxy_info("new ssl_write_len: %u\n", ssl_write_len);
if ( ssl_write_len ) {
return n ; // stop here
} else {
rc = n ; // and continue
}
} else {
shut_soft ( ) ;
return - 1 ;
}
}
}
proxy_debug ( PROXY_DEBUG_NET , 1 , " Session=%p, DataStream=%p -- \n " , sess , this ) ;
bool call_write_to_net = false ;
if ( queue_data ( queueOUT ) ) {
call_write_to_net = true ;
}
if ( call_write_to_net = = false ) {
if ( encrypted ) {
if ( ssl_write_len | | wbio_ssl - > num_write > wbio_ssl - > num_read ) {
call_write_to_net = true ;
}
}
}
if ( call_write_to_net ) {
if ( sess - > session_type = = PROXYSQL_SESSION_MYSQL ) {
if ( poll_fds_idx > - 1 ) { // NOTE: attempt to force writes
if ( net_failure = = false )
rc = write_to_net ( ) ;
rc + = write_to_net ( ) ;
}
} else {
rc = write_to_net ( ) ;
rc + = write_to_net ( ) ;
}
}
if ( fd > 0 & & sess - > session_type = = PROXYSQL_SESSION_MYSQL ) set_pollout ( ) ;
if ( fd > 0 & & sess - > session_type = = PROXYSQL_SESSION_MYSQL ) {
// PROXYSQL_SESSION_MYSQL is a requirement, because it uses threads pool
// the other session types do not
set_pollout ( ) ;
}
return rc ;
}
@ -463,6 +787,9 @@ int MySQL_Data_Stream::buffer2array() {
memcpy ( ( unsigned char * ) queueIN . pkt . ptr + queueIN . partial , queue_r_ptr ( queueIN ) , b ) ;
queue_r ( queueIN , b ) ;
queueIN . partial + = b ;
if ( queueIN . partial = = 80 ) {
proxy_info ( " Breakpoint \n " ) ;
}
ret + = b ;
}
if ( ( queueIN . pkt . size > 0 ) & & ( queueIN . pkt . size = = queueIN . partial ) ) {