Added compression field in mysql_servers table

Issue #220
pull/248/head
René Cannaò 11 years ago
parent 568ef1b5d1
commit 6ba6c52826

@ -3,8 +3,8 @@
#include "proxysql.h"
#include "cpp.h"
#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, mem_pointer INT NOT NULL DEFAULT 0, PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, PRIMARY KEY (hostgroup_id, hostname, port))"
#define MYHGM_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, compression INT CHECK (compression IN (0,1)) NOT NULL DEFAULT 0 , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_MYSQL_SERVERS_INCOMING "CREATE TABLE mysql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3)) NOT NULL DEFAULT 0, compression INT CHECK (compression IN (0,1)) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port))"
class MySrvConnList;
@ -43,9 +43,11 @@ class MySrvC { // MySQL Server Container
uint16_t flags;
unsigned int weight;
enum MySerStatus status;
bool compression;
//uint8_t charset;
MySrvConnList *ConnectionsUsed;
MySrvConnList *ConnectionsFree;
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus);
MySrvC(char *, uint16_t, unsigned int, enum MySerStatus, unsigned int);
~MySrvC();
};
@ -92,7 +94,7 @@ class MySQL_HostGroups_Manager {
void rdunlock();
void wrlock();
void wrunlock();
bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE);
bool server_add(unsigned int hid, char *add, uint16_t p=3306, unsigned int _weight=1, enum MySerStatus status=MYSQL_SERVER_STATUS_ONLINE, bool _comp=0);
bool commit();
SQLite3_result *dump_table_mysql_servers();

@ -102,11 +102,13 @@ void MySrvConnList::drop_all_connections() {
}
MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status) {
MySrvC::MySrvC(char *add, uint16_t p, unsigned int _weight, enum MySerStatus _status, unsigned int _compression /*, uint8_t _charset */) {
address=strdup(add);
port=p;
weight=_weight;
status=_status;
compression=(bool)_compression;
//charset=_charset;
myhgc=NULL;
ConnectionsUsed=new MySrvConnList(this);
ConnectionsFree=new MySrvConnList(this);
@ -218,12 +220,12 @@ void MySQL_HostGroups_Manager::wrunlock() {
// add a new row in mysql_servers_incoming
// we always assume that the calling thread has acquired a rdlock()
bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status) {
bool MySQL_HostGroups_Manager::server_add(unsigned int hid, char *add, uint16_t p, unsigned int _weight, enum MySerStatus status, bool _comp /*, uint8_t _charset */) {
bool ret;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u and status %u\n", add,p,hid,_weight,status);
char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u)";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Adding in mysql_servers_incoming server %s:%d in hostgroup %u with weight %u , status %u, %s compression and charset %d\n", add,p,hid,_weight,status, (_comp ? "with" : "without") /*, _charset */);
char *q=(char *)"INSERT INTO mysql_servers_incoming VALUES (%u, \"%s\", %u, %u, %u, %u)";
char *query=(char *)malloc(strlen(q)+strlen(add)+100);
sprintf(query,q,hid,add,p,_weight,status);
sprintf(query,q,hid,add,p,_weight,status,_comp /*,_charset */);
ret=mydb->execute(query);
free(query);
return ret;
@ -253,12 +255,12 @@ bool MySQL_HostGroups_Manager::commit() {
if (resultset) { delete resultset; resultset=NULL; }
// INSERT OR IGNORE INTO mysql_servers SELECT ... FROM mysql_servers_incoming
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status) SELECT hostgroup_id, hostname, port, weight, status FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status) SELECT hostgroup_id, hostname, port, weight, status FROM mysql_servers_incoming");
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression) SELECT hostgroup_id, hostname, port, weight, status, compression FROM mysql_servers_incoming\n");
mydb->execute("INSERT OR IGNORE INTO mysql_servers(hostgroup_id, hostname, port, weight, status, compression) SELECT hostgroup_id, hostname, port, weight, status, compression FROM mysql_servers_incoming");
// SELECT FROM mysql_servers whatever is not identical in mysql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query=(char *)"SELECT t1.*, t2.weight, t2.status FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status";
query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression FROM mysql_servers t1 JOIN mysql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
@ -266,23 +268,27 @@ bool MySQL_HostGroups_Manager::commit() {
} else {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
long long ptr=atoll(r->fields[5]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]));
long long ptr=atoll(r->fields[6]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]), atoi(r->fields[5]));
fprintf(stderr,"%lld\n", ptr);
if (ptr==0) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Creating new server %s:%d , weight=%d, status=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]));
MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]));
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Creating new server %s:%d , weight=%d, status=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]) );
MySrvC *mysrvc=new MySrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]));
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), mysrvc, atoi(r->fields[0]));
add(mysrvc,atoi(r->fields[0]));
} else {
MySrvC *mysrvc=(MySrvC *)ptr;
if (atoi(r->fields[3])!=atoi(r->fields[6])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[6]));
mysrvc->weight=atoi(r->fields[6]);
if (atoi(r->fields[3])!=atoi(r->fields[7])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[3] , mysrvc->weight , atoi(r->fields[7]));
mysrvc->weight=atoi(r->fields[7]);
}
if (atoi(r->fields[4])!=atoi(r->fields[7])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[7]));
mysrvc->status=(MySerStatus)atoi(r->fields[7]);
if (atoi(r->fields[4])!=atoi(r->fields[8])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing status for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->status , atoi(r->fields[8]));
mysrvc->status=(MySerStatus)atoi(r->fields[8]);
}
if (atoi(r->fields[5])!=atoi(r->fields[9])) {
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing compression for server %s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[4] , mysrvc->compression , atoi(r->fields[9]));
mysrvc->compression=(bool)atoi(r->fields[9]);
}
}
}
@ -308,9 +314,9 @@ void MySQL_HostGroups_Manager::generate_mysql_servers_table() {
for (unsigned int j=0; j<myhgc->mysrvs->servers->len; j++) {
mysrvc=myhgc->mysrvs->idx(j);
uintptr_t ptr=(uintptr_t)mysrvc;
char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%llu)";
char *q=(char *)"INSERT INTO mysql_servers VALUES(%d,\"%s\",%d,%d,%d,%u,%llu)";
char *query=(char *)malloc(strlen(q)+8+strlen(mysrvc->address)+8+8+8+16+32);
sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, ptr);
sprintf(query, q, mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, mysrvc->status, mysrvc->compression, ptr);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
fprintf(stderr,"%s\n",query);
mydb->execute(query);
@ -325,7 +331,7 @@ SQLite3_result * MySQL_HostGroups_Manager::dump_table_mysql_servers() {
int cols=0;
int affected_rows=0;
SQLite3_result *resultset=NULL;
char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END FROM mysql_servers";
char *query=(char *)"SELECT hostgroup_id, hostname, port, weight, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" END, compression FROM mysql_servers";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
wrunlock();
@ -453,6 +459,10 @@ MySQL_Connection * MySrvConnList::get_random_MyConn() {
} else {
conn = new MySQL_Connection();
conn->parent=mysrvc;
//conn->options.charset=mysrvc->charset;
conn->options.server_capabilities=0;
conn->options.server_capabilities|=CLIENT_COMPRESS;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 7, "Returning MySQL Connection %p, server %s:%d\n", conn, conn->parent->address, conn->parent->port);
return conn;
}

@ -43,7 +43,7 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
//#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status INT NOT NULL DEFAULT 0 REFERENCES server_status(status) , PRIMARY KEY(hostname, port) )"
//#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (status IN ('OFFLINE_HARD', 'OFFLINE_SOFT', 'SHUNNED', 'ONLINE')) NOT NULL DEFAULT 'OFFLINE_HARD', PRIMARY KEY(hostname, port) )"
//#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, status VARCHAR CHECK (status IN ('OFFLINE_HARD', 'OFFLINE_SOFT', 'SHUNNED', 'ONLINE')) NOT NULL DEFAULT 'OFFLINE_HARD', weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE', weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_MYSQL_SERVERS "CREATE TABLE mysql_servers ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, status VARCHAR CHECK (status IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE', weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression IN (0,1)) NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
//#define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUPS "CREATE TABLE mysql_hostgroups ( hostgroup_id INT NOT NULL , description VARCHAR, PRIMARY KEY(hostgroup_id) )"
//#define ADMIN_SQLITE_TABLE_MYSQL_HOSTGROUP_ENTRIES "CREATE TABLE mysql_hostgroup_entries ( hostgroup_id INT NOT NULL DEFAULT 0, hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306, weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , FOREIGN KEY (hostname, port) REFERENCES mysql_servers (hostname, port) , FOREIGN KEY (hostgroup_id) REFERENCES mysql_hostgroups (hostgroup_id) , PRIMARY KEY (hostgroup_id, hostname, port) )"
//#define ADMIN_SQLITE_TABLE_MYSQL_USERS "CREATE TABLE mysql_users ( username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0, default_hostgroup INT NOT NULL DEFAULT 0, transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 0, backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1, frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1, PRIMARY KEY (username, backend), UNIQUE (username, frontend) , FOREIGN KEY (default_hostgroup) REFERENCES mysql_hostgroups (hostgroup_id))"
@ -2471,11 +2471,11 @@ void Standard_ProxySQL_Admin::save_mysql_servers_runtime_to_database() {
admindb->execute(query);
SQLite3_result *resultset=MyHGM->dump_table_mysql_servers();
if (!resultset) return;
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s)";
char *q=(char *)"INSERT INTO mysql_servers VALUES(%s,\"%s\",%s,\"%s\",%s,%s,%s)";
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3]);
char *query=(char *)malloc(strlen(q)+strlen(r->fields[0])+strlen(r->fields[1])+strlen(r->fields[2])+strlen(r->fields[3])+strlen(r->fields[4])+strlen(r->fields[5])+strlen(r->fields[6])+16);
sprintf(query, q, r->fields[0], r->fields[1], r->fields[2], r->fields[4], r->fields[3], r->fields[5], r->fields[6]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
fprintf(stderr,"%s\n",query);
admindb->execute(query);
@ -2491,7 +2491,7 @@ void Standard_ProxySQL_Admin::load_mysql_servers_to_runtime() {
int affected_rows=0;
SQLite3_result *resultset=NULL;
//char *query=(char *)"SELECT hostgroup_id,hostname,port,weight FROM main.mysql_hostgroup_entries";
char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight FROM main.mysql_servers";
char *query=(char *)"SELECT hostgroup_id,hostname,port,status,weight,compression FROM main.mysql_servers";
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
//MyHGH->wrlock();
@ -2515,8 +2515,11 @@ void Standard_ProxySQL_Admin::load_mysql_servers_to_runtime() {
}
}
//MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), MYSQL_SERVER_STATUS_ONLINE);
proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]));
MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status);
bool comp=false;
if (atoi(r->fields[5])==1) comp=true;
const char *comps= (comp ? "YES" : "NO" );
proxy_debug(PROXY_DEBUG_ADMIN, 4, "hid=%d , hostname=%s , port=%d , status=%s , weight=%d , compression=%s\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), r->fields[3], atoi(r->fields[4]), comps);
MyHGM->server_add(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), status, comp);
//MyHGH->server_add_hg(atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]));
}
}

@ -504,7 +504,8 @@ int MySQL_Data_Stream::array2buffer_full() {
int MySQL_Data_Stream::myds_connect(char *address, int connect_port, int *pending_connect) {
//assert(myconn==NULL);
if (myconn==NULL) myconn= new MySQL_Connection(); // FIXME: why here? // 20141011
assert(myconn); // this assert() is to remove the next condition
if (myconn==NULL) myconn= new MySQL_Connection(); // FIXME: why here? // 20141011 /// should be removed
myconn->last_time_used=sess->thread->curtime;
struct sockaddr_un u;

Loading…
Cancel
Save