diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index eb65ac6f0..38160e354 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -1735,10 +1735,6 @@ bool Monitor::Analyse() { return false; } - // Store the it that points to our snap we will need it later - packetqueue_iterator snap_it = *analysis_it; - packetqueue.increment_it(analysis_it); - // signal is set by capture bool signal = shared_data->signal; bool signal_change = (signal != last_signal); @@ -1848,8 +1844,15 @@ bool Monitor::Analyse() { while (!snap->decoded and !zm_terminate and !analysis_thread->Stopped()) { // Need to wait for the decoder thread. Debug(1, "Waiting for decode"); + packetqueue.unlock(packet_lock); packetqueue.notify_all(); // decode might be waiting - packet_lock->wait(); + packetqueue.wait(); + + // Another thread may have moved our it. Unlikely but possible + packet_lock = packetqueue.get_packet(analysis_it); + if (!packet_lock) return false; + snap = packet_lock->packet_; + if (!snap->image and snap->decoded) { Debug(1, "No image but was decoded, giving up"); delete packet_lock; @@ -1947,14 +1950,14 @@ bool Monitor::Analyse() { // Must start on a keyframe so rewind. Only for passthrough though I guess. // FIXME this iterator is not protected from invalidation packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it( - snap_it, 0 /* pre_event_count */ + *analysis_it, 0 /* pre_event_count */ ); // This gets a lock on the starting packet ZMLockedPacket *starting_packet_lock = nullptr; std::shared_ptr starting_packet = nullptr; - if (*start_it != snap_it) { + if (*start_it != *analysis_it) { starting_packet_lock = packetqueue.get_packet(start_it); if (!starting_packet_lock) { Warning("Unable to get starting packet lock"); @@ -1968,12 +1971,12 @@ bool Monitor::Analyse() { event = new Event(this, starting_packet->timestamp, "Continuous", noteSetMap); // Write out starting packets, do not modify packetqueue it will garbage collect itself - while (starting_packet and ((*start_it) != snap_it)) { + while (starting_packet and ((*start_it) != *analysis_it)) { event->AddPacket(starting_packet); // Have added the packet, don't want to unlock it until we have locked the next packetqueue.increment_it(start_it); - if ((*start_it) == snap_it) { + if ((*start_it) == *analysis_it) { if (starting_packet_lock) delete starting_packet_lock; break; } @@ -2051,12 +2054,12 @@ bool Monitor::Analyse() { if (!event) { packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it( - snap_it, + *analysis_it, (pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count) ); ZMLockedPacket *starting_packet_lock = nullptr; std::shared_ptr starting_packet = nullptr; - if (*start_it != snap_it) { + if (*start_it != *analysis_it) { starting_packet_lock = packetqueue.get_packet(start_it); if (!starting_packet_lock) return false; starting_packet = starting_packet_lock->packet_; @@ -2071,11 +2074,11 @@ bool Monitor::Analyse() { shared_data->state = state = ALARM; // Write out starting packets, do not modify packetqueue it will garbage collect itself - while (*start_it != snap_it) { + while (*start_it != *analysis_it) { event->AddPacket(starting_packet); packetqueue.increment_it(start_it); - if ( (*start_it) == snap_it ) { + if ( (*start_it) == (*analysis_it) ) { if (starting_packet_lock) delete starting_packet_lock; break; } @@ -2262,6 +2265,7 @@ bool Monitor::Analyse() { if (function == MODECT or function == MOCORD) UpdateAnalysisFPS(); } + packetqueue.increment_it(analysis_it); packetqueue.unlock(packet_lock); shared_data->last_read_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 67ba35c0f..43ab20d1e 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -663,3 +663,12 @@ void PacketQueue::setPreEventVideoPackets(int p) { pre_event_video_packet_count = 1; // We can simplify a lot of logic in queuePacket if we can assume at least 1 packet in queue } + +void PacketQueue::notify_all() { + condition.notify_all(); +}; + +void PacketQueue::wait() { + std::unique_lock lck(mutex); + condition.wait(lck); +} diff --git a/src/zm_packetqueue.h b/src/zm_packetqueue.h index 2fc29ea33..20cfdfe85 100644 --- a/src/zm_packetqueue.h +++ b/src/zm_packetqueue.h @@ -81,7 +81,8 @@ class PacketQueue { ); bool is_there_an_iterator_pointing_to_packet(const std::shared_ptr &zm_packet); void unlock(ZMLockedPacket *lp); - void notify_all() { condition.notify_all(); }; + void notify_all(); + void wait(); }; #endif /* ZM_PACKETQUEUE_H */