Port of #3001 to 2.1.0 , related to #3000

A client connection can use a backend connection with a session variable set by a different client
pull/3072/head
René Cannaò 6 years ago
parent c7edd93e66
commit aaddb4e568

@ -18,11 +18,12 @@
#include "ev.h"
/*
Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool
This is for internal testing ONLY!!!!
#define STRESSTEST_POOL
*/
#ifdef DEBUG
/* */
// Enabling STRESSTEST_POOL ProxySQL will do a lot of loops in the connection pool
// This is for internal testing ONLY!!!!
//#define STRESSTEST_POOL
#endif // DEBUG
#define MHM_PTHREAD_MUTEX
@ -119,6 +120,7 @@ class MySrvConnList {
}
MySQL_Connection *remove(int);
MySQL_Connection * get_random_MyConn(MySQL_Session *sess, bool ff);
void get_random_MyConn_inner_search(unsigned int start, unsigned int end, unsigned int& conn_found_idx, unsigned int& connection_quality_level, unsigned int& number_of_matching_session_variables, const MySQL_Connection * client_conn);
unsigned int conns_length() { return conns->len; }
void drop_all_connections();
MySQL_Connection *index(unsigned int);

@ -215,7 +215,9 @@ class MySQL_Connection {
bool get_gtid(char *buff, uint64_t *trx_id);
void reduce_auto_increment_delay_token() { if (auto_increment_delay_token) auto_increment_delay_token--; };
bool match_tracked_options(MySQL_Connection *c);
bool match_tracked_options(const MySQL_Connection *c);
bool requires_CHANGE_USER(const MySQL_Connection *client_conn);
unsigned int number_of_matching_session_variables(const MySQL_Connection *client_conn, unsigned int& not_matching);
unsigned long get_mysql_thread_id() { return mysql ? mysql->thread_id : 0; }
};
#endif /* __CLASS_MYSQL_CONNECTION_H */

@ -2986,11 +2986,67 @@ MySrvC *MyHGC::get_random_MySrvC(char * gtid_uuid, uint64_t gtid_trxid, int max_
//MySrvC * MySrvList::idx(unsigned int i) { return (MySrvC *)servers->index(i); }
void MySrvConnList::get_random_MyConn_inner_search(unsigned int start, unsigned int end, unsigned int& conn_found_idx, unsigned int& connection_quality_level, unsigned int& number_of_matching_session_variables, const MySQL_Connection * client_conn) {
char *schema = client_conn->userinfo->schemaname;
MySQL_Connection * conn=NULL;
unsigned int k;
for (k = start; k < end; k++) {
conn = (MySQL_Connection *)conns->index(k);
if (conn->match_tracked_options(client_conn)) {
if (connection_quality_level == 0) {
// this is our best candidate so far
connection_quality_level = 1;
conn_found_idx = k;
}
if (conn->requires_CHANGE_USER(client_conn)==false) {
if (connection_quality_level == 1) {
// this is our best candidate so far
connection_quality_level = 2;
conn_found_idx = k;
}
unsigned int cnt_match = 0; // number of matching session variables
unsigned int not_match = 0; // number of not matching session variables
cnt_match = conn->number_of_matching_session_variables(client_conn, not_match);
if (strcmp(conn->userinfo->schemaname,schema)==0) {
cnt_match++;
} else {
not_match++;
}
if (not_match==0) {
// it seems we found the perfect connection
number_of_matching_session_variables = cnt_match;
connection_quality_level = 3;
conn_found_idx = k;
return; // exit immediately, we found the perfect connection
} else {
// we didn't find the perfect connection
// but maybe is better than what we have so far?
if (cnt_match > number_of_matching_session_variables) {
// this is our best candidate so far
number_of_matching_session_variables = cnt_match;
conn_found_idx = k;
}
}
}
}
}
}
MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff) {
MySQL_Connection * conn=NULL;
unsigned int i;
unsigned int conn_found_idx;
unsigned int l=conns_length();
unsigned int connection_quality_level = 0;
bool needs_warming = false;
// connection_quality_level:
// 0 : not found any good connection, tracked options are not OK
// 1 : tracked options are OK , but CHANGE USER is required
// 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
// 3 : tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required
unsigned int number_of_matching_session_variables = 0; // this includes session variables AND schema
if (mysql_thread___connection_warming) {
unsigned int total_connections = mysrvc->ConnectionsFree->conns_length()+mysrvc->ConnectionsUsed->conns_length();
unsigned int expected_warm_connections = mysql_thread___free_connections_pct*mysrvc->max_connections/100;
@ -3005,50 +3061,25 @@ MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff
i=fastrand()%l;
}
if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) {
// try to match schemaname AND username
char *schema = sess->client_myds->myconn->userinfo->schemaname;
char *username = sess->client_myds->myconn->userinfo->username;
MySQL_Connection * client_conn = sess->client_myds->myconn;
bool conn_found = false;
unsigned int k;
bool options_matching_found = false;
for (k = i; conn_found == false && k < l; k++) {
conn = (MySQL_Connection *)conns->index(k);
if (conn->match_tracked_options(client_conn)) {
if (options_matching_found == false) {
options_matching_found = true;
}
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
}
}
}
if (conn_found == false ) {
for (k = 0; conn_found == false && k < i; k++) {
conn = (MySQL_Connection *)conns->index(k);
if (conn->match_tracked_options(client_conn)) {
if (options_matching_found == false) {
options_matching_found = true;
}
if (strcmp(conn->userinfo->schemaname,schema)==0 && strcmp(conn->userinfo->username,username)==0) {
conn_found = true;
i = k;
}
}
}
get_random_MyConn_inner_search(i, l, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);
if (connection_quality_level !=3 ) { // we didn't find the perfect connection
get_random_MyConn_inner_search(0, i, conn_found_idx, connection_quality_level, number_of_matching_session_variables, client_conn);
}
if (conn_found == true) {
conn=(MySQL_Connection *)conns->remove_index_fast(i);
} else {
if (options_matching_found == false) {
// connection_quality_level:
// 1 : tracked options are OK , but CHANGE USER is required
// 2 : tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
switch (connection_quality_level) {
case 0: // not found any good connection, tracked options are not OK
// we must create a new connection
conn = new MySQL_Connection();
conn->parent=mysrvc;
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
} else {
break;
case 1: //tracked options are OK , but CHANGE USER is required
// we may consider creating a new connection
{
unsigned int conns_free = mysrvc->ConnectionsFree->conns_length();
unsigned int conns_used = mysrvc->ConnectionsUsed->conns_length();
if ((conns_used > conns_free) && (mysrvc->max_connections > (conns_free/2 + conns_used/2)) ) {
@ -3057,9 +3088,18 @@ MySQL_Connection * MySrvConnList::get_random_MyConn(MySQL_Session *sess, bool ff
__sync_fetch_and_add(&MyHGM->status.server_connections_created, 1);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
} else {
conn=(MySQL_Connection *)conns->remove_index_fast(i);
conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx);
}
}
}
break;
case 2: // tracked options are OK , CHANGE USER is not required, but some SET statement or INIT_DB needs to be executed
case 3: // tracked options are OK , CHANGE USER is not required, and it seems that SET statements or INIT_DB ARE not required
// here we return the best connection we have, no matter if connection_quality_level is 2 or 3
conn=(MySQL_Connection *)conns->remove_index_fast(conn_found_idx);
break;
default: // this should never happen
assert(0);
break;
}
} else {
conn=(MySQL_Connection *)conns->remove_index_fast(i);

@ -1894,6 +1894,25 @@ bool MySQL_Session::handler_again___verify_backend_user_schema() {
NEXT_IMMEDIATE_NEW(CHANGING_SCHEMA);
}
}
// if we reach here, the username is the same
if (myds->myconn->requires_CHANGE_USER(client_myds->myconn)) {
// if we reach here, even if the username is the same,
// the backend connection has some session variable set
// that the client never asked for
// because we can't unset variables, we will reset the connection
switch(status) {
case PROCESSING_QUERY:
case PROCESSING_STMT_PREPARE:
case PROCESSING_STMT_EXECUTE:
previous_status.push(status);
break;
default:
assert(0);
break;
}
mybe->server_myds->wait_until = thread->curtime + mysql_thread___connect_timeout_server*1000; // max_timeout
NEXT_IMMEDIATE_NEW(CHANGING_USER_SERVER);
}
return false;
}
@ -5837,20 +5856,58 @@ void MySQL_Session::handler___client_DSS_QUERY_SENT___server_DSS_NOT_INITIALIZED
}
}
uuid[n]='\0';
#ifndef STRESSTEST_POOL
mc=thread->get_MyConn_local(mybe->hostgroup_id, this, uuid, trxid, -1);
#endif // STRESSTEST_POOL
} else {
#ifndef STRESSTEST_POOL
mc=thread->get_MyConn_local(mybe->hostgroup_id, this, NULL, 0, (int)qpo->max_lag_ms);
#endif // STRESSTEST_POOL
}
}
#ifdef STRESSTEST_POOL
// Check STRESSTEST_POOL in MySQL_HostGroups_Manager.h
// Note: this works only if session_fast_forward==false and create_new_conn is false too
#define NUM_SLOW_LOOPS 1000
// if STRESSTESTPOOL_MEASURE is define, time is measured in Query_Processor_time_nsec
// even if not the right variable
//#define STRESSTESTPOOL_MEASURE
#ifdef STRESSTESTPOOL_MEASURE
timespec begint;
timespec endt;
clock_gettime(CLOCK_MONOTONIC,&begint);
#endif // STRESSTESTPOOL_MEASURE
for (unsigned int loops=0; loops < NUM_SLOW_LOOPS; loops++) {
#endif // STRESSTEST_POOL
if (mc==NULL) {
if (trxid) {
mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), uuid, trxid, -1);
} else {
mc=MyHGM->get_MyConn_from_pool(mybe->hostgroup_id, this, (session_fast_forward || qpo->create_new_conn), NULL, 0, (int)qpo->max_lag_ms);
}
#ifdef STRESSTEST_POOL
if (mc && (loops < NUM_SLOW_LOOPS - 1)) {
if (mc->mysql) {
mybe->server_myds->attach_connection(mc);
mybe->server_myds->DSS=STATE_NOT_INITIALIZED;
mybe->server_myds->return_MySQL_Connection_To_Pool();
mc=NULL;
}
}
#endif // STRESSTEST_POOL
} else {
thread->status_variables.stvar[st_var_ConnPool_get_conn_immediate]++;
}
#ifdef STRESSTEST_POOL
#ifdef STRESSTESTPOOL_MEASURE
clock_gettime(CLOCK_MONOTONIC,&endt);
thread->status_variables.query_processor_time=thread->status_variables.query_processor_time +
(endt.tv_sec*1000000000+endt.tv_nsec) -
(begint.tv_sec*1000000000+begint.tv_nsec);
#endif // STRESSTESTPOOL_MEASURE
}
#endif // STRESSTEST_POOL
if (mc) {
mybe->server_myds->attach_connection(mc);
thread->status_variables.stvar[st_var_ConnPool_get_conn_success]++;

@ -6108,97 +6108,58 @@ void MySQL_Thread::Get_Memory_Stats() {
MySQL_Connection * MySQL_Thread::get_MyConn_local(unsigned int _hid, MySQL_Session *sess, char *gtid_uuid, uint64_t gtid_trxid, int max_lag_ms) {
// some sanity check
if (sess == NULL) return NULL;
if (sess->client_myds == NULL) return NULL;
if (sess->client_myds->myconn == NULL) return NULL;
if (sess->client_myds->myconn->userinfo == NULL) return NULL;
unsigned int i;
unsigned int bc = 0; // best candidate
bool pcf = false; // possible candidate found
unsigned int npc = 0; // number of possible candidates
std::vector<MySrvC *> parents;
std::vector<MySrvC *> parents; // this is a vector of srvers that needs to be excluded in case gtid_uuid is used
MySQL_Connection *c=NULL;
// MySQL_Connection *_candidate = NULL; // this will be used when we will pass optional parameters
for (i=0; i<cached_connections->len; i++) {
c=(MySQL_Connection *)cached_connections->index(i);
if (c->parent->myhgc->hid==_hid && sess->client_myds->myconn->match_tracked_options(c)) {
if (gtid_uuid) {
// we first check if we already excluded this parent (MySQL Server)
MySrvC *mysrvc = c->parent;
std::vector<MySrvC *>::iterator it;
it = find(parents.begin(), parents.end(), mysrvc);
if (it != parents.end()) {
// we didn't exclude this server (yet?)
bool gtid_found = false;
gtid_found = MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid);
if (gtid_found) {
//c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
//return c;
if (pcf == false) {
bc = i;
pcf = true;
}
//npc++;
if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) {
char *schema = sess->client_myds->myconn->userinfo->schemaname;
char *username = sess->client_myds->myconn->userinfo->username;
if (strcmp(c->userinfo->schemaname,schema)==0 && strcmp(c->userinfo->username,username)==0) {
if (c->parent->myhgc->hid==_hid && sess->client_myds->myconn->match_tracked_options(c)) { // options are all identical
if (
(gtid_uuid == NULL) || // gtid_uuid is not used
(gtid_uuid && find(parents.begin(), parents.end(), c->parent) == parents.end()) // the server is currently not excluded
) {
MySQL_Connection *client_conn = sess->client_myds->myconn;
if (c->requires_CHANGE_USER(client_conn)==false) { // CHANGE_USER is not required
char *schema = client_conn->userinfo->schemaname;
if (strcmp(c->userinfo->schemaname,schema)==0) { // same schema
unsigned int not_match = 0; // number of not matching session variables
c->number_of_matching_session_variables(client_conn, not_match);
if (not_match == 0) { // all session variables match
if (gtid_uuid) { // gtid_uuid is used
// we first check if we already excluded this parent (MySQL Server)
MySrvC *mysrvc = c->parent;
std::vector<MySrvC *>::iterator it;
it = find(parents.begin(), parents.end(), mysrvc);
if (it != parents.end()) {
// we didn't exclude this server (yet?)
bool gtid_found = false;
gtid_found = MyHGM->gtid_exists(mysrvc, gtid_uuid, gtid_trxid);
if (gtid_found) { // this server has the correct GTID
c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
return c;
} else {
parents.push_back(mysrvc); // stop evaluating this server
}
}
} else { // gtid_is not used
if (max_lag_ms >= 0) {
if ((unsigned int)max_lag_ms < (c->parent->aws_aurora_current_lag_us / 1000)) {
status_variables.stvar[st_var_aws_aurora_replicas_skipped_during_query]++;
continue;
}
}
// return the connection
c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
return c;
}
} else {
c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
return c;
}
} else {
parents.push_back(mysrvc); // stop evaluating this server
// if (_candidate == NULL) {
// _candidate = c; // this server is a potential candidate
// }
}
}
} else {
// c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
if (max_lag_ms >= 0) {
if ((unsigned int)max_lag_ms < (c->parent->aws_aurora_current_lag_us / 1000)) {
status_variables.stvar[st_var_aws_aurora_replicas_skipped_during_query]++;
continue;
}
}
if (pcf == false) {
bc = i;
pcf = true;
}
npc++;
if (sess && sess->client_myds && sess->client_myds->myconn && sess->client_myds->myconn->userinfo) {
char *schema = sess->client_myds->myconn->userinfo->schemaname;
char *username = sess->client_myds->myconn->userinfo->username;
if (strcmp(c->userinfo->schemaname,schema)==0 && strcmp(c->userinfo->username,username)==0) {
c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
return c;
}
} else {
c=(MySQL_Connection *)cached_connections->remove_index_fast(i);
return c;
}
//return c;
}
}
}
// if (_candidate) {
// return _candidate;
// }
if (pcf) { // there was a possible connection, but we skipped trying to find a better one
if (gtid_uuid) {
c=(MySQL_Connection *)cached_connections->remove_index_fast(bc);
return c;
} else {
if (npc > 5) { // more candidates were evaluated
c=(MySQL_Connection *)cached_connections->remove_index_fast(bc);
return c;
}
}
}

@ -480,7 +480,43 @@ bool MySQL_Connection::get_status_sql_log_bin0() {
return status_flags & STATUS_MYSQL_CONNECTION_SQL_LOG_BIN0;
}
bool MySQL_Connection::match_tracked_options(MySQL_Connection *c) {
bool MySQL_Connection::requires_CHANGE_USER(const MySQL_Connection *client_conn) {
char *username = client_conn->userinfo->username;
if (strcmp(userinfo->username,username)) {
// the two connections use different usernames
// The connection need to be reset with CHANGE_USER
return true;
}
for (auto i = 0; i < SQL_NAME_LAST; i++) {
if (client_conn->var_hash[i] == 0) {
if (var_hash[i]) {
// this connection has a variable set that the
// client connection doesn't have.
// Since connection cannot be unset , this connection
// needs to be reset with CHANGE_USER
return true;
}
}
}
return false;
}
unsigned int MySQL_Connection::number_of_matching_session_variables(const MySQL_Connection *client_conn, unsigned int& not_matching) {
unsigned int ret=0;
for (auto i = 0; i < SQL_NAME_LAST; i++) {
if (client_conn->var_hash[i] && i != SQL_CHARACTER_ACTION) { // client has a variable set
if (var_hash[i] == client_conn->var_hash[i]) { // server conection has the variable set to the same value
ret++;
} else {
not_matching++;
}
}
}
return ret;
}
bool MySQL_Connection::match_tracked_options(const MySQL_Connection *c) {
uint32_t cf1 = options.client_flag; // own client flags
uint32_t cf2 = c->options.client_flag; // other client flags
if ((cf1 & CLIENT_FOUND_ROWS) == (cf2 & CLIENT_FOUND_ROWS)) {
@ -532,6 +568,7 @@ void MySQL_Connection::connect_start() {
std::stringstream ss;
ss << c->nr;
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_RESULTS, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_CLIENT, ss.str().c_str());
mysql_variables.server_set_value(myds->sess, SQL_CHARACTER_SET_CONNECTION, ss.str().c_str());

Loading…
Cancel
Save