use mutex and condition_variable to manage signalling. Lots of debugging and fixes

This commit is contained in:
Isaac Connor 2020-12-09 15:01:24 -05:00
parent 02dd1d4cc5
commit f54b8fff44
2 changed files with 81 additions and 34 deletions

View File

@ -24,6 +24,7 @@
#include "zm_time.h" #include "zm_time.h"
zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, int p_audio_stream_id ) { zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, int p_audio_stream_id ) {
deleting = false;
video_stream_id = p_video_stream_id; video_stream_id = p_video_stream_id;
max_video_packet_count = video_image_count-1; max_video_packet_count = video_image_count-1;
video_packet_count = 0; video_packet_count = 0;
@ -35,15 +36,26 @@ zm_packetqueue::zm_packetqueue( int video_image_count, int p_video_stream_id, in
packet_counts = new int[max_stream_id+1]; packet_counts = new int[max_stream_id+1];
for ( int i=0; i <= max_stream_id; ++i ) for ( int i=0; i <= max_stream_id; ++i )
packet_counts[i] = 0; packet_counts[i] = 0;
condition = new Condition(mutex);
} }
zm_packetqueue::~zm_packetqueue() { zm_packetqueue::~zm_packetqueue() {
clearQueue(); deleting = true;
Debug(1, "In destructor");
/* zma might be waiting. Must have exclusive access */
while ( ! mutex.try_lock() ) {
Debug(1, "Waiting for exclusive access");
condition.notify_all();
}
while ( !pktQueue.empty() ) {
ZMPacket * packet = pktQueue.front();
pktQueue.pop_front();
delete packet;
}
delete[] packet_counts; delete[] packet_counts;
packet_counts = nullptr; packet_counts = nullptr;
delete condition; mutex.unlock();
condition = nullptr;
} }
/* Enqueues the given packet. Will maintain the analysis_it pointer and image packet counts. /* Enqueues the given packet. Will maintain the analysis_it pointer and image packet counts.
@ -54,6 +66,7 @@ zm_packetqueue::~zm_packetqueue() {
bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) { bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) {
Debug(4, "packetqueue queuepacket, first_video_packet_index is %d", first_video_packet_index); Debug(4, "packetqueue queuepacket, first_video_packet_index is %d", first_video_packet_index);
mutex.lock(); mutex.lock();
Debug(4, "packetqueue queuepacket, have lock first_video_packet_index is %d", first_video_packet_index);
if ( zm_packet->packet.stream_index == video_stream_id ) { if ( zm_packet->packet.stream_index == video_stream_id ) {
video_packet_count += 1; video_packet_count += 1;
@ -75,12 +88,9 @@ bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) {
} }
#endif #endif
// We signal on every packet because someday we may analyze sound
Debug(2,"Signalling");
condition->signal();
Debug(2," after Signalling");
mutex.unlock(); mutex.unlock();
Debug(2," after unlock"); // We signal on every packet because someday we may analyze sound
condition.notify_all();
return true; return true;
} // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet) } // end bool zm_packetqueue::queuePacket(ZMPacket* zm_packet)
@ -91,11 +101,14 @@ ZMPacket* zm_packetqueue::popPacket( ) {
} }
Debug(2, "poPacket Mutex locking"); Debug(2, "poPacket Mutex locking");
mutex.lock(); mutex.lock();
Debug(2, "Have Mutex lock");
ZMPacket *packet = pktQueue.front(); ZMPacket *packet = pktQueue.front();
if ( *analysis_it == packet ) if ( *analysis_it == packet ) {
++analysis_it; Debug(2, "not popping analysis_it index %d", packet->image_index);
mutex.unlock();
return nullptr;
}
packet->lock();
pktQueue.pop_front(); pktQueue.pop_front();
if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) { if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) {
@ -112,12 +125,10 @@ ZMPacket* zm_packetqueue::popPacket( ) {
packet_counts[packet->packet.stream_index] -= 1; packet_counts[packet->packet.stream_index] -= 1;
mutex.unlock(); mutex.unlock();
// Should we lock the packet?
return packet; return packet;
} // popPacket } // popPacket
unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_id) { unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_id) {
Debug(3, "Clearing all but %d frames, queue has %d", frames_to_keep, pktQueue.size()); Debug(3, "Clearing all but %d frames, queue has %d", frames_to_keep, pktQueue.size());
if ( pktQueue.empty() ) { if ( pktQueue.empty() ) {
@ -127,6 +138,7 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
if ( pktQueue.size() <= frames_to_keep ) { if ( pktQueue.size() <= frames_to_keep ) {
return 0; return 0;
} }
Debug(1, "Locking in clearQueue");
mutex.lock(); mutex.lock();
int packets_to_delete = pktQueue.size(); int packets_to_delete = pktQueue.size();
@ -180,8 +192,10 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
delete_count, pktQueue.size()); delete_count, pktQueue.size());
packet = pktQueue.front(); packet = pktQueue.front();
if ( *analysis_it == packet ) if ( *analysis_it == packet ) {
Debug(4, "Bumping analysis it because it is at the front that we are deleting");
++analysis_it; ++analysis_it;
}
if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) { if ( packet->codec_type == AVMEDIA_TYPE_VIDEO ) {
video_packet_count -= 1; video_packet_count -= 1;
if ( video_packet_count ) { if ( video_packet_count ) {
@ -219,6 +233,7 @@ unsigned int zm_packetqueue::clearQueue(unsigned int frames_to_keep, int stream_
} // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id ) } // end unsigned int zm_packetqueue::clearQueue( unsigned int frames_to_keep, int stream_id )
void zm_packetqueue::clearQueue() { void zm_packetqueue::clearQueue() {
Debug(1, "Clocking in clearQueue");
mutex.lock(); mutex.lock();
ZMPacket *packet = nullptr; ZMPacket *packet = nullptr;
int delete_count = 0; int delete_count = 0;
@ -243,9 +258,11 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
if ( pktQueue.empty() ) { if ( pktQueue.empty() ) {
return 0; return 0;
} }
Debug(1, "Locking in clearQueue");
mutex.lock();
struct timeval keep_from; struct timeval keep_from;
std::list<ZMPacket *>::reverse_iterator it; std::list<ZMPacket *>::reverse_iterator it = pktQueue.rbegin();
it = pktQueue.rbegin();
struct timeval *t = (*it)->timestamp; struct timeval *t = (*it)->timestamp;
timersub(t, duration, &keep_from); timersub(t, duration, &keep_from);
@ -256,8 +273,11 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
for ( ; it != pktQueue.rend(); ++it) { for ( ; it != pktQueue.rend(); ++it) {
ZMPacket *zm_packet = *it; ZMPacket *zm_packet = *it;
AVPacket *av_packet = &(zm_packet->packet); AVPacket *av_packet = &(zm_packet->packet);
if (av_packet->stream_index == streamId if (
&& timercmp( zm_packet->timestamp, &keep_from, <= )) { (av_packet->stream_index == streamId)
and
timercmp(zm_packet->timestamp, &keep_from, <=)
) {
Debug(3, "Found frame before keep time with stream index %d at %d.%d", Debug(3, "Found frame before keep time with stream index %d at %d.%d",
av_packet->stream_index, av_packet->stream_index,
zm_packet->timestamp->tv_sec, zm_packet->timestamp->tv_sec,
@ -268,6 +288,7 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
if ( it == pktQueue.rend() ) { if ( it == pktQueue.rend() ) {
Debug(1, "Didn't find a frame before queue preserve time. keeping all"); Debug(1, "Didn't find a frame before queue preserve time. keeping all");
mutex.unlock();
return 0; return 0;
} }
@ -275,8 +296,11 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
for ( ; it != pktQueue.rend(); ++it) { for ( ; it != pktQueue.rend(); ++it) {
ZMPacket *zm_packet = *it; ZMPacket *zm_packet = *it;
AVPacket *av_packet = &(zm_packet->packet); AVPacket *av_packet = &(zm_packet->packet);
if (av_packet->flags & AV_PKT_FLAG_KEY if (
&& av_packet->stream_index == streamId) { (av_packet->flags & AV_PKT_FLAG_KEY)
and
(av_packet->stream_index == streamId)
) {
Debug(3, "Found keyframe before start with stream index %d at %d.%d", Debug(3, "Found keyframe before start with stream index %d at %d.%d",
av_packet->stream_index, av_packet->stream_index,
zm_packet->timestamp->tv_sec, zm_packet->timestamp->tv_sec,
@ -286,6 +310,7 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
} }
if ( it == pktQueue.rend() ) { if ( it == pktQueue.rend() ) {
Debug(1, "Didn't find a keyframe before event starttime. keeping all" ); Debug(1, "Didn't find a keyframe before event starttime. keeping all" );
mutex.unlock();
return 0; return 0;
} }
@ -293,6 +318,9 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
ZMPacket *zm_packet = nullptr; ZMPacket *zm_packet = nullptr;
while ( distance(it, pktQueue.rend()) > 1 ) { while ( distance(it, pktQueue.rend()) > 1 ) {
zm_packet = pktQueue.front(); zm_packet = pktQueue.front();
if ( *analysis_it == zm_packet ) {
++analysis_it;
}
pktQueue.pop_front(); pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1; packet_counts[zm_packet->packet.stream_index] -= 1;
delete zm_packet; delete zm_packet;
@ -300,6 +328,7 @@ unsigned int zm_packetqueue::clearQueue(struct timeval *duration, int streamId)
} }
zm_packet = nullptr; zm_packet = nullptr;
Debug(3, "Deleted %d frames", deleted_frames); Debug(3, "Deleted %d frames", deleted_frames);
mutex.unlock();
return deleted_frames; return deleted_frames;
} }
@ -319,20 +348,27 @@ int zm_packetqueue::packet_count( int stream_id ) {
// Returns a packet to analyse or NULL // Returns a packet to analyse or NULL
ZMPacket *zm_packetqueue::get_analysis_packet() { ZMPacket *zm_packetqueue::get_analysis_packet() {
mutex.lock(); Debug(1, "Locking in get_analysis_packet");
std::unique_lock<std::mutex> lck(mutex);
while ( ((! pktQueue.size()) || ( analysis_it == pktQueue.end() )) && !zm_terminate ) { while ( ((! pktQueue.size()) || ( analysis_it == pktQueue.end() )) && !zm_terminate ) {
Debug(2, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) ); Debug(2, "waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) );
condition->wait(); condition.wait(lck);
} }
//Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) ); //Debug(2, "Distance from head: (%d)", std::distance( pktQueue.begin(), analysis_it ) );
//Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) ); //Debug(2, "Distance from end: (%d)", std::distance( analysis_it, pktQueue.end() ) );
ZMPacket *p = *analysis_it; ZMPacket *p = *analysis_it;
Debug(2, "get_analysis_packet image_index: %d, about to lock packet", p->image_index); Debug(2, "get_analysis_packet image_index: %d, about to lock packet", p->image_index);
p->lock(); while ( !p->trylock() and !zm_terminate ) {
Debug(2,"waiting. Queue size %d analysis_it == end? %d", pktQueue.size(), ( analysis_it == pktQueue.end() ) );
condition.wait(lck);
if ( deleting ) {
// packetqueue is being deleted, do not assume we have a lock on the packet
return nullptr;
}
}
Debug(2, "Locked packet, unlocking packetqueue mutex"); Debug(2, "Locked packet, unlocking packetqueue mutex");
mutex.unlock();
return p; return p;
} // end ZMPacket *zm_packetqueue::get_analysis_packet() } // end ZMPacket *zm_packetqueue::get_analysis_packet()
@ -340,6 +376,14 @@ ZMPacket *zm_packetqueue::get_analysis_packet() {
// probvlem here is that we don't want to analyse a packet twice. Maybe we can flag the packet analysed // probvlem here is that we don't want to analyse a packet twice. Maybe we can flag the packet analysed
bool zm_packetqueue::increment_analysis_it( ) { bool zm_packetqueue::increment_analysis_it( ) {
// We do this instead of distance becuase distance will traverse the entire list in the worst case // We do this instead of distance becuase distance will traverse the entire list in the worst case
if ( analysis_it != pktQueue.end() ) {
++analysis_it;
Debug(1, "Incrementing analysis it %d %d", (analysis_it == pktQueue.end()), (*analysis_it)->image_index);
} else {
Debug(1, "Not Incrementing analysis it %d", (analysis_it == pktQueue.end()));
}
return true;
std::list<ZMPacket *>::iterator next_it = analysis_it; std::list<ZMPacket *>::iterator next_it = analysis_it;
++ next_it; ++ next_it;
if ( next_it == pktQueue.end() ) { if ( next_it == pktQueue.end() ) {

View File

@ -26,6 +26,8 @@
#include <list> #include <list>
#include "zm_packet.h" #include "zm_packet.h"
#include "zm_thread.h" #include "zm_thread.h"
#include <mutex>
#include <condition_variable>
extern "C" { extern "C" {
#include <libavformat/avformat.h> #include <libavformat/avformat.h>
@ -41,9 +43,10 @@ class zm_packetqueue {
int max_video_packet_count; // allow a negative value to someday mean unlimited int max_video_packet_count; // allow a negative value to someday mean unlimited
int max_stream_id; int max_stream_id;
int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */ int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */
bool deleting;
RecursiveMutex mutex; std::mutex mutex;
Condition *condition; std::condition_variable condition;
public: public:
zm_packetqueue(int p_max_video_packet_count, int p_video_stream_id, int p_audio_stream_id); zm_packetqueue(int p_max_video_packet_count, int p_video_stream_id, int p_audio_stream_id);