From fdf515ca109b03b45f77d26f81f9dd2bd3310683 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Wed, 24 Feb 2021 19:59:55 -0500 Subject: [PATCH] rough in a db queue thread. Use it in zm_logger so that we don't have to aquire the db lock --- src/zm_db.cpp | 34 ++++++++++++++++++++++++++++++++++ src/zm_db.h | 25 +++++++++++++++++++++++-- src/zm_logger.cpp | 36 +++++++++++------------------------- 3 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/zm_db.cpp b/src/zm_db.cpp index 02265e680..073e41697 100644 --- a/src/zm_db.cpp +++ b/src/zm_db.cpp @@ -24,6 +24,7 @@ MYSQL dbconn; RecursiveMutex db_mutex; +zmDbQueue dbQueue; bool zmDbConnected = false; @@ -214,3 +215,36 @@ zmDbRow::~zmDbRow() { } row = nullptr; } + +zmDbQueue::zmDbQueue() : + mThread(&zmDbQueue::process, this), + mTerminate(false) +{ } + +zmDbQueue::~zmDbQueue() { + mTerminate = true; + mCondition.notify_all(); + mThread.join(); +} +void zmDbQueue::process() { + std::unique_lock lock(mMutex); + + while (!mTerminate and !zm_terminate) { + if (mQueue.empty()) { + mCondition.wait(lock); + } + if (!mQueue.empty()) { + std::string sql = mQueue.front(); + mQueue.pop(); + lock.unlock(); + zmDbDo(sql.c_str()); + lock.lock(); + } + } +} // end void zmDbQueue::process() + +void zmDbQueue::push(std::string sql) { + std::unique_lock lock(mMutex); + mQueue.push(sql); + mCondition.notify_all(); +} diff --git a/src/zm_db.h b/src/zm_db.h index 22e684c67..de4154ab2 100644 --- a/src/zm_db.h +++ b/src/zm_db.h @@ -23,6 +23,26 @@ #include "zm_thread.h" #include #include +#include +#include +#include +#include +#include + +class zmDbQueue { + private: + std::queue mQueue; + std::thread mThread; + std::mutex mMutex; + std::condition_variable mCondition; + bool mTerminate; + public: + zmDbQueue(); + ~zmDbQueue(); + void push(const char *sql) { return push(std::string(sql)); }; + void push(std::string); + void process(); +}; class zmDbRow { private: @@ -43,6 +63,7 @@ class zmDbRow { extern MYSQL dbconn; extern RecursiveMutex db_mutex; +extern zmDbQueue dbQueue; extern bool zmDbConnected; @@ -51,7 +72,7 @@ void zmDbClose(); int zmDbDo(const char *query); int zmDbDoInsert(const char *query); -MYSQL_RES * zmDbFetch( const char *query ); -zmDbRow *zmDbFetchOne( const char *query ); +MYSQL_RES * zmDbFetch(const char *query); +zmDbRow *zmDbFetchOne(const char *query); #endif // ZM_DB_H diff --git a/src/zm_logger.cpp b/src/zm_logger.cpp index 58d763bae..e264c3c3a 100644 --- a/src/zm_logger.cpp +++ b/src/zm_logger.cpp @@ -535,32 +535,18 @@ void Logger::logPrint(bool hex, const char * const filepath, const int line, con } // end if level <= mFileLevel if ( level <= mDatabaseLevel ) { - if (db_mutex.try_lock()) { - int syslogSize = syslogEnd-syslogStart; - char escapedString[(syslogSize*2)+1]; - mysql_real_escape_string(&dbconn, escapedString, syslogStart, syslogSize); + int syslogSize = syslogEnd-syslogStart; + char escapedString[(syslogSize*2)+1]; + mysql_real_escape_string(&dbconn, escapedString, syslogStart, syslogSize); - char sql[ZM_SQL_MED_BUFSIZ]; - snprintf(sql, sizeof(sql), - "INSERT INTO `Logs` " - "( `TimeKey`, `Component`, `ServerId`, `Pid`, `Level`, `Code`, `Message`, `File`, `Line` )" - " VALUES " - "( %ld.%06ld, '%s', %d, %d, %d, '%s', '%s', '%s', %d )", - timeVal.tv_sec, timeVal.tv_usec, mId.c_str(), staticConfig.SERVER_ID, tid, level, classString, escapedString, file, line - ); - if ( mysql_query(&dbconn, sql) ) { - Level tempDatabaseLevel = mDatabaseLevel; - databaseLevel(NOLOG); - Error("Can't insert log entry: sql(%s) error(%s)", sql, mysql_error(&dbconn)); - databaseLevel(tempDatabaseLevel); - } - db_mutex.unlock(); - } else { - Level tempDatabaseLevel = mDatabaseLevel; - databaseLevel(NOLOG); - Error("Can't insert log entry since the DB lock could not be obtained. Message: %s", syslogStart); - databaseLevel(tempDatabaseLevel); - } + std::string sql_string = stringtf( + "INSERT INTO `Logs` " + "( `TimeKey`, `Component`, `ServerId`, `Pid`, `Level`, `Code`, `Message`, `File`, `Line` )" + " VALUES " + "( %ld.%06ld, '%s', %d, %d, %d, '%s', '%s', '%s', %d )", + timeVal.tv_sec, timeVal.tv_usec, mId.c_str(), staticConfig.SERVER_ID, tid, level, classString, escapedString, file, line + ); + dbQueue.push(sql_string); } // end if level <= mDatabaseLevel if ( level <= mSyslogLevel ) {