You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
proxysql/lib/MySQL_HostGroups_Manager.cpp

1500 lines
65 KiB

#include "proxysql.h"
#include "cpp.h"
#define char_malloc (char *)malloc
#define itostr(__s, __i) { __s=char_malloc(32); sprintf(__s, "%lld", __i); }
#include "thread.h"
#include "wqueue.h"
#define SAFE_SQLITE3_STEP(_stmt) do {\
do {\
rc=sqlite3_step(_stmt);\
if (rc!=SQLITE_DONE) {\
assert(rc==SQLITE_LOCKED);\
usleep(100);\
}\
} while (rc!=SQLITE_DONE);\
} while (0)
extern ProxySQL_Admin *GloAdmin;
extern MySQL_Threads_Handler *GloMTH;
class MySrvConnList;
class MySrvC;
class MySrvList;
class MyHGC;
static void * HGCU_thread_run() {
PtrArray *conn_array=new PtrArray();
while(1) {
MySQL_Connection *myconn=(MySQL_Connection *)MyHGM->queue.remove();
if (myconn==NULL) {
// intentionally exit immediately
return NULL;
}
conn_array->add(myconn);
while (MyHGM->queue.size()) {
myconn=(MySQL_Connection *)MyHGM->queue.remove();
if (myconn==NULL) return NULL;
conn_array->add(myconn);
}
unsigned int l=conn_array->len;
int *errs=(int *)malloc(sizeof(int)*l);
int *statuses=(int *)malloc(sizeof(int)*l);
my_bool *ret=(my_bool *)malloc(sizeof(my_bool)*l);
int i;
for (i=0;i<(int)l;i++) {
myconn=(MySQL_Connection *)conn_array->index(i);
if (myconn->mysql->net.vio) {
statuses[i]=mysql_change_user_start(&ret[i], myconn->mysql, myconn->userinfo->password, myconn->userinfo->password, myconn->userinfo->schemaname);
} else {
statuses[i]=0;
ret[i]=1;
}
}
for (i=0;i<(int)conn_array->len;i++) {
if (statuses[i]==0) {
myconn=(MySQL_Connection *)conn_array->remove_index_fast(i);
if (!ret[i]) {
myconn->reset();
MyHGM->push_MyConn_to_pool(myconn);
} else {
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
i--;
}
}
unsigned long long now=monotonic_time();
while (conn_array->len && ((monotonic_time() - now) < 1000000)) {
usleep(50);
for (i=0;i<(int)conn_array->len;i++) {
myconn=(MySQL_Connection *)conn_array->index(i);
statuses[i]=mysql_change_user_cont(&ret[i], myconn->mysql, statuses[i]);
}
for (i=0;i<(int)conn_array->len;i++) {
if (statuses[i]==0) {
myconn=(MySQL_Connection *)conn_array->remove_index_fast(i);
if (!ret[i]) {
myconn->reset();
MyHGM->push_MyConn_to_pool(myconn);
} else {
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
i--;
}
}
}
while (conn_array->len) {
// we reached here, and there are still connections
myconn=(MySQL_Connection *)conn_array->remove_index_fast(0);
myconn->send_quit=false;
MyHGM->destroy_MyConn_from_pool(myconn);
}
free(statuses);
free(errs);
}
}
MySQL_Connection *MySrvConnList::index(unsigned int _k) {
return (MySQL_Connection *)conns->index(_k);
}
MySQL_Connection * MySrvConnList::remove(int _k) {
return (MySQL_Connection *)conns->remove_index_fast(_k);
}
unsigned int MySrvConnList::conns_length() {
return conns->len;
}
MySrvConnList::MySrvConnList(MySrvC *_mysrvc) {
mysrvc=_mysrvc;
conns=new PtrArray();
}
void MySrvConnList::add(MySQL_Connection *c) {
conns->add(c);
}
MySrvConnList::~MySrvConnList() {
mysrvc=NULL;
while (conns_length()) {
MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0);
delete conn;
}
delete conns;
}
MySrvList::MySrvList(MyHGC *_myhgc) {
myhgc=_myhgc;
servers=new PtrArray();
}
void MySrvList::add(MySrvC *s) {
if (s->myhgc==NULL) {
s->myhgc=myhgc;
}
servers->add(s);
}
int MySrvList::find_idx(MySrvC *s) {
for (unsigned int i=0; i<servers->len; i++) {
MySrvC *mysrv=(MySrvC *)servers->index(i);
if (mysrv==s) {
return (unsigned int)i;
}
}
return -1;
}
void MySrvList::remove(MySrvC *s) {
int i=find_idx(s);
assert(i>=0);
servers->remove_index_fast((unsigned int)i);
}
void MySrvConnList::drop_all_connections() {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Dropping all connections (%lu total) on MySrvConnList %p for server %s:%d , hostgroup=%d , status=%d\n", conns_length(), this, mysrvc->address, mysrvc->port, mysrvc->myhgc->hid, mysrvc->status);
while (conns_length()) {
MySQL_Connection *conn=(MySQL_Connection *)conns->remove_index_fast(0);
delete conn;
}
}
MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms, char *_comment) {
address=strdup(add);
port=p;
weight=_weight;
status=_status;
compression=_compression;
max_connections=_max_connections;
max_replication_lag=_max_replication_lag;
use_ssl=_use_ssl;
max_latency_us=_max_latency_ms*1000;
current_latency_us=0;
connect_OK=0;
connect_ERR=0;
queries_sent=0;
bytes_sent=0;
bytes_recv=0;
time_last_detected_error=0;
connect_ERR_at_time_last_detected_error=0;
shunned_automatic=false;
shunned_and_kill_all_connections=false; // false to default
//charset=_charset;
myhgc=NULL;
comment=strdup(_comment);
ConnectionsUsed=new MySrvConnList(this);
ConnectionsFree=new MySrvConnList(this);
}
void MySrvC::connect_error(int err_num) {
// NOTE: this function operates without any mutex
// although, it is not extremely important if any counter is lost
// as a single connection failure won't make a significant difference
__sync_fetch_and_add(&connect_ERR,1);
__sync_fetch_and_add(&MyHGM->status.server_connections_aborted,1);
switch (err_num) {
case 1044: // access denied
case 1045: // access denied
case 1049: //Unknown databas
case 1226: // do not increase counter to avoid Shunning if user has max_user_connections restriction
return;
break;
default:
break;
}
time_t t=time(NULL);
if (t!=time_last_detected_error) {
time_last_detected_error=t;
connect_ERR_at_time_last_detected_error=1;
} else {
int max_failures = ( mysql_thread___shun_on_failures > mysql_thread___connect_retries_on_failure ? mysql_thread___connect_retries_on_failure : mysql_thread___shun_on_failures) ;
if (__sync_add_and_fetch(&connect_ERR_at_time_last_detected_error,1) >= (unsigned int)max_failures) {
bool _shu=false;
MyHGM->wrlock(); // to prevent race conditions, lock here. See #627
if (status==MYSQL_SERVER_STATUS_ONLINE) {
status=MYSQL_SERVER_STATUS_SHUNNED;
shunned_automatic=true;
_shu=true;
} else {
_shu=false;
}
MyHGM->wrunlock();
if (_shu) {
proxy_error("Shunning server %s:%d with %u errors/sec. Shunning for %u seconds\n", address, port, connect_ERR_at_time_last_detected_error , mysql_thread___shun_recovery_time_sec);
}
}
}
}
void MySrvC::shun_and_killall() {
status=MYSQL_SERVER_STATUS_SHUNNED;
shunned_automatic=true;
shunned_and_kill_all_connections=true;
}
MySrvC::~MySrvC() {
if (address) free(address);
if (comment) free(comment);
delete ConnectionsUsed;
delete ConnectionsFree;
}
MySrvList::~MySrvList() {
myhgc=NULL;
while (servers->len) {
MySrvC *mysrvc=(MySrvC *)servers->remove_index_fast(0);
delete mysrvc;
}
delete servers;
}
MyHGC::MyHGC(int _hid) {
hid=_hid;
mysrvs=new MySrvList(this);
}
MyHGC::~MyHGC() {
delete mysrvs;
}
MySQL_HostGroups_Manager::MySQL_HostGroups_Manager() {
status.client_connections=0;
status.client_connections_aborted=0;
status.client_connections_created=0;
status.server_connections_connected=0;
status.server_connections_aborted=0;
status.server_connections_created=0;
status.servers_table_version=0;
status.myconnpoll_get=0;
status.myconnpoll_get_ok=0;
status.myconnpoll_get_ping=0;
status.myconnpoll_push=0;
status.myconnpoll_destroy=0;
status.autocommit_cnt=0;
status.commit_cnt=0;
status.rollback_cnt=0;
status.autocommit_cnt_filtered=0;
status.commit_cnt_filtered=0;
status.rollback_cnt_filtered=0;
status.backend_change_user=0;
status.backend_init_db=0;
status.backend_set_names=0;
status.frontend_init_db=0;
status.frontend_set_names=0;
status.frontend_use_db=0;
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_init(&lock, NULL);
#else
spinlock_rwlock_init(&rwlock);
#endif
admindb=NULL; // initialized only if needed
mydb=new SQLite3DB();
#ifdef DEBUG
mydb->open((char *)"file:mem_mydb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
#else
mydb->open((char *)"file:mem_mydb?mode=memory", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
#endif /* DEBUG */
mydb->execute(MYHGM_MYSQL_SERVERS);
mydb->execute(MYHGM_MYSQL_SERVERS_INCOMING);
mydb->execute(MYHGM_MYSQL_REPLICATION_HOSTGROUPS);
MyHostGroups=new PtrArray();
incoming_replication_hostgroups=NULL;
HGCU_thread = new std::thread(&HGCU_thread_run);
}
MySQL_HostGroups_Manager::~MySQL_HostGroups_Manager() {
while (MyHostGroups->len) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->remove_index_fast(0);
delete myhgc;
}
delete MyHostGroups;
delete mydb;
if (admindb) {
delete admindb;
}
queue.add(NULL);
HGCU_thread->join();
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_destroy(&lock);
#endif
}
// wrlock() is only required during commit()
void MySQL_HostGroups_Manager::wrlock() {
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_lock(&lock);
#else
spin_wrlock(&rwlock);
#endif
}
void MySQL_HostGroups_Manager::wrunlock() {
#ifdef MHM_PTHREAD_MUTEX
pthread_mutex_unlock(&lock);
#else
spin_wrunlock(&rwlock);
#endif
}
unsigned int MySQL_HostGroups_Manager::get_servers_table_version() {
return __sync_fetch_and_add(&status.servers_table_version,0);
}
// add a new row in mysql_servers_incoming
// we always assume that the calling thread has acquired a rdlock()
bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, unsigned int _comp /*, uint8_t _charset */, unsigned int _max_connections, unsigned int _max_replication_lag, unsigned int _use_ssl, unsigned int _max_latency_ms , char *comment) {
bool ret=true;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression, max_connections %d, max_replication_lag %u, use_ssl=%u, max_latency_ms=%u\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */ , _max_connections, _max_replication_lag, _use_ssl, _max_latency_ms);
int rc;
sqlite3_stmt *statement=NULL;
sqlite3 *mydb3=mydb->get_db();
char *query=(char *)"INSERT INTO mysql_servers_incoming VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)";
rc=sqlite3_prepare_v2(mydb3, query, -1, &statement, 0);
assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 1, hid); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 2, add, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 3, p); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 4, _weight); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 5, status); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 6, _comp); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 7, _max_connections); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 8, _max_replication_lag); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 9, _use_ssl); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement, 10, _max_latency_ms); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement, 11, comment, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement);
rc=sqlite3_clear_bindings(statement); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement); assert(rc==SQLITE_OK);
sqlite3_finalize(statement);
return ret;
}
int MySQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
if (resultset==NULL) {
return 0;
}
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
sqlite3 *mydb3=mydb->get_db();
char *query1=(char *)"INSERT INTO mysql_servers_incoming VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)";
char *query32=(char *)"INSERT INTO mysql_servers_incoming VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11), (?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22), (?23, ?24, ?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32, ?33), (?34, ?35, ?36, ?37, ?38, ?39, ?40, ?41, ?42, ?43, ?44), (?45, ?46, ?47, ?48, ?49, ?50, ?51, ?52, ?53, ?54, ?55),(?56, ?57, ?58, ?59, ?60, ?61, ?62, ?63, ?64, ?65, ?66),(?67, ?68, ?69, ?70, ?71, ?72, ?73, ?74, ?75, ?76, ?77),(?78, ?79, ?80, ?81, ?82, ?83, ?84, ?85, ?86, ?87, ?88),(?89, ?90, ?91, ?92, ?93, ?94, ?95, ?96, ?97, ?98, ?99), (?100, ?101, ?102, ?103, ?104, ?105, ?106, ?107, ?108, ?109, ?110), (?111, ?112, ?113, ?114, ?115, ?116, ?117, ?118, ?119, ?120, ?121), (?122, ?123, ?124, ?125, ?126, ?127, ?128, ?129, ?130, ?131, ?132), (?133, ?134, ?135, ?136, ?137, ?138, ?139, ?140, ?141, ?142, ?143), (?144, ?145, ?146, ?147, ?148, ?149, ?150, ?151, ?152, ?153, ?154), (?155, ?156, ?157, ?158, ?159, ?160, ?161, ?162, ?163, ?164, ?165), (?166, ?167, ?168, ?169, ?170, ?171, ?172, ?173, ?174, ?175, ?176), (?177, ?178, ?179, ?180, ?181, ?182, ?183, ?184, ?185, ?186, ?187), (?188, ?189, ?190, ?191, ?192, ?193, ?194, ?195, ?196, ?197, ?198), (?199, ?200, ?201, ?202, ?203, ?204, ?205, ?206, ?207, ?208, ?209), (?210, ?211, ?212, ?213, ?214, ?215, ?216, ?217, ?218, ?219, ?220), (?221, ?222, ?223, ?224, ?225, ?226, ?227, ?228, ?229, ?230, ?231), (?232, ?233, ?234, ?235, ?236, ?237, ?238, ?239, ?240, ?241, ?242), (?243, ?244, ?245, ?246, ?247, ?248, ?249, ?250, ?251, ?252, ?253), (?254, ?255, ?256, ?257, ?258, ?259, ?260, ?261, ?262, ?263, ?264), (?265, ?266, ?267, ?268, ?269, ?270, ?271, ?272, ?273, ?274, ?275), (?276, ?277, ?278, ?279, ?280, ?281, ?282, ?283, ?284, ?285, ?286), (?287, ?288, ?289, ?290, ?291, ?292, ?293, ?294, ?295, ?296, ?297), (?298, ?299, ?300, ?301, ?302, ?303, ?304, ?305, ?306, ?307, ?308), (?309, ?310, ?311, ?312, ?313, ?314, ?315, ?316, ?317, ?318, ?319), (?320, ?321, ?322, ?323, ?324, ?325, ?326, ?327, ?328, ?329, ?330), (?331, ?332, ?333, ?334, ?335, ?336, ?337, ?338, ?339, ?340, ?341), (?342, ?343, ?344, ?345, ?346, ?347, ?348, ?349, ?350, ?351, ?352)";
rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0);
assert(rc==SQLITE_OK);
rc=sqlite3_prepare_v2(mydb3, query32, -1, &statement32, 0);
assert(rc==SQLITE_OK);
MySerStatus status1=MYSQL_SERVER_STATUS_ONLINE;
int row_idx=0;
int max_bulk_row_idx=resultset->rows_count/32;
max_bulk_row_idx=max_bulk_row_idx*32;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r1=*it;
status1=MYSQL_SERVER_STATUS_ONLINE;
if (strcasecmp(r1->fields[3],"ONLINE")) {
if (!strcasecmp(r1->fields[3],"SHUNNED")) {
status1=MYSQL_SERVER_STATUS_SHUNNED;
} else {
if (!strcasecmp(r1->fields[3],"OFFLINE_SOFT")) {
status1=MYSQL_SERVER_STATUS_OFFLINE_SOFT;
} else {
if (!strcasecmp(r1->fields[3],"OFFLINE_HARD")) {
status1=MYSQL_SERVER_STATUS_OFFLINE_HARD;
}
}
}
}
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=sqlite3_bind_int64(statement32, (idx*11)+1, atoi(r1->fields[0])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement32, (idx*11)+2, r1->fields[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+3, atoi(r1->fields[2])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+4, atoi(r1->fields[4])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+5, status1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+6, atoi(r1->fields[5])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+7, atoi(r1->fields[6])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+8, atoi(r1->fields[7])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+9, atoi(r1->fields[8])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (idx*11)+10, atoi(r1->fields[9])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement32, (idx*11)+11, r1->fields[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
if (idx==31) {
SAFE_SQLITE3_STEP(statement32);
rc=sqlite3_clear_bindings(statement32); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement32); assert(rc==SQLITE_OK);
}
} else { // single row
rc=sqlite3_bind_int64(statement1, 1, atoi(r1->fields[0])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, atoi(r1->fields[2])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 4, atoi(r1->fields[4])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 5, status1); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 6, atoi(r1->fields[5])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 7, atoi(r1->fields[6])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 8, atoi(r1->fields[7])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 9, atoi(r1->fields[8])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 10, atoi(r1->fields[9])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 11, r1->fields[10], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
row_idx++;
}
sqlite3_finalize(statement1);
sqlite3_finalize(statement32);
return 0;
}
SQLite3_result * MySQL_HostGroups_Manager::execute_query(char *query, char **error) {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
wrlock();
mydb->execute_statement(query, error , &cols , &affected_rows , &resultset);
wrunlock();
return resultset;
}
bool MySQL_HostGroups_Manager::commit() {
// purge table
purge_mysql_servers_table();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=NULL;
wrlock();
query=(char *)"SELECT mem_pointer, t1.hostgroup_id, t1.hostname, t1.port FROM mysql_servers t1 LEFT OUTER JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE t2.hostgroup_id IS NULL";
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
long long ptr=atoll(r->fields[0]);
proxy_warning("Removed server at address %lld, hostgroup %s, address %s port %s. Setting status OFFLINE HARD and immediately dropping all free connections. Used connections will be dropped when trying to use them\n", ptr, r->fields[1], r->fields[2], r->fields[3]);
MySrvC *mysrvc=(MySrvC *)ptr;
mysrvc->status=MYSQL_SERVER_STATUS_OFFLINE_HARD;
mysrvc->ConnectionsFree->drop_all_connections();
}
}
if (resultset) { delete resultset; resultset=NULL; }
// This seems unnecessary. Removed as part of issue #829
//mydb->execute("DELETE FROM mysql_servers");
//generate_mysql_servers_table();
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
// proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers_incoming");
// SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
proxy_error("Error on %s : %s\n", query, error);
} else {
// optimization #829
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement2=NULL;
sqlite3 *mydb3=mydb->get_db();
char *query1=(char *)"UPDATE mysql_servers SET mem_pointer = ?1 WHERE hostgroup_id = ?2 AND hostname = ?3 AND port = ?4";
rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0);
assert(rc==SQLITE_OK);
char *query2=(char *)"UPDATE mysql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 WHERE hostgroup_id = ?9 AND hostname = ?10 AND port = ?11";
rc=sqlite3_prepare_v2(mydb3, query2, -1, &statement2, 0);
assert(rc==SQLITE_OK);
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
long long ptr=atoll(r->fields[11]); // increase this index every time a new column is added
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]), atoi(r->fields[5]));
//fprintf(stderr,"%lld\n", ptr);
if (ptr==0) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Creating new server %s:%d , weight=%d, status=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]) );
MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), r->fields[10]); // add new fields here if adding more columns in mysql_servers
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), mysrvc, atoi(r->fields[0]));
add(mysrvc,atoi(r->fields[0]));
ptr=(uintptr_t)mysrvc;
rc=sqlite3_bind_int64(statement1, 1, ptr); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 2, atoi(r->fields[0])); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 3, r->fields[1], -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 4, atoi(r->fields[2])); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
} else {
bool run_update=false;
MySrvC *mysrvc=(MySrvC *)ptr;
// carefully increase the 2nd index by 1 for every new column added
if (atoi(r->fields[3])!=atoi(r->fields[12])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[12]));
mysrvc->weight=atoi(r->fields[12]);
}
if (atoi(r->fields[4])!=atoi(r->fields[13])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[13]));
mysrvc->status=(MySerStatus)atoi(r->fields[13]);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic=false;
}
}
if (atoi(r->fields[5])!=atoi(r->fields[14])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[5] , mysrvc->compression , atoi(r->fields[14]));
mysrvc->compression=atoi(r->fields[14]);
}
if (atoi(r->fields[6])!=atoi(r->fields[15])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_connections for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[6] , mysrvc->max_connections , atoi(r->fields[15]));
mysrvc->max_connections=atoi(r->fields[15]);
}
if (atoi(r->fields[7])!=atoi(r->fields[16])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_replication_lag for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[7] , mysrvc->max_replication_lag , atoi(r->fields[16]));
mysrvc->max_replication_lag=atoi(r->fields[16]);
}
if (atoi(r->fields[8])!=atoi(r->fields[17])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing use_ssl for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[8] , mysrvc->use_ssl , atoi(r->fields[17]));
mysrvc->use_ssl=atoi(r->fields[17]);
}
if (atoi(r->fields[9])!=atoi(r->fields[18])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing max_latency_ms for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[9] , mysrvc->max_latency_us , atoi(r->fields[18]));
mysrvc->max_latency_us=1000*atoi(r->fields[18]);
}
if (strcmp(r->fields[10],r->fields[19])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing comment for server %s:%d (%s:%d) from '%s' to '%s'\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[10], r->fields[19]);
free(mysrvc->comment);
mysrvc->comment=strdup(r->fields[19]);
}
if (run_update) {
rc=sqlite3_bind_int64(statement2, 1, mysrvc->weight); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 2, mysrvc->status); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 3, mysrvc->compression); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 4, mysrvc->max_connections); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 5, mysrvc->max_replication_lag); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 6, mysrvc->use_ssl); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 7, mysrvc->max_latency_us/1000); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 9, mysrvc->myhgc->hid); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement2, 10, mysrvc->address, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement2, 11, mysrvc->port); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement2);
rc=sqlite3_clear_bindings(statement2); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement2); assert(rc==SQLITE_OK);
}
}
}
sqlite3_finalize(statement1);
sqlite3_finalize(statement2);
}
if (resultset) { delete resultset; resultset=NULL; }
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers_incoming\n");
mydb->execute("DELETE FROM mysql_servers_incoming");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_replication_hostgroups\n");
mydb->execute("DELETE FROM mysql_replication_hostgroups");
generate_mysql_replication_hostgroups_table();
__sync_fetch_and_add(&status.servers_table_version,1);
wrunlock();
if (GloMTH) {
GloMTH->signal_all_threads(1);
}
return true;
}
void MySQL_HostGroups_Manager::purge_mysql_servers_table() {
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_OFFLINE_HARD) {
if (mysrvc->ConnectionsUsed->conns_length()==0 && mysrvc->ConnectionsFree->conns_length()==0) {
// no more connections for OFFLINE_HARD server, removing it
mysrvc=(MySrvC *)myhgc->mysrvs->servers->remove_index_fast(j);
delete mysrvc;
}
}
}
}
}
void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
int rc;
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
PtrArray *lst=new PtrArray();
sqlite3 *mydb3=mydb->get_db();
char *query1=(char *)"INSERT INTO mysql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
rc=sqlite3_prepare_v2(mydb3, query1, -1, &statement1, 0);
assert(rc==SQLITE_OK);
char *query32=(char *)"INSERT INTO mysql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12), (?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24), (?25, ?26, ?27, ?28, ?29, ?30, ?31, ?32, ?33, ?34, ?35, ?36), (?37, ?38, ?39, ?40, ?41, ?42, ?43, ?44, ?45, ?46, ?47, ?48), (?49, ?50, ?51, ?52, ?53, ?54, ?55, ?56, ?57, ?58, ?59, ?60), (?61, ?62, ?63, ?64, ?65, ?66, ?67, ?68, ?69, ?70, ?71, ?72), (?73, ?74, ?75, ?76, ?77, ?78, ?79, ?80, ?81, ?82, ?83, ?84), (?85, ?86, ?87, ?88, ?89, ?90, ?91, ?92, ?93, ?94, ?95, ?96), (?97, ?98, ?99, ?100, ?101, ?102, ?103, ?104, ?105, ?106, ?107, ?108), (?109, ?110, ?111, ?112, ?113, ?114, ?115, ?116, ?117, ?118, ?119, ?120), (?121, ?122, ?123, ?124, ?125, ?126, ?127, ?128, ?129, ?130, ?131, ?132), (?133, ?134, ?135, ?136, ?137, ?138, ?139, ?140, ?141, ?142, ?143, ?144), (?145, ?146, ?147, ?148, ?149, ?150, ?151, ?152, ?153, ?154, ?155, ?156), (?157, ?158, ?159, ?160, ?161, ?162, ?163, ?164, ?165, ?166, ?167, ?168), (?169, ?170, ?171, ?172, ?173, ?174, ?175, ?176, ?177, ?178, ?179, ?180), (?181, ?182, ?183, ?184, ?185, ?186, ?187, ?188, ?189, ?190, ?191, ?192), (?193, ?194, ?195, ?196, ?197, ?198, ?199, ?200, ?201, ?202, ?203, ?204), (?205, ?206, ?207, ?208, ?209, ?210, ?211, ?212, ?213, ?214, ?215, ?216), (?217, ?218, ?219, ?220, ?221, ?222, ?223, ?224, ?225, ?226, ?227, ?228), (?229, ?230, ?231, ?232, ?233, ?234, ?235, ?236, ?237, ?238, ?239, ?240), (?241, ?242, ?243, ?244, ?245, ?246, ?247, ?248, ?249, ?250, ?251, ?252), (?253, ?254, ?255, ?256, ?257, ?258, ?259, ?260, ?261, ?262, ?263, ?264), (?265, ?266, ?267, ?268, ?269, ?270, ?271, ?272, ?273, ?274, ?275, ?276), (?277, ?278, ?279, ?280, ?281, ?282, ?283, ?284, ?285, ?286, ?287, ?288), (?289, ?290, ?291, ?292, ?293, ?294, ?295, ?296, ?297, ?298, ?299, ?300), (?301, ?302, ?303, ?304, ?305, ?306, ?307, ?308, ?309, ?310, ?311, ?312), (?313, ?314, ?315, ?316, ?317, ?318, ?319, ?320, ?321, ?322, ?323, ?324), (?325, ?326, ?327, ?328, ?329, ?330, ?331, ?332, ?333, ?334, ?335, ?336), (?337, ?338, ?339, ?340, ?341, ?342, ?343, ?344, ?345, ?346, ?347, ?348), (?349, ?350, ?351, ?352, ?353, ?354, ?355, ?356, ?357, ?358, ?359, ?360), (?361, ?362, ?363, ?364, ?365, ?366, ?367, ?368, ?369, ?370, ?371, ?372), (?373, ?374, ?375, ?376, ?377, ?378, ?379, ?380, ?381, ?382, ?383, ?384)";
rc=sqlite3_prepare_v2(mydb3, query32, -1, &statement32, 0);
assert(rc==SQLITE_OK);
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
MySrvC *mysrvc=NULL;
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (GloMTH->variables.hostgroup_manager_verbose) {
char *st;
switch (mysrvc->status) {
case 0:
st=(char *)"ONLINE";
break;
case 2:
st=(char *)"OFFLINE_SOFT";
break;
case 3:
st=(char *)"OFFLINE_HARD";
break;
default:
case 1:
case 4:
st=(char *)"SHUNNED";
break;
}
fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %d , status: %s , max_connections: %u , max_replication_lag: %u , use_ssl: %u , max_latency_ms: %u , comment: %s\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections, mysrvc->max_replication_lag, mysrvc->use_ssl, mysrvc->max_latency_us*1000, mysrvc->comment);
}
lst->add(mysrvc);
if (lst->len==32) {
while (lst->len) {
int i=lst->len;
i--;
MySrvC *mysrvc=(MySrvC *)lst->remove_index_fast(0);
uintptr_t ptr=(uintptr_t)mysrvc;
rc=sqlite3_bind_int64(statement32, (i*12)+1, mysrvc->myhgc->hid); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement32, (i*12)+2, mysrvc->address, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+3, mysrvc->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+4, mysrvc->weight); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+5, mysrvc->status); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+6, mysrvc->compression); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+7, mysrvc->max_connections); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+8, mysrvc->max_replication_lag); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+9, mysrvc->use_ssl); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+10, mysrvc->max_latency_us/1000); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement32, (i*12)+11, mysrvc->comment, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement32, (i*12)+12, ptr); assert(rc==SQLITE_OK);
}
SAFE_SQLITE3_STEP(statement32);
rc=sqlite3_clear_bindings(statement32); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement32); assert(rc==SQLITE_OK);
}
}
}
while (lst->len) {
MySrvC *mysrvc=(MySrvC *)lst->remove_index_fast(0);
uintptr_t ptr=(uintptr_t)mysrvc;
rc=sqlite3_bind_int64(statement1, 1, mysrvc->myhgc->hid); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 2, mysrvc->address, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 3, mysrvc->port); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 4, mysrvc->weight); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 5, mysrvc->status); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 6, mysrvc->compression); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 7, mysrvc->max_connections); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 8, mysrvc->max_replication_lag); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 9, mysrvc->use_ssl); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 10, mysrvc->max_latency_us/1000); assert(rc==SQLITE_OK);
rc=sqlite3_bind_text(statement1, 11, mysrvc->comment, -1, SQLITE_TRANSIENT); assert(rc==SQLITE_OK);
rc=sqlite3_bind_int64(statement1, 12, ptr); assert(rc==SQLITE_OK);
SAFE_SQLITE3_STEP(statement1);
rc=sqlite3_clear_bindings(statement1); assert(rc==SQLITE_OK);
rc=sqlite3_reset(statement1); assert(rc==SQLITE_OK);
}
sqlite3_finalize(statement1);
sqlite3_finalize(statement32);
}
void MySQL_HostGroups_Manager::generate_mysql_replication_hostgroups_table() {
if (incoming_replication_hostgroups==NULL)
return;
proxy_info("New mysql_replication_hostgroups table\n");
for (std::vector<SQLite3_row *>::iterator it = incoming_replication_hostgroups->rows.begin() ; it != incoming_replication_hostgroups->rows.end(); ++it) {
SQLite3_row *r=*it;
char *o=NULL;
int comment_length=0; // #issue #643
if (r->fields[2]) { // comment is not null
o=escape_string_single_quotes(r->fields[2],false);
comment_length=strlen(o);
}
char *query=(char *)malloc(256+comment_length);
if (r->fields[2]) { // comment is not null
sprintf(query,"INSERT INTO mysql_replication_hostgroups VALUES(%s,%s,'%s')",r->fields[0], r->fields[1], o);
if (o!=r->fields[2]) { // there was a copy
free(o);
}
} else {
sprintf(query,"INSERT INTO mysql_replication_hostgroups VALUES(%s,%s,NULL)",r->fields[0],r->fields[1]);
}
mydb->execute(query);
fprintf(stderr,"writer_hostgroup: %s , reader_hostgroup: %s, %s\n", r->fields[0],r->fields[1], r->fields[2]);
free(query);
}
incoming_replication_hostgroups=NULL;
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
wrlock();
// purge table
purge_mysql_servers_table();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM mysql_servers\n");
mydb->execute("DELETE FROM mysql_servers");
generate_mysql_servers_table();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" WHEN 4 THEN \"SHUNNED\" END, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM mysql_servers";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
return resultset;
}
SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_replication_hostgroups() {
wrlock();
char *error=NULL;
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT writer_hostgroup, reader_hostgroup, comment FROM mysql_replication_hostgroups";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
return resultset;
}
MyHGC * MySQL_HostGroups_Manager::MyHGC_create(unsigned int _hid) {
MyHGC *myhgc=new MyHGC(_hid);
return myhgc;
}
MyHGC * MySQL_HostGroups_Manager::MyHGC_find(unsigned int _hid) {
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
if (myhgc->hid==_hid) {
return myhgc;
}
}
return NULL;
}
MyHGC * MySQL_HostGroups_Manager::MyHGC_lookup(unsigned int _hid) {
MyHGC *myhgc=NULL;
myhgc=MyHGC_find(_hid);
if (myhgc==NULL) {
myhgc=MyHGC_create(_hid);
} else {
return myhgc;
}
assert(myhgc);
MyHostGroups->add(myhgc);
return myhgc;
}
void MySQL_HostGroups_Manager::push_MyConn_to_pool(MySQL_Connection *c, bool _lock) {
assert(c->parent);
MySrvC *mysrvc=NULL;
if (_lock)
wrlock();
status.myconnpoll_push++;
mysrvc=(MySrvC *)c->parent;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
mysrvc->ConnectionsUsed->remove(c);
if (c->largest_query_length > (unsigned int)GloMTH->variables.threshold_query_length) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d . largest_query_length = %lu\n", c, mysrvc->address, mysrvc->port, mysrvc->status, c->largest_query_length);
delete c;
goto __exit_push_MyConn_to_pool;
}
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) {
if (c->async_state_machine==ASYNC_IDLE) {
if (c->local_stmts->get_num_entries() > (unsigned int)GloMTH->variables.max_stmts_per_connection) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d because has too many prepared statements\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
} else {
c->optimize();
mysrvc->ConnectionsFree->add(c);
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
}
} else {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d with status %d\n", c, mysrvc->address, mysrvc->port, mysrvc->status);
delete c;
}
__exit_push_MyConn_to_pool:
if (_lock)
wrunlock();
}
void MySQL_HostGroups_Manager::push_MyConn_to_pool_array(MySQL_Connection **ca) {
unsigned int i=0;
MySQL_Connection *c=NULL;
c=ca[i];
wrlock();
while (c) {
push_MyConn_to_pool(c,false);
i++;
c=ca[i];
}
wrunlock();
}
MySrvC *MyHGC::get_random_MySrvC() {
MySrvC *mysrvc=NULL;
unsigned int j;
unsigned int sum=0;
unsigned int TotalUsedConn=0;
unsigned int l=mysrvs->cnt();
if (l) {
//int j=0;
for (j=0; j<l; j++) {
mysrvc=mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE
if (mysrvc->ConnectionsUsed->conns_length() < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
sum+=mysrvc->weight;
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
}
}
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
// try to recover shunned servers
if (mysrvc->shunned_automatic && mysql_thread___shun_recovery_time_sec) {
time_t t;
t=time(NULL);
// we do all these changes without locking . We assume the server is not used from long
// even if the server is still in used and any of the follow command fails it is not critical
// because this is only an attempt to recover a server that is probably dead anyway
// the next few lines of code try to solve issue #530
int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/1000 - 1 : mysql_thread___shun_recovery_time_sec );
if (max_wait_sec < 1) { // min wait time should be at least 1 second
max_wait_sec = 1;
}
if ((t - mysrvc->time_last_detected_error) > max_wait_sec) {
if (
(mysrvc->shunned_and_kill_all_connections==false) // it is safe to bring it back online
||
(mysrvc->shunned_and_kill_all_connections==true && mysrvc->ConnectionsUsed->conns_length()==0 && mysrvc->ConnectionsFree->conns_length()==0) // if shunned_and_kill_all_connections is set, ensure all connections are already dropped
) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
mysrvc->shunned_automatic=false;
mysrvc->shunned_and_kill_all_connections=false;
mysrvc->connect_ERR_at_time_last_detected_error=0;
mysrvc->time_last_detected_error=0;
// if a server is taken back online, consider it immediately
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
sum+=mysrvc->weight;
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
}
}
}
}
}
}
}
if (sum==0) {
// per issue #531 , we try a desperate attempt to bring back online any shunned server
// we do this lowering the maximum wait time to 10%
// most of the follow code is copied from few lines above
time_t t;
t=time(NULL);
int max_wait_sec = ( mysql_thread___shun_recovery_time_sec * 1000 >= mysql_thread___connect_timeout_server_max ? mysql_thread___connect_timeout_server_max/100 - 1 : mysql_thread___shun_recovery_time_sec/10 );
if (max_wait_sec < 1) { // min wait time should be at least 1 second
max_wait_sec = 1;
}
for (j=0; j<l; j++) {
mysrvc=mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED && mysrvc->shunned_automatic==true) {
if ((t - mysrvc->time_last_detected_error) > max_wait_sec) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
mysrvc->shunned_automatic=false;
mysrvc->connect_ERR_at_time_last_detected_error=0;
mysrvc->time_last_detected_error=0;
// if a server is taken back online, consider it immediately
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
sum+=mysrvc->weight;
TotalUsedConn+=mysrvc->ConnectionsUsed->conns_length();
}
}
}
}
}
if (sum==0) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n");
return NULL; // if we reach here, we couldn't find any target
}
unsigned int New_sum=0;
unsigned int New_TotalUsedConn=0;
// we will now scan again to ignore overloaded server
for (j=0; j<l; j++) {
mysrvc=mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE
unsigned int len=mysrvc->ConnectionsUsed->conns_length();
if (len < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
if ((len * sum) <= (TotalUsedConn * mysrvc->weight * 1.5 + 1)) {
New_sum+=mysrvc->weight;
New_TotalUsedConn+=len;
}
}
}
}
}
if (New_sum==0) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL because no backend ONLINE or with weight\n");
return NULL; // if we reach here, we couldn't find any target
}
unsigned int k=fastrand()%New_sum;
k++;
New_sum=0;
for (j=0; j<l; j++) {
mysrvc=mysrvs->idx(j);
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) { // consider this server only if ONLINE
unsigned int len=mysrvc->ConnectionsUsed->conns_length();
if (len < mysrvc->max_connections) { // consider this server only if didn't reach max_connections
if ( mysrvc->current_latency_us < ( mysrvc->max_latency_us ? mysrvc->max_latency_us : mysql_thread___default_max_latency_ms*1000 ) ) { // consider the host only if not too far
if ((len * sum) <= (TotalUsedConn * mysrvc->weight * 1.5 + 1)) {
New_sum+=mysrvc->weight;
if (k<=New_sum) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC %p, server %s:%d\n", mysrvc, mysrvc->address, mysrvc->port);
return mysrvc;
}
}
}
}
}
}
}
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySrvC NULL\n");
return NULL; // if we reach here, we couldn't find any target
}
unsigned int MySrvList::cnt() {
return servers->len;
}
MySrvC * MySrvList::idx(unsigned int i) { return (MySrvC *)servers->index(i); }
MySQL_Connection * MySrvConnList::get_random_MyConn() {
MySQL_Connection * conn=NULL;
unsigned int i;
unsigned int l=conns_length();
if (l) {
i=fastrand()%l;
conn=(MySQL_Connection *)conns->remove_index_fast(i);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
return conn;
} else {
conn = new MySQL_Connection();
conn->parent=mysrvc;
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
return conn;
}
return NULL; // never reach here
}
MySQL_Connection * MySQL_HostGroups_Manager::get_MyConn_from_pool(unsigned int _hid) {
MySQL_Connection * conn=NULL;
wrlock();
status.myconnpoll_get++;
MyHGC *myhgc=MyHGC_lookup(_hid);
MySrvC *mysrvc=myhgc->get_random_MySrvC();
if (mysrvc) { // a MySrvC exists. If not, we return NULL = no targets
conn=mysrvc->ConnectionsFree->get_random_MyConn();
mysrvc->ConnectionsUsed->add(conn);
status.myconnpoll_get_ok++;
}
wrunlock();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, (conn ? conn->parent->address : "") , (conn ? conn->parent->port : 0 ));
return conn;
}
void MySQL_HostGroups_Manager::destroy_MyConn_from_pool(MySQL_Connection *c) {
bool to_del=true; // the default, legacy behavior
MySrvC *mysrvc=(MySrvC *)c->parent;
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE && c->send_quit && queue.size() < 100) {
// overall, the backend seems healthy and so it is the connection. Try to reset it
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Trying to reset MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
to_del=false;
c->userinfo->set(mysql_thread___monitor_username,mysql_thread___monitor_password,mysql_thread___default_schema,NULL);
queue.add(c);
} else {
// we lock only this part of the code because we need to remove the connection from ConnectionsUsed
wrlock();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Destroying MySQL_Connection %p, server %s:%d\n", c, mysrvc->address, mysrvc->port);
mysrvc->ConnectionsUsed->remove(c);
status.myconnpoll_destroy++;
wrunlock();
}
if (to_del) {
delete c;
}
}
void MySQL_HostGroups_Manager::add(MySrvC *mysrvc, unsigned int _hid) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding MySrvC %p (%s:%d) for hostgroup %d\n", mysrvc, mysrvc->address, mysrvc->port, _hid);
MyHGC *myhgc=MyHGC_lookup(_hid);
myhgc->mysrvs->add(mysrvc);
}
void MySQL_HostGroups_Manager::replication_lag_action(int _hid, char *address, unsigned int port, int current_replication_lag) {
wrlock();
int i,j;
for (i=0; i<(int)MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
if (_hid >= 0 && _hid!=(int)myhgc->hid) continue;
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (strcmp(mysrvc->address,address)==0 && mysrvc->port==port) {
if (mysrvc->status==MYSQL_SERVER_STATUS_ONLINE) {
if (
// (current_replication_lag==-1 )
// ||
(current_replication_lag>=0 && ((unsigned int)current_replication_lag > mysrvc->max_replication_lag))
) {
proxy_warning("Shunning server %s:%d with replication lag of %d second\n", address, port, current_replication_lag);
mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG;
}
} else {
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
if (current_replication_lag>=0 && ((unsigned int)current_replication_lag <= mysrvc->max_replication_lag)) {
mysrvc->status=MYSQL_SERVER_STATUS_ONLINE;
}
}
}
goto __exit_replication_lag_action;
}
}
}
__exit_replication_lag_action:
wrunlock();
}
void MySQL_HostGroups_Manager::drop_all_idle_connections() {
// NOTE: the caller should hold wrlock
int i, j;
for (i=0; i<(int)MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (mysrvc->status!=MYSQL_SERVER_STATUS_ONLINE) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d is not online\n", mysrvc->address, mysrvc->port);
//__sync_fetch_and_sub(&status.server_connections_connected, mysrvc->ConnectionsFree->conns->len);
mysrvc->ConnectionsFree->drop_all_connections();
}
// Drop idle connections if beyond max_connection
while (mysrvc->ConnectionsFree->conns_length() && mysrvc->ConnectionsUsed->conns_length()+mysrvc->ConnectionsFree->conns_length() > mysrvc->max_connections) {
MySQL_Connection *conn=mysrvc->ConnectionsFree->remove(0);
delete conn;
}
//PtrArray *pa=mysrvc->ConnectionsFree->conns;
MySrvConnList *mscl=mysrvc->ConnectionsFree;
while (mscl->conns_length() > mysql_thread___free_connections_pct*mysrvc->max_connections/100) {
MySQL_Connection *mc=mscl->remove(0);
delete mc;
}
// drop all connections with life exceeding mysql-connection_max_age
if (mysql_thread___connection_max_age_ms) {
unsigned long long curtime=monotonic_time();
int i=0;
for (i=0; i<(int)mscl->conns_length() ; i++) {
MySQL_Connection *mc=mscl->index(i);
if (curtime > mc->creation_time + mysql_thread___connection_max_age_ms * 1000) {
mc=mscl->remove(0);
delete mc;
i--;
}
}
}
}
}
}
/*
* Prepares at most num_conn idle connections in the given hostgroup for
* pinging. When -1 is passed as a hostgroup, all hostgroups are examined.
*
* The resulting idle connections are returned in conn_list. Note that not all
* currently idle connections will be returned (some might be purged).
*
* Connections are purged according to 2 criteria:
* - whenever the maximal number of connections for a server is hit, free
* connections will be purged
* - also, idle connections that cause the number of free connections to rise
* above a certain percentage of the maximal number of connections will be
* dropped as well
*/
int MySQL_HostGroups_Manager::get_multiple_idle_connections(int _hid, unsigned long long _max_last_time_used, MySQL_Connection **conn_list, int num_conn) {
wrlock();
drop_all_idle_connections();
int num_conn_current=0;
int i,j, k;
for (i=0; i<(int)MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
if (_hid >= 0 && _hid!=(int)myhgc->hid) continue;
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
//PtrArray *pa=mysrvc->ConnectionsFree->conns;
MySrvConnList *mscl=mysrvc->ConnectionsFree;
for (k=0; k<(int)mscl->conns_length(); k++) {
MySQL_Connection *mc=mscl->index(k);
// If the connection is idle ...
if (mc->last_time_used < _max_last_time_used) {
//mc=(MySQL_Connection *)pa->remove_index_fast(k);
mc=mscl->remove(k);
mysrvc->ConnectionsUsed->add(mc);
k--;
conn_list[num_conn_current]=mc;
num_conn_current++;
if (num_conn_current>=num_conn) goto __exit_get_multiple_idle_connections;
}
}
}
}
__exit_get_multiple_idle_connections:
status.myconnpoll_get_ping+=num_conn_current;
wrunlock();
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning %d idle connections\n", num_conn_current);
return num_conn_current;
}
void MySQL_HostGroups_Manager::set_incoming_replication_hostgroups(SQLite3_result *s) {
incoming_replication_hostgroups=s;
}
SQLite3_result * MySQL_HostGroups_Manager::SQL3_Connection_Pool() {
const int colnum=12;
proxy_debug(PROXY_DEBUG_MYSQL_CONNECTION, 4, "Dumping Connection Pool\n");
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"hostgroup");
result->add_column_definition(SQLITE_TEXT,"srv_host");
result->add_column_definition(SQLITE_TEXT,"srv_port");
result->add_column_definition(SQLITE_TEXT,"status");
result->add_column_definition(SQLITE_TEXT,"ConnUsed");
result->add_column_definition(SQLITE_TEXT,"ConnFree");
result->add_column_definition(SQLITE_TEXT,"ConnOK");
result->add_column_definition(SQLITE_TEXT,"ConnERR");
result->add_column_definition(SQLITE_TEXT,"Queries");
result->add_column_definition(SQLITE_TEXT,"Bytes_sent");
result->add_column_definition(SQLITE_TEXT,"Bytes_recv");
result->add_column_definition(SQLITE_TEXT,"Latency_us");
wrlock();
int i,j, k;
for (i=0; i<(int)MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
for (j=0; j<(int)myhgc->mysrvs->cnt(); j++) {
MySrvC *mysrvc=(MySrvC *)myhgc->mysrvs->servers->index(j);
if (mysrvc->status!=MYSQL_SERVER_STATUS_ONLINE) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d is not online\n", mysrvc->address, mysrvc->port);
//__sync_fetch_and_sub(&status.server_connections_connected, mysrvc->ConnectionsFree->conns->len);
mysrvc->ConnectionsFree->drop_all_connections();
}
// drop idle connections if beyond max_connection
while (mysrvc->ConnectionsFree->conns_length() && mysrvc->ConnectionsUsed->conns_length()+mysrvc->ConnectionsFree->conns_length() > mysrvc->max_connections) {
//MySQL_Connection *conn=(MySQL_Connection *)mysrvc->ConnectionsFree->conns->remove_index_fast(0);
MySQL_Connection *conn=mysrvc->ConnectionsFree->remove(0);
delete conn;
//__sync_fetch_and_sub(&status.server_connections_connected, 1);
}
char buf[1024];
char **pta=(char **)malloc(sizeof(char *)*colnum);
sprintf(buf,"%d", (int)myhgc->hid);
pta[0]=strdup(buf);
pta[1]=strdup(mysrvc->address);
sprintf(buf,"%d", mysrvc->port);
pta[2]=strdup(buf);
switch (mysrvc->status) {
case 0:
pta[3]=strdup("ONLINE");
break;
case 1:
pta[3]=strdup("SHUNNED");
break;
case 2:
pta[3]=strdup("OFFLINE_SOFT");
break;
case 3:
pta[3]=strdup("OFFLINE_HARD");
break;
case 4:
pta[3]=strdup("SHUNNED_REPLICATION_LAG");
break;
default:
assert(0);
break;
}
sprintf(buf,"%u", mysrvc->ConnectionsUsed->conns_length());
pta[4]=strdup(buf);
sprintf(buf,"%u", mysrvc->ConnectionsFree->conns_length());
pta[5]=strdup(buf);
sprintf(buf,"%u", mysrvc->connect_OK);
pta[6]=strdup(buf);
sprintf(buf,"%u", mysrvc->connect_ERR);
pta[7]=strdup(buf);
sprintf(buf,"%llu", mysrvc->queries_sent);
pta[8]=strdup(buf);
sprintf(buf,"%llu", mysrvc->bytes_sent);
pta[9]=strdup(buf);
sprintf(buf,"%llu", mysrvc->bytes_recv);
pta[10]=strdup(buf);
sprintf(buf,"%u", mysrvc->current_latency_us);
pta[11]=strdup(buf);
result->add_row(pta);
for (k=0; k<colnum; k++) {
if (pta[k])
free(pta[k]);
}
free(pta);
}
}
wrunlock();
return result;
}
void MySQL_HostGroups_Manager::read_only_action(char *hostname, int port, int read_only) {
// define queries
const char *Q1=(char *)"SELECT hostgroup_id,status FROM mysql_replication_hostgroups JOIN mysql_servers ON hostgroup_id=writer_hostgroup AND hostname='%s' AND port=%d";
const char *Q1B=(char *)"SELECT hostgroup_id,status FROM ( SELECT DISTINCT writer_hostgroup FROM mysql_replication_hostgroups JOIN mysql_servers WHERE (hostgroup_id=writer_hostgroup OR reader_hostgroup=hostgroup_id) AND hostname='%s' AND port=%d ) LEFT JOIN mysql_servers ON hostgroup_id=writer_hostgroup AND hostname='%s' AND port=%d";
const char *Q2=(char *)"UPDATE OR IGNORE mysql_servers SET hostgroup_id=(SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id)";
const char *Q3A=(char *)"INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT reader_hostgroup, hostname, port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, mysql_servers.comment FROM mysql_servers JOIN mysql_replication_hostgroups ON mysql_servers.hostgroup_id=mysql_replication_hostgroups.writer_hostgroup WHERE hostname='%s' AND port=%d";
const char *Q3B=(char *)"DELETE FROM mysql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE reader_hostgroup=mysql_servers.hostgroup_id)";
const char *Q4=(char *)"UPDATE OR IGNORE mysql_servers SET hostgroup_id=(SELECT reader_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id)";
const char *Q5=(char *)"DELETE FROM mysql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM mysql_replication_hostgroups WHERE writer_hostgroup=mysql_servers.hostgroup_id)";
if (GloAdmin==NULL) {
return;
}
// define a buffer that will be used for all queries
char *query=(char *)malloc(strlen(hostname)*2+strlen(Q3A)+64);
sprintf(query,Q1,hostname,port);
int cols=0;
char *error=NULL;
int affected_rows=0;
SQLite3_result *resultset=NULL;
wrlock();
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
int num_rows=0;
if (resultset==NULL) {
goto __exit_read_only_action;
}
num_rows=resultset->rows_count;
delete resultset;
resultset=NULL;
if (admindb==NULL) { // we initialize admindb only if needed
admindb=new SQLite3DB();
admindb->open((char *)"file:mem_admindb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
}
switch (read_only) {
case 0:
if (num_rows==0) {
// the server has read_only=0 , but we can't find any writer, so we perform a swap
GloAdmin->mysql_servers_wrlock();
GloAdmin->save_mysql_servers_runtime_to_database(false); // SAVE MYSQL SERVERS FROM RUNTIME
sprintf(query,Q2,hostname,port);
admindb->execute(query);
if (mysql_thread___monitor_writer_is_also_reader) {
sprintf(query,Q3A,hostname,port);
} else {
sprintf(query,Q3B,hostname,port);
}
admindb->execute(query);
GloAdmin->load_mysql_servers_to_runtime(); // LOAD MYSQL SERVERS TO RUNTIME
GloAdmin->mysql_servers_wrunlock();
} else {
// there is a server in writer hostgroup, let check the status of present and not present hosts
// this is the same query as Q1, but with a LEFT JOIN
sprintf(query,Q1B,hostname,port,hostname,port);
wrlock();
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
bool act=false;
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int status=MYSQL_SERVER_STATUS_OFFLINE_HARD; // default status, even for missing
if (r->fields[1]) { // has status
status=atoi(r->fields[1]);
}
if (status==MYSQL_SERVER_STATUS_OFFLINE_HARD) {
act=true;
}
}
if (act==true) { // there are servers either missing, or with stats=OFFLINE_HARD
GloAdmin->mysql_servers_wrlock();
GloAdmin->save_mysql_servers_runtime_to_database(false); // SAVE MYSQL SERVERS FROM RUNTIME
sprintf(query,Q2,hostname,port);
admindb->execute(query);
if (mysql_thread___monitor_writer_is_also_reader) {
sprintf(query,Q3A,hostname,port);
} else {
sprintf(query,Q3B,hostname,port);
}
admindb->execute(query);
GloAdmin->load_mysql_servers_to_runtime(); // LOAD MYSQL SERVERS TO RUNTIME
GloAdmin->mysql_servers_wrunlock();
}
}
break;
case 1:
if (num_rows) {
// the server has read_only=1 , but we find it as writer, so we perform a swap
GloAdmin->mysql_servers_wrlock();
GloAdmin->save_mysql_servers_runtime_to_database(false); // SAVE MYSQL SERVERS FROM RUNTIME
sprintf(query,Q4,hostname,port);
admindb->execute(query);
sprintf(query,Q5,hostname,port);
admindb->execute(query);
GloAdmin->load_mysql_servers_to_runtime(); // LOAD MYSQL SERVERS TO RUNTIME
GloAdmin->mysql_servers_wrunlock();
}
break;
default:
assert(0);
break;
}
__exit_read_only_action:
if (resultset) {
delete resultset;
}
free(query);
}
// shun_and_killall
// this function is called only from MySQL_Monitor::monitor_ping()
// it temporary disables a host that is not responding to pings, and mark the host in a way that when used the connection will be dropped
void MySQL_HostGroups_Manager::shun_and_killall(char *hostname, int port) {
wrlock();
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
unsigned int j;
unsigned int l=myhgc->mysrvs->cnt();
if (l) {
for (j=0; j<l; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->port==port && strcmp(mysrvc->address,hostname)==0) {
switch (mysrvc->status) {
case MYSQL_SERVER_STATUS_SHUNNED:
if (mysrvc->shunned_automatic==false) {
break;
}
case MYSQL_SERVER_STATUS_ONLINE:
mysrvc->status=MYSQL_SERVER_STATUS_SHUNNED;
case MYSQL_SERVER_STATUS_OFFLINE_SOFT:
mysrvc->shunned_automatic=true;
mysrvc->shunned_and_kill_all_connections=true;
mysrvc->ConnectionsFree->drop_all_connections();
break;
default:
break;
}
}
}
}
}
wrunlock();
}
// set_server_current_latency_us
// this function is called only from MySQL_Monitor::monitor_ping()
// it set the average latency for a host in the last 3 pings
// the connection pool will use this information to evaluate or exclude a specific hosts
// note that this variable is in microsecond, while user defines it in millisecond
void MySQL_HostGroups_Manager::set_server_current_latency_us(char *hostname, int port, unsigned int _current_latency_us) {
wrlock();
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
unsigned int j;
unsigned int l=myhgc->mysrvs->cnt();
if (l) {
for (j=0; j<l; j++) {
mysrvc=myhgc->mysrvs->idx(j);
if (mysrvc->port==port && strcmp(mysrvc->address,hostname)==0) {
mysrvc->current_latency_us=_current_latency_us;
}
}
}
}
wrunlock();
}
unsigned long long MySQL_HostGroups_Manager::Get_Memory_Stats() {
unsigned long long intsize=0;
wrlock();
MySrvC *mysrvc=NULL;
for (unsigned int i=0; i<MyHostGroups->len; i++) {
intsize+=sizeof(MyHGC);
MyHGC *myhgc=(MyHGC *)MyHostGroups->index(i);
unsigned int j,k;
unsigned int l=myhgc->mysrvs->cnt();
if (l) {
for (j=0; j<l; j++) {
intsize+=sizeof(MySrvC);
mysrvc=myhgc->mysrvs->idx(j);
intsize+=((mysrvc->ConnectionsUsed->conns_length())*sizeof(MySQL_Connection *));
for (k=0; k<mysrvc->ConnectionsFree->conns_length(); k++) {
//MySQL_Connection *myconn=(MySQL_Connection *)mysrvc->ConnectionsFree->conns->index(k);
MySQL_Connection *myconn=mysrvc->ConnectionsFree->index(k);
intsize+=sizeof(MySQL_Connection)+sizeof(MYSQL);
intsize+=myconn->mysql->net.max_packet;
intsize+=(4096*15); // ASYNC_CONTEXT_DEFAULT_STACK_SIZE
if (myconn->MyRS) {
intsize+=myconn->MyRS->current_size();
}
}
intsize+=((mysrvc->ConnectionsUsed->conns_length())*sizeof(MySQL_Connection *));
}
}
}
wrunlock();
return intsize;
}