Merge branch 'v2.x_240429-2' of https://github.com/sysown/proxysql into v2.x_240429-2

pull/4535/head
René Cannaò 2 years ago
commit 994952d320

@ -0,0 +1,70 @@
### Flowchart of `MySQL_Connection::async_query()`
This function asynchronously executes a query on the MySQL connection.
It handles various states of the asynchronous query execution process and returns appropriate status codes indicating the result of the execution.
Returns an integer status code indicating the result of the query execution:
- 0: Query execution completed successfully.
- -1: Query execution failed.
- 1: Query execution in progress.
- 2: Processing a multi-statement query, control needs to be transferred to MySQL_Session.
- 3: In the middle of processing a multi-statement query.
```mermaid
---
title: MySQL_Connection::async_query()
---
flowchart TD
Assert["assert()"]
ValidConnection{Valid Connection}
ValidConnection -- no --> Assert
IsServerOffline{"IsServerOffline()"}
ValidConnection -- yes --> IsServerOffline
IsServerOffline -- yes --> ReturnMinus1
asyncStateMachine1{async_state_machine}
asyncStateMachine2{async_state_machine}
IsServerOffline -- no --> asyncStateMachine1
asyncStateMachine1 -- ASYNC_QUERY_END --> Return0
handler["handler()"]
asyncStateMachine1 --> handler
handler --> asyncStateMachine2
asyncStateMachine2 -- ASYNC_QUERY_END --> mysql_error{"mysql_error"}
asyncStateMachine2 -- ASYNC_STMT_EXECUTE_END --> mysql_error
asyncStateMachine2 -- ASYNC_STMT_PREPARE_FAILED --> ReturnMinus1
asyncStateMachine2 -- ASYNC_STMT_PREPARE_SUCCESSFUL --> Return0
mysql_error -- yes --> ReturnMinus1
mysql_error -- no --> Return0
asyncStateMachine2 -- ASYNC_NEXT_RESULT_START --> Return2
processing_multi_statement{"processing_multi_statement"}
asyncStateMachine2 --> processing_multi_statement
processing_multi_statement -- yes --> Return3
processing_multi_statement -- no --> Return1
ReturnMinus1["return -1"]
Return0["return 0"]
Return1["return 1"]
Return2["return 2"]
Return3["return 3"]
```
### Flowchart of `MySQL_Connection::IsServerOffline()`
```mermaid
---
title: MySQL_Connection::IsServerOffline()
---
flowchart TD
True[true]
False[false]
SS1{"server_status"}
SA{"shunned_automatic"}
SB{"shunned_and_kill_all_connections"}
SS1 -- OFFLINE_HARD --> True
SS1 -- REPLICATION_LAG --> True
SS1 -- SHUNNED --> SA
SA -- yes --> SB
SB -- yes --> True
SA -- no --> False
SB -- no --> False
SS1 --> False
```

@ -0,0 +1,68 @@
### Flowchart of `MySQL_Session::RunQuery()`
This function mostly calls `MySQL_Connection::async_query()` with the right arguments.
Returns an integer status code indicating the result of the query execution:
- 0: Query execution completed successfully.
- -1: Query execution failed.
- 1: Query execution in progress.
- 2: Processing a multi-statement query, control needs to be transferred to MySQL_Session.
- 3: In the middle of processing a multi-statement query.
```mermaid
---
title: MySQL_Session::RunQuery()
---
flowchart TD
RQ["MySQL_Connection::async_query()"]
BEGIN --> RQ
RQ --> END
```
### Flowchart of `MySQL_Session::handler()`
WORK IN PROGRESS
```mermaid
---
title: MySQL_Session::handler()
---
flowchart TD
RQ["rc = RunQuery()"]
RC{rc}
CBCS["rc1 = handler_ProcessingQueryError_CheckBackendConnectionStatus()"]
RC1{rc1}
RQ --> RC
RC -- 0 --> OK
RC -- -1 --> CBCS
CBCS --> RC1
CS["CONNECTING_SERVER"]
ReturnMinus1["return -1"]
RC1 -- -1 --> ReturnMinus1
RC1 -- 1 --> CS
HM1CLE1["handler_minus1_ClientLibraryError()"]
HM1CLE2["handler_minus1_ClientLibraryError()"]
myerr1{"myerr >= 2000
&&
myerr < 3000"}
RC1 --> myerr1
myerr1 -- yes --> HM1CLE1
HM1CLE1 -- true --> CS
HM1CLE1 -- false --> ReturnMinus1
HM1LEDQ1["handler_minus1_LogErrorDuringQuery()"]
myerr1 -- no --> HM1LEDQ1
HM1HEC1["handler_minus1_HandleErrorCodes()"]
HM1LEDQ1 --> HM1HEC1
HM1HEC1 -- true --> HR1{"handler_ret"}
HR1 -- 0 --> CS
HR1 --> RHR1["return handler_ret"]
HM1GEM1["handler_minus1_GenerateErrorMessage()"]
HM1HEC1 -- false --> HM1GEM1
RE["RequestEnd()"]
HM1HBC1["handler_minus1_HandleBackendConnection()"]
HM1GEM1 --> RE
RE --> HM1HBC1
```
### Flowchart of `MySQL_Session::handler_ProcessingQueryError_CheckBackendConnectionStatus()`
TODO

@ -191,7 +191,7 @@ int main(int argc, char** argv) {
rc = run_q(proxysql_admin, s.c_str());
ok(rc == 0 , "%s" , s.c_str());
}
sleep(1);
sleep(3);
for (int i = 0; i < NUM_CONNS ; i++) {
MYSQL * mysql = conns[i];
int rc = run_q(mysql, "DO 1");

@ -42,9 +42,12 @@ inline unsigned long long monotonic_time() {
return (((unsigned long long) ts.tv_sec) * 1000000) + (ts.tv_nsec / 1000);
}
#define NTHREADS 5
#define NCONNS 6
#define NPREP 15000
#define PROGRESS 2000
pthread_mutex_t mtx[NCONNS];
MYSQL* conns[NCONNS];
int ids[NCONNS*NPREP];
MYSQL_STMT * stmts[NCONNS*NPREP];
@ -125,6 +128,163 @@ int execute_stmt(int idx) {
return 0;
}
void * prepare_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Preparing statements. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (prepare_stmt(idx,i)) exit(EXIT_FAILURE);
// excute every 7 stmt
if (idx%7==0) {
if (execute_stmt(idx)) exit(EXIT_FAILURE);
}
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
void * execute1_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Executing statements in order 1. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (execute_stmt(idx)) exit(EXIT_FAILURE);
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
void * execute2_thread(void *arg) {
int thread_id = *(int *)arg;
int p=0; // we need a new counter because of the out of order
for (int j=0; j<NPREP; j++) {
for (int i=0; i<NCONNS; i++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (p%PROGRESS==(PROGRESS-1)) diag("Executing statements in order 2. Progress: %d", p+1);
pthread_mutex_lock(&mtx[i]);
if (execute_stmt(idx)) exit(EXIT_FAILURE);
pthread_mutex_unlock(&mtx[i]);
}
p++;
}
}
return NULL;
}
void * execute3_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Closing or executing statements, loop 1. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (idx%4==3) {
int rc = mysql_stmt_close(stmts[idx]);
if (rc) {
diag("Failed to close stmt %d", idx);
exit(EXIT_FAILURE);
}
stmts[idx] = NULL;
} else {
if (execute_stmt(idx)) exit(EXIT_FAILURE);
}
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
void * execute4_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Closing or executing statements, loop 2. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (idx%4==3) {
// skip, already closed
} else if (idx%4==2) {
int rc = mysql_stmt_close(stmts[idx]);
if (rc) {
diag("Failed to close stmt %d", idx);
exit(EXIT_FAILURE);
}
stmts[idx] = NULL;
} else {
if (execute_stmt(idx)) exit(EXIT_FAILURE);
}
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
void *execute5_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Execute and close the prepared statements left, loop 1. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (stmts[idx] != NULL) {
if (idx%2==1) {
if (execute_stmt(idx)) exit(EXIT_FAILURE);
int rc = mysql_stmt_close(stmts[idx]);
if (rc) {
diag("Failed to close stmt %d", idx);
exit(EXIT_FAILURE);
}
stmts[idx] = NULL;
}
}
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
void *execute6_thread(void *arg) {
int thread_id = *(int *)arg;
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%NTHREADS == thread_id) {
if (idx%PROGRESS==(PROGRESS-1)) diag("Execute and close the prepared statements left, loop 2. Progress: %d", idx+1);
pthread_mutex_lock(&mtx[i]);
if (stmts[idx] != NULL) {
if (idx%2==0) {
if (execute_stmt(idx)) exit(EXIT_FAILURE);
int rc = mysql_stmt_close(stmts[idx]);
if (rc) {
diag("Failed to close stmt %d", idx);
exit(EXIT_FAILURE);
}
stmts[idx] = NULL;
}
}
pthread_mutex_unlock(&mtx[i]);
}
}
}
return NULL;
}
int main(int argc, char** argv) {
CommandLine cl;
@ -147,6 +307,10 @@ int main(int argc, char** argv) {
}
}
for (int i=0; i<NCONNS; i++) {
pthread_mutex_init(&mtx[i], NULL);
}
// ceating "random" ids in within a range 0..NPREP
diag("Creating IDs");
for (int i=0; i<NCONNS*NPREP; i++) {
@ -158,17 +322,18 @@ int main(int argc, char** argv) {
}
diag("Preparing statements");
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%PROGRESS==(PROGRESS-1)) diag("Preparing statements. Progress: %d", idx+1);
if (prepare_stmt(idx,i)) return EXIT_FAILURE;
// excute every 7 stmt
if (idx%7==0) {
if (execute_stmt(idx)) return EXIT_FAILURE;
}
pthread_t thi[NTHREADS];
int tid[NTHREADS];
for (unsigned int i=0; i<NTHREADS; i++) {
tid[i] = i;
if ( pthread_create(&thi[i], NULL, prepare_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
{
int Stmt_Cached = get_Stmt_Cached(proxysql_admin);
@ -177,57 +342,51 @@ int main(int argc, char** argv) {
// excute statements in order
diag("Executing statements in order");
for (int i=0; i<NCONNS; i++) {
for (int j=0; j<NPREP; j++) {
int idx=i*NPREP+j;
if (idx%PROGRESS==(PROGRESS-1)) diag("Executing statements in order 1. Progress: %d", idx+1);
if (execute_stmt(idx)) return EXIT_FAILURE;
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute1_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// excute statements in different order
diag("Executing statements in different order");
int p=0; // we need a new counter because of the out of order
for (int j=0; j<NPREP; j++) {
for (int i=0; i<NCONNS; i++) {
int idx=i*NPREP+j;
if (p%PROGRESS==(PROGRESS-1)) diag("Executing statements in order 2. Progress: %d", p+1);
if (execute_stmt(idx)) return EXIT_FAILURE;
p++;
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute2_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// close 1 of 4 prepared statements, execute the rest
for (int i=0; i<NCONNS*NPREP; i++) {
if (i%PROGRESS==(PROGRESS-1)) diag("Closing or executing statements, loop 1. Progress: %d", i+1);
if (i%4==3) {
int rc = mysql_stmt_close(stmts[i]);
if (rc) {
diag("Failed to close stmt %d", i);
return EXIT_FAILURE;
}
stmts[i] = NULL;
} else {
if (execute_stmt(i)) return EXIT_FAILURE;
diag("Executing statements in different order");
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute3_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// close 1 of 4 prepared statements, skip 1 in 4, execute the rest
for (int i=0; i<NCONNS*NPREP; i++) {
if (i%PROGRESS==(PROGRESS-1)) diag("Closing or executing statements, loop 2. Progress: %d", i+1);
if (i%4==3) {
// skip, already closed
} else if (i%4==2) {
int rc = mysql_stmt_close(stmts[i]);
if (rc) {
diag("Failed to close stmt %d", i);
return EXIT_FAILURE;
}
stmts[i] = NULL;
} else {
if (execute_stmt(i)) return EXIT_FAILURE;
diag("Executing statements in different order");
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute4_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// close half the connections without closing the prepared statements
for (int i=0; i<NCONNS; i+=2) {
@ -240,34 +399,28 @@ int main(int argc, char** argv) {
}
// execute and close the prepared statements left, loop 1
for (int i=0; i<NCONNS*NPREP; i++) {
if (stmts[i] != NULL) {
if (i%2==1) {
if (execute_stmt(i)) return EXIT_FAILURE;
int rc = mysql_stmt_close(stmts[i]);
if (rc) {
diag("Failed to close stmt %d", i);
return EXIT_FAILURE;
}
stmts[i] = NULL;
}
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute5_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// execute and close the prepared statements left, loop 2
for (int i=0; i<NCONNS*NPREP; i++) {
if (stmts[i] != NULL) {
if (i%2==0) {
if (execute_stmt(i)) return EXIT_FAILURE;
int rc = mysql_stmt_close(stmts[i]);
if (rc) {
diag("Failed to close stmt %d", i);
return EXIT_FAILURE;
}
stmts[i] = NULL;
}
for (unsigned int i=0; i<NTHREADS; i++) {
if ( pthread_create(&thi[i], NULL, execute6_thread , &tid[i]) != 0 ) {
perror("Thread creation");
return EXIT_FAILURE;
}
}
for (unsigned int i=0; i<NTHREADS; i++) {
pthread_join(thi[i], NULL);
}
// Half of the connections were freed earlier. We only iterate the other
// half that has not been freed.

@ -116,7 +116,8 @@ std::unordered_map<std::string,var_counter> vars_counters;
*/
void * my_conn_thread(void *arg) {
g_seed = time(NULL) ^ getpid() ^ pthread_self();
g_seed = monotonic_time() * pthread_self() + monotonic_time();
srand(g_seed);
unsigned int select_OK=0;
unsigned int select_ERR=0;
int i, j;
@ -168,7 +169,7 @@ void * my_conn_thread(void *arg) {
int fr = rand();
int r1=fr%count;
//int r2=fastrand()%testCases.size();
int r2=rand()%testCases.size();
int r2=(fastrand() + (RAND_MAX * fastrand())) %testCases.size();
if (j%queries_per_connections==0) {
mysql_idx=r1;

@ -146,7 +146,7 @@ struct cpu_timer
inline int fastrand() {
g_seed = (214014*g_seed+2531011);
g_seed = (214013*g_seed+2531011);
return (g_seed>>16)&0x7FFF;
}

Loading…
Cancel
Save