diff --git a/src/zm_packetqueue.cpp b/src/zm_packetqueue.cpp index 394713267..9662e4958 100644 --- a/src/zm_packetqueue.cpp +++ b/src/zm_packetqueue.cpp @@ -131,9 +131,13 @@ bool PacketQueue::queuePacket(std::shared_ptr add_packet) { } // end while } } // end if too many video packets - if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) { - Error("Unable to free up older packets. Not queueing this video packet."); - return false; + if (max_video_packet_count > 0) { + while (packet_counts[video_stream_id] > max_video_packet_count) { + Error("Unable to free up older packets. Waiting."); + condition.wait(lck); + if (deleting or zm_terminate) + return false; + } } } // end if this packet is a video packet @@ -385,7 +389,7 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) { std::unique_lock lck(mutex); Debug(4, "Have Lock in get_packet"); while (!lp) { - while (*it == pktQueue.end() and !(deleting or zm_terminate)) { + while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) { Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); condition.wait(lck); } @@ -418,6 +422,53 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) { return lp; } // end ZMLockedPacket *PacketQueue::get_packet(it) +// Returns a packet. Packet will be locked +ZMLockedPacket *PacketQueue::get_packet_and_increment_it(packetqueue_iterator *it) { + if (deleting or zm_terminate) + return nullptr; + + Debug(4, "Locking in get_packet using it %p queue end? %d", + std::addressof(*it), (*it == pktQueue.end())); + + ZMLockedPacket *lp = nullptr; + { // scope for lock + std::unique_lock lck(mutex); + Debug(4, "Have Lock in get_packet"); + while (!lp) { + while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) { + Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); + condition.wait(lck); + } + if (deleting or zm_terminate) break; + + std::shared_ptr p = *(*it); + if (!p) { + Error("Null p?!"); + return nullptr; + } + Debug(3, "get_packet using it %p locking index %d", + std::addressof(*it), p->image_index); + + lp = new ZMLockedPacket(p); + if (lp->trylock()) { + Debug(2, "Locked packet %d, unlocking packetqueue mutex, incrementing it", p->image_index); + ++(*it); + return lp; + } + delete lp; + lp = nullptr; + Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); + condition.wait(lck); + } // end while !lp + } // end scope for lock + + if (!lp) { + Debug(1, "terminated, leaving"); + condition.notify_all(); + } + return lp; +} // end ZMLockedPacket *PacketQueue::get_packet_and_increment_it(it) + void PacketQueue::unlock(ZMLockedPacket *lp) { delete lp; condition.notify_all(); @@ -425,7 +476,7 @@ void PacketQueue::unlock(ZMLockedPacket *lp) { bool PacketQueue::increment_it(packetqueue_iterator *it) { Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end())); - if ((*it) == pktQueue.end() or deleting) { + if (((*it) == pktQueue.end()) or deleting) { return false; } std::unique_lock lck(mutex); @@ -441,7 +492,7 @@ bool PacketQueue::increment_it(packetqueue_iterator *it) { // Increment it only considering packets for a given stream bool PacketQueue::increment_it(packetqueue_iterator *it, int stream_id) { Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), (*it == pktQueue.end())); - if ( *it == pktQueue.end() ) { + if (*it == pktQueue.end()) { return false; } diff --git a/src/zm_packetqueue.h b/src/zm_packetqueue.h index a2f57dcfe..7e2f367fc 100644 --- a/src/zm_packetqueue.h +++ b/src/zm_packetqueue.h @@ -70,6 +70,7 @@ class PacketQueue { bool increment_it(packetqueue_iterator *it); bool increment_it(packetqueue_iterator *it, int stream_id); ZMLockedPacket *get_packet(packetqueue_iterator *); + ZMLockedPacket *get_packet_and_increment_it(packetqueue_iterator *); packetqueue_iterator *get_video_it(bool wait); packetqueue_iterator *get_stream_it(int stream_id); void free_it(packetqueue_iterator *);