mirror of https://github.com/sysown/proxysql
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
228 lines
6.0 KiB
228 lines
6.0 KiB
#include "proxysql.h"
|
|
#include "cpp.h"
|
|
|
|
//static void cleanup(const void *data, size_t len, void *arg) {
|
|
// free(arg);
|
|
//}
|
|
|
|
// Constructor
|
|
MySQL_Data_Stream::MySQL_Data_Stream() {
|
|
bytes_info.bytes_recv=0;
|
|
bytes_info.bytes_sent=0;
|
|
pkts_recv=0;
|
|
pkts_sent=0;
|
|
evbIN=NULL;
|
|
evbOUT=NULL;
|
|
|
|
QarrayIN=NULL;
|
|
QarrayOUT=NULL;
|
|
queueIN=NULL;
|
|
queueOUT=NULL;
|
|
listener=0;
|
|
mybe=NULL;
|
|
active=TRUE;
|
|
memset(&ev,0,sizeof(struct epoll_event));
|
|
}
|
|
|
|
|
|
// Destructor
|
|
MySQL_Data_Stream::~MySQL_Data_Stream() {
|
|
if (evbIN) evbuffer_free(evbIN);
|
|
if (evbIN) evbuffer_free(evbOUT);
|
|
if (queueIN) g_queue_free_full(queueIN,free);
|
|
if (queueOUT) g_queue_free_full(queueOUT,free);
|
|
//destroy_databuffer(&bufferOUT);
|
|
proxy_debug(PROXY_DEBUG_NET,1, "Shutdown Data Stream %p, Session %p\n" , this, sess);
|
|
}
|
|
|
|
|
|
|
|
// this function initialze a MySQL_Data_Stream
|
|
// needs to be called only if listener=0
|
|
void MySQL_Data_Stream::init() {
|
|
if (listener==0) {
|
|
evbIN=evbuffer_new();
|
|
assert(evbIN!=NULL);
|
|
evbOUT=evbuffer_new();
|
|
assert(evbOUT!=NULL);
|
|
queueIN=g_queue_new();
|
|
assert(queueIN!=NULL);
|
|
queueOUT=g_queue_new();
|
|
assert(queueOUT!=NULL);
|
|
QarrayIN=g_ptr_array_new();
|
|
assert(QarrayIN);
|
|
QarrayOUT=g_ptr_array_new();
|
|
assert(QarrayOUT);
|
|
//create_databuffer(&bufferOUT, 8192);
|
|
//printf("%p %p\n, queueIN, queueOUT");
|
|
}
|
|
}
|
|
|
|
// Soft shutdown of socket : it only deactivate the data stream
|
|
void MySQL_Data_Stream::shut_soft() {
|
|
proxy_debug(PROXY_DEBUG_NET, 4, "Shutdown soft %d\n", fd);
|
|
active=FALSE;
|
|
sess->net_failure=1;
|
|
}
|
|
|
|
// Hard shutdown of socket
|
|
void MySQL_Data_Stream::shut_hard() {
|
|
proxy_debug(PROXY_DEBUG_NET, 4, "Shutdown hard %d\n", fd);
|
|
if (fd >= 0) {
|
|
shutdown(fd, SHUT_RDWR);
|
|
close(fd);
|
|
fd = -1;
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
void MySQL_Data_Stream::create_databuffer(mysql_data_buffer_t *databuf, unsigned int size) {
|
|
memset(databuf, 0, sizeof(mysql_data_buffer_t));
|
|
databuf->size=size;
|
|
databuf->buffer=malloc(size);
|
|
assert(databuf->buffer);
|
|
}
|
|
|
|
void MySQL_Data_Stream::destroy_databuffer(mysql_data_buffer_t *databuf) {
|
|
free(databuf->buffer);
|
|
if (databuf->pkt) free(databuf->pkt);
|
|
memset(databuf, 0, sizeof(mysql_data_buffer_t));
|
|
}
|
|
*/
|
|
|
|
/*
|
|
* reads data from file descriptor and write it into evbIN .
|
|
* reads only if rev->events has EPOLLIN enabled
|
|
* in case of failure calls shut_soft()
|
|
*/
|
|
int MySQL_Data_Stream::read_from_net() {
|
|
if ((rev->events & EPOLLIN)==0) return 0; // there is no data to read, return immediately
|
|
int bytes_io=0;
|
|
int rc;
|
|
while (( rc = evbuffer_read(evbIN, fd, -1)) > 0) {
|
|
proxy_debug(PROXY_DEBUG_NET,1,"Read %d bytes from FD %d\n", rc, fd);
|
|
bytes_io+=rc;
|
|
}
|
|
if (rc==-1) proxy_debug(PROXY_DEBUG_NET,1,"Total: Read %d bytes from FD %d, errno %d\n", bytes_io, fd, errno);
|
|
if (rc==0) proxy_debug(PROXY_DEBUG_NET,1,"Total: Read %d bytes from FD %d\n", bytes_io, fd);
|
|
//if (bytes_io < 1) {
|
|
// if (bytes_io==0 || (bytes_io==-1 && errno != EINTR && errno != EAGAIN)) {
|
|
if (rc < 1) {
|
|
if (rc==0 || (rc==-1 && errno != EINTR && errno != EAGAIN)) {
|
|
shut_soft();
|
|
}
|
|
} else {
|
|
bytes_info.bytes_recv+=bytes_io;
|
|
if (mybe) {
|
|
// __sync_fetch_and_add(&mybe->mshge->server_bytes.bytes_recv,r);
|
|
}
|
|
}
|
|
return bytes_io;
|
|
}
|
|
|
|
int MySQL_Data_Stream::write_to_net() {
|
|
int bytes_io;
|
|
bytes_io = evbuffer_write(evbOUT, fd);
|
|
proxy_debug(PROXY_DEBUG_NET,1,"Written %d bytes into FD %d\n", bytes_io, fd);
|
|
if (bytes_io < 0) {
|
|
shut_soft();
|
|
} else {
|
|
bytes_info.bytes_sent+=bytes_io;
|
|
if (mybe) {
|
|
// __sync_fetch_and_add(&myds->mybe->mshge->server_bytes.bytes_sent,r);
|
|
}
|
|
}
|
|
return bytes_io;
|
|
}
|
|
|
|
|
|
void MySQL_Data_Stream::set_epollout() {
|
|
|
|
struct epoll_event evtmp;
|
|
|
|
int buflen=evbuffer_get_length(evbOUT);
|
|
if (buflen) {
|
|
//evtmp.events = EPOLLIN | EPOLLOUT | EPOLLERR | EPOLLHUP ;
|
|
evtmp.events = EPOLLIN | EPOLLOUT;
|
|
} else {
|
|
//evtmp.events = EPOLLIN | EPOLLERR | EPOLLHUP ;
|
|
evtmp.events = EPOLLIN;
|
|
}
|
|
//printf("buflen=%d , events=%d , newevents=%d\n", buflen, ev.events, evtmp.events);
|
|
if (evtmp.events != ev.events) {
|
|
ev.events=evtmp.events;
|
|
epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &ev);
|
|
proxy_debug(PROXY_DEBUG_NET,1,"Setting events %d for FD %d\n", ev.events, fd);
|
|
}
|
|
}
|
|
|
|
|
|
int MySQL_Data_Stream::write_to_net_epoll() {
|
|
int rc=0;
|
|
if (evbuffer_get_length(evbOUT) && (rev->events & EPOLLOUT)) {
|
|
rc=write_to_net();
|
|
}
|
|
set_epollout();
|
|
return rc;
|
|
}
|
|
|
|
int MySQL_Data_Stream::read_pkts() {
|
|
int pkts=0;
|
|
int rc;
|
|
int pkt_totlen, pkt_length, buffer_len;
|
|
unsigned char *pkt;
|
|
buffer_len=evbuffer_get_length(evbIN);
|
|
if (buffer_len==0) return 0; // there is no data, return immediately
|
|
while ((pkt=evbuffer_pullup(evbIN, 4))!=NULL) {
|
|
// read pkt length
|
|
pkt_length=CPY3(pkt);
|
|
pkt_totlen=pkt_length+sizeof(mysql_hdr); // totlen = pkt len + header len
|
|
buffer_len = evbuffer_get_length(evbIN); // total amount of data in evbuffer
|
|
if (buffer_len < pkt_totlen) {
|
|
goto read_pkts_return; // returns if there is not enough data in evbuffer
|
|
}
|
|
pkt=(unsigned char *)malloc(pkt_totlen); // allocate a buffer for the whole packet, header included
|
|
assert(pkt);
|
|
rc=evbuffer_remove(evbIN,pkt,pkt_totlen); // copy the packet
|
|
assert(rc==pkt_totlen);
|
|
//parse_mysql_pkt(pkt);
|
|
proxy_debug(PROXY_DEBUG_NET,1,"Read one packet from FD %d, size %d\n", fd, pkt_totlen);
|
|
g_ptr_array_add(QarrayIN,pkt);
|
|
//g_queue_push_tail(queueIN,pkt); // add the packet in the queue
|
|
|
|
pkts++;
|
|
}
|
|
read_pkts_return:
|
|
pkts_recv+=pkts;
|
|
return pkts;
|
|
}
|
|
|
|
int MySQL_Data_Stream::write_pkts() {
|
|
int pkts=0;
|
|
int rc;
|
|
int pkt_totlen, pkt_length;
|
|
unsigned char *pkt;
|
|
//while ((pkt=(unsigned char *)g_queue_pop_head(queueOUT))!=NULL) {
|
|
while (QarrayOUT->len) {
|
|
pkt=(unsigned char *)g_ptr_array_remove_index(QarrayOUT,0);
|
|
pkt_length=CPY3(pkt);
|
|
pkt_totlen=pkt_length+sizeof(mysql_hdr); // totlen = pkt len + header len
|
|
|
|
// add copying the packet
|
|
rc=evbuffer_add(evbOUT, pkt, pkt_totlen);
|
|
assert(rc==0);
|
|
proxy_debug(PROXY_DEBUG_NET,1,"Write one packet for FD %d, size %d\n", fd, pkt_totlen);
|
|
free(pkt);
|
|
|
|
/*
|
|
// add by reference
|
|
rc=evbuffer_add_reference(evbOUT, pkt, pkt_totlen, cleanup, pkt);
|
|
assert(rc==0);
|
|
*/
|
|
pkts++;
|
|
}
|
|
pkts_sent+=pkts;
|
|
return pkts;
|
|
}
|