From a9f7b257ea0444f1d5c685bca3188cf3a73d5bd2 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Wed, 12 Jan 2022 17:16:49 -0500 Subject: [PATCH] Rough in a queue and a thread into the event to process packets. We do this so that the event creator can get back to analysis as fast as possible so as to avoid the packetqueue filling up. --- src/zm_event.cpp | 205 +++++++++++++++++++++++++++-------------------- src/zm_event.h | 20 +++++ 2 files changed, 138 insertions(+), 87 deletions(-) diff --git a/src/zm_event.cpp b/src/zm_event.cpp index 7db8f84f3..f1dce36d3 100644 --- a/src/zm_event.cpp +++ b/src/zm_event.cpp @@ -65,7 +65,8 @@ Event::Event( last_db_frame(0), have_video_keyframe(false), //scheme - save_jpegs(0) + save_jpegs(0), + terminate_(false) { std::string notes; createNotes(notes); @@ -133,98 +134,17 @@ Event::Event( ); id = zmDbDoInsert(sql); - if (!SetPath(storage)) { - // Try another - Warning("Failed creating event dir at %s", storage->Path()); - sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id()); - if (monitor->ServerId()) - sql += stringtf(" AND ServerId=%u", monitor->ServerId()); - - storage = nullptr; - - MYSQL_RES *result = zmDbFetch(sql); - if (result) { - for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) { - storage = new Storage(atoi(dbrow[0])); - if (SetPath(storage)) - break; - delete storage; - storage = nullptr; - } // end foreach row of Storage - mysql_free_result(result); - result = nullptr; - } - if (!storage) { - Info("No valid local storage area found. Trying all other areas."); - // Try remote - sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL"; - if (monitor->ServerId()) - sql += stringtf(" OR ServerId != %u", monitor->ServerId()); - - result = zmDbFetch(sql); - if (result) { - for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) { - storage = new Storage(atoi(dbrow[0])); - if (SetPath(storage)) - break; - delete storage; - storage = nullptr; - } // end foreach row of Storage - mysql_free_result(result); - result = nullptr; - } - } - if (!storage) { - storage = new Storage(); - Warning("Failed to find a storage area to save events."); - } - sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id); - zmDbDo(sql); - } // end if ! setPath(Storage) - Debug(1, "Using storage area at %s", path.c_str()); - - snapshot_file = path + "/snapshot.jpg"; - alarm_file = path + "/alarm.jpg"; - - video_incomplete_path = path + "/" + video_incomplete_file; - - if (monitor->GetOptVideoWriter() != 0) { - /* Save as video */ - - videoStore = new VideoStore( - video_incomplete_path.c_str(), - container.c_str(), - monitor->GetVideoStream(), - monitor->GetVideoCodecContext(), - ( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ), - ( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ), - monitor ); - - if ( !videoStore->open() ) { - Warning("Failed to open videostore, turning on jpegs"); - delete videoStore; - videoStore = nullptr; - if ( ! ( save_jpegs & 1 ) ) { - save_jpegs |= 1; // Turn on jpeg storage - sql = stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id); - zmDbDo(sql); - } - } else { - std::string codec = videoStore->get_codec(); - video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str()); - video_path = path + "/" + video_file; - Debug(1, "Video file is %s", video_file.c_str()); - } - } // end if GetOptVideoWriter - if (storage != monitor->getStorage()) - delete storage; + thread_ = std::thread(&Event::Run, this); } Event::~Event() { - // We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet. + + Stop(); + if (thread_.joinable()) thread_.join(); /* Close the video file */ + // We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet. if (videoStore != nullptr) { Debug(4, "Deleting video store"); delete videoStore; @@ -377,6 +297,12 @@ void Event::updateNotes(const StringSetMap &newNoteSetMap) { } // void Event::updateNotes(const StringSetMap &newNoteSetMap) void Event::AddPacket(const std::shared_ptr&packet) { + std::unique_lock lck(packet_queue_mutex); + packet_queue.push(packet); + packet_queue_condition.notify_one(); +} + +void Event::AddPacket_(const std::shared_ptr&packet) { have_video_keyframe = have_video_keyframe || ( ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) && ( packet->keyframe || monitor->GetOptVideoWriter() == Monitor::ENCODE) ); @@ -655,3 +581,108 @@ bool Event::SetPath(Storage *storage) { } // deep storage or not return true; } // end bool Event::SetPath + +void Event::Run() { + Storage *storage = monitor->getStorage(); + if (!SetPath(storage)) { + // Try another + Warning("Failed creating event dir at %s", storage->Path()); + + std::string sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id()); + if (monitor->ServerId()) + sql += stringtf(" AND ServerId=%u", monitor->ServerId()); + + storage = nullptr; + + MYSQL_RES *result = zmDbFetch(sql); + if (result) { + for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) { + storage = new Storage(atoi(dbrow[0])); + if (SetPath(storage)) + break; + delete storage; + storage = nullptr; + } // end foreach row of Storage + mysql_free_result(result); + result = nullptr; + } + if (!storage) { + Info("No valid local storage area found. Trying all other areas."); + // Try remote + sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL"; + if (monitor->ServerId()) + sql += stringtf(" OR ServerId != %u", monitor->ServerId()); + + result = zmDbFetch(sql); + if (result) { + for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) { + storage = new Storage(atoi(dbrow[0])); + if (SetPath(storage)) + break; + delete storage; + storage = nullptr; + } // end foreach row of Storage + mysql_free_result(result); + result = nullptr; + } + } + if (!storage) { + storage = new Storage(); + Warning("Failed to find a storage area to save events."); + } + sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id); + zmDbDo(sql); + } // end if ! setPath(Storage) + Debug(1, "Using storage area at %s", path.c_str()); + + snapshot_file = path + "/snapshot.jpg"; + alarm_file = path + "/alarm.jpg"; + + video_incomplete_path = path + "/" + video_incomplete_file; + + if (monitor->GetOptVideoWriter() != 0) { + /* Save as video */ + + videoStore = new VideoStore( + video_incomplete_path.c_str(), + container.c_str(), + monitor->GetVideoStream(), + monitor->GetVideoCodecContext(), + ( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ), + ( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ), + monitor ); + + if ( !videoStore->open() ) { + Warning("Failed to open videostore, turning on jpegs"); + delete videoStore; + videoStore = nullptr; + if ( ! ( save_jpegs & 1 ) ) { + save_jpegs |= 1; // Turn on jpeg storage + zmDbDo(stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id)); + } + } else { + std::string codec = videoStore->get_codec(); + video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str()); + video_path = path + "/" + video_file; + Debug(1, "Video file is %s", video_file.c_str()); + } + } // end if GetOptVideoWriter + if (storage != monitor->getStorage()) + delete storage; + + + std::unique_lock lck(packet_queue_mutex); + + // The idea is to process the queue no matter what so that all packets get processed. + // We only break if the queue is empty + while (true) { + if (!packet_queue.empty()) { + this->AddPacket_(packet_queue.front()); + packet_queue.pop(); + } else { + if (terminate_ or zm_terminate) + break; + packet_queue_condition.wait(lck); + } + } +} diff --git a/src/zm_event.h b/src/zm_event.h index 4fb5c7469..447bdeafc 100644 --- a/src/zm_event.h +++ b/src/zm_event.h @@ -22,14 +22,21 @@ #include "zm_config.h" #include "zm_define.h" +#include "zm_packet.h" #include "zm_storage.h" #include "zm_time.h" #include "zm_utils.h" #include "zm_zone.h" +#include +#include #include +#include +#include #include #include +#include + class EventStream; class Frame; @@ -98,6 +105,15 @@ class Event { void createNotes(std::string ¬es); + std::queue> packet_queue; + std::mutex packet_queue_mutex; + std::condition_variable packet_queue_condition; + + void Run(); + + std::atomic terminate_; + std::thread thread_; + public: static bool OpenFrameSocket(int); static bool ValidateFrameSocket(int); @@ -118,6 +134,7 @@ class Event { SystemTimePoint EndTime() const { return end_time; } void AddPacket(const std::shared_ptr &p); + void AddPacket_(const std::shared_ptr &p); bool WritePacket(const std::shared_ptr &p); bool SendFrameImage(const Image *image, bool alarm_frame=false); bool WriteFrameImage(Image *image, SystemTimePoint timestamp, const char *event_file, bool alarm_frame = false) const; @@ -130,6 +147,9 @@ class Event { int score = 0, Image *alarm_image = nullptr); + void Stop() { terminate_ = true; } + bool Stopped() const { return terminate_; } + private: void WriteDbFrames(); bool SetPath(Storage *storage);