diff --git a/plugins/mysqlx/include/mysqlx_thread.h b/plugins/mysqlx/include/mysqlx_thread.h index 886b86e3b..9afd91a25 100644 --- a/plugins/mysqlx/include/mysqlx_thread.h +++ b/plugins/mysqlx/include/mysqlx_thread.h @@ -107,7 +107,7 @@ private: mutable std::mutex listener_mutex_; std::vector sessions_; - std::mutex sessions_mutex_; + mutable std::mutex sessions_mutex_; std::vector conn_cache_; std::mutex conn_cache_mutex_; diff --git a/plugins/mysqlx/src/mysqlx_thread.cpp b/plugins/mysqlx/src/mysqlx_thread.cpp index 512a6898c..6bce44e01 100644 --- a/plugins/mysqlx/src/mysqlx_thread.cpp +++ b/plugins/mysqlx/src/mysqlx_thread.cpp @@ -256,8 +256,20 @@ void Mysqlx_Thread::process_all_sessions() { auto it = sessions_.begin(); while (it != sessions_.end()) { MysqlxSession* sess = *it; - sess->to_process = true; - int rc = sess->handler(); + + // Only invoke handler() when there is real work: a poll event landed + // on either data stream, the session asked to be re-run, or there are + // already-buffered frames to dispatch. Forcing to_process=true on every + // tick burned the CPU at large session counts. + short c_rev = sess->client_ds().get_revents(); + short s_rev = sess->server_ds().get_revents(); + bool fd_ready = (c_rev != 0) || (s_rev != 0); + bool buffered = sess->client_ds().has_complete_frame() || sess->server_ds().has_complete_frame(); + int rc = 0; + if (fd_ready || buffered || sess->to_process) { + sess->to_process = true; + rc = sess->handler(); + } bool timeout = false; MysqlxSession::Status st = sess->get_status(); @@ -355,6 +367,7 @@ int Mysqlx_Thread::get_listener_count() const { } size_t Mysqlx_Thread::get_session_count() const { + std::lock_guard lock(sessions_mutex_); return sessions_.size(); }