rough in a db queue thread. Use it in zm_logger so that we don't have to aquire the db lock
This commit is contained in:
parent
d106c2fcc3
commit
fdf515ca10
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
MYSQL dbconn;
|
MYSQL dbconn;
|
||||||
RecursiveMutex db_mutex;
|
RecursiveMutex db_mutex;
|
||||||
|
zmDbQueue dbQueue;
|
||||||
|
|
||||||
bool zmDbConnected = false;
|
bool zmDbConnected = false;
|
||||||
|
|
||||||
|
@ -214,3 +215,36 @@ zmDbRow::~zmDbRow() {
|
||||||
}
|
}
|
||||||
row = nullptr;
|
row = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmDbQueue::zmDbQueue() :
|
||||||
|
mThread(&zmDbQueue::process, this),
|
||||||
|
mTerminate(false)
|
||||||
|
{ }
|
||||||
|
|
||||||
|
zmDbQueue::~zmDbQueue() {
|
||||||
|
mTerminate = true;
|
||||||
|
mCondition.notify_all();
|
||||||
|
mThread.join();
|
||||||
|
}
|
||||||
|
void zmDbQueue::process() {
|
||||||
|
std::unique_lock<std::mutex> lock(mMutex);
|
||||||
|
|
||||||
|
while (!mTerminate and !zm_terminate) {
|
||||||
|
if (mQueue.empty()) {
|
||||||
|
mCondition.wait(lock);
|
||||||
|
}
|
||||||
|
if (!mQueue.empty()) {
|
||||||
|
std::string sql = mQueue.front();
|
||||||
|
mQueue.pop();
|
||||||
|
lock.unlock();
|
||||||
|
zmDbDo(sql.c_str());
|
||||||
|
lock.lock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} // end void zmDbQueue::process()
|
||||||
|
|
||||||
|
void zmDbQueue::push(std::string sql) {
|
||||||
|
std::unique_lock<std::mutex> lock(mMutex);
|
||||||
|
mQueue.push(sql);
|
||||||
|
mCondition.notify_all();
|
||||||
|
}
|
||||||
|
|
25
src/zm_db.h
25
src/zm_db.h
|
@ -23,6 +23,26 @@
|
||||||
#include "zm_thread.h"
|
#include "zm_thread.h"
|
||||||
#include <mysql/mysql.h>
|
#include <mysql/mysql.h>
|
||||||
#include <mysql/mysqld_error.h>
|
#include <mysql/mysqld_error.h>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <queue>
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
class zmDbQueue {
|
||||||
|
private:
|
||||||
|
std::queue<std::string> mQueue;
|
||||||
|
std::thread mThread;
|
||||||
|
std::mutex mMutex;
|
||||||
|
std::condition_variable mCondition;
|
||||||
|
bool mTerminate;
|
||||||
|
public:
|
||||||
|
zmDbQueue();
|
||||||
|
~zmDbQueue();
|
||||||
|
void push(const char *sql) { return push(std::string(sql)); };
|
||||||
|
void push(std::string);
|
||||||
|
void process();
|
||||||
|
};
|
||||||
|
|
||||||
class zmDbRow {
|
class zmDbRow {
|
||||||
private:
|
private:
|
||||||
|
@ -43,6 +63,7 @@ class zmDbRow {
|
||||||
|
|
||||||
extern MYSQL dbconn;
|
extern MYSQL dbconn;
|
||||||
extern RecursiveMutex db_mutex;
|
extern RecursiveMutex db_mutex;
|
||||||
|
extern zmDbQueue dbQueue;
|
||||||
|
|
||||||
extern bool zmDbConnected;
|
extern bool zmDbConnected;
|
||||||
|
|
||||||
|
@ -51,7 +72,7 @@ void zmDbClose();
|
||||||
int zmDbDo(const char *query);
|
int zmDbDo(const char *query);
|
||||||
int zmDbDoInsert(const char *query);
|
int zmDbDoInsert(const char *query);
|
||||||
|
|
||||||
MYSQL_RES * zmDbFetch( const char *query );
|
MYSQL_RES * zmDbFetch(const char *query);
|
||||||
zmDbRow *zmDbFetchOne( const char *query );
|
zmDbRow *zmDbFetchOne(const char *query);
|
||||||
|
|
||||||
#endif // ZM_DB_H
|
#endif // ZM_DB_H
|
||||||
|
|
|
@ -535,32 +535,18 @@ void Logger::logPrint(bool hex, const char * const filepath, const int line, con
|
||||||
} // end if level <= mFileLevel
|
} // end if level <= mFileLevel
|
||||||
|
|
||||||
if ( level <= mDatabaseLevel ) {
|
if ( level <= mDatabaseLevel ) {
|
||||||
if (db_mutex.try_lock()) {
|
int syslogSize = syslogEnd-syslogStart;
|
||||||
int syslogSize = syslogEnd-syslogStart;
|
char escapedString[(syslogSize*2)+1];
|
||||||
char escapedString[(syslogSize*2)+1];
|
mysql_real_escape_string(&dbconn, escapedString, syslogStart, syslogSize);
|
||||||
mysql_real_escape_string(&dbconn, escapedString, syslogStart, syslogSize);
|
|
||||||
|
|
||||||
char sql[ZM_SQL_MED_BUFSIZ];
|
std::string sql_string = stringtf(
|
||||||
snprintf(sql, sizeof(sql),
|
"INSERT INTO `Logs` "
|
||||||
"INSERT INTO `Logs` "
|
"( `TimeKey`, `Component`, `ServerId`, `Pid`, `Level`, `Code`, `Message`, `File`, `Line` )"
|
||||||
"( `TimeKey`, `Component`, `ServerId`, `Pid`, `Level`, `Code`, `Message`, `File`, `Line` )"
|
" VALUES "
|
||||||
" VALUES "
|
"( %ld.%06ld, '%s', %d, %d, %d, '%s', '%s', '%s', %d )",
|
||||||
"( %ld.%06ld, '%s', %d, %d, %d, '%s', '%s', '%s', %d )",
|
timeVal.tv_sec, timeVal.tv_usec, mId.c_str(), staticConfig.SERVER_ID, tid, level, classString, escapedString, file, line
|
||||||
timeVal.tv_sec, timeVal.tv_usec, mId.c_str(), staticConfig.SERVER_ID, tid, level, classString, escapedString, file, line
|
);
|
||||||
);
|
dbQueue.push(sql_string);
|
||||||
if ( mysql_query(&dbconn, sql) ) {
|
|
||||||
Level tempDatabaseLevel = mDatabaseLevel;
|
|
||||||
databaseLevel(NOLOG);
|
|
||||||
Error("Can't insert log entry: sql(%s) error(%s)", sql, mysql_error(&dbconn));
|
|
||||||
databaseLevel(tempDatabaseLevel);
|
|
||||||
}
|
|
||||||
db_mutex.unlock();
|
|
||||||
} else {
|
|
||||||
Level tempDatabaseLevel = mDatabaseLevel;
|
|
||||||
databaseLevel(NOLOG);
|
|
||||||
Error("Can't insert log entry since the DB lock could not be obtained. Message: %s", syslogStart);
|
|
||||||
databaseLevel(tempDatabaseLevel);
|
|
||||||
}
|
|
||||||
} // end if level <= mDatabaseLevel
|
} // end if level <= mDatabaseLevel
|
||||||
|
|
||||||
if ( level <= mSyslogLevel ) {
|
if ( level <= mSyslogLevel ) {
|
||||||
|
|
Loading…
Reference in New Issue