Instead of pointing analsysis_it to begin, point it to the end, which is the newly pushed packet. Add back queue clearing

This commit is contained in:
Isaac Connor 2020-12-15 16:01:18 -05:00
parent da18305729
commit a502a86b00
1 changed files with 38 additions and 27 deletions

View File

@ -23,7 +23,11 @@
#include <sys/time.h>
#include "zm_time.h"
zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, int p_audio_stream_id ) {
zm_packetqueue::zm_packetqueue(
int video_image_count,
int p_video_stream_id,
int p_audio_stream_id
) {
deleting = false;
video_stream_id = p_video_stream_id;
max_video_packet_count = video_image_count-1;
@ -40,26 +44,26 @@ zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, in
zm_packetqueue::~zm_packetqueue() {
deleting = true;
Debug(1, "In destructor");
Debug(4, "In destructor");
/* zma might be waiting. Must have exclusive access */
while ( ! mutex.try_lock() ) {
Debug(1, "Waiting for exclusive access");
Debug(4, "Waiting for exclusive access");
condition.notify_all();
}
while ( !pktQueue.empty() ) {
Debug(1, "Fronting packet %d", pktQueue.empty());
Debug(4, "Fronting packet %d", pktQueue.empty());
ZMPacket * packet = pktQueue.front();
Debug(1, "poppng packet %d", packet->image_index);
Debug(4, "poppng packet %d", packet->image_index);
pktQueue.pop_front();
if ( packet->image_index == -1 ) {
Debug(1, "Deletng packet");
Debug(4, "Deletng packet");
delete packet;
}
}
delete[] packet_counts;
Debug(1, "Done in destrcutor");
Debug(4, "Done in destrcutor");
packet_counts = nullptr;
mutex.unlock();
condition.notify_all();
@ -82,36 +86,43 @@ bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) {
packet_counts[zm_packet->packet.stream_index] += 1;
if ( analysis_it == pktQueue.end() ) {
// Analsys_it should only point to end when queue is empty
Debug(2, "pointing analysis_it to begining");
analysis_it = pktQueue.begin();
Debug(4, "pointing analysis_it to back");
analysis_it --;
//analysis_it = pktQueue.back();
}
#if 0
// This code should not be neccessary. Taken care of by the above code that ensure that no packet appears twice
if ( zm_packet->codec_type == AVMEDIA_TYPE_VIDEO ) {
video_packet_count += 1;
if ( video_packet_count >= max_video_packet_count )
clearQueue(max_video_packet_count, video_stream_id);
}
#endif
mutex.unlock();
// We signal on every packet because someday we may analyze sound
Debug(4, "packetqueue queuepacket, unlocked signalling");
condition.notify_all();
// We have added to the queue and signalled so other processes can now work on the new packet.
// Now to clean ups mainting the queue size.
if ( video_packet_count >= max_video_packet_count )
clearQueue(max_video_packet_count, video_stream_id);
return true;
} // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet)
ZMPacket* zm_packetqueue::popPacket( ) {
Debug(4, "pktQueue size %d", pktQueue.size());
if ( pktQueue.empty() ) {
return nullptr;
}
Debug(2, "poPacket Mutex locking");
Debug(4, "poPacket Mutex locking");
mutex.lock();
ZMPacket *packet = pktQueue.front();
if ( *analysis_it == packet ) {
Debug(2, "not popping analysis_it index %d", packet->image_index);
Debug(4, "not popping analysis_it index %d", packet->image_index);
mutex.unlock();
return nullptr;
}
@ -122,7 +133,7 @@ ZMPacket* zm_packetqueue::popPacket( ) {
video_packet_count -= 1;
if ( video_packet_count ) {
// There is another video packet, so it must be the next one
Debug(2, "Incrementing first video packet index was (%d)", first_video_packet_index);
Debug(4, "Incrementing first video packet index was (%d)", first_video_packet_index);
first_video_packet_index += 1;
first_video_packet_index %= max_video_packet_count;
} else {
@ -145,7 +156,7 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
if ( pktQueue.size() <= frames_to_keep ) {
return 0;
}
Debug(1, "Locking in clearQueue");
Debug(4, "Locking in clearQueue");
mutex.lock();
int packets_to_delete = pktQueue.size();
@ -216,7 +227,7 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
}
packet_counts[packet->packet.stream_index] -= 1;
pktQueue.pop_front();
//if ( packet->image_index == -1 )
if ( packet->image_index == -1 )
delete packet;
delete_count += 1;
@ -240,7 +251,7 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
} // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id )
void zm_packetqueue::clearQueue() {
Debug(1, "Clocking in clearQueue");
Debug(4, "Clocking in clearQueue");
mutex.lock();
ZMPacket *packet = nullptr;
int delete_count = 0;
@ -248,7 +259,7 @@ void zm_packetqueue::clearQueue() {
packet = pktQueue.front();
packet_counts[packet->packet.stream_index] -= 1;
pktQueue.pop_front();
//if ( packet->image_index == -1 )
if ( packet->image_index == -1 )
delete packet;
delete_count += 1;
}
@ -265,7 +276,7 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
if ( pktQueue.empty() ) {
return 0;
}
Debug(1, "Locking in clearQueue");
Debug(4, "Locking in clearQueue");
mutex.lock();
struct timeval keep_from;
@ -330,7 +341,8 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
}
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
delete zm_packet;
if ( zm_packet->image_index == -1 )
delete zm_packet;
deleted_frames += 1;
}
zm_packet = nullptr;
@ -355,7 +367,7 @@ int zm_packetqueue::packet_count( int stream_id ) {
// Returns a packet to analyse or NULL
ZMPacket *zm_packetqueue::get_analysis_packet() {
Debug(1, "Locking in get_analysis_packet");
Debug(4, "Locking in get_analysis_packet");
std::unique_lock<std::mutex> lck(mutex);
while ( ((! pktQueue.size()) or ( analysis_it == pktQueue.end() )) and !zm_terminate and !deleting ) {
@ -369,12 +381,11 @@ ZMPacket *zm_packetqueue::get_analysis_packet() {
//Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) );
//Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) );
ZMPacket *p = *analysis_it;
Debug(2, "get_analysis_packet image_index: %d, about to lock packet", p->image_index);
Debug(3, "get_analysis_packet image_index: %d, about to lock packet", p->image_index);
while ( !p->trylock() and !zm_terminate ) {
Debug(2,"waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) );
Debug(3,"waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) );
condition.wait(lck);
if ( deleting ) {
Debug(1, "deleting");
// packetqueue is being deleted, do not assume we have a lock on the packet
return nullptr;
}
@ -390,12 +401,12 @@ bool zm_packetqueue::increment_analysis_it( ) {
if ( analysis_it != pktQueue.end() ) {
++analysis_it;
if ( (analysis_it == pktQueue.end()) ) {
Debug(1, "Incrementing analysis it %d", (analysis_it == pktQueue.end()) );
Debug(3, "Incrementing analysis it %d", (analysis_it == pktQueue.end()) );
} else {
Debug(1, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index);
Debug(3, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index);
}
} else {
Debug(1, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end()));
Debug(3, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end()));
}
return true;