Remove 'gtid_port' column from pgsql_servers table

v2.x_pg_PrepStmtBase_240714
Rahim Kanji 2 years ago
parent 0221cd9b12
commit 2730a41a38

@ -40,15 +40,15 @@ namespace nlohmann { class json; }
// we have 2 versions of the same tables: with (debug) and without (no debug) checks
#ifdef DEBUG
#define MYHGM_PgSQL_SERVERS "CREATE TABLE pgsql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , gtid_port INT NOT NULL DEFAULT 0 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_PgSQL_SERVERS_INCOMING "CREATE TABLE pgsql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , gtid_port INT NOT NULL DEFAULT 0 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port))"
#define MYHGM_PgSQL_SERVERS "CREATE TABLE pgsql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_PgSQL_SERVERS_INCOMING "CREATE TABLE pgsql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , weight INT CHECK (weight >= 0) NOT NULL DEFAULT 1 , status INT CHECK (status IN (0, 1, 2, 3, 4)) NOT NULL DEFAULT 0 , compression INT CHECK (compression >=0 AND compression <= 102400) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port))"
#else
#define MYHGM_PgSQL_SERVERS "CREATE TABLE pgsql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , gtid_port INT NOT NULL DEFAULT 0 , weight INT NOT NULL DEFAULT 1 , status INT NOT NULL DEFAULT 0 , compression INT NOT NULL DEFAULT 0 , max_connections INT NOT NULL DEFAULT 1000 , max_replication_lag INT NOT NULL DEFAULT 0 , use_ssl INT NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_PgSQL_SERVERS_INCOMING "CREATE TABLE pgsql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , gtid_port INT NOT NULL DEFAULT 0 , weight INT NOT NULL DEFAULT 1 , status INT NOT NULL DEFAULT 0 , compression INT NOT NULL DEFAULT 0 , max_connections INT NOT NULL DEFAULT 1000 , max_replication_lag INT NOT NULL DEFAULT 0 , use_ssl INT NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port))"
#define MYHGM_PgSQL_SERVERS "CREATE TABLE pgsql_servers ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , weight INT NOT NULL DEFAULT 1 , status INT NOT NULL DEFAULT 0 , compression INT NOT NULL DEFAULT 0 , max_connections INT NOT NULL DEFAULT 1000 , max_replication_lag INT NOT NULL DEFAULT 0 , use_ssl INT NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , mem_pointer INT NOT NULL DEFAULT 0 , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define MYHGM_PgSQL_SERVERS_INCOMING "CREATE TABLE pgsql_servers_incoming ( hostgroup_id INT NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT NOT NULL DEFAULT 5432 , weight INT NOT NULL DEFAULT 1 , status INT NOT NULL DEFAULT 0 , compression INT NOT NULL DEFAULT 0 , max_connections INT NOT NULL DEFAULT 1000 , max_replication_lag INT NOT NULL DEFAULT 0 , use_ssl INT NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port))"
#endif /* DEBUG */
#define MYHGM_PgSQL_REPLICATION_HOSTGROUPS "CREATE TABLE pgsql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>=0) , check_type VARCHAR CHECK (LOWER(check_type) IN ('read_only','innodb_read_only','super_read_only','read_only|innodb_read_only','read_only&innodb_read_only')) NOT NULL DEFAULT 'read_only' , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (reader_hostgroup))"
#define PGHGM_GEN_ADMIN_RUNTIME_SERVERS "SELECT hostgroup_id, hostname, port, gtid_port, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" WHEN 4 THEN \"SHUNNED\" END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers ORDER BY hostgroup_id, hostname, port"
#define PGHGM_GEN_ADMIN_RUNTIME_SERVERS "SELECT hostgroup_id, hostname, port, CASE status WHEN 0 THEN \"ONLINE\" WHEN 1 THEN \"SHUNNED\" WHEN 2 THEN \"OFFLINE_SOFT\" WHEN 3 THEN \"OFFLINE_HARD\" WHEN 4 THEN \"SHUNNED\" END status, weight, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers ORDER BY hostgroup_id, hostname, port"
#define MYHGM_PgSQL_HOSTGROUP_ATTRIBUTES "CREATE TABLE pgsql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , hostgroup_settings VARCHAR CHECK (JSON_VALID(hostgroup_settings) OR hostgroup_settings = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')"
@ -69,7 +69,7 @@ namespace nlohmann { class json; }
*/
#define PGHGM_GEN_CLUSTER_ADMIN_RUNTIME_SERVERS \
"SELECT " \
"hostgroup_id, hostname, port, gtid_port," \
"hostgroup_id, hostname, port, " \
"CASE status" \
" WHEN 0 THEN \"ONLINE\"" \
" WHEN 1 THEN \"ONLINE\"" \
@ -92,7 +92,7 @@ namespace nlohmann { class json; }
*/
#define PGHGM_GEN_CLUSTER_ADMIN_PGSQL_SERVERS \
"SELECT " \
"hostgroup_id, hostname, port, gtid_port, " \
"hostgroup_id, hostname, port, " \
"CASE" \
" WHEN status=\"SHUNNED\" THEN \"ONLINE\"" \
" ELSE status " \
@ -174,7 +174,6 @@ class PgSQL_SrvC { // MySQL Server Container
PgSQL_HGC *myhgc;
char *address;
uint16_t port;
uint16_t gtid_port;
uint16_t flags;
int64_t weight;
enum MySerStatus status;
@ -220,7 +219,7 @@ class PgSQL_SrvC { // MySQL Server Container
* @param _comment User defined comment.
*/
PgSQL_SrvC(
char* addr, uint16_t port, uint16_t gitd_port, int64_t _weight, enum MySerStatus _status, unsigned int _compression,
char* addr, uint16_t port, int64_t _weight, enum MySerStatus _status, unsigned int _compression,
int64_t _max_connections, unsigned int _max_replication_lag, int32_t _use_ssl, unsigned int _max_latency_ms,
char* _comment
);
@ -682,12 +681,7 @@ class PgSQL_HostGroups_Manager : public Base_HostGroups_Manager<PgSQL_HGC> {
void p_update_pgsql_error_counter(p_pgsql_error_type err_type, unsigned int hid, char* address, uint16_t port, unsigned int code);
wqueue<PgSQL_Connection *> queue;
// has_gtid_port is set to true if *any* of the servers in pgsql_servers has gtid_port enabled
// it is configured during commit()
// NOTE: this variable is currently NOT used, but in future will be able
// to deprecate pgsql-default_session_track_gtids because proxysql will
// be automatically able to determine when to enable GTID tracking
std::atomic<bool> has_gtid_port;
PgSQL_HostGroups_Manager();
~PgSQL_HostGroups_Manager();
void init();

@ -120,7 +120,6 @@ private:
PgSQL_Connection** my_idle_conns;
bool processing_idles;
//bool maintenance_loop;
bool retrieve_gtids_required; // if any of the servers has gtid_port enabled, this needs to be turned on too
PtrArray* cached_connections;

@ -266,7 +266,7 @@
// PgSQL Admin tables
#define ADMIN_SQLITE_TABLE_PGSQL_SERVERS "CREATE TABLE pgsql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT CHECK (port >= 0 AND port <= 65535) NOT NULL DEFAULT 5432 , gtid_port INT CHECK ((gtid_port <> port OR gtid_port=0) AND gtid_port >= 0 AND gtid_port <= 65535) NOT NULL DEFAULT 0 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0 AND weight <=10000000) NOT NULL DEFAULT 1 , compression INT CHECK (compression IN(0,1)) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_PGSQL_SERVERS "CREATE TABLE pgsql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT CHECK (port >= 0 AND port <= 65535) NOT NULL DEFAULT 5432 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0 AND weight <=10000000) NOT NULL DEFAULT 1 , compression INT CHECK (compression IN(0,1)) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_PGSQL_USERS "CREATE TABLE pgsql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 1 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , attributes VARCHAR CHECK (JSON_VALID(attributes) OR attributes = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))"
#define ADMIN_SQLITE_TABLE_PGSQL_LDAP_MAPPING "CREATE TABLE pgsql_ldap_mapping (priority INTEGER CHECK (priority >= 1 AND priority <= 1000000) PRIMARY KEY , frontend_entity VARCHAR NOT NULL , backend_entity VARCHAR NOT NULL , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (frontend_entity))"
#define ADMIN_SQLITE_TABLE_PGSQL_QUERY_RULES "CREATE TABLE pgsql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT CHECK (flagIN >= 0) NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT CHECK (proxy_port >= 0 AND proxy_port <= 65535) , digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , re_modifiers VARCHAR DEFAULT 'CASELESS' , flagOUT INT CHECK (flagOUT >= 0) , replace_pattern VARCHAR CHECK(CASE WHEN replace_pattern IS NULL THEN 1 WHEN replace_pattern IS NOT NULL AND match_pattern IS NOT NULL THEN 1 ELSE 0 END) , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , cache_empty_result INT CHECK (cache_empty_result IN (0,1)) DEFAULT NULL , cache_timeout INT CHECK(cache_timeout >= 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED CHECK (timeout >= 0) , retries INT CHECK (retries>=0 AND retries <=1000) , delay INT UNSIGNED CHECK (delay >=0) , next_query_flagIN INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , OK_msg VARCHAR , sticky_conn INT CHECK (sticky_conn IN (0,1)) , multiplex INT CHECK (multiplex IN (0,1,2)) , gtid_from_hostgroup INT UNSIGNED , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0 , attributes VARCHAR CHECK (JSON_VALID(attributes) OR attributes = '') NOT NULL DEFAULT '' , comment VARCHAR)"
@ -277,7 +277,7 @@
#define ADMIN_SQLITE_TABLE_PGSQL_HOSTGROUP_ATTRIBUTES "CREATE TABLE pgsql_hostgroup_attributes (hostgroup_id INT NOT NULL PRIMARY KEY , max_num_online_servers INT CHECK (max_num_online_servers>=0 AND max_num_online_servers <= 1000000) NOT NULL DEFAULT 1000000 , autocommit INT CHECK (autocommit IN (-1, 0, 1)) NOT NULL DEFAULT -1 , free_connections_pct INT CHECK (free_connections_pct >= 0 AND free_connections_pct <= 100) NOT NULL DEFAULT 10 , init_connect VARCHAR NOT NULL DEFAULT '' , multiplex INT CHECK (multiplex IN (0, 1)) NOT NULL DEFAULT 1 , connection_warming INT CHECK (connection_warming IN (0, 1)) NOT NULL DEFAULT 0 , throttle_connections_per_sec INT CHECK (throttle_connections_per_sec >= 1 AND throttle_connections_per_sec <= 1000000) NOT NULL DEFAULT 1000000 , ignore_session_variables VARCHAR CHECK (JSON_VALID(ignore_session_variables) OR ignore_session_variables = '') NOT NULL DEFAULT '' , hostgroup_settings VARCHAR CHECK (JSON_VALID(hostgroup_settings) OR hostgroup_settings = '') NOT NULL DEFAULT '' , servers_defaults VARCHAR CHECK (JSON_VALID(servers_defaults) OR servers_defaults = '') NOT NULL DEFAULT '' , comment VARCHAR NOT NULL DEFAULT '')"
#define ADMIN_SQLITE_TABLE_PGSQL_REPLICATION_HOSTGROUPS "CREATE TABLE pgsql_replication_hostgroups (writer_hostgroup INT CHECK (writer_hostgroup>=0) NOT NULL PRIMARY KEY , reader_hostgroup INT NOT NULL CHECK (reader_hostgroup<>writer_hostgroup AND reader_hostgroup>=0) , check_type VARCHAR CHECK (LOWER(check_type) IN ('read_only','innodb_read_only','super_read_only','read_only|innodb_read_only','read_only&innodb_read_only')) NOT NULL DEFAULT 'read_only' , comment VARCHAR NOT NULL DEFAULT '', UNIQUE (reader_hostgroup))"
#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_SERVERS "CREATE TABLE runtime_pgsql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT CHECK (port >= 0 AND port <= 65535) NOT NULL DEFAULT 5432 , gtid_port INT CHECK ((gtid_port <> port OR gtid_port=0) AND gtid_port >= 0 AND gtid_port <= 65535) NOT NULL DEFAULT 0 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0 AND weight <=10000000) NOT NULL DEFAULT 1 , compression INT CHECK (compression IN(0,1)) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_SERVERS "CREATE TABLE runtime_pgsql_servers (hostgroup_id INT CHECK (hostgroup_id>=0) NOT NULL DEFAULT 0 , hostname VARCHAR NOT NULL , port INT CHECK (port >= 0 AND port <= 65535) NOT NULL DEFAULT 5432 , status VARCHAR CHECK (UPPER(status) IN ('ONLINE','SHUNNED','OFFLINE_SOFT', 'OFFLINE_HARD')) NOT NULL DEFAULT 'ONLINE' , weight INT CHECK (weight >= 0 AND weight <=10000000) NOT NULL DEFAULT 1 , compression INT CHECK (compression IN(0,1)) NOT NULL DEFAULT 0 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 1000 , max_replication_lag INT CHECK (max_replication_lag >= 0 AND max_replication_lag <= 126144000) NOT NULL DEFAULT 0 , use_ssl INT CHECK (use_ssl IN(0,1)) NOT NULL DEFAULT 0 , max_latency_ms INT UNSIGNED CHECK (max_latency_ms>=0) NOT NULL DEFAULT 0 , comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (hostgroup_id, hostname, port) )"
#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_USERS "CREATE TABLE runtime_pgsql_users (username VARCHAR NOT NULL , password VARCHAR , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 1 , use_ssl INT CHECK (use_ssl IN (0,1)) NOT NULL DEFAULT 0 , default_hostgroup INT NOT NULL DEFAULT 0 , transaction_persistent INT CHECK (transaction_persistent IN (0,1)) NOT NULL DEFAULT 1 , fast_forward INT CHECK (fast_forward IN (0,1)) NOT NULL DEFAULT 0 , backend INT CHECK (backend IN (0,1)) NOT NULL DEFAULT 1 , frontend INT CHECK (frontend IN (0,1)) NOT NULL DEFAULT 1 , max_connections INT CHECK (max_connections >=0) NOT NULL DEFAULT 10000 , attributes VARCHAR CHECK (JSON_VALID(attributes) OR attributes = '') NOT NULL DEFAULT '', comment VARCHAR NOT NULL DEFAULT '' , PRIMARY KEY (username, backend) , UNIQUE (username, frontend))"
#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_LDAP_MAPPING "CREATE TABLE runtime_pgsql_ldap_mapping (priority INTEGER PRIMARY KEY NOT NULL , frontend_entity VARCHAR NOT NULL , backend_entity VARCHAR NOT NULL , comment VARCHAR NOT NULL DEFAULT '' , UNIQUE (frontend_entity))"
#define ADMIN_SQLITE_TABLE_RUNTIME_PGSQL_QUERY_RULES "CREATE TABLE runtime_pgsql_query_rules (rule_id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL , active INT CHECK (active IN (0,1)) NOT NULL DEFAULT 0 , username VARCHAR , schemaname VARCHAR , flagIN INT CHECK (flagIN >= 0) NOT NULL DEFAULT 0 , client_addr VARCHAR , proxy_addr VARCHAR , proxy_port INT CHECK (proxy_port >= 0 AND proxy_port <= 65535), digest VARCHAR , match_digest VARCHAR , match_pattern VARCHAR , negate_match_pattern INT CHECK (negate_match_pattern IN (0,1)) NOT NULL DEFAULT 0 , re_modifiers VARCHAR DEFAULT 'CASELESS' , flagOUT INT CHECK (flagOUT >= 0), replace_pattern VARCHAR CHECK(CASE WHEN replace_pattern IS NULL THEN 1 WHEN replace_pattern IS NOT NULL AND match_pattern IS NOT NULL THEN 1 ELSE 0 END) , destination_hostgroup INT DEFAULT NULL , cache_ttl INT CHECK(cache_ttl > 0) , cache_empty_result INT CHECK (cache_empty_result IN (0,1)) DEFAULT NULL , cache_timeout INT CHECK(cache_timeout >= 0) , reconnect INT CHECK (reconnect IN (0,1)) DEFAULT NULL , timeout INT UNSIGNED CHECK (timeout >= 0) , retries INT CHECK (retries>=0 AND retries <=1000) , delay INT UNSIGNED CHECK (delay >=0) , next_query_flagIN INT UNSIGNED , mirror_flagOUT INT UNSIGNED , mirror_hostgroup INT UNSIGNED , error_msg VARCHAR , OK_msg VARCHAR , sticky_conn INT CHECK (sticky_conn IN (0,1)) , multiplex INT CHECK (multiplex IN (0,1,2)) , gtid_from_hostgroup INT UNSIGNED , log INT CHECK (log IN (0,1)) , apply INT CHECK(apply IN (0,1)) NOT NULL DEFAULT 0 , attributes VARCHAR CHECK (JSON_VALID(attributes) OR attributes = '') NOT NULL DEFAULT '' , comment VARCHAR)"

@ -291,13 +291,12 @@ void PgSQL_SrvConnList::drop_all_connections() {
PgSQL_SrvC::PgSQL_SrvC(
char* add, uint16_t p, uint16_t gp, int64_t _weight, enum MySerStatus _status, unsigned int _compression,
char* add, uint16_t p, int64_t _weight, enum MySerStatus _status, unsigned int _compression,
int64_t _max_connections, unsigned int _max_replication_lag, int32_t _use_ssl, unsigned int _max_latency_ms,
char* _comment
) {
address=strdup(add);
port=p;
gtid_port=gp;
weight=_weight;
status=_status;
compression=_compression;
@ -984,8 +983,8 @@ int PgSQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
sqlite3_stmt *statement1=NULL;
sqlite3_stmt *statement32=NULL;
//sqlite3 *mydb3=mydb->get_db();
char *query1=(char *)"INSERT INTO pgsql_servers_incoming VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
std::string query32s = "INSERT INTO pgsql_servers_incoming VALUES " + generate_multi_rows_query(32,12);
char *query1=(char *)"INSERT INTO pgsql_servers_incoming VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)";
std::string query32s = "INSERT INTO pgsql_servers_incoming VALUES " + generate_multi_rows_query(32,11);
char *query32 = (char *)query32s.c_str();
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
rc = mydb->prepare_v2(query1, &statement1);
@ -1000,14 +999,14 @@ int PgSQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r1=*it;
status1=MYSQL_SERVER_STATUS_ONLINE;
if (strcasecmp(r1->fields[4],"ONLINE")) {
if (!strcasecmp(r1->fields[4],"SHUNNED")) {
if (strcasecmp(r1->fields[3],"ONLINE")) {
if (!strcasecmp(r1->fields[3],"SHUNNED")) {
status1=MYSQL_SERVER_STATUS_SHUNNED;
} else {
if (!strcasecmp(r1->fields[4],"OFFLINE_SOFT")) {
if (!strcasecmp(r1->fields[3],"OFFLINE_SOFT")) {
status1=MYSQL_SERVER_STATUS_OFFLINE_SOFT;
} else {
if (!strcasecmp(r1->fields[4],"OFFLINE_HARD")) {
if (!strcasecmp(r1->fields[3],"OFFLINE_HARD")) {
status1=MYSQL_SERVER_STATUS_OFFLINE_HARD;
}
}
@ -1015,18 +1014,17 @@ int PgSQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
}
int idx=row_idx%32;
if (row_idx<max_bulk_row_idx) { // bulk
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*12)+2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+4, atoi(r1->fields[3])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+5, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+6, status1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*12)+11, atoi(r1->fields[10])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*12)+12, r1->fields[11], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*11)+2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+4, atoi(r1->fields[4])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+5, status1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+6, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (idx*11)+10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (idx*11)+11, r1->fields[10], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
if (idx==31) {
SAFE_SQLITE3_STEP2(statement32);
rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, mydb);
@ -1034,17 +1032,16 @@ int PgSQL_HostGroups_Manager::servers_add(SQLite3_result *resultset) {
}
} else { // single row
rc=(*proxy_sqlite3_bind_int64)(statement1, 1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r1->fields[3])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 5, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 6, status1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r1->fields[4])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 5, status1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 6, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 11, atoi(r1->fields[10])); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 12, r1->fields[11], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 11, r1->fields[10], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb);
@ -1366,9 +1363,7 @@ bool PgSQL_HostGroups_Manager::commit(
wrlock();
// purge table
purge_pgsql_servers_table();
// if any server has gtid_port enabled, use_gtid is set to true
// and then has_gtid_port is set too
bool use_gtid = false;
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM pgsql_servers\n");
mydb->execute("DELETE FROM pgsql_servers");
generate_pgsql_servers_table();
@ -1419,10 +1414,10 @@ bool PgSQL_HostGroups_Manager::commit(
//mydb->execute("DELETE FROM pgsql_servers");
//generate_pgsql_servers_table();
mydb->execute("INSERT OR IGNORE INTO pgsql_servers(hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, gtid_port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers_incoming");
mydb->execute("INSERT OR IGNORE INTO pgsql_servers(hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT hostgroup_id, hostname, port, weight, status, compression, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment FROM pgsql_servers_incoming");
// SELECT FROM pgsql_servers whatever is not identical in pgsql_servers_incoming, or where mem_pointer=0 (where there is no pointer yet)
query=(char *)"SELECT t1.*, t2.gtid_port, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM pgsql_servers t1 JOIN pgsql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.gtid_port<>t2.gtid_port OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
query=(char *)"SELECT t1.*, t2.weight, t2.status, t2.compression, t2.max_connections, t2.max_replication_lag, t2.use_ssl, t2.max_latency_ms, t2.comment FROM pgsql_servers t1 JOIN pgsql_servers_incoming t2 ON (t1.hostgroup_id=t2.hostgroup_id AND t1.hostname=t2.hostname AND t1.port=t2.port) WHERE mem_pointer=0 OR t1.weight<>t2.weight OR t1.status<>t2.status OR t1.compression<>t2.compression OR t1.max_connections<>t2.max_connections OR t1.max_replication_lag<>t2.max_replication_lag OR t1.use_ssl<>t2.use_ssl OR t1.max_latency_ms<>t2.max_latency_ms or t1.comment<>t2.comment";
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "%s\n", query);
mydb->execute_statement(query, &error , &cols , &affected_rows , &resultset);
if (error) {
@ -1442,22 +1437,22 @@ bool PgSQL_HostGroups_Manager::commit(
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
rc = mydb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, mydb);
char *query2=(char *)"UPDATE pgsql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 , gtid_port = ?9 WHERE hostgroup_id = ?10 AND hostname = ?11 AND port = ?12";
char *query2=(char *)"UPDATE pgsql_servers SET weight = ?1 , status = ?2 , compression = ?3 , max_connections = ?4 , max_replication_lag = ?5 , use_ssl = ?6 , max_latency_ms = ?7 , comment = ?8 WHERE hostgroup_id = ?9 AND hostname = ?10 AND port = ?11";
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query2, -1, &statement2, 0);
rc = mydb->prepare_v2(query2, &statement2);
ASSERT_SQLITE_OK(rc, mydb);
for (std::vector<SQLite3_row *>::iterator it = resultset->rows.begin() ; it != resultset->rows.end(); ++it) {
SQLite3_row *r=*it;
long long ptr=atoll(r->fields[12]); // increase this index every time a new column is added
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus) atoi(r->fields[5]), ptr, atoi(r->fields[0]), atoi(r->fields[6]));
long long ptr=atoll(r->fields[11]); // increase this index every time a new column is added
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Server %s:%d , weight=%d, status=%d, mem_pointer=%llu, hostgroup=%d, compression=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), ptr, atoi(r->fields[0]), atoi(r->fields[5]));
//fprintf(stderr,"%lld\n", ptr);
if (ptr==0) {
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus) atoi(r->fields[5]));
proxy_info("Creating new server in HG %d : %s:%d , weight=%d, status=%d\n", atoi(r->fields[0]), r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]));
}
PgSQL_SrvC *mysrvc=new PgSQL_SrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), atoi(r->fields[4]), (MySerStatus) atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), atoi(r->fields[10]), r->fields[11]); // add new fields here if adding more columns in pgsql_servers
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]), (MySerStatus) atoi(r->fields[5]), mysrvc, atoi(r->fields[0]));
PgSQL_SrvC *mysrvc=new PgSQL_SrvC(r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), atoi(r->fields[5]), atoi(r->fields[6]), atoi(r->fields[7]), atoi(r->fields[8]), atoi(r->fields[9]), r->fields[10]); // add new fields here if adding more columns in pgsql_servers
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%d, status=%d, mem_ptr=%p into hostgroup=%d\n", r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]), (MySerStatus) atoi(r->fields[4]), mysrvc, atoi(r->fields[0]));
add(mysrvc,atoi(r->fields[0]));
ptr=(uintptr_t)mysrvc;
rc=(*proxy_sqlite3_bind_int64)(statement1, 1, ptr); ASSERT_SQLITE_OK(rc, mydb);
@ -1467,48 +1462,38 @@ bool PgSQL_HostGroups_Manager::commit(
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, mydb);
if (mysrvc->gtid_port) {
// this server has gtid_port configured, we set use_gtid
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid , mysrvc->address, mysrvc->port);
use_gtid = true;
}
} else {
bool run_update=false;
PgSQL_SrvC *mysrvc=(PgSQL_SrvC *)ptr;
// carefully increase the 2nd index by 1 for every new column added
if (atoi(r->fields[3])!=atoi(r->fields[13])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing gtid_port for server %u:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]) , mysrvc->gtid_port , atoi(r->fields[13]));
mysrvc->gtid_port=atoi(r->fields[13]);
}
if (atoi(r->fields[4])!=atoi(r->fields[14])) {
if (atoi(r->fields[3])!=atoi(r->fields[12])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]) , mysrvc->weight , atoi(r->fields[14]));
mysrvc->weight=atoi(r->fields[14]);
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Changing weight for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[3]) , mysrvc->weight , atoi(r->fields[12]));
mysrvc->weight=atoi(r->fields[12]);
}
if (atoi(r->fields[5])!=atoi(r->fields[15])) {
if (atoi(r->fields[4])!=atoi(r->fields[13])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]) , mysrvc->status , atoi(r->fields[15]));
mysrvc->status=(MySerStatus)atoi(r->fields[15]);
proxy_info("Changing status for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[4]) , mysrvc->status , atoi(r->fields[13]));
mysrvc->status=(MySerStatus)atoi(r->fields[13]);
if (mysrvc->status==MYSQL_SERVER_STATUS_SHUNNED) {
mysrvc->shunned_automatic=false;
}
}
if (atoi(r->fields[6])!=atoi(r->fields[16])) {
if (atoi(r->fields[5])!=atoi(r->fields[14])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]) , mysrvc->compression , atoi(r->fields[16]));
mysrvc->compression=atoi(r->fields[16]);
proxy_info("Changing compression for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[5]) , mysrvc->compression , atoi(r->fields[14]));
mysrvc->compression=atoi(r->fields[14]);
}
if (atoi(r->fields[7])!=atoi(r->fields[17])) {
if (atoi(r->fields[6])!=atoi(r->fields[15])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%ld) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]) , mysrvc->max_connections , atoi(r->fields[17]));
mysrvc->max_connections=atoi(r->fields[17]);
proxy_info("Changing max_connections for server %d:%s:%d (%s:%d) from %d (%ld) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[6]) , mysrvc->max_connections , atoi(r->fields[15]));
mysrvc->max_connections=atoi(r->fields[15]);
}
if (atoi(r->fields[8])!=atoi(r->fields[18])) {
if (atoi(r->fields[7])!=atoi(r->fields[16])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]) , mysrvc->max_replication_lag , atoi(r->fields[18]));
mysrvc->max_replication_lag=atoi(r->fields[18]);
proxy_info("Changing max_replication_lag for server %u:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[7]) , mysrvc->max_replication_lag , atoi(r->fields[16]));
mysrvc->max_replication_lag=atoi(r->fields[16]);
if (mysrvc->max_replication_lag == 0) { // we just changed it to 0
if (mysrvc->status == MYSQL_SERVER_STATUS_SHUNNED_REPLICATION_LAG) {
// the server is currently shunned due to replication lag
@ -1518,21 +1503,21 @@ bool PgSQL_HostGroups_Manager::commit(
}
}
}
if (atoi(r->fields[9])!=atoi(r->fields[19])) {
if (atoi(r->fields[8])!=atoi(r->fields[17])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]) , mysrvc->use_ssl , atoi(r->fields[19]));
mysrvc->use_ssl=atoi(r->fields[19]);
proxy_info("Changing use_ssl for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[8]) , mysrvc->use_ssl , atoi(r->fields[17]));
mysrvc->use_ssl=atoi(r->fields[17]);
}
if (atoi(r->fields[10])!=atoi(r->fields[20])) {
if (atoi(r->fields[9])!=atoi(r->fields[18])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[10]) , mysrvc->max_latency_us/1000 , atoi(r->fields[20]));
mysrvc->max_latency_us=1000*atoi(r->fields[20]);
proxy_info("Changing max_latency_ms for server %d:%s:%d (%s:%d) from %d (%d) to %d\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), atoi(r->fields[9]) , mysrvc->max_latency_us/1000 , atoi(r->fields[18]));
mysrvc->max_latency_us=1000*atoi(r->fields[18]);
}
if (strcmp(r->fields[11],r->fields[21])) {
if (strcmp(r->fields[10],r->fields[19])) {
if (GloMTH->variables.hostgroup_manager_verbose)
proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[11], r->fields[21]);
proxy_info("Changing comment for server %d:%s:%d (%s:%d) from '%s' to '%s'\n" , mysrvc->myhgc->hid , mysrvc->address, mysrvc->port, r->fields[1], atoi(r->fields[2]), r->fields[10], r->fields[19]);
free(mysrvc->comment);
mysrvc->comment=strdup(r->fields[21]);
mysrvc->comment=strdup(r->fields[19]);
}
if (run_update) {
rc=(*proxy_sqlite3_bind_int64)(statement2, 1, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
@ -1542,30 +1527,19 @@ bool PgSQL_HostGroups_Manager::commit(
rc=(*proxy_sqlite3_bind_int64)(statement2, 5, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 6, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 7, mysrvc->max_latency_us/1000); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 10, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement2, 11, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 12, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement2, 8, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 9, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement2, 10, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement2, 11, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
SAFE_SQLITE3_STEP2(statement2);
rc=(*proxy_sqlite3_clear_bindings)(statement2); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_reset)(statement2); ASSERT_SQLITE_OK(rc, mydb);
}
if (mysrvc->gtid_port) {
// this server has gtid_port configured, we set use_gtid
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 6, "Server %u:%s:%d has gtid_port enabled, setting use_gitd=true if not already set\n", mysrvc->myhgc->hid , mysrvc->address, mysrvc->port);
use_gtid = true;
}
}
}
(*proxy_sqlite3_finalize)(statement1);
(*proxy_sqlite3_finalize)(statement2);
}
if (use_gtid) {
has_gtid_port = true;
} else {
has_gtid_port = false;
}
if (resultset) { delete resultset; resultset=NULL; }
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 4, "DELETE FROM pgsql_servers_incoming\n");
mydb->execute("DELETE FROM pgsql_servers_incoming");
@ -1711,11 +1685,11 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
PtrArray *lst=new PtrArray();
//sqlite3 *mydb3=mydb->get_db();
char *query1=(char *)"INSERT INTO pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)";
char *query1=(char *)"INSERT INTO pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
rc = mydb->prepare_v2(query1, &statement1);
ASSERT_SQLITE_OK(rc, mydb);
std::string query32s = "INSERT INTO pgsql_servers VALUES " + generate_multi_rows_query(32,13);
std::string query32s = "INSERT INTO pgsql_servers VALUES " + generate_multi_rows_query(32,12);
char *query32 = (char *)query32s.c_str();
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query32, -1, &statement32, 0);
rc = mydb->prepare_v2(query32, &statement32);
@ -1723,10 +1697,10 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
if (pgsql_thread___hostgroup_manager_verbose) {
if (_onlyhg==NULL) {
proxy_info("Dumping current MySQL Servers structures for hostgroup ALL\n");
proxy_info("Dumping current PgSQL Servers structures for hostgroup ALL\n");
} else {
int hidonly=*_onlyhg;
proxy_info("Dumping current MySQL Servers structures for hostgroup %d\n", hidonly);
proxy_info("Dumping current PgSQL Servers structures for hostgroup %d\n", hidonly);
}
}
for (unsigned int i=0; i<MyHostGroups->len; i++) {
@ -1759,7 +1733,7 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
st=(char *)"SHUNNED";
break;
}
fprintf(stderr,"HID: %d , address: %s , port: %d , gtid_port: %d , weight: %ld , status: %s , max_connections: %ld , max_replication_lag: %u , use_ssl: %u , max_latency_ms: %u , comment: %s\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->gtid_port, mysrvc->weight, st, mysrvc->max_connections, mysrvc->max_replication_lag, mysrvc->use_ssl, mysrvc->max_latency_us*1000, mysrvc->comment);
fprintf(stderr,"HID: %d , address: %s , port: %d , weight: %ld , status: %s , max_connections: %ld , max_replication_lag: %u , use_ssl: %u , max_latency_ms: %u , comment: %s\n", mysrvc->myhgc->hid, mysrvc->address, mysrvc->port, mysrvc->weight, st, mysrvc->max_connections, mysrvc->max_replication_lag, mysrvc->use_ssl, mysrvc->max_latency_us*1000, mysrvc->comment);
}
lst->add(mysrvc);
if (lst->len==32) {
@ -1768,19 +1742,18 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
i--;
PgSQL_SrvC *mysrvc=(PgSQL_SrvC *)lst->remove_index_fast(0);
uintptr_t ptr=(uintptr_t)mysrvc;
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+1, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (i*13)+2, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+3, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+4, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+5, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+6, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+7, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+8, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+9, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+10, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+11, mysrvc->max_latency_us/1000); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (i*13)+12, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*13)+13, ptr); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+1, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (i*12)+2, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+3, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+4, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+5, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+6, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+7, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+8, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+9, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+10, mysrvc->max_latency_us/1000); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement32, (i*12)+11, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement32, (i*12)+12, ptr); ASSERT_SQLITE_OK(rc, mydb);
}
SAFE_SQLITE3_STEP2(statement32);
rc=(*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, mydb);
@ -1792,18 +1765,17 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
PgSQL_SrvC *mysrvc=(PgSQL_SrvC *)lst->remove_index_fast(0);
uintptr_t ptr=(uintptr_t)mysrvc;
rc=(*proxy_sqlite3_bind_int64)(statement1, 1, mysrvc->myhgc->hid); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 2, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 2, mysrvc->address, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 3, mysrvc->port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 4, mysrvc->gtid_port); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 5, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 6, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 7, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 8, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 9, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 10, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 11, mysrvc->max_latency_us/1000); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 12, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 13, ptr); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 4, mysrvc->weight); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 5, mysrvc->status); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 6, mysrvc->compression); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 7, mysrvc->max_connections); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 8, mysrvc->max_replication_lag); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 9, mysrvc->use_ssl); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 10, mysrvc->max_latency_us/1000); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_text)(statement1, 11, mysrvc->comment, -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, mydb);
rc=(*proxy_sqlite3_bind_int64)(statement1, 12, ptr); ASSERT_SQLITE_OK(rc, mydb);
SAFE_SQLITE3_STEP2(statement1);
rc=(*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, mydb);
@ -1817,11 +1789,11 @@ void PgSQL_HostGroups_Manager::generate_pgsql_servers_table(int *_onlyhg) {
int affected_rows=0;
SQLite3_result *resultset=NULL;
if (_onlyhg==NULL) {
mydb->execute_statement((char *)"SELECT hostgroup_id hid, hostname, port, gtid_port gtid, weight, status, compression cmp, max_connections max_conns, max_replication_lag max_lag, use_ssl ssl, max_latency_ms max_lat, comment, mem_pointer FROM pgsql_servers", &error , &cols , &affected_rows , &resultset);
mydb->execute_statement((char *)"SELECT hostgroup_id hid, hostname, port, weight, status, compression cmp, max_connections max_conns, max_replication_lag max_lag, use_ssl ssl, max_latency_ms max_lat, comment, mem_pointer FROM pgsql_servers", &error , &cols , &affected_rows , &resultset);
} else {
int hidonly=*_onlyhg;
char *q1 = (char *)malloc(256);
sprintf(q1,"SELECT hostgroup_id hid, hostname, port, gtid_port gtid, weight, status, compression cmp, max_connections max_conns, max_replication_lag max_lag, use_ssl ssl, max_latency_ms max_lat, comment, mem_pointer FROM pgsql_servers WHERE hostgroup_id=%d" , hidonly);
sprintf(q1,"SELECT hostgroup_id hid, hostname, port, weight, status, compression cmp, max_connections max_conns, max_replication_lag max_lag, use_ssl ssl, max_latency_ms max_lat, comment, mem_pointer FROM pgsql_servers WHERE hostgroup_id=%d" , hidonly);
mydb->execute_statement(q1, &error , &cols , &affected_rows , &resultset);
free(q1);
}
@ -3457,7 +3429,7 @@ void PgSQL_HostGroups_Manager::read_only_action(char *hostname, int port, int re
const char *Q1B=(char *)"SELECT hostgroup_id,status FROM ( SELECT DISTINCT writer_hostgroup FROM pgsql_replication_hostgroups JOIN pgsql_servers WHERE (hostgroup_id=writer_hostgroup) AND hostname='%s' AND port=%d UNION SELECT DISTINCT writer_hostgroup FROM pgsql_replication_hostgroups JOIN pgsql_servers WHERE (hostgroup_id=reader_hostgroup) AND hostname='%s' AND port=%d) LEFT JOIN pgsql_servers ON hostgroup_id=writer_hostgroup AND hostname='%s' AND port=%d";
const char *Q2A=(char *)"DELETE FROM pgsql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM pgsql_replication_hostgroups WHERE writer_hostgroup=pgsql_servers.hostgroup_id) AND status='OFFLINE_HARD'";
const char *Q2B=(char *)"UPDATE OR IGNORE pgsql_servers SET hostgroup_id=(SELECT writer_hostgroup FROM pgsql_replication_hostgroups WHERE reader_hostgroup=pgsql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM pgsql_replication_hostgroups WHERE reader_hostgroup=pgsql_servers.hostgroup_id)";
const char *Q3A=(char *)"INSERT OR IGNORE INTO pgsql_servers(hostgroup_id, hostname, port, gtid_port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT reader_hostgroup, hostname, port, gtid_port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers.comment FROM pgsql_servers JOIN pgsql_replication_hostgroups ON pgsql_servers.hostgroup_id=pgsql_replication_hostgroups.writer_hostgroup WHERE hostname='%s' AND port=%d";
const char *Q3A=(char *)"INSERT OR IGNORE INTO pgsql_servers(hostgroup_id, hostname, port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) SELECT reader_hostgroup, hostname, port, status, weight, max_connections, max_replication_lag, use_ssl, max_latency_ms, pgsql_servers.comment FROM pgsql_servers JOIN pgsql_replication_hostgroups ON pgsql_servers.hostgroup_id=pgsql_replication_hostgroups.writer_hostgroup WHERE hostname='%s' AND port=%d";
const char *Q3B=(char *)"DELETE FROM pgsql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT reader_hostgroup FROM pgsql_replication_hostgroups WHERE reader_hostgroup=pgsql_servers.hostgroup_id)";
const char *Q4=(char *)"UPDATE OR IGNORE pgsql_servers SET hostgroup_id=(SELECT reader_hostgroup FROM pgsql_replication_hostgroups WHERE writer_hostgroup=pgsql_servers.hostgroup_id) WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM pgsql_replication_hostgroups WHERE writer_hostgroup=pgsql_servers.hostgroup_id)";
const char *Q5=(char *)"DELETE FROM pgsql_servers WHERE hostname='%s' AND port=%d AND hostgroup_id IN (SELECT writer_hostgroup FROM pgsql_replication_hostgroups WHERE writer_hostgroup=pgsql_servers.hostgroup_id)";
@ -4566,7 +4538,7 @@ int PgSQL_HostGroups_Manager::create_new_server_in_hg(
if (mysrvc == nullptr) {
char* c_hostname { const_cast<char*>(srv_info.addr.c_str()) };
PgSQL_SrvC* mysrvc = new PgSQL_SrvC(
c_hostname, srv_info.port, 0, srv_opts.weigth, MYSQL_SERVER_STATUS_ONLINE, 0, srv_opts.max_conns, 0,
c_hostname, srv_info.port, srv_opts.weigth, MYSQL_SERVER_STATUS_ONLINE, 0, srv_opts.max_conns, 0,
srv_opts.use_ssl, 0, const_cast<char*>("")
);
add(mysrvc,hid);
@ -4764,7 +4736,6 @@ PgSQL_SrvC* PgSQL_HostGroups_Manager::HostGroup_Server_Mapping::insert_HGM(unsig
if (strcmp(mysrvc->address, srv->address) == 0 && mysrvc->port == srv->port) {
if (mysrvc->status == MYSQL_SERVER_STATUS_OFFLINE_HARD) {
mysrvc->gtid_port = srv->gtid_port;
mysrvc->weight = srv->weight;
mysrvc->compression = srv->compression;
mysrvc->max_connections = srv->max_connections;
@ -4777,9 +4748,9 @@ PgSQL_SrvC* PgSQL_HostGroups_Manager::HostGroup_Server_Mapping::insert_HGM(unsig
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info(
"Found server node in Host Group Container %s:%d as 'OFFLINE_HARD', setting back as 'ONLINE' with:"
" hostgroup_id=%d, gtid_port=%d, weight=%ld, compression=%d, max_connections=%ld, use_ssl=%d,"
" hostgroup_id=%d, weight=%ld, compression=%d, max_connections=%ld, use_ssl=%d,"
" max_replication_lag=%d, max_latency_ms=%d, comment=%s\n",
mysrvc->address, mysrvc->port, hostgroup_id, mysrvc->gtid_port, mysrvc->weight, mysrvc->compression,
mysrvc->address, mysrvc->port, hostgroup_id, mysrvc->weight, mysrvc->compression,
mysrvc->max_connections, mysrvc->use_ssl, mysrvc->max_replication_lag, (mysrvc->max_latency_us / 1000),
mysrvc->comment
);
@ -4792,12 +4763,12 @@ PgSQL_SrvC* PgSQL_HostGroups_Manager::HostGroup_Server_Mapping::insert_HGM(unsig
if (!ret_srv) {
if (GloMTH->variables.hostgroup_manager_verbose) {
proxy_info("Creating new server in HG %d : %s:%d , gtid_port=%d, weight=%ld, status=%d\n", hostgroup_id, srv->address, srv->port, srv->gtid_port, srv->weight, srv->status);
proxy_info("Creating new server in HG %d : %s:%d , weight=%ld, status=%d\n", hostgroup_id, srv->address, srv->port, srv->weight, srv->status);
}
proxy_debug(PROXY_DEBUG_MYSQL_CONNPOOL, 5, "Adding new server %s:%d , weight=%ld, status=%d, mem_ptr=%p into hostgroup=%d\n", srv->address, srv->port, srv->weight, srv->status, srv, hostgroup_id);
ret_srv = new PgSQL_SrvC(srv->address, srv->port, srv->gtid_port, srv->weight, srv->status, srv->compression,
ret_srv = new PgSQL_SrvC(srv->address, srv->port, srv->weight, srv->status, srv->compression,
srv->max_connections, srv->max_replication_lag, srv->use_ssl, (srv->max_latency_us / 1000), srv->comment);
myhgc->mysrvs->add(ret_srv);

@ -987,7 +987,8 @@ void PgSQL_Protocol::welcome_client() {
pgpkt.set_multi_pkt_mode(true);
pgpkt.write_AuthenticationOk();
pgpkt.write_ParameterStatus("is_superuser", "on"); // only for admin
if (sess->session_type == PROXYSQL_SESSION_ADMIN)
pgpkt.write_ParameterStatus("is_superuser", "on"); // only for admin
const char* application_name = (*myds)->myconn->conn_params.get_value(PG_APPLICATION_NAME);
if (application_name)

@ -3052,8 +3052,6 @@ void PgSQL_Thread::run() {
maintenance_loop = true;
servers_table_version_previous = servers_table_version_current;
servers_table_version_current = PgHGM->get_servers_table_version();
// during a maintenance loop (every 1 second) we read has_gtid_port from PgHGM
retrieve_gtids_required = PgHGM->has_gtid_port;
}
else {
maintenance_loop = false;
@ -3979,7 +3977,6 @@ PgSQL_Thread::PgSQL_Thread() {
last_maintenance_time = 0;
last_move_to_idle_thread_time = 0;
maintenance_loop = true;
retrieve_gtids_required = false;
servers_table_version_previous = 0;
servers_table_version_current = 0;

@ -6843,13 +6843,13 @@ void ProxySQL_Admin::save_pgsql_servers_runtime_to_database(bool _runtime) {
char* query32 = NULL;
std::string query32s = "";
if (_runtime) {
query1 = (char*)"INSERT INTO runtime_pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
query32s = "INSERT INTO runtime_pgsql_servers VALUES " + generate_multi_rows_query(32, 12);
query1 = (char*)"INSERT INTO runtime_pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)";
query32s = "INSERT INTO runtime_pgsql_servers VALUES " + generate_multi_rows_query(32, 11);
query32 = (char*)query32s.c_str();
}
else {
query1 = (char*)"INSERT INTO pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)";
query32s = "INSERT INTO pgsql_servers VALUES " + generate_multi_rows_query(32, 12);
query1 = (char*)"INSERT INTO pgsql_servers VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)";
query32s = "INSERT INTO pgsql_servers VALUES " + generate_multi_rows_query(32, 11);
query32 = (char*)query32s.c_str();
}
//rc=(*proxy_sqlite3_prepare_v2)(mydb3, query1, -1, &statement1, 0);
@ -6865,18 +6865,17 @@ void ProxySQL_Admin::save_pgsql_servers_runtime_to_database(bool _runtime) {
SQLite3_row* r1 = *it;
int idx = row_idx % 32;
if (row_idx < max_bulk_row_idx) { // bulk
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 12) + 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 4, atoi(r1->fields[3])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 12) + 5, (_runtime ? r1->fields[4] : (strcmp(r1->fields[4], "SHUNNED") == 0 ? "ONLINE" : r1->fields[4])), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 6, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 12) + 11, atoi(r1->fields[10])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 12) + 12, r1->fields[11], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 11) + 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 11) + 4, (_runtime ? r1->fields[3] : (strcmp(r1->fields[4], "SHUNNED") == 0 ? "ONLINE" : r1->fields[3])), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 5, atoi(r1->fields[4])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 6, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement32, (idx * 11) + 10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement32, (idx * 11) + 11, r1->fields[10], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
if (idx == 31) {
SAFE_SQLITE3_STEP2(statement32);
rc = (*proxy_sqlite3_clear_bindings)(statement32); ASSERT_SQLITE_OK(rc, admindb);
@ -6885,17 +6884,16 @@ void ProxySQL_Admin::save_pgsql_servers_runtime_to_database(bool _runtime) {
}
else { // single row
rc = (*proxy_sqlite3_bind_int64)(statement1, 1, atoi(r1->fields[0])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 2, r1->fields[1], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 3, atoi(r1->fields[2])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 4, atoi(r1->fields[3])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 5, (_runtime ? r1->fields[4] : (strcmp(r1->fields[4], "SHUNNED") == 0 ? "ONLINE" : r1->fields[4])), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 4, (_runtime ? r1->fields[3] : (strcmp(r1->fields[3], "SHUNNED") == 0 ? "ONLINE" : r1->fields[3])), -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 5, atoi(r1->fields[4])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 6, atoi(r1->fields[5])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 7, atoi(r1->fields[6])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 8, atoi(r1->fields[7])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 9, atoi(r1->fields[8])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 10, atoi(r1->fields[9])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_int64)(statement1, 11, atoi(r1->fields[10])); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 12, r1->fields[11], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_bind_text)(statement1, 11, r1->fields[10], -1, SQLITE_TRANSIENT); ASSERT_SQLITE_OK(rc, admindb);
SAFE_SQLITE3_STEP2(statement1);
rc = (*proxy_sqlite3_clear_bindings)(statement1); ASSERT_SQLITE_OK(rc, admindb);
rc = (*proxy_sqlite3_reset)(statement1); ASSERT_SQLITE_OK(rc, admindb);
@ -7267,7 +7265,7 @@ void ProxySQL_Admin::load_pgsql_servers_to_runtime(const incoming_pgsql_servers_
SQLite3_result* incoming_hostgroup_attributes = incoming_pgsql_servers.incoming_hostgroup_attributes;
SQLite3_result* incoming_pgsql_servers_v2 = incoming_pgsql_servers.incoming_pgsql_servers_v2;
const char* query = (char*)"SELECT hostgroup_id,hostname,port,gtid_port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.pgsql_servers ORDER BY hostgroup_id, hostname, port";
const char* query = (char*)"SELECT hostgroup_id,hostname,port,status,weight,compression,max_connections,max_replication_lag,use_ssl,max_latency_ms,comment FROM main.pgsql_servers ORDER BY hostgroup_id, hostname, port";
if (runtime_pgsql_servers == nullptr) {
proxy_debug(PROXY_DEBUG_ADMIN, 4, "%s\n", query);
admindb->execute_statement(query, &error, &cols, &affected_rows, &resultset_servers);

@ -1443,15 +1443,14 @@ int ProxySQL_Config::Write_PgSQL_Servers_to_configfile(std::string& data) {
addField(data, "hostgroup_id", r->fields[0], "");
addField(data, "hostname", r->fields[1]);
addField(data, "port", r->fields[2], "");
addField(data, "gtid_port", r->fields[3], "");
addField(data, "status", r->fields[4]);
addField(data, "weight", r->fields[5], "");
addField(data, "compression", r->fields[6], "");
addField(data, "max_connections", r->fields[7], "");
addField(data, "max_replication_lag", r->fields[8], "");
addField(data, "use_ssl", r->fields[9], "");
addField(data, "max_latency_ms", r->fields[10], "");
addField(data, "comment", r->fields[11]);
addField(data, "status", r->fields[3]);
addField(data, "weight", r->fields[4], "");
addField(data, "compression", r->fields[5], "");
addField(data, "max_connections", r->fields[6], "");
addField(data, "max_replication_lag", r->fields[7], "");
addField(data, "use_ssl", r->fields[8], "");
addField(data, "max_latency_ms", r->fields[9], "");
addField(data, "comment", r->fields[10]);
data += "\t}";
isNext = true;
@ -1504,13 +1503,12 @@ int ProxySQL_Config::Read_PgSQL_Servers_from_configfile() {
const Setting& pgsql_servers = root["pgsql_servers"];
int count = pgsql_servers.getLength();
//fprintf(stderr, "Found %d servers\n",count);
char* q = (char*)"INSERT OR REPLACE INTO pgsql_servers (hostname, port, gtid_port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (\"%s\", %d, %d, %d, %d, %d, \"%s\", %d, %d, %d, %d, '%s')";
char* q = (char*)"INSERT OR REPLACE INTO pgsql_servers (hostname, port, hostgroup_id, compression, weight, status, max_connections, max_replication_lag, use_ssl, max_latency_ms, comment) VALUES (\"%s\", %d, %d, %d, %d, \"%s\", %d, %d, %d, %d, '%s')";
for (i = 0; i < count; i++) {
const Setting& server = pgsql_servers[i];
std::string address;
std::string status = "ONLINE";
int port = 5432;
int gtid_port = 0;
int hostgroup;
int weight = 1;
int compression = 0;
@ -1526,7 +1524,6 @@ int ProxySQL_Config::Read_PgSQL_Servers_from_configfile() {
}
}
server.lookupValue("port", port);
server.lookupValue("gtid_port", gtid_port);
if (server.lookupValue("hostgroup", hostgroup) == false) {
if (server.lookupValue("hostgroup_id", hostgroup) == false) {
proxy_error("Admin: detected a pgsql_servers in config file without a mandatory hostgroup_id\n");
@ -1552,7 +1549,7 @@ int ProxySQL_Config::Read_PgSQL_Servers_from_configfile() {
char* o1 = strdup(comment.c_str());
char* o = escape_string_single_quotes(o1, false);
char* query = (char*)malloc(strlen(q) + strlen(status.c_str()) + strlen(address.c_str()) + strlen(o) + 128);
sprintf(query, q, address.c_str(), port, gtid_port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms, o);
sprintf(query, q, address.c_str(), port, hostgroup, compression, weight, status.c_str(), max_connections, max_replication_lag, use_ssl, max_latency_ms, o);
//fprintf(stderr, "%s\n", query);
admindb->execute(query);
if (o != o1) free(o);

Loading…
Cancel
Save