Rough in a queue and a thread into the event to process packets. We do this so that the event creator can get back to analysis as fast as possible so as to avoid the packetqueue filling up.

This commit is contained in:
Isaac Connor 2022-01-12 17:16:49 -05:00
parent 134d80a91d
commit a9f7b257ea
2 changed files with 138 additions and 87 deletions

View File

@ -65,7 +65,8 @@ Event::Event(
last_db_frame(0), last_db_frame(0),
have_video_keyframe(false), have_video_keyframe(false),
//scheme //scheme
save_jpegs(0) save_jpegs(0),
terminate_(false)
{ {
std::string notes; std::string notes;
createNotes(notes); createNotes(notes);
@ -133,98 +134,17 @@ Event::Event(
); );
id = zmDbDoInsert(sql); id = zmDbDoInsert(sql);
if (!SetPath(storage)) {
// Try another
Warning("Failed creating event dir at %s", storage->Path());
sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id()); thread_ = std::thread(&Event::Run, this);
if (monitor->ServerId())
sql += stringtf(" AND ServerId=%u", monitor->ServerId());
storage = nullptr;
MYSQL_RES *result = zmDbFetch(sql);
if (result) {
for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) {
storage = new Storage(atoi(dbrow[0]));
if (SetPath(storage))
break;
delete storage;
storage = nullptr;
} // end foreach row of Storage
mysql_free_result(result);
result = nullptr;
}
if (!storage) {
Info("No valid local storage area found. Trying all other areas.");
// Try remote
sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL";
if (monitor->ServerId())
sql += stringtf(" OR ServerId != %u", monitor->ServerId());
result = zmDbFetch(sql);
if (result) {
for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) {
storage = new Storage(atoi(dbrow[0]));
if (SetPath(storage))
break;
delete storage;
storage = nullptr;
} // end foreach row of Storage
mysql_free_result(result);
result = nullptr;
}
}
if (!storage) {
storage = new Storage();
Warning("Failed to find a storage area to save events.");
}
sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id);
zmDbDo(sql);
} // end if ! setPath(Storage)
Debug(1, "Using storage area at %s", path.c_str());
snapshot_file = path + "/snapshot.jpg";
alarm_file = path + "/alarm.jpg";
video_incomplete_path = path + "/" + video_incomplete_file;
if (monitor->GetOptVideoWriter() != 0) {
/* Save as video */
videoStore = new VideoStore(
video_incomplete_path.c_str(),
container.c_str(),
monitor->GetVideoStream(),
monitor->GetVideoCodecContext(),
( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ),
( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ),
monitor );
if ( !videoStore->open() ) {
Warning("Failed to open videostore, turning on jpegs");
delete videoStore;
videoStore = nullptr;
if ( ! ( save_jpegs & 1 ) ) {
save_jpegs |= 1; // Turn on jpeg storage
sql = stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id);
zmDbDo(sql);
}
} else {
std::string codec = videoStore->get_codec();
video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str());
video_path = path + "/" + video_file;
Debug(1, "Video file is %s", video_file.c_str());
}
} // end if GetOptVideoWriter
if (storage != monitor->getStorage())
delete storage;
} }
Event::~Event() { Event::~Event() {
// We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet.
Stop();
if (thread_.joinable()) thread_.join();
/* Close the video file */ /* Close the video file */
// We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet.
if (videoStore != nullptr) { if (videoStore != nullptr) {
Debug(4, "Deleting video store"); Debug(4, "Deleting video store");
delete videoStore; delete videoStore;
@ -377,6 +297,12 @@ void Event::updateNotes(const StringSetMap &newNoteSetMap) {
} // void Event::updateNotes(const StringSetMap &newNoteSetMap) } // void Event::updateNotes(const StringSetMap &newNoteSetMap)
void Event::AddPacket(const std::shared_ptr<ZMPacket>&packet) { void Event::AddPacket(const std::shared_ptr<ZMPacket>&packet) {
std::unique_lock<std::mutex> lck(packet_queue_mutex);
packet_queue.push(packet);
packet_queue_condition.notify_one();
}
void Event::AddPacket_(const std::shared_ptr<ZMPacket>&packet) {
have_video_keyframe = have_video_keyframe || have_video_keyframe = have_video_keyframe ||
( ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) && ( ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) &&
( packet->keyframe || monitor->GetOptVideoWriter() == Monitor::ENCODE) ); ( packet->keyframe || monitor->GetOptVideoWriter() == Monitor::ENCODE) );
@ -655,3 +581,108 @@ bool Event::SetPath(Storage *storage) {
} // deep storage or not } // deep storage or not
return true; return true;
} // end bool Event::SetPath } // end bool Event::SetPath
void Event::Run() {
Storage *storage = monitor->getStorage();
if (!SetPath(storage)) {
// Try another
Warning("Failed creating event dir at %s", storage->Path());
std::string sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id());
if (monitor->ServerId())
sql += stringtf(" AND ServerId=%u", monitor->ServerId());
storage = nullptr;
MYSQL_RES *result = zmDbFetch(sql);
if (result) {
for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) {
storage = new Storage(atoi(dbrow[0]));
if (SetPath(storage))
break;
delete storage;
storage = nullptr;
} // end foreach row of Storage
mysql_free_result(result);
result = nullptr;
}
if (!storage) {
Info("No valid local storage area found. Trying all other areas.");
// Try remote
sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL";
if (monitor->ServerId())
sql += stringtf(" OR ServerId != %u", monitor->ServerId());
result = zmDbFetch(sql);
if (result) {
for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) {
storage = new Storage(atoi(dbrow[0]));
if (SetPath(storage))
break;
delete storage;
storage = nullptr;
} // end foreach row of Storage
mysql_free_result(result);
result = nullptr;
}
}
if (!storage) {
storage = new Storage();
Warning("Failed to find a storage area to save events.");
}
sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id);
zmDbDo(sql);
} // end if ! setPath(Storage)
Debug(1, "Using storage area at %s", path.c_str());
snapshot_file = path + "/snapshot.jpg";
alarm_file = path + "/alarm.jpg";
video_incomplete_path = path + "/" + video_incomplete_file;
if (monitor->GetOptVideoWriter() != 0) {
/* Save as video */
videoStore = new VideoStore(
video_incomplete_path.c_str(),
container.c_str(),
monitor->GetVideoStream(),
monitor->GetVideoCodecContext(),
( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ),
( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ),
monitor );
if ( !videoStore->open() ) {
Warning("Failed to open videostore, turning on jpegs");
delete videoStore;
videoStore = nullptr;
if ( ! ( save_jpegs & 1 ) ) {
save_jpegs |= 1; // Turn on jpeg storage
zmDbDo(stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id));
}
} else {
std::string codec = videoStore->get_codec();
video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str());
video_path = path + "/" + video_file;
Debug(1, "Video file is %s", video_file.c_str());
}
} // end if GetOptVideoWriter
if (storage != monitor->getStorage())
delete storage;
std::unique_lock<std::mutex> lck(packet_queue_mutex);
// The idea is to process the queue no matter what so that all packets get processed.
// We only break if the queue is empty
while (true) {
if (!packet_queue.empty()) {
this->AddPacket_(packet_queue.front());
packet_queue.pop();
} else {
if (terminate_ or zm_terminate)
break;
packet_queue_condition.wait(lck);
}
}
}

View File

@ -22,14 +22,21 @@
#include "zm_config.h" #include "zm_config.h"
#include "zm_define.h" #include "zm_define.h"
#include "zm_packet.h"
#include "zm_storage.h" #include "zm_storage.h"
#include "zm_time.h" #include "zm_time.h"
#include "zm_utils.h" #include "zm_utils.h"
#include "zm_zone.h" #include "zm_zone.h"
#include <atomic>
#include <condition_variable>
#include <map> #include <map>
#include <memory>
#include <mutex>
#include <queue> #include <queue>
#include <set> #include <set>
#include <thread>
class EventStream; class EventStream;
class Frame; class Frame;
@ -98,6 +105,15 @@ class Event {
void createNotes(std::string &notes); void createNotes(std::string &notes);
std::queue<std::shared_ptr<ZMPacket>> packet_queue;
std::mutex packet_queue_mutex;
std::condition_variable packet_queue_condition;
void Run();
std::atomic<bool> terminate_;
std::thread thread_;
public: public:
static bool OpenFrameSocket(int); static bool OpenFrameSocket(int);
static bool ValidateFrameSocket(int); static bool ValidateFrameSocket(int);
@ -118,6 +134,7 @@ class Event {
SystemTimePoint EndTime() const { return end_time; } SystemTimePoint EndTime() const { return end_time; }
void AddPacket(const std::shared_ptr<ZMPacket> &p); void AddPacket(const std::shared_ptr<ZMPacket> &p);
void AddPacket_(const std::shared_ptr<ZMPacket> &p);
bool WritePacket(const std::shared_ptr<ZMPacket> &p); bool WritePacket(const std::shared_ptr<ZMPacket> &p);
bool SendFrameImage(const Image *image, bool alarm_frame=false); bool SendFrameImage(const Image *image, bool alarm_frame=false);
bool WriteFrameImage(Image *image, SystemTimePoint timestamp, const char *event_file, bool alarm_frame = false) const; bool WriteFrameImage(Image *image, SystemTimePoint timestamp, const char *event_file, bool alarm_frame = false) const;
@ -130,6 +147,9 @@ class Event {
int score = 0, int score = 0,
Image *alarm_image = nullptr); Image *alarm_image = nullptr);
void Stop() { terminate_ = true; }
bool Stopped() const { return terminate_; }
private: private:
void WriteDbFrames(); void WriteDbFrames();
bool SetPath(Storage *storage); bool SetPath(Storage *storage);