* Added total poll timeout in case poll is waiting on stale connection.

* Initialized few uninitialized variable.
pull/4063/head
Rahim Kanji 3 years ago
parent 7fa44cedc1
commit ebc7ebf501

@ -229,6 +229,7 @@ class MySQL_Monitor_State_Data {
MySQL_Monitor_State_Data(MySQL_Monitor_State_Data_Task_Type task_type, char* h, int p, bool _use_ssl = 0, int g = 0);
~MySQL_Monitor_State_Data();
void init_async();
void mark_task_as_timeout();
bool create_new_connection();
int async_exit_status;

@ -497,6 +497,15 @@ MySQL_Monitor_State_Data::MySQL_Monitor_State_Data(MySQL_Monitor_State_Data_Task
interr = 0;
timeout_ = 0;
async_state_machine_ = ASYNC_IDLE;
writer_hostgroup = 0;
writer_is_also_reader = 0;
max_transactions_behind = 0;
max_transactions_behind_count = 0;
aws_aurora_max_lag_ms = 0;
aws_aurora_check_timeout_ms = 0;
aws_aurora_add_lag_ms = 0;
aws_aurora_min_lag_ms = 0;
aws_aurora_lag_num_checks = 0;
}
MySQL_Monitor_State_Data::~MySQL_Monitor_State_Data() {
@ -619,6 +628,23 @@ void MySQL_Monitor_State_Data::init_async() {
}
}
void MySQL_Monitor_State_Data::mark_task_as_timeout() {
task_result_ = MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_TIMEOUT;
if (mysql_error_msg)
free(mysql_error_msg);
if (task_id_ == MON_PING) {
async_state_machine_ = ASYNC_PING_TIMEOUT;
mysql_error_msg = strdup("timeout during ping");
}
else {
async_state_machine_ = ASYNC_STORE_RESULT_TIMEOUT;
mysql_error_msg = strdup("timeout check");
}
}
void * monitor_connect_pthread(void *arg) {
#ifndef NOJEM
bool cache=false;
@ -2852,6 +2878,7 @@ __sleep_monitor_connect_loop:
}
void * MySQL_Monitor::monitor_ping() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
// struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
@ -3112,6 +3139,7 @@ VALGRIND_ENABLE_ERROR_REPORTING;
}
void * MySQL_Monitor::monitor_read_only() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
@ -3210,6 +3238,7 @@ __sleep_monitor_read_only:
}
void * MySQL_Monitor::monitor_group_replication() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
// struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
@ -3323,6 +3352,7 @@ __sleep_monitor_group_replication:
return NULL;
}
void * MySQL_Monitor::monitor_galera() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
// struct event_base *libevent_base;
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
@ -3396,6 +3426,7 @@ __sleep_monitor_galera:
}
void * MySQL_Monitor::monitor_replication_lag() {
mysql_close(mysql_init(NULL));
// initialize the MySQL Thread (note: this is not a real thread, just the structures associated with it)
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
@ -5869,44 +5900,44 @@ mysql_status(short event) {
class Monitor_Poll {
public:
Monitor_Poll() {
len = 0;
size = MIN_POLL_LEN;
fds = (struct pollfd*)malloc(size * sizeof(struct pollfd));
mmsds = (MySQL_Monitor_State_Data**)malloc(size * sizeof(MySQL_Monitor_State_Data*));
len_ = 0;
size_ = MIN_POLL_LEN;
fds_ = (struct pollfd*)malloc(size_ * sizeof(struct pollfd));
mmsds_ = (MySQL_Monitor_State_Data**)malloc(size_ * sizeof(MySQL_Monitor_State_Data*));
}
~Monitor_Poll() {
free(fds);
free(mmsds);
free(fds_);
free(mmsds_);
}
void shrink() {
unsigned int new_size = near_pow_2(len + 1);
fds = (struct pollfd*)realloc(fds, new_size * sizeof(struct pollfd));
mmsds = (MySQL_Monitor_State_Data**)realloc(mmsds, new_size * sizeof(MySQL_Monitor_State_Data*));
size = new_size;
unsigned int new_size = near_pow_2(len_ + 1);
fds_ = (struct pollfd*)realloc(fds_, new_size * sizeof(struct pollfd));
mmsds_ = (MySQL_Monitor_State_Data**)realloc(mmsds_, new_size * sizeof(MySQL_Monitor_State_Data*));
size_ = new_size;
}
void expand(unsigned int more) {
if ((len + more) > size) {
unsigned int new_size = near_pow_2(len + more);
fds = (struct pollfd*)realloc(fds, new_size * sizeof(struct pollfd));
mmsds = (MySQL_Monitor_State_Data**)realloc(mmsds, new_size * sizeof(MySQL_Monitor_State_Data*));
size = new_size;
if ((len_ + more) > size_) {
unsigned int new_size = near_pow_2(len_ + more);
fds_ = (struct pollfd*)realloc(fds_, new_size * sizeof(struct pollfd));
mmsds_ = (MySQL_Monitor_State_Data**)realloc(mmsds_, new_size * sizeof(MySQL_Monitor_State_Data*));
size_ = new_size;
}
}
void add(short _events, MySQL_Monitor_State_Data* mmsd) {
assert(mmsd->mysql);
if (len == size) {
if (len_ == size_) {
expand(1);
}
fds[len].fd = mysql_get_socket(mmsd->mysql);
fds[len].events = _events;
fds[len].revents = 0;
mmsds[len] = mmsd;
len++;
fds_[len_].fd = mysql_get_socket(mmsd->mysql);
fds_[len_].events = _events;
fds_[len_].revents = 0;
mmsds_[len_] = mmsd;
len_++;
mmsd->init_async();
mmsd->task_handler(-1, _events);
@ -5915,22 +5946,22 @@ public:
void remove_index_fast(unsigned int i) {
if ((int)i == -1) return;
if (i != (len - 1)) {
fds[i].fd = fds[len - 1].fd;
fds[i].events = fds[len - 1].events;
fds[i].revents = fds[len - 1].revents;
mmsds[i] = mmsds[len - 1];
if (i != (len_ - 1)) {
fds_[i].fd = fds_[len_ - 1].fd;
fds_[i].events = fds_[len_ - 1].events;
fds_[i].revents = fds_[len_ - 1].revents;
mmsds_[i] = mmsds_[len_ - 1];
}
len--;
if ((len > MIN_POLL_LEN) && (size > len * MIN_POLL_DELETE_RATIO)) {
len_--;
if ((len_ > MIN_POLL_LEN) && (size_ > len_ * MIN_POLL_DELETE_RATIO)) {
shrink();
}
}
int find_index(int fd) {
unsigned int i;
for (i = 0; i < len; i++) {
if (fds[i].fd == fd) {
for (i = 0; i < len_; i++) {
if (fds_[i].fd == fd) {
return i;
}
}
@ -5938,20 +5969,22 @@ public:
}
bool event_loop(int poll_timeout) {
bool event_loop(int poll_timeout_ms) {
int rc = 0;
while (len || GloMyMon->shutdown == true) {
poll_timeout /= len;
if (count() == 0)
return false;
if (poll_timeout < 1000)
poll_timeout = 1000;
int rc = 0;
auto t1 = 0;
auto t2 = monotonic_time()/1000;
rc = poll(fds, len, poll_timeout);
while (len_ || GloMyMon->shutdown == true) {
if (rc == 0)
continue;
rc = poll(fds_, len_, poll_timeout_ms);
if (rc == 0) {
break;
}
if (rc == -1) {
if (errno == EINTR) {
@ -5962,28 +5995,42 @@ public:
}
}
for (unsigned int i = 0; i < len;) {
for (unsigned int i = 0; i < len_;) {
if (fds[i].revents != 0) {
if (fds_[i].revents != 0) {
if (mmsds[i]->task_handler(fds[i].revents, fds[i].events) == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING) {
assert(fds[i].events != 0);
if (mmsds_[i]->task_handler(fds_[i].revents, fds_[i].events) == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_PENDING) {
assert(fds_[i].events != 0);
}
else {
remove_index_fast(i);
continue;
}
}
fds[i].revents = 0;
fds_[i].revents = 0;
i++;
}
t1 = t2;
t2 = monotonic_time()/1000;
poll_timeout_ms -= static_cast<int>(t2 - t1);
if (poll_timeout_ms <= 0) {
break;
}
}
for (unsigned int i = 0; i < len_; i++) {
mmsds_[i]->mark_task_as_timeout();
}
return true;
}
unsigned int len;
unsigned int size;
inline
unsigned int count() const {
return len_;
}
private:
static unsigned int near_pow_2(unsigned int n) {
@ -5992,8 +6039,10 @@ private:
return i ? i : n;
}
struct pollfd* fds;
MySQL_Monitor_State_Data** mmsds;
unsigned int len_;
unsigned int size_;
struct pollfd* fds_;
MySQL_Monitor_State_Data** mmsds_;
};
MySQL_Monitor_State_Data_Task_Result MySQL_Monitor_State_Data::ping_handler(short event_, short& wait_event) {
@ -6091,7 +6140,7 @@ void MySQL_Monitor::monitor_ping_async(SQLite3DB* monitordb, SQLite3_result* res
if (shutdown) return;
}
if (monitor_poll.event_loop(mysql_thread___monitor_ping_interval / 2) == false) {
if (monitor_poll.event_loop(mysql_thread___monitor_ping_interval) == false) {
return;
}
@ -6323,7 +6372,7 @@ void MySQL_Monitor::monitor_read_only_async(SQLite3DB* monitordb, SQLite3_result
if (shutdown) return;
}
if (monitor_poll.event_loop(mysql_thread___monitor_read_only_interval/2) == false) {
if (monitor_poll.event_loop(mysql_thread___monitor_read_only_interval) == false) {
return;
}
@ -6513,7 +6562,7 @@ void MySQL_Monitor::monitor_group_replication_async(SQLite3DB* monitordb) {
}
pthread_mutex_unlock(&group_replication_mutex);
if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_interval/2) == false) {
if (monitor_poll.event_loop(mysql_thread___monitor_groupreplication_healthcheck_interval) == false) {
return;
}
@ -6786,7 +6835,7 @@ void MySQL_Monitor::monitor_replication_lag_async(SQLite3DB* monitordb, SQLite3_
if (shutdown) return;
}
if (monitor_poll.event_loop(mysql_thread___monitor_replication_lag_interval/2) == false) {
if (monitor_poll.event_loop(mysql_thread___monitor_replication_lag_interval) == false) {
return;
}
@ -6953,7 +7002,7 @@ void MySQL_Monitor::monitor_galera_async(SQLite3DB* monitordb) {
}
pthread_mutex_unlock(&galera_mutex);
if (monitor_poll.event_loop(mysql_thread___monitor_galera_healthcheck_interval/2) == false) {
if (monitor_poll.event_loop(mysql_thread___monitor_galera_healthcheck_interval) == false) {
return;
}
@ -6961,9 +7010,21 @@ void MySQL_Monitor::monitor_galera_async(SQLite3DB* monitordb) {
const auto task_result = mmsd->get_task_result();
if (task_result == MySQL_Monitor_State_Data_Task_Result::TASK_RESULT_SUCCESS) {
#ifdef TEST_GALERA
if (rand() % 3 == 0) { // drop the connection once every 3 checks
My_Conn_Pool->conn_unregister(mmsd.get());
mysql_close(mmsd->mysql);
mmsd->mysql = NULL;
}
else {
My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
mmsd->mysql = NULL;
}
#else
My_Conn_Pool->put_connection(mmsd->hostname, mmsd->port, mmsd->mysql);
mmsd->mysql = NULL;
#endif
if (shutdown == true) {
goto __fast_exit_monitor_galera_thread;
}

Loading…
Cancel
Save