Added Replication Lag simulator

pull/4144/head
Rahim Kanji 3 years ago
parent 0d86d0aa48
commit 380e582075

@ -88,6 +88,9 @@ testgrouprep: build_deps_debug build_lib_testgrouprep build_src_testgrouprep
.PHONY: testreadonly
testreadonly: build_deps_debug build_lib_testreadonly build_src_testreadonly
.PHONY: testreplicationlag
testreplicationlag: build_deps_debug build_lib_testreplicationlag build_src_testreplicationlag
.PHONY: testall
testall: build_deps_debug build_lib_testall build_src_testall
@ -151,13 +154,21 @@ build_src_testreadonly: build_deps build_lib_testreadonly
build_lib_testreadonly: build_deps_debug
cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_READONLY" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_src_testreplicationlag
build_src_testreplicationlag: build_deps build_lib_testreplicationlag
cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_REPLICATIONLAG" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_lib_testreplicationlag
build_lib_testreplicationlag: build_deps_debug
cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_REPLICATIONLAG" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_src_testall
build_src_testall: build_deps build_lib_testall
cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_GALERA -DTEST_GROUPREP -DTEST_READONLY" CC=${CC} CXX=${CXX} ${MAKE}
cd src && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_GALERA -DTEST_GROUPREP -DTEST_READONLY -DTEST_REPLICATIONLAG" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_lib_testall
build_lib_testall: build_deps_debug
cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_GALERA -DTEST_GROUPREP -DTEST_READONLY" CC=${CC} CXX=${CXX} ${MAKE}
cd lib && OPTZ="${O0} -ggdb -DDEBUG -DTEST_AURORA -DTEST_GALERA -DTEST_GROUPREP -DTEST_READONLY -DTEST_REPLICATIONLAG" CC=${CC} CXX=${CXX} ${MAKE}
.PHONY: build_tap_test
build_tap_test: build_src

@ -54,7 +54,11 @@ class SQLite3_Server {
std::unordered_map<std::string, bool> readonly_map;
std::vector<table_def_t *> *tables_defs_readonly;
#endif // TEST_READONLY
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#ifdef TEST_REPLICATIONLAG
std::unordered_map<std::string, int> replicationlag_map;
std::vector<table_def_t*>* tables_defs_replicationlag;
#endif // TEST_REPLICATIONLAG
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
void insert_into_tables_defs(std::vector<table_def_t *> *, const char *table_name, const char *table_def);
void drop_tables_defs(std::vector<table_def_t *> *tables_defs);
void check_and_build_standard_tables(SQLite3DB *db, std::vector<table_def_t *> *tables_defs);
@ -92,6 +96,14 @@ class SQLite3_Server {
return readonly_map.size();
}
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
pthread_mutex_t test_replicationlag_mutex;
void load_replicationlag_table(MySQL_Session* sess);
int replicationlag_test_value(const char* p);
int replicationlag_map_size() {
return replicationlag_map.size();
}
#endif // TEST_REPLICATIONLAG
SQLite3_Server();
~SQLite3_Server();
char **get_variables_list();

@ -538,6 +538,10 @@ class ProxySQL_Admin {
void enable_readonly_testing();
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
void enable_replicationlag_testing();
#endif // TEST_REPLICATIONLAG
unsigned int ProxySQL_Test___GenerateRandom_mysql_query_rules_fast_routing(unsigned int, bool);
bool ProxySQL_Test___Verify_mysql_query_rules_fast_routing(int *ret1, int *ret2, int cnt, int dual);
void ProxySQL_Test___MySQL_HostGroups_Manager_generate_many_clusters();

@ -632,6 +632,10 @@ void MySQL_Monitor_State_Data::init_async() {
break;
case MON_REPLICATION_LAG:
async_state_machine_ = ASYNC_QUERY_START;
#ifdef TEST_REPLICATIONLAG
query_ = "SELECT SLAVE STATUS "; // replaced SHOW with SELECT to avoid breaking simulator logic
query_ += std::string(hostname) + ":" + std::to_string(port);
#else
if (mysql_thread___monitor_replication_lag_use_percona_heartbeat &&
mysql_thread___monitor_replication_lag_use_percona_heartbeat[0] != '\0') {
use_percona_heartbeat = true;
@ -640,6 +644,7 @@ void MySQL_Monitor_State_Data::init_async() {
} else {
query_ = "SHOW SLAVE STATUS";
}
#endif
task_timeout_ = mysql_thread___monitor_replication_lag_timeout;
task_handler_ = &MySQL_Monitor_State_Data::replication_lag_handler;
break;
@ -2617,6 +2622,14 @@ void * monitor_replication_lag_thread(void *arg) {
mmsd->t1=monotonic_time();
mmsd->interr=0; // reset the value
#ifdef TEST_REPLICATIONLAG
{
std::string s = "SELECT SLAVE STATUS "; // replaced SHOW with SELECT to avoid breaking simulator logic
s += std::string(mmsd->hostname) + ":" + std::to_string(mmsd->port);
mmsd->async_exit_status = mysql_query_start(&mmsd->interr, mmsd->mysql, s.c_str());
}
#else
if (percona_heartbeat_table) {
int l = strlen(percona_heartbeat_table);
if (l) {
@ -2631,6 +2644,7 @@ void * monitor_replication_lag_thread(void *arg) {
if (use_percona_heartbeat == false) {
mmsd->async_exit_status=mysql_query_start(&mmsd->interr,mmsd->mysql,"SHOW SLAVE STATUS");
}
#endif // TEST_REPLICATIONLAG
while (mmsd->async_exit_status) {
mmsd->async_exit_status=wait_for_mysql(mmsd->mysql, mmsd->async_exit_status);
#ifdef DEBUG
@ -2721,13 +2735,18 @@ __exit_monitor_replication_lag_thread:
int j=-1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
#ifdef TEST_REPLICATIONLAG
if (fields && num_fields == 1 )
#else
if (
fields && (
( num_fields == 1 && use_percona_heartbeat == true )
||
( num_fields > 30 && use_percona_heartbeat == false )
)
) {
)
#endif
{
for(k = 0; k < num_fields; k++) {
if (fields[k].name) {
if (strcmp("Seconds_Behind_Master", fields[k].name)==0) {
@ -7591,13 +7610,18 @@ bool MySQL_Monitor::monitor_replication_lag_process_ready_tasks(const std::vecto
int j = -1;
num_fields = mysql_num_fields(mmsd->result);
fields = mysql_fetch_fields(mmsd->result);
#ifdef TEST_REPLICATIONLAG
if (fields && num_fields == 1)
#else
if (
fields && (
(num_fields == 1 && mmsd->use_percona_heartbeat == true)
||
(num_fields > 30 && mmsd->use_percona_heartbeat == false)
)
) {
)
#endif
{
for (k = 0; k < num_fields; k++) {
if (fields[k].name) {
if (strcmp("Seconds_Behind_Master", fields[k].name) == 0) {

@ -13429,6 +13429,18 @@ void ProxySQL_Admin::enable_readonly_testing() {
}
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
void ProxySQL_Admin::enable_replicationlag_testing() {
proxy_info("Admin is enabling Replication Lag Testing using SQLite3 Server and HGs from 5201 to 5800\n");
mysql_servers_wrlock();
admindb->execute("DELETE FROM mysql_servers WHERE hostgroup_id BETWEEN 5201 AND 5800");
load_mysql_servers_to_runtime();
mysql_servers_wrunlock();
}
#endif // TEST_REPLICATIONLAG
void ProxySQL_Admin::ProxySQL_Test___MySQL_HostGroups_Manager_generate_many_clusters() {
mysql_servers_wrlock();
admindb->execute("DELETE FROM mysql_servers WHERE hostgroup_id BETWEEN 10001 AND 20000");

@ -333,7 +333,7 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
memcpy(query,(char *)pkt->ptr+sizeof(mysql_hdr)+1,query_length-1);
query[query_length-1]=0;
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
if (sess->client_myds->proxy_addr.addr == NULL) {
struct sockaddr addr;
socklen_t addr_len=sizeof(struct sockaddr);
@ -363,7 +363,7 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
sess->client_myds->proxy_addr.addr = strdup("unknown");
}
}
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY || TEST_REPLICATIONLAG
char *query_no_space=(char *)l_alloc(query_length);
memcpy(query_no_space,query,query_length);
@ -534,13 +534,13 @@ void SQLite3_Server_session_handler(MySQL_Session *sess, void *_pa, PtrSize_t *p
if (query_no_space_length==SELECT_VERSION_COMMENT_LEN) {
if (!strncasecmp(SELECT_VERSION_COMMENT, query_no_space, query_no_space_length)) {
l_free(query_length,query);
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
char *a = (char *)"SELECT '(ProxySQL Automated Test Server) - %s'";
query = (char *)malloc(strlen(a)+strlen(sess->client_myds->proxy_addr.addr));
sprintf(query,a,sess->client_myds->proxy_addr.addr);
#else
query=l_strdup("SELECT '(ProxySQL SQLite3 Server)'");
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY || TEST_REPLICATIONLAG
query_length=strlen(query)+1;
goto __run_query;
}
@ -730,7 +730,7 @@ __end_show_commands:
__run_query:
if (run_query) {
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
if (strncasecmp("SELECT",query_no_space,6)==0) {
#ifdef TEST_AURORA
if (strstr(query_no_space,(char *)"REPLICA_HOST_STATUS")) {
@ -802,6 +802,24 @@ __run_query:
}
}
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
if (strncasecmp("SELECT SLAVE STATUS ", query_no_space, strlen("SELECT SLAVE STATUS ")) == 0) {
if (strlen(query_no_space) > strlen("SELECT SLAVE STATUS ") + 5) {
pthread_mutex_lock(&GloSQLite3Server->test_replicationlag_mutex);
// the current test doesn't try to simulate failures, therefore it will return immediately
if (GloSQLite3Server->replicationlag_map_size() == 0) {
// probably never initialized
GloSQLite3Server->load_replicationlag_table(sess);
}
const int rc = GloSQLite3Server->replicationlag_test_value(query_no_space + strlen("SELECT SLAVE STATUS "));
free(query);
char* a = (char*)"SELECT %d as Seconds_Behind_Master";
query = (char*)malloc(strlen(a) + 2);
sprintf(query, a, rc);
pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex);
}
}
#endif // TEST_REPLICATIONLAG
if (strstr(query_no_space,(char *)"Seconds_Behind_Master")) {
free(query);
char *a = (char *)"SELECT %d as Seconds_Behind_Master";
@ -809,7 +827,7 @@ __run_query:
sprintf(query,a,rand()%30+10);
}
}
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY || TEST_REPLICATIONLAG
SQLite3_Session *sqlite_sess = (SQLite3_Session *)sess->thread->gen_args;
if (sess->autocommit==false) {
sqlite3 *db = sqlite_sess->sessdb->get_db();
@ -878,6 +896,16 @@ __run_query:
}
}
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
if (strncasecmp("SELECT", query_no_space, 6)) {
if (strstr(query_no_space, (char*)"REPLICATIONLAG_HOST_STATUS")) {
// the table is writable
pthread_mutex_lock(&GloSQLite3Server->test_replicationlag_mutex);
GloSQLite3Server->load_replicationlag_table(sess);
pthread_mutex_unlock(&GloSQLite3Server->test_replicationlag_mutex);
}
}
#endif // TEST_REPLICATIONLAG
}
l_free(pkt->size-sizeof(mysql_hdr),query_no_space); // it is always freed here
l_free(query_length,query);
@ -1247,7 +1275,7 @@ SQLite3_Server::SQLite3_Server() {
variables.read_only=false;
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
string s = "";
#ifdef TEST_AURORA
@ -1268,11 +1296,19 @@ SQLite3_Server::SQLite3_Server() {
s += "0.0.0.0:3306";
pthread_mutex_init(&test_readonly_mutex, NULL);
#endif //TEST_READONLY
#ifdef TEST_REPLICATIONLAG
// for replication test we listen on all IPs
if (!s.empty())
s += ";";
s += "0.0.0.0:3306";
pthread_mutex_init(&test_replicationlag_mutex, NULL);
#endif //TEST_REPLICATIONLAG
variables.mysql_ifaces=strdup(s.c_str());
#else
variables.mysql_ifaces=strdup("127.0.0.1:6030");
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY || TEST_REPLICATIONLAG
};
@ -1483,7 +1519,7 @@ void SQLite3_Server::populate_grouprep_table(MySQL_Session *sess, int txs_behind
#endif // TEST_GALERA
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#if defined(TEST_AURORA) || defined(TEST_GALERA) || defined(TEST_GROUPREP) || defined(TEST_READONLY) || defined(TEST_REPLICATIONLAG)
void SQLite3_Server::insert_into_tables_defs(std::vector<table_def_t *> *tables_defs, const char *table_name, const char *table_def) {
table_def_t *td = new table_def_t;
td->table_name=strdup(table_name);
@ -1513,7 +1549,7 @@ void SQLite3_Server::drop_tables_defs(std::vector<table_def_t *> *tables_defs) {
delete td;
}
};
#endif // TEST_AURORA || TEST_GALERA || defined(TEST_GROUPREP) || defined(TEST_READONLY)
#endif // TEST_AURORA || TEST_GALERA || TEST_GROUPREP || TEST_READONLY || TEST_REPLICATIONLAG
void SQLite3_Server::wrlock() {
pthread_rwlock_wrlock(&rwlock);
@ -1567,7 +1603,18 @@ bool SQLite3_Server::init() {
check_and_build_standard_tables(sessdb, tables_defs_readonly);
GloAdmin->enable_readonly_testing();
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
tables_defs_replicationlag = new std::vector<table_def_t*>;
insert_into_tables_defs(tables_defs_replicationlag,
(const char*)"REPLICATIONLAG_HOST_STATUS",
(const char*)"CREATE TABLE REPLICATIONLAG_HOST_STATUS ("
"hostname VARCHAR NOT NULL, port INT NOT NULL, seconds_behind_master INT NOT NULL, PRIMARY KEY (hostname, port)"
")"
);
check_and_build_standard_tables(sessdb, tables_defs_replicationlag);
GloAdmin->enable_replicationlag_testing();
#endif // TEST_REPLICATIONLAG
child_func[0]=child_mysql;
main_shutdown=0;
main_poll_nfds=0;
@ -1718,3 +1765,42 @@ int SQLite3_Server::readonly_test_value(char *p) {
return rc;
}
#endif // TEST_READONLY
#ifdef TEST_REPLICATIONLAG
void SQLite3_Server::load_replicationlag_table(MySQL_Session* sess) {
GloAdmin->mysql_servers_wrlock();
replicationlag_map.clear();
char* error = NULL;
int cols = 0;
int affected_rows = 0;
SQLite3_result* resultset = NULL;
sessdb->execute_statement((char*)"SELECT * FROM REPLICATIONLAG_HOST_STATUS", &error, &cols, &affected_rows, &resultset);
if (resultset) {
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = std::string(r->fields[0]) + ":" + std::string(r->fields[1]);
replicationlag_map[s] = atoi(r->fields[2]);
}
}
delete resultset;
if (replicationlag_map.size() == 0) {
GloAdmin->admindb->execute_statement((char*)"SELECT DISTINCT hostname, port FROM mysql_servers WHERE hostgroup_id BETWEEN 5202 AND 5700", &error, &cols, &affected_rows, &resultset);
for (std::vector<SQLite3_row*>::iterator it = resultset->rows.begin(); it != resultset->rows.end(); ++it) {
SQLite3_row* r = *it;
const std::string& s = "INSERT INTO REPLICATIONLAG_HOST_STATUS VALUES ('" + std::string(r->fields[0]) + "'," + std::string(r->fields[1]) + ",0)";
sessdb->execute(s.c_str());
}
delete resultset;
}
GloAdmin->mysql_servers_wrunlock();
}
int SQLite3_Server::replicationlag_test_value(const char* p) {
int rc = 0; // default
std::unordered_map<std::string, int>::iterator it = replicationlag_map.find(std::string(p));
if (it != replicationlag_map.end()) {
rc = it->second;
}
return rc;
}
#endif // TEST_REPLICATIONLAG
Loading…
Cancel
Save