Merge branch 'v1.2.2' into 1.3.0-port122

Conflicts:
	lib/Makefile
	lib/ProxySQL_Admin.cpp
	lib/Query_Cache.cpp
pull/739/head
René Cannaò 10 years ago
commit 5125067643

@ -99,7 +99,7 @@
#else
#define DEB ""
#endif /* DEBUG */
#define PROXYSQL_VERSION "1.2.1" DEB
#define PROXYSQL_VERSION "1.2.2-RC" DEB
#define PROXYSQL_CODENAME "Truls"
#ifndef PROXYSQL_FUNC_DEFS

@ -77,6 +77,7 @@ class Query_Cache {
bool set(uint64_t , const unsigned char *, uint32_t, unsigned char *, uint32_t, unsigned long long, unsigned long long);
unsigned char * get(uint64_t , const unsigned char *, const uint32_t, uint32_t *, unsigned long long);
uint64_t flush();
SQLite3_result * SQL3_getStats();
};
#endif /* __CLASS_QUERY_CACHE_H */

@ -156,6 +156,9 @@ class Query_Processor_Output {
if (comment) { // #643
free(comment);
}
if (comment) { // #643
free(comment);
}
}
};

@ -62,6 +62,7 @@ static int __admin_refresh_interval=0;
static bool proxysql_mysql_paused=false;
static int old_wait_timeout;
extern Query_Cache *GloQC;
extern MySQL_Authentication *GloMyAuth;
extern ProxySQL_Admin *GloAdmin;
extern Query_Processor *GloQPro;
@ -118,9 +119,9 @@ pthread_mutex_t admin_mutex = PTHREAD_MUTEX_INITIALIZER;
#define ADMIN_SQLITE_TABLE_SCHEDULER "CREATE TABLE scheduler (id INTEGER NOT NULL , interval_ms INTEGER CHECK (interval_ms>=100 AND interval_ms<=100000000) NOT NULL , filename VARCHAR NOT NULL , arg1 VARCHAR , arg2 VARCHAR , arg3 VARCHAR , arg4 VARCHAR , arg5 VARCHAR , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY(id))"
#define ADMIN_SQLITE_TABLE_SCHEDULER_V1_2_0 "CREATE TABLE scheduler (id INTEGER NOT NULL , interval_ms INTEGER CHECK (interval_ms>=100 AND interval_ms<=100000000) NOT NULL , filename VARCHAR NOT NULL , arg1 VARCHAR , arg2 VARCHAR , arg3 VARCHAR , arg4 VARCHAR , arg5 VARCHAR , PRIMARY KEY(id))"
#define ADMIN_SQLITE_TABLE_SCHEDULER_V1_2_0 "CREATE TABLE scheduler (id INTEGER NOT NULL , interval_ms INTEGER CHECK (interval_ms>=100 AND interval_ms<=100000000) NOT NULL , filename VARCHAR NOT NULL , arg1 VARCHAR , arg2 VARCHAR , arg3 VARCHAR , arg4 VARCHAR , arg5 VARCHAR , PRIMARY KEY(id))"
#define ADMIN_SQLITE_TABLE_SCHEDULER_V1_2_2 "CREATE TABLE scheduler (id INTEGER NOT NULL , interval_ms INTEGER CHECK (interval_ms>=100 AND interval_ms<=100000000) NOT NULL , filename VARCHAR NOT NULL , arg1 VARCHAR , arg2 VARCHAR , arg3 VARCHAR , arg4 VARCHAR , arg5 VARCHAR , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY(id))"
#define ADMIN_SQLITE_TABLE_SCHEDULER_V1_2_2 "CREATE TABLE scheduler (id INTEGER NOT NULL , interval_ms INTEGER CHECK (interval_ms>=100 AND interval_ms<=100000000) NOT NULL , filename VARCHAR NOT NULL , arg1 VARCHAR , arg2 VARCHAR , arg3 VARCHAR , arg4 VARCHAR , arg5 VARCHAR , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY(id))"
#define ADMIN_SQLITE_TABLE_RUNTIME_MYSQL_SERVERS "CREATE TABLE runtime_mysql_servers (hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 3306 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )"
@ -2456,6 +2457,10 @@ bool ProxySQL_Admin::init() {
admindb->open((char *)"file:mem_admindb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
statsdb=new SQLite3DB();
statsdb->open((char *)"file:mem_statsdb?mode=memory&cache=shared", SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
// check if file exists , see #617
bool admindb_file_exists=Proxy_file_exists(GloVars.admindb);
configdb=new SQLite3DB();
configdb->open((char *)GloVars.admindb, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX);
@ -2547,7 +2552,7 @@ bool ProxySQL_Admin::init() {
}
#endif /* DEBUG */
if (GloVars.__cmd_proxysql_reload || GloVars.__cmd_proxysql_initial) {
if (GloVars.__cmd_proxysql_reload || GloVars.__cmd_proxysql_initial || admindb_file_exists==false) { // see #617
if (GloVars.configfile_open) {
if (GloVars.confFile->cfg) {
Read_MySQL_Servers_from_configfile();
@ -3134,6 +3139,8 @@ void ProxySQL_Admin::stats___mysql_global() {
statsdb->execute(query);
free(query);
}
delete resultset;
resultset=NULL;
int highwater;
int current;
sqlite3_status(SQLITE_STATUS_MEMORY_USED, &current, &highwater, 0);
@ -3155,8 +3162,23 @@ void ProxySQL_Admin::stats___mysql_global() {
statsdb->execute(query);
free(query);
resultset=GloQC->SQL3_getStats();
if (resultset) {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
int arg_len=0;
for (int i=0; i<2; i++) {
arg_len+=strlen(r->fields[i]);
}
char *query=(char *)malloc(strlen(a)+arg_len+32);
sprintf(query,a,r->fields[0],r->fields[1]);
statsdb->execute(query);
free(query);
}
delete resultset;
resultset=NULL;
}
statsdb->execute("COMMIT");
delete resultset;
}
void ProxySQL_Admin::stats___mysql_processlist() {
@ -4386,50 +4408,81 @@ int ProxySQL_Admin::Read_MySQL_Query_Rules_from_configfile() {
int ProxySQL_Admin::Read_MySQL_Servers_from_configfile() {
const Setting& root = GloVars.confFile->cfg->getRoot();
if (root.exists("mysql_servers")==false) return 0;
const Setting &mysql_servers = root["mysql_servers"];
int count = mysql_servers.getLength();
//fprintf(stderr, "Found %d servers\n",count);
int i;
int rows=0;
admindb->execute("PRAGMA foreign_keys = OFF");
char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag, use_ssl, max_latency_ms) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d, %d, %d, %d)";
for (i=0; i< count; i++) {
const Setting &server = mysql_servers[i];
std::string address;
std::string status="ONLINE";
int port;
int hostgroup;
int weight=1;
int compression=0;
int max_connections=1000; // default
int max_replication_lag=0; // default
int use_ssl=0;
int max_latency_ms=0;
if (server.lookupValue("address", address)==false) continue;
if (server.lookupValue("port", port)==false) continue;
if (server.lookupValue("hostgroup", hostgroup)==false) continue;
server.lookupValue("status", status);
if (
(strcasecmp(status.c_str(),(char *)"ONLINE"))
&& (strcasecmp(status.c_str(),(char *)"SHUNNED"))
&& (strcasecmp(status.c_str(),(char *)"OFFLINE_SOFT"))
&& (strcasecmp(status.c_str(),(char *)"OFFLINE_HARD"))
) {
status="ONLINE";
}
server.lookupValue("compression", compression);
server.lookupValue("weight", weight);
server.lookupValue("max_connections", max_connections);
server.lookupValue("max_replication_lag", max_replication_lag);
server.lookupValue("use_ssl", use_ssl);
server.lookupValue("max_latency_ms", max_latency_ms);
char *query=(char *)malloc(strlen(q)+strlen(status.c_str())+strlen(address.c_str())+128);
sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
free(query);
rows++;
if (root.exists("mysql_servers")==true) {
const Setting &mysql_servers = root["mysql_servers"];
int count = mysql_servers.getLength();
//fprintf(stderr, "Found %d servers\n",count);
char *q=(char *)"INSERT OR REPLACE INTO mysql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d, %d, %d, %d, '%s')";
for (i=0; i< count; i++) {
const Setting &server = mysql_servers[i];
std::string address;
std::string status="ONLINE";
int port;
int hostgroup;
int weight=1;
int compression=0;
int max_connections=1000; // default
int max_replication_lag=0; // default
int use_ssl=0;
int max_latency_ms=0;
std::string comment="";
if (server.lookupValue("address", address)==false) continue;
if (server.lookupValue("port", port)==false) continue;
if (server.lookupValue("hostgroup", hostgroup)==false) continue;
server.lookupValue("status", status);
if (
(strcasecmp(status.c_str(),(char *)"ONLINE"))
&& (strcasecmp(status.c_str(),(char *)"SHUNNED"))
&& (strcasecmp(status.c_str(),(char *)"OFFLINE_SOFT"))
&& (strcasecmp(status.c_str(),(char *)"OFFLINE_HARD"))
) {
status="ONLINE";
}
server.lookupValue("compression", compression);
server.lookupValue("weight", weight);
server.lookupValue("max_connections", max_connections);
server.lookupValue("max_replication_lag", max_replication_lag);
server.lookupValue("use_ssl", use_ssl);
server.lookupValue("max_latency_ms", max_latency_ms);
server.lookupValue("comment", comment);
char *o1=strdup(comment.c_str());
char *o=escape_string_single_quotes(o1, false);
char *query=(char *)malloc(strlen(q)+strlen(status.c_str())+strlen(address.c_str())+strlen(o)+128);
sprintf(query,q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms, o);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
if (o!=o1) free(o);
free(o1);
free(query);
rows++;
}
}
if (root.exists("mysql_replication_hostgroups")==true) {
const Setting &mysql_replication_hostgroups = root["mysql_replication_hostgroups"];
int count = mysql_replication_hostgroups.getLength();
char *q=(char *)"INSERT OR REPLACE INTO mysql_replication_hostgroups (writer_hostgroup, reader_hostgroup, comment) VALUES (%d, %d, '%s')";
for (i=0; i< count; i++) {
const Setting &line = mysql_replication_hostgroups[i];
int writer_hostgroup;
int reader_hostgroup;
std::string comment="";
if (line.lookupValue("writer_hostgroup", writer_hostgroup)==false) continue;
if (line.lookupValue("reader_hostgroup", reader_hostgroup)==false) continue;
line.lookupValue("comment", comment);
char *o1=strdup(comment.c_str());
char *o=escape_string_single_quotes(o1, false);
char *query=(char *)malloc(strlen(q)+strlen(o)+32);
sprintf(query,q, writer_hostgroup, reader_hostgroup, o);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
if (o!=o1) free(o);
free(o1);
free(query);
rows++;
}
}
admindb->execute("PRAGMA foreign_keys = ON");
return rows;

@ -178,9 +178,12 @@ int KV_BtreeArray::cnt() {
bool KV_BtreeArray::replace(uint64_t key, QC_entry_t *entry) {
spin_wrlock(&lock);
THR_UPDATE_CNT(__thr_cntSet,Glo_cntSet,1,100);
THR_UPDATE_CNT(__thr_size_values,Glo_size_values,entry->length,100);
THR_UPDATE_CNT(__thr_dataIN,Glo_dataIN,entry->length,100);
//THR_UPDATE_CNT(__thr_cntSet,Glo_cntSet,1,100);
//THR_UPDATE_CNT(__thr_size_values,Glo_size_values,entry->length,100);
//THR_UPDATE_CNT(__thr_dataIN,Glo_dataIN,entry->length,100);
THR_UPDATE_CNT(__thr_cntSet,Glo_cntSet,1,1);
THR_UPDATE_CNT(__thr_size_values,Glo_size_values,entry->length,1);
THR_UPDATE_CNT(__thr_dataIN,Glo_dataIN,entry->length,1);
THR_UPDATE_CNT(__thr_num_entries,Glo_num_entries,1,1);
entry->ref_count=1;
@ -200,14 +203,15 @@ bool KV_BtreeArray::replace(uint64_t key, QC_entry_t *entry) {
QC_entry_t * KV_BtreeArray::lookup(uint64_t key) {
QC_entry_t *entry=NULL;
spin_rdlock(&lock);
THR_UPDATE_CNT(__thr_cntGet,Glo_cntGet,1,100);
//THR_UPDATE_CNT(__thr_cntGet,Glo_cntGet,1,100);
THR_UPDATE_CNT(__thr_cntGet,Glo_cntGet,1,1);
btree::btree_map<uint64_t, QC_entry_t *>::iterator lookup;
lookup = bt_map.find(key);
if (lookup != bt_map.end()) {
entry=lookup->second;
__sync_fetch_and_add(&entry->ref_count,1);
THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,100);
THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,10000);
//THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,100);
//THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,10000);
}
spin_rdunlock(&lock);
return entry;
@ -299,6 +303,8 @@ unsigned char * Query_Cache::get(uint64_t user_hash, const unsigned char *kp, co
if (entry!=NULL) {
unsigned long long t=curtime_ms;
if (entry->expire_ms > t) {
THR_UPDATE_CNT(__thr_cntGetOK,Glo_cntGetOK,1,1);
THR_UPDATE_CNT(__thr_dataOUT,Glo_dataOUT,entry->length,1);
result=(unsigned char *)malloc(entry->length);
memcpy(result,entry->value,entry->length);
*lv=entry->length;
@ -344,16 +350,20 @@ void * Query_Cache::purgeHash_thread(void *) {
unsigned int MySQL_Monitor__thread_MySQL_Thread_Variables_version;
MySQL_Thread * mysql_thr = new MySQL_Thread();
MySQL_Monitor__thread_MySQL_Thread_Variables_version=GloMTH->get_global_version();
mysql_thr->refresh_variables();
max_memory_size=mysql_thread___query_cache_size_MB*1024*1024;
while (shutdown==0) {
usleep(purge_loop_time);
unsigned long long t=monotonic_time()/1000;
QCnow_ms=t;
unsigned int glover=GloMTH->get_global_version();
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover;
mysql_thr->refresh_variables();
max_memory_size=mysql_thread___query_cache_size_MB*1024*1024;
}
if (GloMTH) {
if (MySQL_Monitor__thread_MySQL_Thread_Variables_version < glover ) {
MySQL_Monitor__thread_MySQL_Thread_Variables_version=glover;
mysql_thr->refresh_variables();
max_memory_size=mysql_thread___query_cache_size_MB*1024*1024;
}
}
if (current_used_memory_pct() < purge_threshold_pct_min ) continue;
for (i=0; i<SHARED_QUERY_CACHE_HASH_TABLES; i++) {
KVs[i]->purge_some(QCnow_ms);
@ -363,4 +373,63 @@ void * Query_Cache::purgeHash_thread(void *) {
return NULL;
};
SQLite3_result * Query_Cache::SQL3_getStats() {
const int colnum=2;
char buf[256];
char **pta=(char **)malloc(sizeof(char *)*colnum);
//Get_Memory_Stats();
SQLite3_result *result=new SQLite3_result(colnum);
result->add_column_definition(SQLITE_TEXT,"Variable_Name");
result->add_column_definition(SQLITE_TEXT,"Variable_Value");
// NOTE: as there is no string copy, we do NOT free pta[0] and pta[1]
{ // Used Memoery
pta[0]=(char *)"Query_Cache_Memory_bytes";
sprintf(buf,"%lu", get_data_size_total());
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_cntGet
pta[0]=(char *)"Query_Cache_count_GET";
sprintf(buf,"%lu", Glo_cntGet);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_cntGetOK
pta[0]=(char *)"Query_Cache_count_GET_OK";
sprintf(buf,"%lu", Glo_cntGetOK);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_cntSet
pta[0]=(char *)"Query_Cache_count_SET";
sprintf(buf,"%lu", Glo_cntSet);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_dataIN
pta[0]=(char *)"Query_Cache_bytes_IN";
sprintf(buf,"%lu", Glo_dataIN);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_dataOUT
pta[0]=(char *)"Query_Cache_bytes_OUT";
sprintf(buf,"%lu", Glo_dataOUT);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_cntPurge
pta[0]=(char *)"Query_Cache_Purged";
sprintf(buf,"%lu", Glo_cntPurge);
pta[1]=buf;
result->add_row(pta);
}
{ // Glo_num_entries
pta[0]=(char *)"Query_Cache_Entries";
sprintf(buf,"%lu", Glo_num_entries);
pta[1]=buf;
result->add_row(pta);
}
free(pta);
return result;
}

@ -1404,6 +1404,11 @@ void MySQL_Connection::ProcessQueryAndSetStatusFlags(char *query_digest_text) {
set_status_lock_tables(true);
}
}
if (get_status_lock_tables()==false) { // we search for lock tables only if not already set
if (!strncasecmp(query_digest_text,"FLUSH TABLES WITH READ LOCK", strlen("FLUSH TABLES WITH READ LOCK"))) { // issue 613
set_status_lock_tables(true);
}
}
if (get_status_lock_tables()==true) {
if (!strncasecmp(query_digest_text,"UNLOCK TABLES", strlen("UNLOCK TABLES"))) {
set_status_lock_tables(false);

@ -46,9 +46,24 @@ mysql_servers =
port=3306
hostgroup=0
max_connections=200
comment="test server"
}
)
mysql_replication_hostgroups=
(
{
writer_hostgroup=30
reader_hostgroup=40
comment="test repl 1"
},
{
writer_hostgroup=50
reader_hostgroup=60
comment="test repl 2"
}
)
mysql_users:
(
{

Loading…
Cancel
Save