Further testing implementation of SO_REUSEPORT

pull/738/head
René Cannaò 10 years ago
parent adbdb5b2e3
commit b93cc4b6bc

@ -241,8 +241,8 @@ class MySQL_Listeners_Manager {
public:
MySQL_Listeners_Manager();
~MySQL_Listeners_Manager();
int add(const char *iface);
int add(const char *address, int port);
int add(const char *iface, unsigned int num_threads, int *perthrsocks);
//int add(const char *address, int port);
int find_idx(const char *iface);
int find_idx(const char *address, int port);
iface_info * find_iface_from_fd(int fd);

@ -111,7 +111,7 @@ extern "C" {
#endif /* __cplusplus */
//mysql_data_stream_t * mysql_data_stream_New(mysql_session_t *, int, mysql_backend_t *);
int listen_on_port(char *, uint16_t, int);
int listen_on_port(char *ip, uint16_t port, int backlog, bool reuseport=false);
int listen_on_unix(char *, int);
int connect_socket(char *, int);
//void process_global_variables_from_file(GKeyFile *, int );

@ -36,6 +36,9 @@ class ProxySQL_GlobalVariables {
bool gdbg;
bool nostart;
bool monitor;
#ifdef SO_REUSEPORT
bool reuseport;
#endif /* SO_REUSEPORT */
// bool use_proxysql_mem;
pthread_mutex_t start_mutex;
bool foreground;

@ -86,7 +86,7 @@ MySQL_Listeners_Manager::~MySQL_Listeners_Manager() {
ifaces=NULL;
}
int MySQL_Listeners_Manager::add(const char *iface) {
int MySQL_Listeners_Manager::add(const char *iface, unsigned int num_threads, int *perthrsocks) {
for (unsigned int i=0; i<ifaces->len; i++) {
iface_info *ifi=(iface_info *)ifaces->index(i);
if (strcmp(ifi->iface,iface)==0) {
@ -95,14 +95,40 @@ int MySQL_Listeners_Manager::add(const char *iface) {
}
char *address=NULL; char *port=NULL;
c_split_2(iface, ":" , &address, &port);
int s = ( atoi(port) ? listen_on_port(address, atoi(port), PROXYSQL_LISTEN_LEN) : listen_on_unix(address, PROXYSQL_LISTEN_LEN));
int s=-1;
#ifdef SO_REUSEPORT
if (GloVars.global.reuseport==false) {
s = ( atoi(port) ? listen_on_port(address, atoi(port), PROXYSQL_LISTEN_LEN) : listen_on_unix(address, PROXYSQL_LISTEN_LEN));
} else {
if (atoi(port)==0) {
s = listen_on_unix(address, PROXYSQL_LISTEN_LEN);
} else {
// for TCP we will use SO_REUSEPORT
perthrsocks=(int *)malloc(sizeof(int)*num_threads);
int i;
for (i=0;i<num_threads;i++) {
s=listen_on_port(address, atoi(port), PROXYSQL_LISTEN_LEN, true);
ioctl_FIONBIO(s,1);
iface_info *ifi=new iface_info((char *)iface, address, atoi(port), s);
ifaces->add(ifi);
perthrsocks[i]=s;
}
s=0;
}
}
#else
s = ( atoi(port) ? listen_on_port(address, atoi(port), PROXYSQL_LISTEN_LEN) : listen_on_unix(address, PROXYSQL_LISTEN_LEN));
#endif /* SO_REUSEPORT */
if (s==-1) return s;
ioctl_FIONBIO(s,1);
iface_info *ifi=new iface_info((char *)iface, address, atoi(port), s);
ifaces->add(ifi);
if (s>0) {
ioctl_FIONBIO(s,1);
iface_info *ifi=new iface_info((char *)iface, address, atoi(port), s);
ifaces->add(ifi);
}
return s;
}
/* unused ?
int MySQL_Listeners_Manager::add(const char *address, int port) {
char *s=(char *)malloc(strlen(address)+32);
sprintf(s,"%s:%d",address,port);
@ -110,6 +136,7 @@ int MySQL_Listeners_Manager::add(const char *address, int port) {
free(s);
return ret;
}
*/
int MySQL_Listeners_Manager::find_idx(const char *iface) {
for (unsigned int i=0; i<ifaces->len; i++) {
@ -333,14 +360,26 @@ int MySQL_Threads_Handler::listener_add(const char *address, int port) {
int MySQL_Threads_Handler::listener_add(const char *iface) {
int rc;
rc=MLM->add(iface);
int *perthrsocks=NULL;;
rc=MLM->add(iface, num_threads, perthrsocks);
if (rc>-1) {
unsigned int i;
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) {
usleep(10); // pause a bit
if (perthrsocks==NULL) {
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,rc)) {
usleep(10); // pause a bit
}
}
} else {
for (i=0;i<num_threads;i++) {
MySQL_Thread *thr=(MySQL_Thread *)mysql_threads[i].worker;
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_add,0,perthrsocks[i])) {
usleep(10); // pause a bit
}
}
free(perthrsocks);
}
/*
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_change,0,1)) { cpu_relax_pa(); }
while(__sync_fetch_and_add(&thr->mypolls.pending_listener_change,0)==1) { cpu_relax_pa(); }
@ -349,7 +388,6 @@ int MySQL_Threads_Handler::listener_add(const char *iface) {
while(!__sync_bool_compare_and_swap(&thr->mypolls.pending_listener_change,2,0));
// spin_wrunlock(&thr->thread_mutex);
*/
}
}
return rc;
}

@ -58,6 +58,9 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() {
global.nostart=false;
global.foreground=false;
global.monitor=true;
#ifdef SO_REUSEPORT
global.reuseport=true;
#endif /* SO_REUSEPORT */
// global.use_proxysql_mem=false;
pthread_mutex_init(&global.start_mutex,NULL);
#ifdef DEBUG
@ -78,6 +81,9 @@ ProxySQL_GlobalVariables::ProxySQL_GlobalVariables() {
opt->add((const char *)"",0,0,0,(const char *)"Starts only the admin service",(const char *)"-n",(const char *)"--no-start");
opt->add((const char *)"",0,0,0,(const char *)"Do not start Monitor Module",(const char *)"-M",(const char *)"--no-monitor");
opt->add((const char *)"",0,0,0,(const char *)"Run in foreground",(const char *)"-f",(const char *)"--foreground");
#ifdef SO_REUSEPORT
opt->add((const char *)"",0,0,0,(const char *)"Use SO_REUSEPORT",(const char *)"-r",(const char *)"--reuseport");
#endif /* SO_REUSEPORT */
opt->add((const char *)"",0,0,0,(const char *)"Do not restart ProxySQL if crashes",(const char *)"-e",(const char *)"--exit-on-error");
opt->add((const char *)"~/proxysql.cnf",0,1,0,(const char *)"Configuraton file",(const char *)"-c",(const char *)"--config");
//opt->add((const char *)"",0,0,0,(const char *)"Enable custom memory allocator",(const char *)"-m",(const char *)"--custom-memory");
@ -189,6 +195,12 @@ void ProxySQL_GlobalVariables::process_opts_post() {
global.monitor=false;
}
#ifdef SO_REUSEPORT
if (opt->isSet("-r")) {
global.reuseport=true;
}
#endif /* SO_REUSEPORT */
if (opt->isSet("-S")) {
std::string admin_socket;
opt->get("-S")->getString(admin_socket);

@ -4,7 +4,7 @@
* create a socket and listen on a specified IP and port
* returns the socket
*/
int listen_on_port(char *ip, uint16_t port, int backlog) {
int listen_on_port(char *ip, uint16_t port, int backlog, bool reuseport) {
int rc, arg_on=1;
struct sockaddr_in addr;
int sd;
@ -16,18 +16,22 @@ int listen_on_port(char *ip, uint16_t port, int backlog) {
return -1;
}
#ifdef SO_REUSEPORT
// set SO_REUSEADDR and SO_REUSEPORT
rc = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, (char *)&arg_on, sizeof(arg_on));
#else
// set SO_REUSEADDR
rc = setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg_on, sizeof(arg_on));
#endif /* SO_REUSEPORT */
if (rc < 0) {
proxy_error("setsockopt() failed\n");
}
#ifdef SO_REUSEPORT
// set SO_REUSEPORT
if (reuseport) {
rc = setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, (char *)&arg_on, sizeof(arg_on));
if (rc < 0) {
proxy_error("setsockopt() failed\n");
}
}
#endif /* SO_REUSEPORT */
// define addr with the specified IP and port
addr.sin_family = AF_INET;
addr.sin_port = htons(port);

Loading…
Cancel
Save