//ZoneMinder Packet Queue Implementation Class //Copyright 2016 Steve Gilvarry // //This file is part of ZoneMinder. // //ZoneMinder is free software: you can redistribute it and/or modify //it under the terms of the GNU General Public License as published by //the Free Software Foundation, either version 3 of the License, or //(at your option) any later version. // //ZoneMinder is distributed in the hope that it will be useful, //but WITHOUT ANY WARRANTY; without even the implied warranty of //MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //GNU General Public License for more details. // //You should have received a copy of the GNU General Public License //along with ZoneMinder. If not, see . #include "zm_packetqueue.h" #include "zm_ffmpeg.h" #include "zm_signal.h" #include #include "zm_time.h" zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, int p_audio_stream_id ) { deleting = false; video_stream_id = p_video_stream_id; max_video_packet_count = video_image_count-1; video_packet_count = 0; analysis_it = pktQueue.begin(); first_video_packet_index = -1; Debug(4, "packetqueue init, first_video_packet_index is %d", first_video_packet_index); max_stream_id = p_video_stream_id > p_audio_stream_id ? p_video_stream_id : p_audio_stream_id; packet_counts = new int[max_stream_id+1]; for ( int i=0; i <= max_stream_id; ++i ) packet_counts[i] = 0; } zm_packetqueue::~zm_packetqueue() { deleting = true; Debug(1, "In destructor"); /* zma might be waiting. Must have exclusive access */ while ( ! mutex.try_lock() ) { Debug(1, "Waiting for exclusive access"); condition.notify_all(); } while ( !pktQueue.empty() ) { ZMPacket * packet = pktQueue.front(); pktQueue.pop_front(); delete packet; } delete[] packet_counts; packet_counts = nullptr; mutex.unlock(); } /* Enqueues the given packet. Will maintain the analysis_it pointer and image packet counts. * If we have reached our max image packet count, it will pop off as many packets as are needed. * Thus it will ensure that the same packet never gets queued twice. */ bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) { Debug(4, "packetqueue queuepacket, first_video_packet_index is %d", first_video_packet_index); mutex.lock(); Debug(4, "packetqueue queuepacket, have lock first_video_packet_index is %d", first_video_packet_index); if ( zm_packet->packet.stream_index == video_stream_id ) { video_packet_count += 1; } pktQueue.push_back(zm_packet); packet_counts[zm_packet->packet.stream_index] += 1; if ( analysis_it == pktQueue.end() ) { // Analsys_it should only point to end when queue is empty Debug(2, "pointing analysis_it to begining"); analysis_it = pktQueue.begin(); } #if 0 // This code should not be neccessary. Taken care of by the above code that ensure that no packet appears twice if ( zm_packet->codec_type == AVMEDIA_TYPE_VIDEO ) { video_packet_count += 1; if ( video_packet_count >= max_video_packet_count ) clearQueue(max_video_packet_count, video_stream_id); } #endif mutex.unlock(); // We signal on every packet because someday we may analyze sound condition.notify_all(); return true; } // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) ZMPacket* zm_packetqueue::popPacket( ) { if ( pktQueue.empty() ) { return nullptr; } Debug(2, "poPacket Mutex locking"); mutex.lock(); ZMPacket *packet = pktQueue.front(); if ( *analysis_it == packet ) { Debug(2, "not popping analysis_it index %d", packet->image_index); mutex.unlock(); return nullptr; } packet->lock(); pktQueue.pop_front(); if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) { video_packet_count -= 1; if ( video_packet_count ) { // There is another video packet, so it must be the next one Debug(2, "Incrementing first video packet index was (%d)", first_video_packet_index); first_video_packet_index += 1; first_video_packet_index %= max_video_packet_count; } else { first_video_packet_index = -1; } } packet_counts[packet->packet.stream_index] -= 1; mutex.unlock(); return packet; } // popPacket unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_id) { Debug(3, "Clearing all but %d frames, queue has %d", frames_to_keep, pktQueue.size()); if ( pktQueue.empty() ) { return 0; } frames_to_keep += 1; if ( pktQueue.size() <= frames_to_keep ) { return 0; } Debug(1, "Locking in clearQueue"); mutex.lock(); int packets_to_delete = pktQueue.size(); std::list::reverse_iterator it; ZMPacket *packet = nullptr; for ( it = pktQueue.rbegin(); frames_to_keep && (it != pktQueue.rend()); ++it ) { ZMPacket *zm_packet = *it; AVPacket *av_packet = &(zm_packet->packet); Debug(4, "Looking at packet with stream index (%d) with keyframe(%d), Image_index(%d) frames_to_keep is (%d)", av_packet->stream_index, zm_packet->keyframe, zm_packet->image_index, frames_to_keep ); // Want frames_to_keep video keyframes. Otherwise, we may not have enough if ( av_packet->stream_index == stream_id ) { frames_to_keep --; packets_to_delete --; } } // Make sure we start on a keyframe for ( ; it != pktQueue.rend(); ++it ) { ZMPacket *zm_packet = *it; AVPacket *av_packet = &(zm_packet->packet); Debug(5, "Looking for keyframe at packet with stream index (%d) with keyframe (%d), image_index(%d) frames_to_keep is (%d)", av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), zm_packet->image_index, frames_to_keep ); // Want frames_to_keep video keyframes. Otherwise, we may not have enough if ( (av_packet->stream_index == stream_id) && (av_packet->flags & AV_PKT_FLAG_KEY) ) { Debug(4, "Found keyframe at packet with stream index (%d) with keyframe (%d), frames_to_keep is (%d)", av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), frames_to_keep); break; } packets_to_delete--; } if ( frames_to_keep ) { Debug(3, "Hit end of queue, still need (%d) video frames", frames_to_keep); } if ( it != pktQueue.rend() ) { // We want to keep this packet, so advance to the next ++it; packets_to_delete--; } int delete_count = 0; if ( packets_to_delete > 0 ) { Debug(4, "Deleting packets from the front, count is (%d)", packets_to_delete); while ( --packets_to_delete ) { Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d", delete_count, pktQueue.size()); packet = pktQueue.front(); if ( *analysis_it == packet ) { Debug(4, "Bumping analysis it because it is at the front that we are deleting"); ++analysis_it; } if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) { video_packet_count -= 1; if ( video_packet_count ) { // There is another video packet, so it must be the next one first_video_packet_index += 1; first_video_packet_index %= max_video_packet_count; } else { // Re-init first_video_packet_index = -1; } } packet_counts[packet->packet.stream_index] -= 1; pktQueue.pop_front(); //if ( packet->image_index == -1 ) delete packet; delete_count += 1; } // while our iterator is not the first packet } // end if have packet_delete_count packet = nullptr; // tidy up for valgrind Debug(3, "Deleted %d packets, %d remaining", delete_count, pktQueue.size()); #if 0 if ( pktQueue.size() ) { packet = pktQueue.front(); first_video_packet_index = packet->image_index; } else { first_video_packet_index = -1; } #endif Debug(3, "Deleted packets, resulting size is %d", pktQueue.size()); mutex.unlock(); return delete_count; } // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id ) void zm_packetqueue::clearQueue() { Debug(1, "Clocking in clearQueue"); mutex.lock(); ZMPacket *packet = nullptr; int delete_count = 0; while ( !pktQueue.empty() ) { packet = pktQueue.front(); packet_counts[packet->packet.stream_index] -= 1; pktQueue.pop_front(); //if ( packet->image_index == -1 ) delete packet; delete_count += 1; } Debug(3, "Deleted (%d) packets", delete_count ); video_packet_count = 0; first_video_packet_index = -1; analysis_it = pktQueue.begin(); mutex.unlock(); } // clear queue keeping only specified duration of video -- return number of pkts removed unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId) { if ( pktQueue.empty() ) { return 0; } Debug(1, "Locking in clearQueue"); mutex.lock(); struct timeval keep_from; std::list::reverse_iterator it = pktQueue.rbegin(); struct timeval *t = (*it)->timestamp; timersub(t, duration, &keep_from); ++it; Debug(3, "Looking for frame before queue keep time with stream id (%d), queue has %d packets", streamId, pktQueue.size()); for ( ; it != pktQueue.rend(); ++it) { ZMPacket *zm_packet = *it; AVPacket *av_packet = &(zm_packet->packet); if ( (av_packet->stream_index == streamId) and timercmp(zm_packet->timestamp, &keep_from, <=) ) { Debug(3, "Found frame before keep time with stream index %d at %d.%d", av_packet->stream_index, zm_packet->timestamp->tv_sec, zm_packet->timestamp->tv_usec); break; } } if ( it == pktQueue.rend() ) { Debug(1, "Didn't find a frame before queue preserve time. keeping all"); mutex.unlock(); return 0; } Debug(3, "Looking for keyframe"); for ( ; it != pktQueue.rend(); ++it) { ZMPacket *zm_packet = *it; AVPacket *av_packet = &(zm_packet->packet); if ( (av_packet->flags & AV_PKT_FLAG_KEY) and (av_packet->stream_index == streamId) ) { Debug(3, "Found keyframe before start with stream index %d at %d.%d", av_packet->stream_index, zm_packet->timestamp->tv_sec, zm_packet->timestamp->tv_usec ); break; } } if ( it == pktQueue.rend() ) { Debug(1, "Didn't find a keyframe before event starttime. keeping all" ); mutex.unlock(); return 0; } unsigned int deleted_frames = 0; ZMPacket *zm_packet = nullptr; while ( distance(it, pktQueue.rend()) > 1 ) { zm_packet = pktQueue.front(); if ( *analysis_it == zm_packet ) { ++analysis_it; } pktQueue.pop_front(); packet_counts[zm_packet->packet.stream_index] -= 1; delete zm_packet; deleted_frames += 1; } zm_packet = nullptr; Debug(3, "Deleted %d frames", deleted_frames); mutex.unlock(); return deleted_frames; } unsigned int zm_packetqueue::size() { return pktQueue.size(); } unsigned int zm_packetqueue::get_video_packet_count() { return video_packet_count; } int zm_packetqueue::packet_count( int stream_id ) { return packet_counts[stream_id]; } // end int zm_packetqueue::packet_count( int stream_id ) // Returns a packet to analyse or NULL ZMPacket *zm_packetqueue::get_analysis_packet() { Debug(1, "Locking in get_analysis_packet"); std::unique_lock lck(mutex); while ( ((! pktQueue.size()) || ( analysis_it == pktQueue.end() )) && !zm_terminate ) { Debug(2, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); condition.wait(lck); } //Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) ); //Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) ); ZMPacket *p = *analysis_it; Debug(2, "get_analysis_packet image_index: %d, about to lock packet", p->image_index); while ( !p->trylock() and !zm_terminate ) { Debug(2,"waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); condition.wait(lck); if ( deleting ) { // packetqueue is being deleted, do not assume we have a lock on the packet return nullptr; } } Debug(2, "Locked packet, unlocking packetqueue mutex"); return p; } // end ZMPacket *zm_packetqueue::get_analysis_packet() // The idea is that analsys_it will only be == end() if the queue is empty // probvlem here is that we don't want to analyse a packet twice. Maybe we can flag the packet analysed bool zm_packetqueue::increment_analysis_it( ) { // We do this instead of distance becuase distance will traverse the entire list in the worst case if ( analysis_it != pktQueue.end() ) { ++analysis_it; Debug(1, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index); } else { Debug(1, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end())); } return true; std::list::iterator next_it = analysis_it; ++ next_it; if ( next_it == pktQueue.end() ) { return false; } analysis_it = next_it; return true; } // end bool zm_packetqueue::increment_analysis_it( ) void zm_packetqueue::dumpQueue() { std::list::reverse_iterator it; for ( it = pktQueue.rbegin(); it != pktQueue.rend(); ++ it ) { ZMPacket *zm_packet = *it; AVPacket *av_packet = &(zm_packet->packet); dumpPacket(av_packet); } }