diff --git a/src/zm_event.cpp b/src/zm_event.cpp index 7db8f84f3..f1dce36d3 100644 --- a/src/zm_event.cpp +++ b/src/zm_event.cpp @@ -65,7 +65,8 @@ Event::Event( last_db_frame(0), have_video_keyframe(false), //scheme - save_jpegs(0) + save_jpegs(0), + terminate_(false) { std::string notes; createNotes(notes); @@ -133,98 +134,17 @@ Event::Event( ); id = zmDbDoInsert(sql); - if (!SetPath(storage)) { - // Try another - Warning("Failed creating event dir at %s", storage->Path()); - sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id()); - if (monitor->ServerId()) - sql += stringtf(" AND ServerId=%u", monitor->ServerId()); - - storage = nullptr; - - MYSQL_RES *result = zmDbFetch(sql); - if (result) { - for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) { - storage = new Storage(atoi(dbrow[0])); - if (SetPath(storage)) - break; - delete storage; - storage = nullptr; - } // end foreach row of Storage - mysql_free_result(result); - result = nullptr; - } - if (!storage) { - Info("No valid local storage area found. Trying all other areas."); - // Try remote - sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL"; - if (monitor->ServerId()) - sql += stringtf(" OR ServerId != %u", monitor->ServerId()); - - result = zmDbFetch(sql); - if (result) { - for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) { - storage = new Storage(atoi(dbrow[0])); - if (SetPath(storage)) - break; - delete storage; - storage = nullptr; - } // end foreach row of Storage - mysql_free_result(result); - result = nullptr; - } - } - if (!storage) { - storage = new Storage(); - Warning("Failed to find a storage area to save events."); - } - sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id); - zmDbDo(sql); - } // end if ! setPath(Storage) - Debug(1, "Using storage area at %s", path.c_str()); - - snapshot_file = path + "/snapshot.jpg"; - alarm_file = path + "/alarm.jpg"; - - video_incomplete_path = path + "/" + video_incomplete_file; - - if (monitor->GetOptVideoWriter() != 0) { - /* Save as video */ - - videoStore = new VideoStore( - video_incomplete_path.c_str(), - container.c_str(), - monitor->GetVideoStream(), - monitor->GetVideoCodecContext(), - ( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ), - ( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ), - monitor ); - - if ( !videoStore->open() ) { - Warning("Failed to open videostore, turning on jpegs"); - delete videoStore; - videoStore = nullptr; - if ( ! ( save_jpegs & 1 ) ) { - save_jpegs |= 1; // Turn on jpeg storage - sql = stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id); - zmDbDo(sql); - } - } else { - std::string codec = videoStore->get_codec(); - video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str()); - video_path = path + "/" + video_file; - Debug(1, "Video file is %s", video_file.c_str()); - } - } // end if GetOptVideoWriter - if (storage != monitor->getStorage()) - delete storage; + thread_ = std::thread(&Event::Run, this); } Event::~Event() { - // We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet. + + Stop(); + if (thread_.joinable()) thread_.join(); /* Close the video file */ + // We close the videowriter first, because if we finish the event, we might try to view the file, but we aren't done writing it yet. if (videoStore != nullptr) { Debug(4, "Deleting video store"); delete videoStore; @@ -377,6 +297,12 @@ void Event::updateNotes(const StringSetMap &newNoteSetMap) { } // void Event::updateNotes(const StringSetMap &newNoteSetMap) void Event::AddPacket(const std::shared_ptr&packet) { + std::unique_lock lck(packet_queue_mutex); + packet_queue.push(packet); + packet_queue_condition.notify_one(); +} + +void Event::AddPacket_(const std::shared_ptr&packet) { have_video_keyframe = have_video_keyframe || ( ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) && ( packet->keyframe || monitor->GetOptVideoWriter() == Monitor::ENCODE) ); @@ -655,3 +581,108 @@ bool Event::SetPath(Storage *storage) { } // deep storage or not return true; } // end bool Event::SetPath + +void Event::Run() { + Storage *storage = monitor->getStorage(); + if (!SetPath(storage)) { + // Try another + Warning("Failed creating event dir at %s", storage->Path()); + + std::string sql = stringtf("SELECT `Id` FROM `Storage` WHERE `Id` != %u", storage->Id()); + if (monitor->ServerId()) + sql += stringtf(" AND ServerId=%u", monitor->ServerId()); + + storage = nullptr; + + MYSQL_RES *result = zmDbFetch(sql); + if (result) { + for (int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++) { + storage = new Storage(atoi(dbrow[0])); + if (SetPath(storage)) + break; + delete storage; + storage = nullptr; + } // end foreach row of Storage + mysql_free_result(result); + result = nullptr; + } + if (!storage) { + Info("No valid local storage area found. Trying all other areas."); + // Try remote + sql = "SELECT `Id` FROM `Storage` WHERE ServerId IS NULL"; + if (monitor->ServerId()) + sql += stringtf(" OR ServerId != %u", monitor->ServerId()); + + result = zmDbFetch(sql); + if (result) { + for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) { + storage = new Storage(atoi(dbrow[0])); + if (SetPath(storage)) + break; + delete storage; + storage = nullptr; + } // end foreach row of Storage + mysql_free_result(result); + result = nullptr; + } + } + if (!storage) { + storage = new Storage(); + Warning("Failed to find a storage area to save events."); + } + sql = stringtf("UPDATE Events SET StorageId = '%d' WHERE Id=%" PRIu64, storage->Id(), id); + zmDbDo(sql); + } // end if ! setPath(Storage) + Debug(1, "Using storage area at %s", path.c_str()); + + snapshot_file = path + "/snapshot.jpg"; + alarm_file = path + "/alarm.jpg"; + + video_incomplete_path = path + "/" + video_incomplete_file; + + if (monitor->GetOptVideoWriter() != 0) { + /* Save as video */ + + videoStore = new VideoStore( + video_incomplete_path.c_str(), + container.c_str(), + monitor->GetVideoStream(), + monitor->GetVideoCodecContext(), + ( monitor->RecordAudio() ? monitor->GetAudioStream() : nullptr ), + ( monitor->RecordAudio() ? monitor->GetAudioCodecContext() : nullptr ), + monitor ); + + if ( !videoStore->open() ) { + Warning("Failed to open videostore, turning on jpegs"); + delete videoStore; + videoStore = nullptr; + if ( ! ( save_jpegs & 1 ) ) { + save_jpegs |= 1; // Turn on jpeg storage + zmDbDo(stringtf("UPDATE Events SET SaveJpegs=%d WHERE Id=%" PRIu64, save_jpegs, id)); + } + } else { + std::string codec = videoStore->get_codec(); + video_file = stringtf("%" PRIu64 "-%s.%s.%s", id, "video", codec.c_str(), container.c_str()); + video_path = path + "/" + video_file; + Debug(1, "Video file is %s", video_file.c_str()); + } + } // end if GetOptVideoWriter + if (storage != monitor->getStorage()) + delete storage; + + + std::unique_lock lck(packet_queue_mutex); + + // The idea is to process the queue no matter what so that all packets get processed. + // We only break if the queue is empty + while (true) { + if (!packet_queue.empty()) { + this->AddPacket_(packet_queue.front()); + packet_queue.pop(); + } else { + if (terminate_ or zm_terminate) + break; + packet_queue_condition.wait(lck); + } + } +} diff --git a/src/zm_event.h b/src/zm_event.h index 4fb5c7469..447bdeafc 100644 --- a/src/zm_event.h +++ b/src/zm_event.h @@ -22,14 +22,21 @@ #include "zm_config.h" #include "zm_define.h" +#include "zm_packet.h" #include "zm_storage.h" #include "zm_time.h" #include "zm_utils.h" #include "zm_zone.h" +#include +#include #include +#include +#include #include #include +#include + class EventStream; class Frame; @@ -98,6 +105,15 @@ class Event { void createNotes(std::string ¬es); + std::queue> packet_queue; + std::mutex packet_queue_mutex; + std::condition_variable packet_queue_condition; + + void Run(); + + std::atomic terminate_; + std::thread thread_; + public: static bool OpenFrameSocket(int); static bool ValidateFrameSocket(int); @@ -118,6 +134,7 @@ class Event { SystemTimePoint EndTime() const { return end_time; } void AddPacket(const std::shared_ptr &p); + void AddPacket_(const std::shared_ptr &p); bool WritePacket(const std::shared_ptr &p); bool SendFrameImage(const Image *image, bool alarm_frame=false); bool WriteFrameImage(Image *image, SystemTimePoint timestamp, const char *event_file, bool alarm_frame = false) const; @@ -130,6 +147,9 @@ class Event { int score = 0, Image *alarm_image = nullptr); + void Stop() { terminate_ = true; } + bool Stopped() const { return terminate_; } + private: void WriteDbFrames(); bool SetPath(Storage *storage);