diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 7c6df3374..64679feef 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -17,6 +17,8 @@ //along with ZoneMinder. If not, see . +// PacketQueue must know about all iterators and manage them + #include "zm_packetqueue.h" #include "zm_ffmpeg.h" #include "zm_signal.h" @@ -32,7 +34,6 @@ zm_packetqueue::zm_packetqueue( max_video_packet_count(video_image_count), deleting(false) { - analysis_it = pktQueue.begin(); max_stream_id = p_video_stream_id > p_audio_stream_id ? p_video_stream_id : p_audio_stream_id; packet_counts = new int[max_stream_id+1]; @@ -43,64 +44,131 @@ zm_packetqueue::zm_packetqueue( zm_packetqueue::~zm_packetqueue() { deleting = true; /* zma might be waiting. Must have exclusive access */ - while ( ! mutex.try_lock() ) { + while ( !mutex.try_lock() ) { Debug(4, "Waiting for exclusive access"); condition.notify_all(); } while ( !pktQueue.empty() ) { - ZMPacket * packet = pktQueue.front(); + ZMPacket *packet = pktQueue.front(); pktQueue.pop_front(); - if ( packet->image_index == -1 ) { - delete packet; - } + delete packet; } delete[] packet_counts; - Debug(4, "Done in destrcutor"); + Debug(4, "Done in destructor"); packet_counts = nullptr; mutex.unlock(); condition.notify_all(); } -/* Enqueues the given packet. Will maintain the analysis_it pointer and image packet counts. +/* Enqueues the given packet. Will maintain the it pointer and image packet counts. * If we have reached our max image packet count, it will pop off as many packets as are needed. * Thus it will ensure that the same packet never gets queued twice. */ -bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) { +bool zm_packetqueue::queuePacket(ZMPacket* add_packet) { Debug(4, "packetqueue queuepacket"); mutex.lock(); - pktQueue.push_back(zm_packet); - packet_counts[zm_packet->packet.stream_index] += 1; + pktQueue.push_back(add_packet); + packet_counts[add_packet->packet.stream_index] += 1; Debug(1, "packet counts for %d is %d", - zm_packet->packet.stream_index, packet_counts[zm_packet->packet.stream_index]); - if ( analysis_it == pktQueue.end() ) { - // Analsys_it should only point to end when queue is empty - Debug(4, "pointing analysis_it to back"); - analysis_it --; - } + add_packet->packet.stream_index, + packet_counts[add_packet->packet.stream_index]); + + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + // Have to check each iterator and make sure it doesn't point to the packet we are about to delete + if ( *iterator_it == pktQueue.end() ) { + Debug(4, "pointing it to back"); + --(*iterator_it); + } + } // end foreach iterator + + // Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one. + // No good. Have to satisfy two conditions: + // 1. packetqueue starts with a video keyframe + // 2. Have minimum # of video packets + // 3. No packets can be locked + // 4. No iterator can point to one of the packets + // + // So start at the beginning, counting video packets until the next keyframe. + // Then if deleting those packets doesn't break 1 and 2, then go ahead and delete them. + if ( add_packet->packet.stream_index == video_stream_id + and + add_packet->keyframe + and + (packet_counts[video_stream_id] > max_video_packet_count) + ) { + packetqueue_iterator it = pktQueue.begin(); + int video_stream_packets = 0; + // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that + do { + it++; + ZMPacket *zm_packet = *it; + Debug(1, "Checking packet to see if we can delete them"); + if ( zm_packet->packet.stream_index == video_stream_id ) { + if ( zm_packet->keyframe ) + break; + video_stream_packets ++; + } + + if ( !zm_packet->trylock() ) { + video_stream_packets = max_video_packet_count; + break; + } + zm_packet->unlock(); + + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + // Have to check each iterator and make sure it doesn't point to the packet we are about to delete + if ( *(*iterator_it) == zm_packet ) { + Debug(4, "bumping it. Threads not keeping up"); + video_stream_packets = max_video_packet_count; + } + } // end foreach iterator + + } while ( video_stream_packets < max_video_packet_count ); + + if ( + max_video_packet_count - video_stream_packets > max_video_packet_count + and + ( *it != add_packet ) + ) { + Debug(1, "Deleting packets"); + + // Now to clean ups mainting the queue size. + while ( packet_counts[video_stream_id] > max_video_packet_count ) { + ZMPacket *zm_packet = *pktQueue.begin(); + if ( !zm_packet ) { + Error("NULL zm_packet in queue"); + continue; + } + + Debug(1, "Deleting a packet with stream index (%d) image_index %d with keyframe(%d), video_frames_to_keep is (%d) max: %d, queuesuze:%d", + zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count,pktQueue.size()); + pktQueue.pop_front(); + packet_counts[zm_packet->packet.stream_index] -= 1; + zm_packet->unlock(); + delete zm_packet; + } + } // end if have at least max_video_packet_count video packets remaining + } // end if this is a video keyframe mutex.unlock(); // We signal on every packet because someday we may analyze sound Debug(4, "packetqueue queuepacket, unlocked signalling"); condition.notify_all(); - // We have added to the queue and signalled so other processes can now work on the new packet. - // Now to clean ups mainting the queue size. - while ( packet_counts[video_stream_id] > max_video_packet_count ) { - //clearQueue(max_video_packet_count, video_stream_id); - //clearQueue is rather heavy. Since this is the only packet injection spot, we can just start at the beginning of the queue and remove packets until we get to the next video keyframe - Debug(1, "Deleting a packet with stream index (%d) with keyframe(%d), video_frames_to_keep is (%d) max: %d", - zm_packet->packet.stream_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count); - ZMPacket *zm_packet = *pktQueue.begin(); - pktQueue.pop_front(); - packet_counts[zm_packet->packet.stream_index] -= 1; - if ( zm_packet->image_index == -1 ) - delete zm_packet; - } - return true; } // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) @@ -112,20 +180,28 @@ ZMPacket* zm_packetqueue::popPacket( ) { Debug(4, "poPacket Mutex locking"); mutex.lock(); - ZMPacket *packet = pktQueue.front(); - if ( *analysis_it == packet ) { - Debug(4, "not popping analysis_it index %d", packet->image_index); - mutex.unlock(); - return nullptr; - } - packet->lock(); + ZMPacket *zm_packet = pktQueue.front(); + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + // Have to check each iterator and make sure it doesn't point to the packet we are about to delete + if ( *(*iterator_it) == zm_packet ) { + Debug(4, "Bumping it because it is at the front that we are deleting"); + ++(*iterators_it); + } + } // end foreach iterator + + zm_packet->lock(); pktQueue.pop_front(); - packet_counts[packet->packet.stream_index] -= 1; + packet_counts[zm_packet->packet.stream_index] -= 1; mutex.unlock(); - return packet; + return zm_packet; } // popPacket @@ -147,10 +223,10 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_ if ( pktQueue.size() <= frames_to_keep ) { return 0; } - Debug(4, "Locking in clearQueue"); + Debug(5, "Locking in clearQueue"); mutex.lock(); - std::list::iterator it = pktQueue.end()--; // point to last element instead of end + packetqueue_iterator it = pktQueue.end()--; // point to last element instead of end ZMPacket *zm_packet = nullptr; while ( (it != pktQueue.begin()) and frames_to_keep ) { @@ -184,13 +260,21 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_ Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d", delete_count, pktQueue.size()); zm_packet = pktQueue.front(); - if ( *analysis_it == zm_packet ) { - Debug(4, "Bumping analysis it because it is at the front that we are deleting"); - ++analysis_it; - } + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + // Have to check each iterator and make sure it doesn't point to the packet we are about to delete + if ( *(*iterator_it) == zm_packet ) { + Debug(4, "Bumping it because it is at the front that we are deleting"); + ++(*iterators_it); + } + } // end foreach iterator packet_counts[zm_packet->packet.stream_index] --; pktQueue.pop_front(); - if ( zm_packet->image_index == -1 ) + //if ( zm_packet->image_index == -1 ) delete zm_packet; delete_count += 1; @@ -200,100 +284,9 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_ mutex.unlock(); return delete_count; -# if 0 - // I forget why +1 - frames_to_keep += 1; - int packets_to_delete = pktQueue.size(); - - std::list::reverse_iterator it; - ZMPacket *zm_packet = nullptr; - - for ( it = pktQueue.rbegin(); frames_to_keep && (it != pktQueue.rend()); ++it ) { - zm_packet = *it; - AVPacket *av_packet = &(zm_packet->packet); - - Debug(3, "Looking at packet with stream index (%d) with keyframe(%d), Image_index(%d) frames_to_keep is (%d)", - av_packet->stream_index, zm_packet->keyframe, zm_packet->image_index, frames_to_keep ); - - // Want frames_to_keep video keyframes. Otherwise, we may not have enough - if ( av_packet->stream_index == stream_id ) { - frames_to_keep --; - packets_to_delete --; - } - } - - // Make sure we start on a keyframe - for ( ; it != pktQueue.rend(); ++it ) { - zm_packet = *it; - AVPacket *av_packet = &(zm_packet->packet); - - Debug(3, "Looking for keyframe at packet with stream index (%d) with keyframe (%d), image_index(%d) frames_to_keep is (%d)", - av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), zm_packet->image_index, frames_to_keep ); - - // Want frames_to_keep video keyframes. Otherwise, we may not have enough - if ( (av_packet->stream_index == stream_id) and (av_packet->flags & AV_PKT_FLAG_KEY) ) { - Debug(3, "Found keyframe at packet with stream index (%d) with keyframe (%d), frames_to_keep is (%d)", - av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), frames_to_keep); - break; - } - packets_to_delete--; - } - if ( frames_to_keep ) { - Debug(3, "Hit end of queue, still need (%d) video frames", frames_to_keep); - } - if ( it != pktQueue.rend() ) { - // We want to keep this packet, so advance to the next - ++it; - packets_to_delete--; - } - int delete_count = 0; - - if ( packets_to_delete > 0 ) { - Debug(4, "Deleting packets from the front, count is (%d)", packets_to_delete); - while ( --packets_to_delete ) { - Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d", - delete_count, pktQueue.size()); - - zm_packet = pktQueue.front(); - if ( *analysis_it == zm_packet ) { - Debug(4, "Bumping analysis it because it is at the front that we are deleting"); - ++analysis_it; - } - if ( zm_packet->codec_type == AVMEDIA_TYPE_VIDEO ) { - video_packet_count -= 1; - if ( video_packet_count ) { - // There is another video packet, so it must be the next one - first_video_packet_index += 1; - first_video_packet_index %= max_video_packet_count; - } else { - // Re-init - first_video_packet_index = -1; - } - } - packet_counts[zm_packet->packet.stream_index] -= 1; - pktQueue.pop_front(); - if ( zm_packet->image_index == -1 ) - delete zm_packet; - - delete_count += 1; - } // while our iterator is not the first packet - } // end if have packet_delete_count - zm_packet = nullptr; // tidy up for valgrind - Debug(3, "Deleted %d packets, %d remaining", delete_count, pktQueue.size()); - -#if 0 - if ( pktQueue.size() ) { - packet = pktQueue.front(); - first_video_packet_index = packet->image_index; - } else { - first_video_packet_index = -1; - } -#endif - Debug(3, "Deleted packets, resulting size is %d", pktQueue.size()); mutex.unlock(); return delete_count; -# endif } // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id ) void zm_packetqueue::clearQueue() { @@ -305,12 +298,19 @@ void zm_packetqueue::clearQueue() { packet = pktQueue.front(); packet_counts[packet->packet.stream_index] -= 1; pktQueue.pop_front(); - if ( packet->image_index == -1 ) + //if ( packet->image_index == -1 ) delete packet; delete_count += 1; } Debug(3, "Deleted (%d) packets", delete_count ); - analysis_it = pktQueue.begin(); + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + *iterator_it = pktQueue.begin(); + } // end foreach iterator mutex.unlock(); } @@ -380,12 +380,21 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId) ZMPacket *zm_packet = nullptr; while ( distance(it, pktQueue.rend()) > 1 ) { zm_packet = pktQueue.front(); - if ( *analysis_it == zm_packet ) { - ++analysis_it; - } + for ( + std::list::iterator iterators_it = iterators.begin(); + iterators_it != iterators.end(); + ++iterators_it + ) { + packetqueue_iterator *iterator_it = *iterators_it; + // Have to check each iterator and make sure it doesn't point to the packet we are about to delete + if ( *(*iterator_it) == zm_packet ) { + Debug(4, "Bumping it because it is at the front that we are deleting"); + ++(*iterators_it); + } + } // end foreach iterator pktQueue.pop_front(); packet_counts[zm_packet->packet.stream_index] -= 1; - if ( zm_packet->image_index == -1 ) + //if ( zm_packet->image_index == -1 ) delete zm_packet; deleted_frames += 1; } @@ -400,66 +409,71 @@ unsigned int zm_packetqueue::size() { return pktQueue.size(); } -int zm_packetqueue::packet_count( int stream_id ) { +int zm_packetqueue::packet_count(int stream_id) { return packet_counts[stream_id]; -} // end int zm_packetqueue::packet_count( int stream_id ) +} // end int zm_packetqueue::packet_count(int stream_id) -// Returns a packet to analyse or NULL -ZMPacket *zm_packetqueue::get_analysis_packet() { +// Returns a packet. Packet will be locked +ZMPacket *zm_packetqueue::get_packet(packetqueue_iterator *it) { - Debug(4, "Locking in get_analysis_packet"); + Debug(4, "Locking in get_packet"); std::unique_lock lck(mutex); + Debug(4, "Have Lock in get_packet"); - while ( ((! pktQueue.size()) or ( analysis_it == pktQueue.end() )) and !zm_terminate and !deleting ) { - Debug(2, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); + while ( (! pktQueue.size()) or ( *it == pktQueue.end() ) ) { + if ( deleting or zm_terminate ) + return nullptr; + Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) ); condition.wait(lck); } - if ( deleting ) { + if ( deleting or zm_terminate ) + return nullptr; + + ZMPacket *p = *(*it); + if ( !p ) { + Error("Null p?!"); return nullptr; } - -//Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) ); - //Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) ); - ZMPacket *p = *analysis_it; - Debug(3, "get_analysis_packet image_index: %d, about to lock packet", p->image_index); - while ( !p->trylock() and !zm_terminate ) { - Debug(3, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); + Debug(3, "get_packet image_index: %d, about to lock packet", p->image_index); + while ( !(zm_terminate or deleting) and !p->trylock() ) { + Debug(3, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) ); condition.wait(lck); - if ( deleting ) { - // packetqueue is being deleted, do not assume we have a lock on the packet - return nullptr; - } } Debug(2, "Locked packet, unlocking packetqueue mutex"); return p; -} // end ZMPacket *zm_packetqueue::get_analysis_packet() +} // end ZMPacket *zm_packetqueue::get_packet(it) -// The idea is that analsys_it will only be == end() if the queue is empty -// probvlem here is that we don't want to analyse a packet twice. Maybe we can flag the packet analysed -bool zm_packetqueue::increment_analysis_it( ) { - // We do this instead of distance becuase distance will traverse the entire list in the worst case - if ( analysis_it != pktQueue.end() ) { - ++analysis_it; - if ( (analysis_it == pktQueue.end()) ) { - Debug(3, "Incrementing analysis it %d", (analysis_it == pktQueue.end()) ); - } else { - Debug(3, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index); - } - } else { - Debug(3, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end())); - } - return true; - - std::list::iterator next_it = analysis_it; - ++ next_it; - if ( next_it == pktQueue.end() ) { +bool zm_packetqueue::increment_it(packetqueue_iterator *it) { + Debug(2, "Incrementing %p, queue size %d, end? ", it, pktQueue.size(), (*it == pktQueue.end())); + if ( *it == pktQueue.end() ) { return false; } - analysis_it = next_it; - return true; -} // end bool zm_packetqueue::increment_analysis_it( ) + ++(*it); + if ( *it != pktQueue.end() ) { + Debug(2, "Incrementing %p, still not at end, so returning true", it); + return true; + } + return false; +} // end bool zm_packetqueue::increment_it(packetqueue_iterator *it) +// Increment it only considering packets for a given stream +bool zm_packetqueue::increment_it(packetqueue_iterator *it, int stream_id) { + Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), (*it == pktQueue.end())); + if ( *it == pktQueue.end() ) { + return false; + } + + do { + ++(*it); + } while ( (*it != pktQueue.end()) and ( (*(*it))->packet.stream_index != stream_id) ); + + if ( *it != pktQueue.end() ) { + Debug(2, "Incrementing %p, still not at end, so incrementing", it); + return true; + } + return false; +} // end bool zm_packetqueue::increment_it(packetqueue_iterator *it) std::list::iterator zm_packetqueue::get_event_start_packet_it( std::list::iterator snapshot_it, @@ -485,8 +499,10 @@ std::list::iterator zm_packetqueue::get_event_start_packet_it( Debug(1, "Hit begin"); // hit end, the first packet in the queue should ALWAYS be a video keyframe. // So we should be able to return it. - if ( pre_event_count ) + if ( pre_event_count ) { Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count); + dumpPacket( &((*it)->packet ) ); + } return it; } Debug(1, "Checking for keyframe %p", *it); @@ -548,3 +564,43 @@ void zm_packetqueue::dumpQueue() { dumpPacket(av_packet); } } + +/* Returns an iterator to the first video keyframe in the queue. + * nullptr if no keyframe video packet exists. + */ +packetqueue_iterator * zm_packetqueue::get_video_it(bool wait) { + packetqueue_iterator *it = new packetqueue_iterator; + iterators.push_back(it); + + std::unique_lock lck(mutex); + *it = pktQueue.begin(); + + if ( wait ) { + while ( ((! pktQueue.size()) or (*it == pktQueue.end())) and !zm_terminate and !deleting ) { + Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) ); + condition.wait(lck); + *it = pktQueue.begin(); + } + if ( deleting or zm_terminate ) { + delete it; + return nullptr; + } + } + + while ( *it != pktQueue.end() ) { + ZMPacket *zm_packet = *(*it); + if ( !zm_packet ) { + Error("Null zmpacket in queue!?"); + return nullptr; + } + Debug(1, "Packet keyframe %d for stream %d, so returning the it to it", + zm_packet->keyframe, zm_packet->packet.stream_index); + if ( zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id ) ) { + Debug(1, "Found a keyframe for stream %d, so returning the it to it", video_stream_id); + return it; + } + ++(*it); + } + Debug(1, "DIdn't Found a keyframe for stream %d, so returning the it to it", video_stream_id); + return it; +} diff --git a/src/zm_packetqueue.h b/src/zm_packetqueue.h index 270498526..458c3076b 100644 --- a/src/zm_packetqueue.h +++ b/src/zm_packetqueue.h @@ -32,6 +32,8 @@ extern "C" { #include } +typedef std::list::iterator packetqueue_iterator; + class zm_packetqueue { public: // For now just to ease development std::list pktQueue; @@ -42,6 +44,7 @@ class zm_packetqueue { int max_stream_id; int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */ bool deleting; + std::list iterators; std::mutex mutex; std::condition_variable condition; @@ -49,6 +52,8 @@ class zm_packetqueue { public: zm_packetqueue(int p_max_video_packet_count, int p_video_stream_id, int p_audio_stream_id); virtual ~zm_packetqueue(); + std::list::const_iterator end() const { return pktQueue.end(); } + std::list::const_iterator begin() const { return pktQueue.begin(); } bool queuePacket(ZMPacket* packet); ZMPacket * popPacket(); @@ -64,12 +69,14 @@ class zm_packetqueue { void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId); int packet_count(int stream_id); - // Functions to manage the analysis frame logic - bool increment_analysis_it(); - ZMPacket *get_analysis_packet(); - std::list::iterator get_analysis_it() const { return analysis_it; } + bool increment_it(packetqueue_iterator *it); + bool increment_it(packetqueue_iterator *it, int stream_id); + ZMPacket *get_packet(packetqueue_iterator *); + packetqueue_iterator *get_video_it(bool wait); + packetqueue_iterator *get_stream_it(int stream_id); + std::list::iterator get_event_start_packet_it( - std::list::iterator snapshot_it, + packetqueue_iterator snapshot_it, unsigned int pre_event_count ); };