From 045cd219f865b6ec9fa966c494ddb64e617c5472 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Thu, 18 Feb 2021 19:25:40 -0500 Subject: [PATCH] Move clear packetqueue logic to it's own function and call it from the analysis thread. --- src/zm_monitor.cpp | 3 +- src/zm_packetqueue.cpp | 123 ++++++++++++++++++++++------------------- src/zm_packetqueue.h | 1 + 3 files changed, 69 insertions(+), 58 deletions(-) diff --git a/src/zm_monitor.cpp b/src/zm_monitor.cpp index c38737617..75e121104 100644 --- a/src/zm_monitor.cpp +++ b/src/zm_monitor.cpp @@ -1950,7 +1950,7 @@ bool Monitor::Analyse() { // If doing record, check to see if we need to close the event or not. if ( event ) { - Debug(2, "Have event in mocord"); + Debug(2, "Have event %" PRIu64 " in mocord", event->Id()); if ( section_length && ( ( timestamp->tv_sec - video_store_data->recording.tv_sec ) >= section_length ) && ( (function == MOCORD && (event_close_mode != CLOSE_TIME)) || ! ( timestamp->tv_sec % section_length ) ) @@ -2229,6 +2229,7 @@ bool Monitor::Analyse() { shared_data->last_read_time = time(nullptr); analysis_image_count++; UpdateAnalysisFPS(); + packetqueue.clearPackets(snap); return true; } // end Monitor::Analyse diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index cd60b5c40..95ba99aff 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -99,7 +99,15 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) { --(*iterator_it); } } // end foreach iterator + mutex.unlock(); + // We signal on every packet because someday we may analyze sound + Debug(4, "packetqueue queuepacket, unlocked signalling"); + condition.notify_all(); + return true; +} // end bool PacketQueue::queuePacket(ZMPacket* zm_packet) + +void PacketQueue::clearPackets(ZMPacket *add_packet) { // Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one. // No good. Have to satisfy two conditions: // 1. packetqueue starts with a video keyframe @@ -109,77 +117,78 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) { // // So start at the beginning, counting video packets until the next keyframe. // Then if deleting those packets doesn't break 1 and 2, then go ahead and delete them. - if ( add_packet->packet.stream_index == video_stream_id + if ( ! ( + add_packet->packet.stream_index == video_stream_id and add_packet->keyframe and (packet_counts[video_stream_id] > max_video_packet_count) and *(pktQueue.begin()) != add_packet + ) ) { - packetqueue_iterator it = pktQueue.begin(); - packetqueue_iterator next_front = pktQueue.begin(); + return; + } + std::unique_lock lck(mutex); - // First packet is special because we know it is a video keyframe and only need to check for lock - ZMPacket *zm_packet = *it; - if ( zm_packet->trylock() ) { - ++it; + packetqueue_iterator it = pktQueue.begin(); + packetqueue_iterator next_front = pktQueue.begin(); + + // First packet is special because we know it is a video keyframe and only need to check for lock + ZMPacket *zm_packet = *it; + if ( zm_packet->trylock() ) { + ++it; + zm_packet->unlock(); + + // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that + while ( *it != add_packet ) { + zm_packet = *it; + Debug(1, "Checking packet to see if we can delete them"); + if ( !zm_packet->trylock() ) { + Debug(1, "Have locked packet %d", zm_packet->image_index); + break; + } zm_packet->unlock(); - // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that - while ( *it != add_packet ) { - zm_packet = *it; - Debug(1, "Checking packet to see if we can delete them"); - if ( !zm_packet->trylock() ) { - Debug(1, "Have locked packet %d", zm_packet->image_index); - break; - } - zm_packet->unlock(); - - if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) { - Debug(4, "Found IT at beginning of queue. Threads not keeping up"); - break; - } - - if ( zm_packet->packet.stream_index == video_stream_id ) { - if ( zm_packet->keyframe ) { - Debug(1, "Have a video keyframe so breaking out"); - next_front = it; - } - } - it++; - } // end while - } // end if first packet not locked - Debug(1, "Resulting pointing at latest packet? %d, have next front? %d", - ( *it == add_packet ), - ( next_front == pktQueue.begin() ) - ); - if ( next_front != pktQueue.begin() ) { - Debug(1, "Deleting packets"); - // It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set - while ( pktQueue.begin() != next_front ) { - ZMPacket *zm_packet = *pktQueue.begin(); - if ( !zm_packet ) { - Error("NULL zm_packet in queue"); - continue; - } - - Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d", - zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size()); - pktQueue.pop_front(); - packet_counts[zm_packet->packet.stream_index] -= 1; - delete zm_packet; + if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) { + Debug(4, "Found IT at beginning of queue. Threads not keeping up"); + break; } - } // end if have at least max_video_packet_count video packets remaining - } // end if this is a video keyframe - mutex.unlock(); + if ( zm_packet->packet.stream_index == video_stream_id ) { + if ( zm_packet->keyframe ) { + Debug(1, "Have a video keyframe so breaking out"); + next_front = it; + } + } + it++; + } // end while + } // end if first packet not locked + Debug(1, "Resulting pointing at latest packet? %d, have next front? %d", + ( *it == add_packet ), + ( next_front == pktQueue.begin() ) + ); + if ( next_front != pktQueue.begin() ) { + Debug(1, "Deleting packets"); + // It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set + while ( pktQueue.begin() != next_front ) { + ZMPacket *zm_packet = *pktQueue.begin(); + if ( !zm_packet ) { + Error("NULL zm_packet in queue"); + continue; + } + + Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d", + zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size()); + pktQueue.pop_front(); + packet_counts[zm_packet->packet.stream_index] -= 1; + delete zm_packet; + } + } // end if have at least max_video_packet_count video packets remaining // We signal on every packet because someday we may analyze sound - Debug(4, "packetqueue queuepacket, unlocked signalling"); - condition.notify_all(); - return true; -} // end bool PacketQueue::queuePacket(ZMPacket* zm_packet) + return; +} // end voidPacketQueue::clearPackets(ZMPacket* zm_packet) ZMPacket* PacketQueue::popPacket( ) { Debug(4, "pktQueue size %d", pktQueue.size()); diff --git a/src/zm_packetqueue.h b/src/zm_packetqueue.h index 4cbd0739d..cd88648b9 100644 --- a/src/zm_packetqueue.h +++ b/src/zm_packetqueue.h @@ -63,6 +63,7 @@ class PacketQueue { unsigned int get_packet_count(int stream_id) const { return packet_counts[stream_id]; }; void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId); + void clearPackets(ZMPacket *); int packet_count(int stream_id); bool increment_it(packetqueue_iterator *it);