Introduce get_packet_and_increase_it just so we can lose an extra grab lock and function calls, etc. In queuePacket, if the queue is full, WAIT instead of failing to queue.

This commit is contained in:
Isaac Connor 2021-06-03 18:21:46 -04:00
parent 61de5eaae5
commit a903ab5d09
2 changed files with 58 additions and 6 deletions

View File

@ -131,9 +131,13 @@ bool PacketQueue::queuePacket(std::shared_ptr<ZMPacket> add_packet) {
} // end while
}
} // end if too many video packets
if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) {
Error("Unable to free up older packets. Not queueing this video packet.");
return false;
if (max_video_packet_count > 0) {
while (packet_counts[video_stream_id] > max_video_packet_count) {
Error("Unable to free up older packets. Waiting.");
condition.wait(lck);
if (deleting or zm_terminate)
return false;
}
}
} // end if this packet is a video packet
@ -385,7 +389,7 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet");
while (!lp) {
while (*it == pktQueue.end() and !(deleting or zm_terminate)) {
while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) {
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
}
@ -418,6 +422,53 @@ ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
return lp;
} // end ZMLockedPacket *PacketQueue::get_packet(it)
// Returns a packet. Packet will be locked
ZMLockedPacket *PacketQueue::get_packet_and_increment_it(packetqueue_iterator *it) {
if (deleting or zm_terminate)
return nullptr;
Debug(4, "Locking in get_packet using it %p queue end? %d",
std::addressof(*it), (*it == pktQueue.end()));
ZMLockedPacket *lp = nullptr;
{ // scope for lock
std::unique_lock<std::mutex> lck(mutex);
Debug(4, "Have Lock in get_packet");
while (!lp) {
while ((*it == pktQueue.end()) and !(deleting or zm_terminate)) {
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
}
if (deleting or zm_terminate) break;
std::shared_ptr<ZMPacket> p = *(*it);
if (!p) {
Error("Null p?!");
return nullptr;
}
Debug(3, "get_packet using it %p locking index %d",
std::addressof(*it), p->image_index);
lp = new ZMLockedPacket(p);
if (lp->trylock()) {
Debug(2, "Locked packet %d, unlocking packetqueue mutex, incrementing it", p->image_index);
++(*it);
return lp;
}
delete lp;
lp = nullptr;
Debug(2, "waiting. Queue size %zu it == end? %d", pktQueue.size(), (*it == pktQueue.end()));
condition.wait(lck);
} // end while !lp
} // end scope for lock
if (!lp) {
Debug(1, "terminated, leaving");
condition.notify_all();
}
return lp;
} // end ZMLockedPacket *PacketQueue::get_packet_and_increment_it(it)
void PacketQueue::unlock(ZMLockedPacket *lp) {
delete lp;
condition.notify_all();
@ -425,7 +476,7 @@ void PacketQueue::unlock(ZMLockedPacket *lp) {
bool PacketQueue::increment_it(packetqueue_iterator *it) {
Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end()));
if ((*it) == pktQueue.end() or deleting) {
if (((*it) == pktQueue.end()) or deleting) {
return false;
}
std::unique_lock<std::mutex> lck(mutex);
@ -441,7 +492,7 @@ bool PacketQueue::increment_it(packetqueue_iterator *it) {
// Increment it only considering packets for a given stream
bool PacketQueue::increment_it(packetqueue_iterator *it, int stream_id) {
Debug(2, "Incrementing %p, queue size %zu, end? %d", it, pktQueue.size(), (*it == pktQueue.end()));
if ( *it == pktQueue.end() ) {
if (*it == pktQueue.end()) {
return false;
}

View File

@ -70,6 +70,7 @@ class PacketQueue {
bool increment_it(packetqueue_iterator *it);
bool increment_it(packetqueue_iterator *it, int stream_id);
ZMLockedPacket *get_packet(packetqueue_iterator *);
ZMLockedPacket *get_packet_and_increment_it(packetqueue_iterator *);
packetqueue_iterator *get_video_it(bool wait);
packetqueue_iterator *get_stream_it(int stream_id);
void free_it(packetqueue_iterator *);