Add early return if pktQueue is empty so we can assume that it isn't below. Add notifications if we wake up and find that we have terminated so as to wake up any other waiters. Fixes failure to terminate when deinterlacing because both decoder and analysis are waiting

This commit is contained in:
Isaac Connor 2021-05-07 14:04:51 -04:00
parent 9c6d3989d3
commit 4685c63fab
1 changed files with 20 additions and 5 deletions

View File

@ -193,10 +193,11 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
return; return;
} }
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
if (!pktQueue.size()) return;
// If analysis_it isn't at the end, we need to keep that many additional packets // If analysis_it isn't at the end, we need to keep that many additional packets
int tail_count = 0; int tail_count = 0;
if (pktQueue.size() and (pktQueue.back() != add_packet)) { if (pktQueue.back() != add_packet) {
packetqueue_iterator it = pktQueue.end(); packetqueue_iterator it = pktQueue.end();
--it; --it;
while (*it != add_packet) { while (*it != add_packet) {
@ -205,7 +206,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
--it; --it;
} }
} }
Debug(1, "Tail count is %d", tail_count); Debug(1, "Tail count is %d, queue size is %lu", tail_count, pktQueue.size());
if (!keep_keyframes) { if (!keep_keyframes) {
// If not doing passthrough, we don't care about starting with a keyframe so logic is simpler // If not doing passthrough, we don't care about starting with a keyframe so logic is simpler
@ -241,11 +242,17 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
// First packet is special because we know it is a video keyframe and only need to check for lock // First packet is special because we know it is a video keyframe and only need to check for lock
ZMPacket *zm_packet = *it; ZMPacket *zm_packet = *it;
Debug(1, "trying lock on first packet");
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet); ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (lp->trylock()) { if (lp->trylock()) {
Debug(1, "Have lock on first packet");
++it; ++it;
delete lp; delete lp;
if (it == pktQueue.end()) {
Debug(1, "Hit end already");
it = pktQueue.begin();
} else {
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
while (*it != add_packet) { while (*it != add_packet) {
zm_packet = *it; zm_packet = *it;
@ -275,6 +282,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
} }
it++; it++;
} // end while } // end while
}
} // end if first packet not locked } // end if first packet not locked
Debug(1, "Resulting pointing at latest packet? %d, next front points to begin? %d", Debug(1, "Resulting pointing at latest packet? %d, next front points to begin? %d",
( *it == add_packet ), ( *it == add_packet ),
@ -419,7 +427,7 @@ unsigned int PacketQueue::clear(unsigned int frames_to_keep, int stream_id) {
void PacketQueue::clear() { void PacketQueue::clear() {
deleting = true; deleting = true;
condition.notify_all(); condition.notify_all();
Debug(1, "Clearing packetqueue");
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
while (!pktQueue.empty()) { while (!pktQueue.empty()) {
@ -560,13 +568,19 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
ZMLockedPacket *lp = nullptr; ZMLockedPacket *lp = nullptr;
while (!lp) { while (!lp) {
while (*it == pktQueue.end()) { while (*it == pktQueue.end()) {
if (deleting or zm_terminate) if (deleting or zm_terminate) {
Debug(1, "terminated, leaving");
condition.notify_all();
return nullptr; return nullptr;
}
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck); condition.wait(lck);
} }
if (deleting or zm_terminate) if (deleting or zm_terminate) {
Debug(1, "terminated, leaving");
condition.notify_all();
return nullptr; return nullptr;
}
ZMPacket *p = *(*it); ZMPacket *p = *(*it);
if (!p) { if (!p) {
@ -586,6 +600,7 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
} }
delete lp; delete lp;
lp = nullptr; lp = nullptr;
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck); condition.wait(lck);
} // end while !lp } // end while !lp
return nullptr; return nullptr;