remove analysis_it and correctly manage external iterators. Fix testing for ability to remove packets from packetqueue. Fix some cases where the order of testing zm_terminate and packet locking is important

This commit is contained in:
Isaac Connor 2021-01-07 09:43:53 -05:00
parent 3b8266a6f0
commit abc402878b
2 changed files with 255 additions and 192 deletions

View File

@ -17,6 +17,8 @@
//along with ZoneMinder. If not, see <http://www.gnu.org/licenses/>. //along with ZoneMinder. If not, see <http://www.gnu.org/licenses/>.
// PacketQueue must know about all iterators and manage them
#include "zm_packetqueue.h" #include "zm_packetqueue.h"
#include "zm_ffmpeg.h" #include "zm_ffmpeg.h"
#include "zm_signal.h" #include "zm_signal.h"
@ -32,7 +34,6 @@ zm_packetqueue::zm_packetqueue(
max_video_packet_count(video_image_count), max_video_packet_count(video_image_count),
deleting(false) deleting(false)
{ {
analysis_it = pktQueue.begin();
max_stream_id = p_video_stream_id > p_audio_stream_id ? p_video_stream_id : p_audio_stream_id; max_stream_id = p_video_stream_id > p_audio_stream_id ? p_video_stream_id : p_audio_stream_id;
packet_counts = new int[max_stream_id+1]; packet_counts = new int[max_stream_id+1];
@ -43,64 +44,131 @@ zm_packetqueue::zm_packetqueue(
zm_packetqueue::~zm_packetqueue() { zm_packetqueue::~zm_packetqueue() {
deleting = true; deleting = true;
/* zma might be waiting. Must have exclusive access */ /* zma might be waiting. Must have exclusive access */
while ( ! mutex.try_lock() ) { while ( !mutex.try_lock() ) {
Debug(4, "Waiting for exclusive access"); Debug(4, "Waiting for exclusive access");
condition.notify_all(); condition.notify_all();
} }
while ( !pktQueue.empty() ) { while ( !pktQueue.empty() ) {
ZMPacket * packet = pktQueue.front(); ZMPacket *packet = pktQueue.front();
pktQueue.pop_front(); pktQueue.pop_front();
if ( packet->image_index == -1 ) { delete packet;
delete packet;
}
} }
delete[] packet_counts; delete[] packet_counts;
Debug(4, "Done in destrcutor"); Debug(4, "Done in destructor");
packet_counts = nullptr; packet_counts = nullptr;
mutex.unlock(); mutex.unlock();
condition.notify_all(); condition.notify_all();
} }
/* Enqueues the given packet. Will maintain the analysis_it pointer and image packet counts. /* Enqueues the given packet. Will maintain the it pointer and image packet counts.
* If we have reached our max image packet count, it will pop off as many packets as are needed. * If we have reached our max image packet count, it will pop off as many packets as are needed.
* Thus it will ensure that the same packet never gets queued twice. * Thus it will ensure that the same packet never gets queued twice.
*/ */
bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) { bool zm_packetqueue::queuePacket(ZMPacket* add_packet) {
Debug(4, "packetqueue queuepacket"); Debug(4, "packetqueue queuepacket");
mutex.lock(); mutex.lock();
pktQueue.push_back(zm_packet); pktQueue.push_back(add_packet);
packet_counts[zm_packet->packet.stream_index] += 1; packet_counts[add_packet->packet.stream_index] += 1;
Debug(1, "packet counts for %d is %d", Debug(1, "packet counts for %d is %d",
zm_packet->packet.stream_index, packet_counts[zm_packet->packet.stream_index]); add_packet->packet.stream_index,
if ( analysis_it == pktQueue.end() ) { packet_counts[add_packet->packet.stream_index]);
// Analsys_it should only point to end when queue is empty
Debug(4, "pointing analysis_it to back"); for (
analysis_it --; std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
} iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *iterator_it == pktQueue.end() ) {
Debug(4, "pointing it to back");
--(*iterator_it);
}
} // end foreach iterator
// Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one.
// No good. Have to satisfy two conditions:
// 1. packetqueue starts with a video keyframe
// 2. Have minimum # of video packets
// 3. No packets can be locked
// 4. No iterator can point to one of the packets
//
// So start at the beginning, counting video packets until the next keyframe.
// Then if deleting those packets doesn't break 1 and 2, then go ahead and delete them.
if ( add_packet->packet.stream_index == video_stream_id
and
add_packet->keyframe
and
(packet_counts[video_stream_id] > max_video_packet_count)
) {
packetqueue_iterator it = pktQueue.begin();
int video_stream_packets = 0;
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
do {
it++;
ZMPacket *zm_packet = *it;
Debug(1, "Checking packet to see if we can delete them");
if ( zm_packet->packet.stream_index == video_stream_id ) {
if ( zm_packet->keyframe )
break;
video_stream_packets ++;
}
if ( !zm_packet->trylock() ) {
video_stream_packets = max_video_packet_count;
break;
}
zm_packet->unlock();
for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "bumping it. Threads not keeping up");
video_stream_packets = max_video_packet_count;
}
} // end foreach iterator
} while ( video_stream_packets < max_video_packet_count );
if (
max_video_packet_count - video_stream_packets > max_video_packet_count
and
( *it != add_packet )
) {
Debug(1, "Deleting packets");
// Now to clean ups mainting the queue size.
while ( packet_counts[video_stream_id] > max_video_packet_count ) {
ZMPacket *zm_packet = *pktQueue.begin();
if ( !zm_packet ) {
Error("NULL zm_packet in queue");
continue;
}
Debug(1, "Deleting a packet with stream index (%d) image_index %d with keyframe(%d), video_frames_to_keep is (%d) max: %d, queuesuze:%d",
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count,pktQueue.size());
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
zm_packet->unlock();
delete zm_packet;
}
} // end if have at least max_video_packet_count video packets remaining
} // end if this is a video keyframe
mutex.unlock(); mutex.unlock();
// We signal on every packet because someday we may analyze sound // We signal on every packet because someday we may analyze sound
Debug(4, "packetqueue queuepacket, unlocked signalling"); Debug(4, "packetqueue queuepacket, unlocked signalling");
condition.notify_all(); condition.notify_all();
// We have added to the queue and signalled so other processes can now work on the new packet.
// Now to clean ups mainting the queue size.
while ( packet_counts[video_stream_id] > max_video_packet_count ) {
//clearQueue(max_video_packet_count, video_stream_id);
//clearQueue is rather heavy. Since this is the only packet injection spot, we can just start at the beginning of the queue and remove packets until we get to the next video keyframe
Debug(1, "Deleting a packet with stream index (%d) with keyframe(%d), video_frames_to_keep is (%d) max: %d",
zm_packet->packet.stream_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count);
ZMPacket *zm_packet = *pktQueue.begin();
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
if ( zm_packet->image_index == -1 )
delete zm_packet;
}
return true; return true;
} // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) } // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet)
@ -112,20 +180,28 @@ ZMPacket* zm_packetqueue::popPacket( ) {
Debug(4, "poPacket Mutex locking"); Debug(4, "poPacket Mutex locking");
mutex.lock(); mutex.lock();
ZMPacket *packet = pktQueue.front(); ZMPacket *zm_packet = pktQueue.front();
if ( *analysis_it == packet ) { for (
Debug(4, "not popping analysis_it index %d", packet->image_index); std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
mutex.unlock(); iterators_it != iterators.end();
return nullptr; ++iterators_it
} ) {
packet->lock(); packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "Bumping it because it is at the front that we are deleting");
++(*iterators_it);
}
} // end foreach iterator
zm_packet->lock();
pktQueue.pop_front(); pktQueue.pop_front();
packet_counts[packet->packet.stream_index] -= 1; packet_counts[zm_packet->packet.stream_index] -= 1;
mutex.unlock(); mutex.unlock();
return packet; return zm_packet;
} // popPacket } // popPacket
@ -147,10 +223,10 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
if ( pktQueue.size() <= frames_to_keep ) { if ( pktQueue.size() <= frames_to_keep ) {
return 0; return 0;
} }
Debug(4, "Locking in clearQueue"); Debug(5, "Locking in clearQueue");
mutex.lock(); mutex.lock();
std::list<ZMPacket *>::iterator it = pktQueue.end()--; // point to last element instead of end packetqueue_iterator it = pktQueue.end()--; // point to last element instead of end
ZMPacket *zm_packet = nullptr; ZMPacket *zm_packet = nullptr;
while ( (it != pktQueue.begin()) and frames_to_keep ) { while ( (it != pktQueue.begin()) and frames_to_keep ) {
@ -184,13 +260,21 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d", Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d",
delete_count, pktQueue.size()); delete_count, pktQueue.size());
zm_packet = pktQueue.front(); zm_packet = pktQueue.front();
if ( *analysis_it == zm_packet ) { for (
Debug(4, "Bumping analysis it because it is at the front that we are deleting"); std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
++analysis_it; iterators_it != iterators.end();
} ++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "Bumping it because it is at the front that we are deleting");
++(*iterators_it);
}
} // end foreach iterator
packet_counts[zm_packet->packet.stream_index] --; packet_counts[zm_packet->packet.stream_index] --;
pktQueue.pop_front(); pktQueue.pop_front();
if ( zm_packet->image_index == -1 ) //if ( zm_packet->image_index == -1 )
delete zm_packet; delete zm_packet;
delete_count += 1; delete_count += 1;
@ -200,100 +284,9 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
mutex.unlock(); mutex.unlock();
return delete_count; return delete_count;
# if 0
// I forget why +1
frames_to_keep += 1;
int packets_to_delete = pktQueue.size();
std::list<ZMPacket *>::reverse_iterator it;
ZMPacket *zm_packet = nullptr;
for ( it = pktQueue.rbegin(); frames_to_keep && (it != pktQueue.rend()); ++it ) {
zm_packet = *it;
AVPacket *av_packet = &(zm_packet->packet);
Debug(3, "Looking at packet with stream index (%d) with keyframe(%d), Image_index(%d) frames_to_keep is (%d)",
av_packet->stream_index, zm_packet->keyframe, zm_packet->image_index, frames_to_keep );
// Want frames_to_keep video keyframes. Otherwise, we may not have enough
if ( av_packet->stream_index == stream_id ) {
frames_to_keep --;
packets_to_delete --;
}
}
// Make sure we start on a keyframe
for ( ; it != pktQueue.rend(); ++it ) {
zm_packet = *it;
AVPacket *av_packet = &(zm_packet->packet);
Debug(3, "Looking for keyframe at packet with stream index (%d) with keyframe (%d), image_index(%d) frames_to_keep is (%d)",
av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), zm_packet->image_index, frames_to_keep );
// Want frames_to_keep video keyframes. Otherwise, we may not have enough
if ( (av_packet->stream_index == stream_id) and (av_packet->flags & AV_PKT_FLAG_KEY) ) {
Debug(3, "Found keyframe at packet with stream index (%d) with keyframe (%d), frames_to_keep is (%d)",
av_packet->stream_index, ( av_packet->flags & AV_PKT_FLAG_KEY ), frames_to_keep);
break;
}
packets_to_delete--;
}
if ( frames_to_keep ) {
Debug(3, "Hit end of queue, still need (%d) video frames", frames_to_keep);
}
if ( it != pktQueue.rend() ) {
// We want to keep this packet, so advance to the next
++it;
packets_to_delete--;
}
int delete_count = 0;
if ( packets_to_delete > 0 ) {
Debug(4, "Deleting packets from the front, count is (%d)", packets_to_delete);
while ( --packets_to_delete ) {
Debug(4, "Deleting a packet from the front, count is (%d), queue size is %d",
delete_count, pktQueue.size());
zm_packet = pktQueue.front();
if ( *analysis_it == zm_packet ) {
Debug(4, "Bumping analysis it because it is at the front that we are deleting");
++analysis_it;
}
if ( zm_packet->codec_type == AVMEDIA_TYPE_VIDEO ) {
video_packet_count -= 1;
if ( video_packet_count ) {
// There is another video packet, so it must be the next one
first_video_packet_index += 1;
first_video_packet_index %= max_video_packet_count;
} else {
// Re-init
first_video_packet_index = -1;
}
}
packet_counts[zm_packet->packet.stream_index] -= 1;
pktQueue.pop_front();
if ( zm_packet->image_index == -1 )
delete zm_packet;
delete_count += 1;
} // while our iterator is not the first packet
} // end if have packet_delete_count
zm_packet = nullptr; // tidy up for valgrind
Debug(3, "Deleted %d packets, %d remaining", delete_count, pktQueue.size());
#if 0
if ( pktQueue.size() ) {
packet = pktQueue.front();
first_video_packet_index = packet->image_index;
} else {
first_video_packet_index = -1;
}
#endif
Debug(3, "Deleted packets, resulting size is %d", pktQueue.size()); Debug(3, "Deleted packets, resulting size is %d", pktQueue.size());
mutex.unlock(); mutex.unlock();
return delete_count; return delete_count;
# endif
} // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id ) } // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id )
void zm_packetqueue::clearQueue() { void zm_packetqueue::clearQueue() {
@ -305,12 +298,19 @@ void zm_packetqueue::clearQueue() {
packet = pktQueue.front(); packet = pktQueue.front();
packet_counts[packet->packet.stream_index] -= 1; packet_counts[packet->packet.stream_index] -= 1;
pktQueue.pop_front(); pktQueue.pop_front();
if ( packet->image_index == -1 ) //if ( packet->image_index == -1 )
delete packet; delete packet;
delete_count += 1; delete_count += 1;
} }
Debug(3, "Deleted (%d) packets", delete_count ); Debug(3, "Deleted (%d) packets", delete_count );
analysis_it = pktQueue.begin(); for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
*iterator_it = pktQueue.begin();
} // end foreach iterator
mutex.unlock(); mutex.unlock();
} }
@ -380,12 +380,21 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
ZMPacket *zm_packet = nullptr; ZMPacket *zm_packet = nullptr;
while ( distance(it, pktQueue.rend()) > 1 ) { while ( distance(it, pktQueue.rend()) > 1 ) {
zm_packet = pktQueue.front(); zm_packet = pktQueue.front();
if ( *analysis_it == zm_packet ) { for (
++analysis_it; std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
} iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "Bumping it because it is at the front that we are deleting");
++(*iterators_it);
}
} // end foreach iterator
pktQueue.pop_front(); pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1; packet_counts[zm_packet->packet.stream_index] -= 1;
if ( zm_packet->image_index == -1 ) //if ( zm_packet->image_index == -1 )
delete zm_packet; delete zm_packet;
deleted_frames += 1; deleted_frames += 1;
} }
@ -400,66 +409,71 @@ unsigned int zm_packetqueue::size() {
return pktQueue.size(); return pktQueue.size();
} }
int zm_packetqueue::packet_count( int stream_id ) { int zm_packetqueue::packet_count(int stream_id) {
return packet_counts[stream_id]; return packet_counts[stream_id];
} // end int zm_packetqueue::packet_count( int stream_id ) } // end int zm_packetqueue::packet_count(int stream_id)
// Returns a packet to analyse or NULL // Returns a packet. Packet will be locked
ZMPacket *zm_packetqueue::get_analysis_packet() { ZMPacket *zm_packetqueue::get_packet(packetqueue_iterator *it) {
Debug(4, "Locking in get_analysis_packet"); Debug(4, "Locking in get_packet");
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet");
while ( ((! pktQueue.size()) or ( analysis_it == pktQueue.end() )) and !zm_terminate and !deleting ) { while ( (! pktQueue.size()) or ( *it == pktQueue.end() ) ) {
Debug(2, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); if ( deleting or zm_terminate )
return nullptr;
Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) );
condition.wait(lck); condition.wait(lck);
} }
if ( deleting ) { if ( deleting or zm_terminate )
return nullptr;
ZMPacket *p = *(*it);
if ( !p ) {
Error("Null p?!");
return nullptr; return nullptr;
} }
Debug(3, "get_packet image_index: %d, about to lock packet", p->image_index);
//Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) ); while ( !(zm_terminate or deleting) and !p->trylock() ) {
//Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) ); Debug(3, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) );
ZMPacket *p = *analysis_it;
Debug(3, "get_analysis_packet image_index: %d, about to lock packet", p->image_index);
while ( !p->trylock() and !zm_terminate ) {
Debug(3, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) );
condition.wait(lck); condition.wait(lck);
if ( deleting ) {
// packetqueue is being deleted, do not assume we have a lock on the packet
return nullptr;
}
} }
Debug(2, "Locked packet, unlocking packetqueue mutex"); Debug(2, "Locked packet, unlocking packetqueue mutex");
return p; return p;
} // end ZMPacket *zm_packetqueue::get_analysis_packet() } // end ZMPacket *zm_packetqueue::get_packet(it)
// The idea is that analsys_it will only be == end() if the queue is empty bool zm_packetqueue::increment_it(packetqueue_iterator *it) {
// probvlem here is that we don't want to analyse a packet twice. Maybe we can flag the packet analysed Debug(2, "Incrementing %p, queue size %d, end? ", it, pktQueue.size(), (*it == pktQueue.end()));
bool zm_packetqueue::increment_analysis_it( ) { if ( *it == pktQueue.end() ) {
// We do this instead of distance becuase distance will traverse the entire list in the worst case
if ( analysis_it != pktQueue.end() ) {
++analysis_it;
if ( (analysis_it == pktQueue.end()) ) {
Debug(3, "Incrementing analysis it %d", (analysis_it == pktQueue.end()) );
} else {
Debug(3, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index);
}
} else {
Debug(3, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end()));
}
return true;
std::list<ZMPacket *>::iterator next_it = analysis_it;
++ next_it;
if ( next_it == pktQueue.end() ) {
return false; return false;
} }
analysis_it = next_it; ++(*it);
return true; if ( *it != pktQueue.end() ) {
} // end bool zm_packetqueue::increment_analysis_it( ) Debug(2, "Incrementing %p, still not at end, so returning true", it);
return true;
}
return false;
} // end bool zm_packetqueue::increment_it(packetqueue_iterator *it)
// Increment it only considering packets for a given stream
bool zm_packetqueue::increment_it(packetqueue_iterator *it, int stream_id) {
Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), (*it == pktQueue.end()));
if ( *it == pktQueue.end() ) {
return false;
}
do {
++(*it);
} while ( (*it != pktQueue.end()) and ( (*(*it))->packet.stream_index != stream_id) );
if ( *it != pktQueue.end() ) {
Debug(2, "Incrementing %p, still not at end, so incrementing", it);
return true;
}
return false;
} // end bool zm_packetqueue::increment_it(packetqueue_iterator *it)
std::list<ZMPacket *>::iterator zm_packetqueue::get_event_start_packet_it( std::list<ZMPacket *>::iterator zm_packetqueue::get_event_start_packet_it(
std::list<ZMPacket *>::iterator snapshot_it, std::list<ZMPacket *>::iterator snapshot_it,
@ -485,8 +499,10 @@ std::list<ZMPacket *>::iterator zm_packetqueue::get_event_start_packet_it(
Debug(1, "Hit begin"); Debug(1, "Hit begin");
// hit end, the first packet in the queue should ALWAYS be a video keyframe. // hit end, the first packet in the queue should ALWAYS be a video keyframe.
// So we should be able to return it. // So we should be able to return it.
if ( pre_event_count ) if ( pre_event_count ) {
Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count); Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
dumpPacket( &((*it)->packet ) );
}
return it; return it;
} }
Debug(1, "Checking for keyframe %p", *it); Debug(1, "Checking for keyframe %p", *it);
@ -548,3 +564,43 @@ void zm_packetqueue::dumpQueue() {
dumpPacket(av_packet); dumpPacket(av_packet);
} }
} }
/* Returns an iterator to the first video keyframe in the queue.
* nullptr if no keyframe video packet exists.
*/
packetqueue_iterator * zm_packetqueue::get_video_it(bool wait) {
packetqueue_iterator *it = new packetqueue_iterator;
iterators.push_back(it);
std::unique_lock<std::mutex> lck(mutex);
*it = pktQueue.begin();
if ( wait ) {
while ( ((! pktQueue.size()) or (*it == pktQueue.end())) and !zm_terminate and !deleting ) {
Debug(2, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) );
condition.wait(lck);
*it = pktQueue.begin();
}
if ( deleting or zm_terminate ) {
delete it;
return nullptr;
}
}
while ( *it != pktQueue.end() ) {
ZMPacket *zm_packet = *(*it);
if ( !zm_packet ) {
Error("Null zmpacket in queue!?");
return nullptr;
}
Debug(1, "Packet keyframe %d for stream %d, so returning the it to it",
zm_packet->keyframe, zm_packet->packet.stream_index);
if ( zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id ) ) {
Debug(1, "Found a keyframe for stream %d, so returning the it to it", video_stream_id);
return it;
}
++(*it);
}
Debug(1, "DIdn't Found a keyframe for stream %d, so returning the it to it", video_stream_id);
return it;
}

View File

@ -32,6 +32,8 @@
extern "C" { extern "C" {
#include <libavformat/avformat.h> #include <libavformat/avformat.h>
} }
typedef std::list<ZMPacket *>::iterator packetqueue_iterator;
class zm_packetqueue { class zm_packetqueue {
public: // For now just to ease development public: // For now just to ease development
std::list<ZMPacket *> pktQueue; std::list<ZMPacket *> pktQueue;
@ -42,6 +44,7 @@ class zm_packetqueue {
int max_stream_id; int max_stream_id;
int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */ int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */
bool deleting; bool deleting;
std::list<packetqueue_iterator *> iterators;
std::mutex mutex; std::mutex mutex;
std::condition_variable condition; std::condition_variable condition;
@ -49,6 +52,8 @@ class zm_packetqueue {
public: public:
zm_packetqueue(int p_max_video_packet_count, int p_video_stream_id, int p_audio_stream_id); zm_packetqueue(int p_max_video_packet_count, int p_video_stream_id, int p_audio_stream_id);
virtual ~zm_packetqueue(); virtual ~zm_packetqueue();
std::list<ZMPacket *>::const_iterator end() const { return pktQueue.end(); }
std::list<ZMPacket *>::const_iterator begin() const { return pktQueue.begin(); }
bool queuePacket(ZMPacket* packet); bool queuePacket(ZMPacket* packet);
ZMPacket * popPacket(); ZMPacket * popPacket();
@ -64,12 +69,14 @@ class zm_packetqueue {
void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId); void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId);
int packet_count(int stream_id); int packet_count(int stream_id);
// Functions to manage the analysis frame logic bool increment_it(packetqueue_iterator *it);
bool increment_analysis_it(); bool increment_it(packetqueue_iterator *it, int stream_id);
ZMPacket *get_analysis_packet(); ZMPacket *get_packet(packetqueue_iterator *);
std::list<ZMPacket *>::iterator get_analysis_it() const { return analysis_it; } packetqueue_iterator *get_video_it(bool wait);
packetqueue_iterator *get_stream_it(int stream_id);
std::list<ZMPacket *>::iterator get_event_start_packet_it( std::list<ZMPacket *>::iterator get_event_start_packet_it(
std::list<ZMPacket *>::iterator snapshot_it, packetqueue_iterator snapshot_it,
unsigned int pre_event_count unsigned int pre_event_count
); );
}; };