Introduce get_packet_and_increase_it just so we can lose an extra grab lock and function calls, etc. In queuePacket, if the queue is full, WAIT instead of failing to queue.

This commit is contained in:
Isaac Connor 2021-06-03 18:21:46 -04:00
parent 1b67074d35
commit fc6202d349
2 changed files with 58 additions and 6 deletions

View File

@ -131,9 +131,13 @@ bool PacketQueue::queuePacket(std::shared_ptr<ZMPacket> add_packet) {
} // end while } // end while
} }
} // end if too many video packets } // end if too many video packets
if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) { if (max_video_packet_count > 0) {
Error("Unable to free up older packets. Not queueing this video packet."); while (packet_counts[video_stream_id] > max_video_packet_count) {
return false; Error("Unable to free up older packets. Waiting.");
condition.wait(lck);
if (deleting or zm_terminate)
return false;
}
} }
} // end if this packet is a video packet } // end if this packet is a video packet
@ -373,7 +377,7 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet"); Debug(4, "Have Lock in get_packet");
while (!lp) { while (!lp) {
while (*it == pktQueue.end() and !(deleting or zm_terminate)) { while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) {
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end())); Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck); condition.wait(lck);
} }
@ -406,6 +410,53 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
return lp; return lp;
} // end ZMLockedPacket *PacketQueue::get_packet(it) } // end ZMLockedPacket *PacketQueue::get_packet(it)
// Returns a packet. Packet will be locked
ZMLockedPacket *PacketQueue::get_packet_and_increment_it(packetqueue_iterator *it) {
if (deleting or zm_terminate)
return nullptr;
Debug(4, "Locking in get_packet using it %p queue end? %d",
std::addressof(*it), (*it == pktQueue.end()));
ZMLockedPacket *lp = nullptr;
{ // scope for lock
std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet");
while (!lp) {
while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) {
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
}
if (deleting or zm_terminate) break;
std::shared_ptr<ZMPacket> p = *(*it);
if (!p) {
Error("Null p?!");
return nullptr;
}
Debug(3, "get_packet using it %p locking index %d",
std::addressof(*it), p->image_index);
lp = new ZMLockedPacket(p);
if (lp->trylock()) {
Debug(2, "Locked packet %d, unlocking packetqueue mutex, incrementing it", p->image_index);
++(*it);
return lp;
}
delete lp;
lp = nullptr;
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
} // end while !lp
} // end scope for lock
if (!lp) {
Debug(1, "terminated, leaving");
condition.notify_all();
}
return lp;
} // end ZMLockedPacket *PacketQueue::get_packet_and_increment_it(it)
void PacketQueue::unlock(ZMLockedPacket *lp) { void PacketQueue::unlock(ZMLockedPacket *lp) {
delete lp; delete lp;
condition.notify_all(); condition.notify_all();
@ -413,7 +464,7 @@ void PacketQueue::unlock(ZMLockedPacket *lp) {
bool PacketQueue::increment_it(packetqueue_iterator *it) { bool PacketQueue::increment_it(packetqueue_iterator *it) {
Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end())); Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end()));
if ((*it) == pktQueue.end() or deleting) { if (((*it) == pktQueue.end()) or deleting) {
return false; return false;
} }
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
@ -429,7 +480,7 @@ bool PacketQueue::increment_it(packetqueue_iterator *it) {
// Increment it only considering packets for a given stream // Increment it only considering packets for a given stream
bool PacketQueue::increment_it(packetqueue_iterator *it, int stream_id) { bool PacketQueue::increment_it(packetqueue_iterator *it, int stream_id) {
Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), (*it == pktQueue.end())); Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), (*it == pktQueue.end()));
if ( *it == pktQueue.end() ) { if (*it == pktQueue.end()) {
return false; return false;
} }

View File

@ -70,6 +70,7 @@ class PacketQueue {
bool increment_it(packetqueue_iterator *it); bool increment_it(packetqueue_iterator *it);
bool increment_it(packetqueue_iterator *it, int stream_id); bool increment_it(packetqueue_iterator *it, int stream_id);
ZMLockedPacket *get_packet(packetqueue_iterator *); ZMLockedPacket *get_packet(packetqueue_iterator *);
ZMLockedPacket *get_packet_and_increment_it(packetqueue_iterator *);
packetqueue_iterator *get_video_it(bool wait); packetqueue_iterator *get_video_it(bool wait);
packetqueue_iterator *get_stream_it(int stream_id); packetqueue_iterator *get_stream_it(int stream_id);
void free_it(packetqueue_iterator *); void free_it(packetqueue_iterator *);