mirror of https://github.com/sysown/proxysql
First commit to split MySQL_HostGroups_Manager.cpp into multiple files. More to split.pull/4516/head
parent
0a2cd03672
commit
e7aa5ff4c5
@ -0,0 +1,27 @@
|
||||
#ifndef CLASS_GTID_Server_Data_H
|
||||
#define CLASS_GTID_Server_Data_H
|
||||
class GTID_Server_Data {
|
||||
public:
|
||||
char *address;
|
||||
uint16_t port;
|
||||
uint16_t mysql_port;
|
||||
char *data;
|
||||
size_t len;
|
||||
size_t size;
|
||||
size_t pos;
|
||||
struct ev_io *w;
|
||||
char uuid_server[64];
|
||||
unsigned long long events_read;
|
||||
gtid_set_t gtid_executed;
|
||||
bool active;
|
||||
GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port);
|
||||
void resize(size_t _s);
|
||||
~GTID_Server_Data();
|
||||
bool readall();
|
||||
bool writeout();
|
||||
bool read_next_gtid();
|
||||
bool gtid_exists(char *gtid_uuid, uint64_t gtid_trxid);
|
||||
void read_all_gtids();
|
||||
void dump();
|
||||
};
|
||||
#endif // CLASS_GTID_Server_Data_H
|
||||
@ -0,0 +1,469 @@
|
||||
#include "MySQL_HostGroups_Manager.h"
|
||||
|
||||
#include "ev.h"
|
||||
#include <iterator>
|
||||
|
||||
|
||||
extern ProxySQL_Admin *GloAdmin;
|
||||
|
||||
extern MySQL_Threads_Handler *GloMTH;
|
||||
|
||||
extern MySQL_Monitor *GloMyMon;
|
||||
|
||||
static pthread_mutex_t ev_loop_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
static void gtid_async_cb(struct ev_loop *loop, struct ev_async *watcher, int revents) {
|
||||
if (glovars.shutdown) {
|
||||
ev_break(loop);
|
||||
}
|
||||
pthread_mutex_lock(&ev_loop_mutex);
|
||||
MyHGM->gtid_missing_nodes = false;
|
||||
MyHGM->generate_mysql_gtid_executed_tables();
|
||||
pthread_mutex_unlock(&ev_loop_mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
static void gtid_timer_cb (struct ev_loop *loop, struct ev_timer *timer, int revents) {
|
||||
if (GloMTH == nullptr) { return; }
|
||||
ev_timer_stop(loop, timer);
|
||||
ev_timer_set(timer, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0);
|
||||
if (glovars.shutdown) {
|
||||
ev_break(loop);
|
||||
}
|
||||
if (MyHGM->gtid_missing_nodes) {
|
||||
pthread_mutex_lock(&ev_loop_mutex);
|
||||
MyHGM->gtid_missing_nodes = false;
|
||||
MyHGM->generate_mysql_gtid_executed_tables();
|
||||
pthread_mutex_unlock(&ev_loop_mutex);
|
||||
}
|
||||
ev_timer_start(loop, timer);
|
||||
return;
|
||||
}
|
||||
|
||||
void reader_cb(struct ev_loop *loop, struct ev_io *w, int revents) {
|
||||
pthread_mutex_lock(&ev_loop_mutex);
|
||||
if (revents & EV_READ) {
|
||||
GTID_Server_Data *sd = (GTID_Server_Data *)w->data;
|
||||
bool rc = true;
|
||||
rc = sd->readall();
|
||||
if (rc == false) {
|
||||
//delete sd;
|
||||
std::string s1 = sd->address;
|
||||
s1.append(":");
|
||||
s1.append(std::to_string(sd->mysql_port));
|
||||
MyHGM->gtid_missing_nodes = true;
|
||||
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
|
||||
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
|
||||
it2 = MyHGM->gtid_map.find(s1);
|
||||
if (it2 != MyHGM->gtid_map.end()) {
|
||||
//MyHGM->gtid_map.erase(it2);
|
||||
it2->second = NULL;
|
||||
delete sd;
|
||||
}
|
||||
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
||||
free(w);
|
||||
} else {
|
||||
sd->dump();
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&ev_loop_mutex);
|
||||
}
|
||||
|
||||
void connect_cb(EV_P_ ev_io *w, int revents) {
|
||||
pthread_mutex_lock(&ev_loop_mutex);
|
||||
struct ev_io * c = w;
|
||||
if (revents & EV_WRITE) {
|
||||
int optval = 0;
|
||||
socklen_t optlen = sizeof(optval);
|
||||
if ((getsockopt(w->fd, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) ||
|
||||
(optval != 0)) {
|
||||
/* Connection failed; try the next address in the list. */
|
||||
//int errnum = optval ? optval : errno;
|
||||
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
||||
close(w->fd);
|
||||
MyHGM->gtid_missing_nodes = true;
|
||||
GTID_Server_Data * custom_data = (GTID_Server_Data *)w->data;
|
||||
GTID_Server_Data *sd = custom_data;
|
||||
std::string s1 = sd->address;
|
||||
s1.append(":");
|
||||
s1.append(std::to_string(sd->mysql_port));
|
||||
proxy_warning("GTID: failed to connect to ProxySQL binlog reader on port %d for server %s:%d\n", sd->port, sd->address, sd->mysql_port);
|
||||
std::unordered_map <string, GTID_Server_Data *>::iterator it2;
|
||||
it2 = MyHGM->gtid_map.find(s1);
|
||||
if (it2 != MyHGM->gtid_map.end()) {
|
||||
//MyHGM->gtid_map.erase(it2);
|
||||
it2->second = NULL;
|
||||
delete sd;
|
||||
}
|
||||
//delete custom_data;
|
||||
free(c);
|
||||
} else {
|
||||
ev_io_stop(MyHGM->gtid_ev_loop, w);
|
||||
int fd=w->fd;
|
||||
struct ev_io * new_w = (struct ev_io*) malloc(sizeof(struct ev_io));
|
||||
new_w->data = w->data;
|
||||
GTID_Server_Data * custom_data = (GTID_Server_Data *)new_w->data;
|
||||
custom_data->w = new_w;
|
||||
free(w);
|
||||
ev_io_init(new_w, reader_cb, fd, EV_READ);
|
||||
ev_io_start(MyHGM->gtid_ev_loop, new_w);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&ev_loop_mutex);
|
||||
}
|
||||
|
||||
struct ev_io * new_connector(char *address, uint16_t gtid_port, uint16_t mysql_port) {
|
||||
//struct sockaddr_in a;
|
||||
int s;
|
||||
|
||||
if ((s = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
|
||||
perror("socket");
|
||||
close(s);
|
||||
return NULL;
|
||||
}
|
||||
/*
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.sin_port = htons(gtid_port);
|
||||
a.sin_family = AF_INET;
|
||||
if (!inet_aton(address, (struct in_addr *) &a.sin_addr.s_addr)) {
|
||||
perror("bad IP address format");
|
||||
close(s);
|
||||
return NULL;
|
||||
}
|
||||
*/
|
||||
ioctl_FIONBIO(s,1);
|
||||
|
||||
struct addrinfo hints;
|
||||
struct addrinfo *res = NULL;
|
||||
memset(&hints, 0, sizeof(hints));
|
||||
hints.ai_protocol= IPPROTO_TCP;
|
||||
hints.ai_family= AF_UNSPEC;
|
||||
hints.ai_socktype= SOCK_STREAM;
|
||||
|
||||
char str_port[NI_MAXSERV+1];
|
||||
sprintf(str_port,"%d", gtid_port);
|
||||
int gai_rc = getaddrinfo(address, str_port, &hints, &res);
|
||||
if (gai_rc) {
|
||||
freeaddrinfo(res);
|
||||
//exit here
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//int status = connect(s, (struct sockaddr *) &a, sizeof(a));
|
||||
int status = connect(s, res->ai_addr, res->ai_addrlen);
|
||||
if ((status == 0) || ((status == -1) && (errno == EINPROGRESS))) {
|
||||
struct ev_io *c = (struct ev_io *)malloc(sizeof(struct ev_io));
|
||||
if (c) {
|
||||
ev_io_init(c, connect_cb, s, EV_WRITE);
|
||||
GTID_Server_Data * custom_data = new GTID_Server_Data(c, address, gtid_port, mysql_port);
|
||||
c->data = (void *)custom_data;
|
||||
return c;
|
||||
}
|
||||
/* else error */
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
GTID_Server_Data::GTID_Server_Data(struct ev_io *_w, char *_address, uint16_t _port, uint16_t _mysql_port) {
|
||||
active = true;
|
||||
w = _w;
|
||||
size = 1024; // 1KB buffer
|
||||
data = (char *)malloc(size);
|
||||
memset(uuid_server, 0, sizeof(uuid_server));
|
||||
pos = 0;
|
||||
len = 0;
|
||||
address = strdup(_address);
|
||||
port = _port;
|
||||
mysql_port = _mysql_port;
|
||||
events_read = 0;
|
||||
}
|
||||
|
||||
void GTID_Server_Data::resize(size_t _s) {
|
||||
char *data_ = (char *)malloc(_s);
|
||||
memcpy(data_, data, (_s > size ? size : _s));
|
||||
size = _s;
|
||||
free(data);
|
||||
data = data_;
|
||||
}
|
||||
|
||||
GTID_Server_Data::~GTID_Server_Data() {
|
||||
free(address);
|
||||
free(data);
|
||||
}
|
||||
|
||||
bool GTID_Server_Data::readall() {
|
||||
bool ret = true;
|
||||
if (size == len) {
|
||||
// buffer is full, expand
|
||||
resize(len*2);
|
||||
}
|
||||
int rc = 0;
|
||||
rc = read(w->fd,data+len,size-len);
|
||||
if (rc > 0) {
|
||||
len += rc;
|
||||
} else {
|
||||
int myerr = errno;
|
||||
proxy_error("Read returned %d bytes, error %d\n", rc, myerr);
|
||||
if (
|
||||
(rc == 0) ||
|
||||
(rc==-1 && myerr != EINTR && myerr != EAGAIN)
|
||||
) {
|
||||
ret = false;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
|
||||
std::string s = gtid_uuid;
|
||||
auto it = gtid_executed.find(s);
|
||||
// fprintf(stderr,"Checking if server %s:%d has GTID %s:%lu ... ", address, port, gtid_uuid, gtid_trxid);
|
||||
if (it == gtid_executed.end()) {
|
||||
// fprintf(stderr,"NO\n");
|
||||
return false;
|
||||
}
|
||||
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
|
||||
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
|
||||
// fprintf(stderr,"YES\n");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// fprintf(stderr,"NO\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
void GTID_Server_Data::read_all_gtids() {
|
||||
while (read_next_gtid()) {
|
||||
}
|
||||
}
|
||||
|
||||
void GTID_Server_Data::dump() {
|
||||
if (len==0) {
|
||||
return;
|
||||
}
|
||||
read_all_gtids();
|
||||
//int rc = write(1,data+pos,len-pos);
|
||||
fflush(stdout);
|
||||
///pos += rc;
|
||||
if (pos >= len/2) {
|
||||
memmove(data,data+pos,len-pos);
|
||||
len = len-pos;
|
||||
pos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool GTID_Server_Data::writeout() {
|
||||
bool ret = true;
|
||||
if (len==0) {
|
||||
return ret;
|
||||
}
|
||||
int rc = 0;
|
||||
rc = write(w->fd,data+pos,len-pos);
|
||||
if (rc > 0) {
|
||||
pos += rc;
|
||||
if (pos >= len/2) {
|
||||
memmove(data,data+pos,len-pos);
|
||||
len = len-pos;
|
||||
pos = 0;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool GTID_Server_Data::read_next_gtid() {
|
||||
if (len==0) {
|
||||
return false;
|
||||
}
|
||||
void *nlp = NULL;
|
||||
nlp = memchr(data+pos,'\n',len-pos);
|
||||
if (nlp == NULL) {
|
||||
return false;
|
||||
}
|
||||
int l = (char *)nlp - (data+pos);
|
||||
char rec_msg[80];
|
||||
if (strncmp(data+pos,(char *)"ST=",3)==0) {
|
||||
// we are reading the bootstrap
|
||||
char *bs = (char *)malloc(l+1-3); // length + 1 (null byte) - 3 (header)
|
||||
memcpy(bs, data+pos+3, l-3);
|
||||
bs[l-3] = '\0';
|
||||
char *saveptr1=NULL;
|
||||
char *saveptr2=NULL;
|
||||
//char *saveptr3=NULL;
|
||||
char *token = NULL;
|
||||
char *subtoken = NULL;
|
||||
//char *subtoken2 = NULL;
|
||||
char *str1 = NULL;
|
||||
char *str2 = NULL;
|
||||
//char *str3 = NULL;
|
||||
for (str1 = bs; ; str1 = NULL) {
|
||||
token = strtok_r(str1, ",", &saveptr1);
|
||||
if (token == NULL) {
|
||||
break;
|
||||
}
|
||||
int j = 0;
|
||||
for (str2 = token; ; str2 = NULL) {
|
||||
subtoken = strtok_r(str2, ":", &saveptr2);
|
||||
if (subtoken == NULL) {
|
||||
break;
|
||||
}
|
||||
j++;
|
||||
if (j%2 == 1) { // we are reading the uuid
|
||||
char *p = uuid_server;
|
||||
for (unsigned int k=0; k<strlen(subtoken); k++) {
|
||||
if (subtoken[k]!='-') {
|
||||
*p = subtoken[k];
|
||||
p++;
|
||||
}
|
||||
}
|
||||
//fprintf(stdout,"BS from %s\n", uuid_server);
|
||||
} else { // we are reading the trxids
|
||||
uint64_t trx_from;
|
||||
uint64_t trx_to;
|
||||
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
|
||||
//fprintf(stdout,"BS from %s:%lu-%lu\n", uuid_server, trx_from, trx_to);
|
||||
std::string s = uuid_server;
|
||||
gtid_executed[s].emplace_back(trx_from, trx_to);
|
||||
}
|
||||
}
|
||||
}
|
||||
pos += l+1;
|
||||
free(bs);
|
||||
//return true;
|
||||
} else {
|
||||
strncpy(rec_msg,data+pos,l);
|
||||
pos += l+1;
|
||||
rec_msg[l] = 0;
|
||||
//int rc = write(1,data+pos,l+1);
|
||||
//fprintf(stdout,"%s\n", rec_msg);
|
||||
if (rec_msg[0]=='I') {
|
||||
//char rec_uuid[80];
|
||||
uint64_t rec_trxid = 0;
|
||||
char *a = NULL;
|
||||
int ul = 0;
|
||||
switch (rec_msg[1]) {
|
||||
case '1':
|
||||
//sscanf(rec_msg+3,"%s\:%lu",uuid_server,&rec_trxid);
|
||||
a = strchr(rec_msg+3,':');
|
||||
ul = a-rec_msg-3;
|
||||
strncpy(uuid_server,rec_msg+3,ul);
|
||||
uuid_server[ul] = 0;
|
||||
rec_trxid=atoll(a+1);
|
||||
break;
|
||||
case '2':
|
||||
//sscanf(rec_msg+3,"%lu",&rec_trxid);
|
||||
rec_trxid=atoll(rec_msg+3);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
//fprintf(stdout,"%s:%lu\n", uuid_server, rec_trxid);
|
||||
std::string s = uuid_server;
|
||||
gtid_t new_gtid = std::make_pair(s,rec_trxid);
|
||||
addGtid(new_gtid,gtid_executed);
|
||||
events_read++;
|
||||
//return true;
|
||||
}
|
||||
}
|
||||
//std::cout << "current pos " << gtid_executed_to_string(gtid_executed) << std::endl << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
|
||||
std::string gtid_set;
|
||||
for (auto it=gtid_executed.begin(); it!=gtid_executed.end(); ++it) {
|
||||
std::string s = it->first;
|
||||
s.insert(8,"-");
|
||||
s.insert(13,"-");
|
||||
s.insert(18,"-");
|
||||
s.insert(23,"-");
|
||||
s = s + ":";
|
||||
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
|
||||
std::string s2 = s;
|
||||
s2 = s2 + std::to_string(itr->first);
|
||||
s2 = s2 + "-";
|
||||
s2 = s2 + std::to_string(itr->second);
|
||||
s2 = s2 + ",";
|
||||
gtid_set = gtid_set + s2;
|
||||
}
|
||||
}
|
||||
// Extract latest comma only in case 'gtid_executed' isn't empty
|
||||
if (gtid_set.empty() == false) {
|
||||
gtid_set.pop_back();
|
||||
}
|
||||
return gtid_set;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
|
||||
auto it = gtid_executed.find(gtid.first);
|
||||
if (it == gtid_executed.end())
|
||||
{
|
||||
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
|
||||
return;
|
||||
}
|
||||
|
||||
bool flag = true;
|
||||
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
|
||||
{
|
||||
if (gtid.second >= itr->first && gtid.second <= itr->second)
|
||||
return;
|
||||
if (gtid.second + 1 == itr->first)
|
||||
{
|
||||
--itr->first;
|
||||
flag = false;
|
||||
break;
|
||||
}
|
||||
else if (gtid.second == itr->second + 1)
|
||||
{
|
||||
++itr->second;
|
||||
flag = false;
|
||||
break;
|
||||
}
|
||||
else if (gtid.second < itr->first)
|
||||
{
|
||||
it->second.emplace(itr, gtid.second, gtid.second);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (flag)
|
||||
it->second.emplace_back(gtid.second, gtid.second);
|
||||
|
||||
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
|
||||
{
|
||||
auto next_itr = std::next(itr);
|
||||
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
|
||||
{
|
||||
itr->second = next_itr->second;
|
||||
it->second.erase(next_itr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void * GTID_syncer_run() {
|
||||
//struct ev_loop * gtid_ev_loop;
|
||||
//gtid_ev_loop = NULL;
|
||||
MyHGM->gtid_ev_loop = ev_loop_new (EVBACKEND_POLL | EVFLAG_NOENV);
|
||||
if (MyHGM->gtid_ev_loop == NULL) {
|
||||
proxy_error("could not initialise GTID sync loop\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
//ev_async_init(gtid_ev_async, gtid_async_cb);
|
||||
//ev_async_start(gtid_ev_loop, gtid_ev_async);
|
||||
MyHGM->gtid_ev_timer = (struct ev_timer *)malloc(sizeof(struct ev_timer));
|
||||
ev_async_init(MyHGM->gtid_ev_async, gtid_async_cb);
|
||||
ev_async_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_async);
|
||||
//ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, __sync_add_and_fetch(&GloMTH->variables.binlog_reader_connect_retry_msec,0)/1000, 0);
|
||||
ev_timer_init(MyHGM->gtid_ev_timer, gtid_timer_cb, 3, 0);
|
||||
ev_timer_start(MyHGM->gtid_ev_loop, MyHGM->gtid_ev_timer);
|
||||
//ev_ref(gtid_ev_loop);
|
||||
ev_run(MyHGM->gtid_ev_loop, 0);
|
||||
//sleep(1000);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -0,0 +1,384 @@
|
||||
#include "MySQL_HostGroups_Manager.h"
|
||||
|
||||
|
||||
extern MySQL_Threads_Handler *GloMTH;
|
||||
|
||||
MyHGC::MyHGC(int _hid) {
|
||||
hid=_hid;
|
||||
mysrvs=new MySrvList(this);
|
||||
current_time_now = 0;
|
||||
new_connections_now = 0;
|
||||
attributes.initialized = false;
|
||||
reset_attributes();
|
||||
// Uninitialized server defaults. Should later be initialized via 'mysql_hostgroup_attributes'.
|
||||
servers_defaults.weight = -1;
|
||||
servers_defaults.max_connections = -1;
|
||||
servers_defaults.use_ssl = -1;
|
||||
}
|
||||
|
||||
void MyHGC::reset_attributes() {
|
||||
if (attributes.initialized == false) {
|
||||
attributes.init_connect = NULL;
|
||||
attributes.comment = NULL;
|
||||
attributes.ignore_session_variables_text = NULL;
|
||||
}
|
||||
attributes.initialized = true;
|
||||
attributes.configured = false;
|
||||
attributes.max_num_online_servers = 1000000;
|
||||
attributes.throttle_connections_per_sec = 1000000;
|
||||
attributes.autocommit = -1;
|
||||
attributes.free_connections_pct = 10;
|
||||
attributes.handle_warnings = -1;
|
||||
attributes.multiplex = true;
|
||||
attributes.connection_warming = false;
|
||||
free(attributes.init_connect);
|
||||
attributes.init_connect = NULL;
|
||||
free(attributes.comment);
|
||||
attributes.comment = NULL;
|
||||
free(attributes.ignore_session_variables_text);
|
||||
attributes.ignore_session_variables_text = NULL;
|
||||
attributes.ignore_session_variables_json = json();
|
||||
}
|
||||
|
||||
MyHGC::~MyHGC() {
|
||||
reset_attributes(); // free all memory
|
||||
delete mysrvs;
|
||||
}
|
||||
|
||||
MySrvC *MyHGC::get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_lag_ms, MySQL_Session *sess) {
|
||||
MySrvC *mysrvc=NULL;
|
||||
unsigned int j;
|
||||
unsigned int sum=0;
|
||||
unsigned int TotalUsedConn=0;
|
||||
unsigned int l=mysrvs->cnt();
|
||||
static time_t last_hg_log = 0;
|
||||
#ifdef TEST_AURORA
|
||||
unsigned long long a1 = array_mysrvc_total/10000;
|
||||
array_mysrvc_total += l;
|
||||
unsigned long long a2 = array_mysrvc_total/10000;
|
||||
if (a2 > a1) {
|
||||
fprintf(stderr, "Total: %llu, Candidates: %llu\n", array_mysrvc_total-l, array_mysrvc_cands);
|
||||
}
|
||||
#endif // TEST_AURORA
|
||||
MySrvC *mysrvcCandidates_static[32];
|
||||
MySrvC **mysrvcCandidates = mysrvcCandidates_static;
|
||||
unsigned int num_candidates = 0;
|
||||
bool max_connections_reached = false;
|
||||
if (l>32) {
|
||||
mysrvcCandidates = (MySrvC **)malloc(sizeof(MySrvC *)*l);
|
||||
}
|
||||
if (l) {
|
||||
//int j=0;
|
||||
for (j=0; j<l; j++) {
|
||||
mysrvc=mysrvs->idx(j);
|
||||
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE
|
||||
if (mysrvc->ConnectionsUsed->conns_length() < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
|
||||
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
|
||||
if (gtid_trxid) {
|
||||
if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
} else {
|
||||
if (max_lag_ms >= 0) {
|
||||
if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
} else {
|
||||
sess->thread->status_variables.stvar[st_var_aws_aurora_replicas_skipped_during_query]++;
|
||||
}
|
||||
} else {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
max_connections_reached = true;
|
||||
}
|
||||
} else {
|
||||
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
|
||||
// try to recover shunned servers
|
||||
if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time_sec) {
|
||||
time_t t;
|
||||
t=time(NULL);
|
||||
// we do all these changes without locking . We assume the server is not used from long
|
||||
// even if the server is still in used and any of the follow command fails it is not critical
|
||||
// because this is only an attempt to recover a server that is probably dead anyway
|
||||
|
||||
// the next few lines of code try to solve issue #530
|
||||
int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/1000 - 1 : mysql_thread___shun_recovery_time_sec );
|
||||
if (max_wait_sec < 1) { // min wait time should be at least 1 second
|
||||
max_wait_sec = 1;
|
||||
}
|
||||
if (t > mysrvc->time_last_detected_error && (t - mysrvc->time_last_detected_error) > max_wait_sec) {
|
||||
if (
|
||||
(mysrvc->shunned_and_kill_all_connections==false) // it is safe to bring it back online
|
||||
||
|
||||
(mysrvc->shunned_and_kill_all_connections==true && mysrvc->ConnectionsUsed->conns_length()==0 && mysrvc->ConnectionsFree->conns_length()==0) // if shunned_and_kill_all_connections is set, ensure all connections are already dropped
|
||||
) {
|
||||
#ifdef DEBUG
|
||||
if (GloMTH->variables.hostgroup_manager_verbose >= 3) {
|
||||
proxy_info("Unshunning server %s:%d.\n", mysrvc->address, mysrvc->port);
|
||||
}
|
||||
#endif
|
||||
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
|
||||
mysrvc->shunned_automatic=false;
|
||||
mysrvc->shunned_and_kill_all_connections=false;
|
||||
mysrvc->connect_ERR_at_time_last_detected_error=0;
|
||||
mysrvc->time_last_detected_error=0;
|
||||
// note: the following function scans all the hostgroups.
|
||||
// This is ok for now because we only have a global mutex.
|
||||
// If one day we implement a mutex per hostgroup (unlikely,
|
||||
// but possible), this must be taken into consideration
|
||||
if (mysql_thread___unshun_algorithm == 1) {
|
||||
MyHGM->unshun_server_all_hostgroups(mysrvc->address, mysrvc->port, t, max_wait_sec, &mysrvc->myhgc->hid);
|
||||
}
|
||||
// if a server is taken back online, consider it immediately
|
||||
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
|
||||
if (gtid_trxid) {
|
||||
if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
} else {
|
||||
if (max_lag_ms >= 0) {
|
||||
if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
} else {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (max_lag_ms > 0) { // we are using AWS Aurora, as this logic is implemented only here
|
||||
unsigned int min_num_replicas = sess->thread->variables.aurora_max_lag_ms_only_read_from_replicas;
|
||||
if (min_num_replicas) {
|
||||
if (num_candidates >= min_num_replicas) { // there are at least N replicas
|
||||
// we try to remove the writer
|
||||
unsigned int total_aws_aurora_current_lag_us=0;
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
total_aws_aurora_current_lag_us += mysrvc->aws_aurora_current_lag_us;
|
||||
}
|
||||
if (total_aws_aurora_current_lag_us) { // we are just double checking that we don't have all servers with aws_aurora_current_lag_us==0
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
if (mysrvc->aws_aurora_current_lag_us==0) {
|
||||
sum-=mysrvc->weight;
|
||||
TotalUsedConn-=mysrvc->ConnectionsUsed->conns_length();
|
||||
if (j < num_candidates-1) {
|
||||
mysrvcCandidates[j]=mysrvcCandidates[num_candidates-1];
|
||||
}
|
||||
num_candidates--;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sum==0) {
|
||||
// per issue #531 , we try a desperate attempt to bring back online any shunned server
|
||||
// we do this lowering the maximum wait time to 10%
|
||||
// most of the follow code is copied from few lines above
|
||||
time_t t;
|
||||
t=time(NULL);
|
||||
int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/10000 - 1 : mysql_thread___shun_recovery_time_sec/10 );
|
||||
if (max_wait_sec < 1) { // min wait time should be at least 1 second
|
||||
max_wait_sec = 1;
|
||||
}
|
||||
if (t - last_hg_log > 1) { // log this at most once per second to avoid spamming the logs
|
||||
last_hg_log = time(NULL);
|
||||
|
||||
if (gtid_trxid) {
|
||||
proxy_error("Hostgroup %u has no servers ready for GTID '%s:%ld'. Waiting for replication...\n", hid, gtid_uuid, gtid_trxid);
|
||||
} else {
|
||||
proxy_error("Hostgroup %u has no servers available%s! Checking servers shunned for more than %u second%s\n", hid,
|
||||
(max_connections_reached ? " or max_connections reached for all servers" : ""), max_wait_sec, max_wait_sec == 1 ? "" : "s");
|
||||
}
|
||||
}
|
||||
for (j=0; j<l; j++) {
|
||||
mysrvc=mysrvs->idx(j);
|
||||
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED && mysrvc->shunned_automatic==true) {
|
||||
if ((t - mysrvc->time_last_detected_error) > max_wait_sec) {
|
||||
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
|
||||
mysrvc->shunned_automatic=false;
|
||||
mysrvc->connect_ERR_at_time_last_detected_error=0;
|
||||
mysrvc->time_last_detected_error=0;
|
||||
// if a server is taken back online, consider it immediately
|
||||
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
|
||||
if (gtid_trxid) {
|
||||
if (MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid)) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
} else {
|
||||
if (max_lag_ms >= 0) {
|
||||
if ((unsigned int)max_lag_ms >= mysrvc->aws_aurora_current_lag_us/1000) {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
} else {
|
||||
sum+=mysrvc->weight;
|
||||
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
|
||||
mysrvcCandidates[num_candidates]=mysrvc;
|
||||
num_candidates++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sum==0) {
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n");
|
||||
if (l>32) {
|
||||
free(mysrvcCandidates);
|
||||
}
|
||||
#ifdef TEST_AURORA
|
||||
array_mysrvc_cands += num_candidates;
|
||||
#endif // TEST_AURORA
|
||||
return NULL; // if we reach here, we couldn't find any target
|
||||
}
|
||||
|
||||
/*
|
||||
unsigned int New_sum=0;
|
||||
unsigned int New_TotalUsedConn=0;
|
||||
// we will now scan again to ignore overloaded servers
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
unsigned int len=mysrvc->ConnectionsUsed->conns_length();
|
||||
if ((len * sum) <= (TotalUsedConn * mysrvc->weight * 1.5 + 1)) {
|
||||
|
||||
New_sum+=mysrvc->weight;
|
||||
New_TotalUsedConn+=len;
|
||||
} else {
|
||||
// remove the candidate
|
||||
if (j+1 < num_candidates) {
|
||||
mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1];
|
||||
}
|
||||
j--;
|
||||
num_candidates--;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
unsigned int New_sum=sum;
|
||||
|
||||
if (New_sum==0) {
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n");
|
||||
if (l>32) {
|
||||
free(mysrvcCandidates);
|
||||
}
|
||||
#ifdef TEST_AURORA
|
||||
array_mysrvc_cands += num_candidates;
|
||||
#endif // TEST_AURORA
|
||||
return NULL; // if we reach here, we couldn't find any target
|
||||
}
|
||||
|
||||
// latency awareness algorithm is enabled only when compiled with USE_MYSRVC_ARRAY
|
||||
if (sess && sess->thread->variables.min_num_servers_lantency_awareness) {
|
||||
if ((int) num_candidates >= sess->thread->variables.min_num_servers_lantency_awareness) {
|
||||
unsigned int servers_with_latency = 0;
|
||||
unsigned int total_latency_us = 0;
|
||||
// scan and verify that all servers have some latency
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
if (mysrvc->current_latency_us) {
|
||||
servers_with_latency++;
|
||||
total_latency_us += mysrvc->current_latency_us;
|
||||
}
|
||||
}
|
||||
if (servers_with_latency == num_candidates) {
|
||||
// all servers have some latency.
|
||||
// That is good. If any server have no latency, something is wrong
|
||||
// and we will skip this algorithm
|
||||
sess->thread->status_variables.stvar[st_var_ConnPool_get_conn_latency_awareness]++;
|
||||
unsigned int avg_latency_us = 0;
|
||||
avg_latency_us = total_latency_us/num_candidates;
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
if (mysrvc->current_latency_us > avg_latency_us) {
|
||||
// remove the candidate
|
||||
if (j+1 < num_candidates) {
|
||||
mysrvcCandidates[j] = mysrvcCandidates[num_candidates-1];
|
||||
}
|
||||
j--;
|
||||
num_candidates--;
|
||||
}
|
||||
}
|
||||
// we scan again to adjust weight
|
||||
New_sum = 0;
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
New_sum+=mysrvc->weight;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
unsigned int k;
|
||||
if (New_sum > 32768) {
|
||||
k=rand()%New_sum;
|
||||
} else {
|
||||
k=fastrand()%New_sum;
|
||||
}
|
||||
k++;
|
||||
New_sum=0;
|
||||
|
||||
for (j=0; j<num_candidates; j++) {
|
||||
mysrvc = mysrvcCandidates[j];
|
||||
New_sum+=mysrvc->weight;
|
||||
if (k<=New_sum) {
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC %p, server %s:%d\n", mysrvc, mysrvc->address, mysrvc->port);
|
||||
if (l>32) {
|
||||
free(mysrvcCandidates);
|
||||
}
|
||||
#ifdef TEST_AURORA
|
||||
array_mysrvc_cands += num_candidates;
|
||||
#endif // TEST_AURORA
|
||||
return mysrvc;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
time_t t = time(NULL);
|
||||
|
||||
if (t - last_hg_log > 1) {
|
||||
last_hg_log = time(NULL);
|
||||
proxy_error("Hostgroup %u has no servers available!\n", hid);
|
||||
}
|
||||
}
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL\n");
|
||||
if (l>32) {
|
||||
free(mysrvcCandidates);
|
||||
}
|
||||
#ifdef TEST_AURORA
|
||||
array_mysrvc_cands += num_candidates;
|
||||
#endif // TEST_AURORA
|
||||
return NULL; // if we reach here, we couldn't find any target
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,198 @@
|
||||
#include "MySQL_HostGroups_Manager.h"
|
||||
/*
|
||||
#include "proxysql.h"
|
||||
#include "cpp.h"
|
||||
|
||||
#include "MySQL_PreparedStatement.h"
|
||||
#include "MySQL_Data_Stream.h"
|
||||
|
||||
#include <memory>
|
||||
#include <pthread.h>
|
||||
#include <string>
|
||||
|
||||
#include <prometheus/counter.h>
|
||||
#include <prometheus/detail/builder.h>
|
||||
#include <prometheus/family.h>
|
||||
#include <prometheus/gauge.h>
|
||||
|
||||
#include "prometheus_helpers.h"
|
||||
#include "proxysql_utils.h"
|
||||
|
||||
#define char_malloc (char *)malloc
|
||||
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
|
||||
|
||||
#include "thread.h"
|
||||
#include "wqueue.h"
|
||||
|
||||
#include "ev.h"
|
||||
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <type_traits>
|
||||
|
||||
using std::function;
|
||||
|
||||
#ifdef TEST_AURORA
|
||||
static unsigned long long array_mysrvc_total = 0;
|
||||
static unsigned long long array_mysrvc_cands = 0;
|
||||
#endif // TEST_AURORA
|
||||
|
||||
#define SAFE_SQLITE3_STEP(_stmt) do {\
|
||||
do {\
|
||||
rc=(*proxy_sqlite3_step)(_stmt);\
|
||||
if (rc!=SQLITE_DONE) {\
|
||||
assert(rc==SQLITE_LOCKED);\
|
||||
usleep(100);\
|
||||
}\
|
||||
} while (rc!=SQLITE_DONE);\
|
||||
} while (0)
|
||||
|
||||
extern ProxySQL_Admin *GloAdmin;
|
||||
|
||||
extern MySQL_Threads_Handler *GloMTH;
|
||||
|
||||
extern MySQL_Monitor *GloMyMon;
|
||||
*/
|
||||
|
||||
class MySrvConnList;
|
||||
class MySrvC;
|
||||
class MySrvList;
|
||||
class MyHGC;
|
||||
|
||||
MySrvC::MySrvC(
|
||||
char* add, uint16_t p, uint16_t gp, int64_t _weight, enum MySerStatus _status, unsigned int _compression,
|
||||
int64_t _max_connections, unsigned int _max_replication_lag, int32_t _use_ssl, unsigned int _max_latency_ms,
|
||||
char* _comment
|
||||
) {
|
||||
address=strdup(add);
|
||||
port=p;
|
||||
gtid_port=gp;
|
||||
weight=_weight;
|
||||
status=_status;
|
||||
compression=_compression;
|
||||
max_connections=_max_connections;
|
||||
max_replication_lag=_max_replication_lag;
|
||||
use_ssl=_use_ssl;
|
||||
cur_replication_lag=0;
|
||||
cur_replication_lag_count=0;
|
||||
max_latency_us=_max_latency_ms*1000;
|
||||
current_latency_us=0;
|
||||
aws_aurora_current_lag_us = 0;
|
||||
connect_OK=0;
|
||||
connect_ERR=0;
|
||||
queries_sent=0;
|
||||
bytes_sent=0;
|
||||
bytes_recv=0;
|
||||
max_connections_used=0;
|
||||
queries_gtid_sync=0;
|
||||
time_last_detected_error=0;
|
||||
connect_ERR_at_time_last_detected_error=0;
|
||||
shunned_automatic=false;
|
||||
shunned_and_kill_all_connections=false; // false to default
|
||||
//charset=_charset;
|
||||
myhgc=NULL;
|
||||
comment=strdup(_comment);
|
||||
ConnectionsUsed=new MySrvConnList(this);
|
||||
ConnectionsFree=new MySrvConnList(this);
|
||||
}
|
||||
|
||||
void MySrvC::connect_error(int err_num, bool get_mutex) {
|
||||
// NOTE: this function operates without any mutex
|
||||
// although, it is not extremely important if any counter is lost
|
||||
// as a single connection failure won't make a significant difference
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 5, "Connect failed with code '%d'\n", err_num);
|
||||
__sync_fetch_and_add(&connect_ERR,1);
|
||||
__sync_fetch_and_add(&MyHGM->status.server_connections_aborted,1);
|
||||
if (err_num >= 1048 && err_num <= 1052)
|
||||
return;
|
||||
if (err_num >= 1054 && err_num <= 1075)
|
||||
return;
|
||||
if (err_num >= 1099 && err_num <= 1104)
|
||||
return;
|
||||
if (err_num >= 1106 && err_num <= 1113)
|
||||
return;
|
||||
if (err_num >= 1116 && err_num <= 1118)
|
||||
return;
|
||||
if (err_num == 1136 || (err_num >= 1138 && err_num <= 1149))
|
||||
return;
|
||||
switch (err_num) {
|
||||
case 1007: // Can't create database
|
||||
case 1008: // Can't drop database
|
||||
case 1044: // access denied
|
||||
case 1045: // access denied
|
||||
/*
|
||||
case 1048: // Column cannot be null
|
||||
case 1049: // Unknown database
|
||||
case 1050: // Table already exists
|
||||
case 1051: // Unknown table
|
||||
case 1052: // Column is ambiguous
|
||||
*/
|
||||
case 1120:
|
||||
case 1203: // User %s already has more than 'max_user_connections' active connections
|
||||
case 1226: // User '%s' has exceeded the '%s' resource (current value: %ld)
|
||||
case 3118: // Access denied for user '%s'. Account is locked..
|
||||
return;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
time_t t=time(NULL);
|
||||
if (t > time_last_detected_error) {
|
||||
time_last_detected_error=t;
|
||||
connect_ERR_at_time_last_detected_error=1;
|
||||
} else {
|
||||
if (t < time_last_detected_error) {
|
||||
// time_last_detected_error is in the future
|
||||
// this means that monitor has a ping interval too big and tuned that in the future
|
||||
return;
|
||||
}
|
||||
// same time
|
||||
/**
|
||||
* @brief The expected configured retries set by 'mysql-connect_retries_on_failure' + '2' extra expected
|
||||
* connection errors.
|
||||
* @details This two extra connections errors are expected:
|
||||
* 1. An initial connection error generated by the datastream and the connection when being created,
|
||||
* this is, right after the session has requested a connection to the connection pool. This error takes
|
||||
* places directly in the state machine from 'MySQL_Connection'. Because of this, we consider this
|
||||
* additional error to be a consequence of the two states machines, and it's not considered for
|
||||
* 'connect_retries'.
|
||||
* 2. A second connection connection error, which is the initial connection error generated by 'MySQL_Session'
|
||||
* when already in the 'CONNECTING_SERVER' state. This error is an 'extra error' to always consider, since
|
||||
* it's not part of the retries specified by 'mysql_thread___connect_retries_on_failure', thus, we set the
|
||||
* 'connect_retries' to be 'mysql_thread___connect_retries_on_failure + 1'.
|
||||
*/
|
||||
int connect_retries = mysql_thread___connect_retries_on_failure + 1;
|
||||
int max_failures = mysql_thread___shun_on_failures > connect_retries ? connect_retries : mysql_thread___shun_on_failures;
|
||||
|
||||
if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) {
|
||||
bool _shu=false;
|
||||
if (get_mutex==true)
|
||||
MyHGM->wrlock(); // to prevent race conditions, lock here. See #627
|
||||
if (status==MYSQL_SERVER_STATUS_ONLINE) {
|
||||
status=MYSQL_SERVER_STATUS_SHUNNED;
|
||||
shunned_automatic=true;
|
||||
_shu=true;
|
||||
} else {
|
||||
_shu=false;
|
||||
}
|
||||
if (get_mutex==true)
|
||||
MyHGM->wrunlock();
|
||||
if (_shu) {
|
||||
proxy_error("Shunning server %s:%d with %u errors/sec. Shunning for %u seconds\n", address, port, connect_ERR_at_time_last_detected_error , mysql_thread___shun_recovery_time_sec);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void MySrvC::shun_and_killall() {
|
||||
status=MYSQL_SERVER_STATUS_SHUNNED;
|
||||
shunned_automatic=true;
|
||||
shunned_and_kill_all_connections=true;
|
||||
}
|
||||
|
||||
MySrvC::~MySrvC() {
|
||||
if (address) free(address);
|
||||
if (comment) free(comment);
|
||||
delete ConnectionsUsed;
|
||||
delete ConnectionsFree;
|
||||
}
|
||||
@ -0,0 +1,256 @@
|
||||
#include "MySQL_HostGroups_Manager.h"
|
||||
|
||||
#include "MySQL_Data_Stream.h"
|
||||
|
||||
extern ProxySQL_Admin *GloAdmin;
|
||||
|
||||
extern MySQL_Threads_Handler *GloMTH;
|
||||
|
||||
extern MySQL_Monitor *GloMyMon;
|
||||
|
||||
class MySrvConnList;
|
||||
class MySrvC;
|
||||
class MySrvList;
|
||||
class MyHGC;
|
||||
|
||||
MySQL_Connection *MySrvConnList::index(unsigned int _k) {
|
||||
return (MySQL_Connection *)conns->index(_k);
|
||||
}
|
||||
|
||||
MySQL_Connection * MySrvConnList::remove(int _k) {
|
||||
return (MySQL_Connection *)conns->remove_index_fast(_k);
|
||||
}
|
||||
|
||||
MySrvConnList::MySrvConnList(MySrvC *_mysrvc) {
|
||||
mysrvc=_mysrvc;
|
||||
conns=new PtrArray();
|
||||
}
|
||||
|
||||
void MySrvConnList::add(MySQL_Connection *c) {
|
||||
conns->add(c);
|
||||
}
|
||||
|
||||
MySrvConnList::~MySrvConnList() {
|
||||
mysrvc=NULL;
|
||||
while (conns_length()) {
|
||||
MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0);
|
||||
delete conn;
|
||||
}
|
||||
delete conns;
|
||||
}
|
||||
|
||||
void MySrvConnList::drop_all_connections() {
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Dropping all connections (%u total) on MySrvConnList %p for server %s:%d , hostgroup=%d , status=%d\n", conns_length(), this, mysrvc->address, mysrvc->port, mysrvc->myhgc->hid, mysrvc->status);
|
||||
while (conns_length()) {
|
||||
MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0);
|
||||
delete conn;
|
||||
}
|
||||
}
|
||||
|
||||
void MySrvConnList::get_random_MyConn_inner_search(unsigned int start, unsigned int end, unsigned int& conn_found_idx, unsigned int& connection_quality_level, unsigned int& number_of_matching_session_variables, const MySQL_Connection * client_conn) {
|
||||
char *schema = client_conn->userinfo->schemaname;
|
||||
MySQL_Connection * conn=NULL;
|
||||
unsigned int k;
|
||||
for (k = start; k < end; k++) {
|
||||
conn = (MySQL_Connection *)conns->index(k);
|
||||
if (conn->match_tracked_options(client_conn)) {
|
||||
if (connection_quality_level == 0) {
|
||||
// this is our best candidate so far
|
||||
connection_quality_level = 1;
|
||||
conn_found_idx = k;
|
||||
}
|
||||
if (conn->requires_CHANGE_USER(client_conn)==false) {
|
||||
if (connection_quality_level == 1) {
|
||||
// this is our best candidate so far
|
||||
connection_quality_level = 2;
|
||||
conn_found_idx = k;
|
||||
}
|
||||
unsigned int cnt_match = 0; // number of matching session variables
|
||||
unsigned int not_match = 0; // number of not matching session variables
|
||||
cnt_match = conn->number_of_matching_session_variables(client_conn, not_match);
|
||||
if (strcmp(conn->userinfo->schemaname,schema)==0) {
|
||||
cnt_match++;
|
||||
} else {
|
||||
not_match++;
|
||||
}
|
||||
if (not_match==0) {
|
||||
// it seems we found the perfect connection
|
||||
number_of_matching_session_variables = cnt_match;
|
||||
connection_quality_level = 3;
|
||||
conn_found_idx = k;
|
||||
return; // exit immediately, we found the perfect connection
|
||||
} else {
|
||||
// we didn't find the perfect connection
|
||||
// but maybe is better than what we have so far?
|
||||
if (cnt_match > number_of_matching_session_variables) {
|
||||
// this is our best candidate so far
|
||||
number_of_matching_session_variables = cnt_match;
|
||||
conn_found_idx = k;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (connection_quality_level == 1) {
|
||||
int rca = mysql_thread___reset_connection_algorithm;
|
||||
if (rca==1) {
|
||||
int ql = GloMTH->variables.connpoll_reset_queue_length;
|
||||
if (ql==0) {
|
||||
// if:
|
||||
// mysql-reset_connection_algorithm=1 and
|
||||
// mysql-connpoll_reset_queue_length=0
|
||||
// we will not return a connection with connection_quality_level == 1
|
||||
// because we want to run COM_CHANGE_USER
|
||||
// This change was introduced to work around Galera bug
|
||||
// https://github.com/codership/galera/issues/613
|
||||
connection_quality_level = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff) {
|
||||
MySQL_Connection * conn=NULL;
|
||||
unsigned int i;
|
||||
unsigned int conn_found_idx;
|
||||
unsigned int l=conns_length();
|
||||
unsigned int connection_quality_level = 0;
|
||||
bool needs_warming = false;
|
||||
// connection_quality_level:
|
||||
// 0 : not found any good connection, tracked options are not OK
|
||||
// 1 : tracked options are OK , but CHANGE USER is required
|
||||
// 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
|
||||
// 3 : tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required
|
||||
unsigned int number_of_matching_session_variables = 0; // this includes session variables AND schema
|
||||
bool connection_warming = mysql_thread___connection_warming;
|
||||
int free_connections_pct = mysql_thread___free_connections_pct;
|
||||
if (mysrvc->myhgc->attributes.configured == true) {
|
||||
// mysql_hostgroup_attributes takes priority
|
||||
connection_warming = mysrvc->myhgc->attributes.connection_warming;
|
||||
free_connections_pct = mysrvc->myhgc->attributes.free_connections_pct;
|
||||
}
|
||||
if (connection_warming == true) {
|
||||
unsigned int total_connections = mysrvc->ConnectionsFree->conns_length()+mysrvc->ConnectionsUsed->conns_length();
|
||||
unsigned int expected_warm_connections = free_connections_pct*mysrvc->max_connections/100;
|
||||
if (total_connections < expected_warm_connections) {
|
||||
needs_warming = true;
|
||||
}
|
||||
}
|
||||
if (l && ff==false && needs_warming==false) {
|
||||
if (l>32768) {
|
||||
i=rand()%l;
|
||||
} else {
|
||||
i=fastrand()%l;
|
||||
}
|
||||
if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) {
|
||||
MySQL_Connection * client_conn = sess->client_myds->myconn;
|
||||
get_random_MyConn_inner_search(i, l, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);
|
||||
if (connection_quality_level !=3 ) { // we didn't find the perfect connection
|
||||
get_random_MyConn_inner_search(0, i, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);
|
||||
}
|
||||
// connection_quality_level:
|
||||
// 1 : tracked options are OK , but CHANGE USER is required
|
||||
// 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
|
||||
switch (connection_quality_level) {
|
||||
case 0: // not found any good connection, tracked options are not OK
|
||||
// we must check if connections need to be freed before
|
||||
// creating a new connection
|
||||
{
|
||||
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
|
||||
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
|
||||
unsigned int pct_max_connections = (3 * mysrvc->max_connections) / 4;
|
||||
unsigned int connections_to_free = 0;
|
||||
|
||||
if (conns_free >= 1) {
|
||||
// connection cleanup is triggered when connections exceed 3/4 of the total
|
||||
// allowed max connections, this cleanup ensures that at least *one connection*
|
||||
// will be freed.
|
||||
if (pct_max_connections <= (conns_free + conns_used)) {
|
||||
connections_to_free = (conns_free + conns_used) - pct_max_connections;
|
||||
if (connections_to_free == 0) connections_to_free = 1;
|
||||
}
|
||||
|
||||
while (conns_free && connections_to_free) {
|
||||
MySQL_Connection* conn = mysrvc->ConnectionsFree->remove(0);
|
||||
delete conn;
|
||||
|
||||
conns_free = mysrvc->ConnectionsFree->conns_length();
|
||||
connections_to_free -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
// we must create a new connection
|
||||
conn = new MySQL_Connection();
|
||||
conn->parent=mysrvc;
|
||||
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
|
||||
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);
|
||||
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
|
||||
}
|
||||
break;
|
||||
case 1: //tracked options are OK , but CHANGE USER is required
|
||||
// we may consider creating a new connection
|
||||
{
|
||||
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
|
||||
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
|
||||
if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) {
|
||||
conn = new MySQL_Connection();
|
||||
conn->parent=mysrvc;
|
||||
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
|
||||
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);
|
||||
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
|
||||
} else {
|
||||
conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 2: // tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
|
||||
case 3: // tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required
|
||||
// here we return the best connection we have, no matter if connection_quality_level is 2 or 3
|
||||
conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx);
|
||||
break;
|
||||
default: // this should never happen
|
||||
// LCOV_EXCL_START
|
||||
assert(0);
|
||||
break;
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
} else {
|
||||
conn=(MySQL_Connection *)conns->remove_index_fast(i);
|
||||
}
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
|
||||
return conn;
|
||||
} else {
|
||||
unsigned long long curtime = monotonic_time();
|
||||
curtime = curtime / 1000 / 1000; // convert to second
|
||||
MyHGC *_myhgc = mysrvc->myhgc;
|
||||
if (curtime > _myhgc->current_time_now) {
|
||||
_myhgc->current_time_now = curtime;
|
||||
_myhgc->new_connections_now = 0;
|
||||
}
|
||||
_myhgc->new_connections_now++;
|
||||
unsigned int throttle_connections_per_sec_to_hostgroup = (unsigned int) mysql_thread___throttle_connections_per_sec_to_hostgroup;
|
||||
if (_myhgc->attributes.configured == true) {
|
||||
// mysql_hostgroup_attributes takes priority
|
||||
throttle_connections_per_sec_to_hostgroup = _myhgc->attributes.throttle_connections_per_sec;
|
||||
}
|
||||
if (_myhgc->new_connections_now > (unsigned int) throttle_connections_per_sec_to_hostgroup) {
|
||||
__sync_fetch_and_add(&MyHGM->status.server_connections_delayed, 1);
|
||||
return NULL;
|
||||
} else {
|
||||
conn = new MySQL_Connection();
|
||||
conn->parent=mysrvc;
|
||||
// if attributes.multiplex == true , STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG is set to false. And vice-versa
|
||||
conn->set_status(!conn->parent->myhgc->attributes.multiplex, STATUS_MYSQL_CONNECTION_NO_MULTIPLEX_HG);
|
||||
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
|
||||
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
return NULL; // never reach here
|
||||
}
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
#include "MySQL_HostGroups_Manager.h"
|
||||
|
||||
class MySrvConnList;
|
||||
class MySrvC;
|
||||
class MySrvList;
|
||||
class MyHGC;
|
||||
|
||||
MySrvList::MySrvList(MyHGC *_myhgc) {
|
||||
myhgc=_myhgc;
|
||||
servers=new PtrArray();
|
||||
}
|
||||
|
||||
void MySrvList::add(MySrvC *s) {
|
||||
if (s->myhgc==NULL) {
|
||||
s->myhgc=myhgc;
|
||||
}
|
||||
servers->add(s);
|
||||
}
|
||||
|
||||
|
||||
int MySrvList::find_idx(MySrvC *s) {
|
||||
for (unsigned int i=0; i<servers->len; i++) {
|
||||
MySrvC *mysrv=(MySrvC *)servers->index(i);
|
||||
if (mysrv==s) {
|
||||
return (unsigned int)i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
void MySrvList::remove(MySrvC *s) {
|
||||
int i=find_idx(s);
|
||||
assert(i>=0);
|
||||
servers->remove_index_fast((unsigned int)i);
|
||||
}
|
||||
|
||||
MySrvList::~MySrvList() {
|
||||
myhgc=NULL;
|
||||
while (servers->len) {
|
||||
MySrvC *mysrvc=(MySrvC *)servers->remove_index_fast(0);
|
||||
delete mysrvc;
|
||||
}
|
||||
delete servers;
|
||||
}
|
||||
Loading…
Reference in new issue