Merge new logic from master. We now delete a non-keyframe from head instead of waiting in capture.

This commit is contained in:
Isaac Connor 2021-10-25 17:03:36 -04:00
parent 22f398dd6f
commit 4b5bc09c41
1 changed files with 68 additions and 67 deletions

View File

@ -24,7 +24,6 @@
#include "zm_ffmpeg.h" #include "zm_ffmpeg.h"
#include "zm_packet.h" #include "zm_packet.h"
#include "zm_signal.h" #include "zm_signal.h"
#include <sys/time.h>
PacketQueue::PacketQueue(): PacketQueue::PacketQueue():
video_stream_id(-1), video_stream_id(-1),
@ -87,61 +86,6 @@ bool PacketQueue::queuePacket(std::shared_ptr<ZMPacket> add_packet) {
{ {
std::unique_lock<std::mutex> lck(mutex); std::unique_lock<std::mutex> lck(mutex);
if (add_packet->packet.stream_index == video_stream_id) {
if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) {
Warning("You have set the max video packets in the queue to %u."
" The queue is full. Either Analysis is not keeping up or"
" your camera's keyframe interval is larger than this setting."
" We are dropping packets.", max_video_packet_count);
if (add_packet->keyframe) {
// Have a new keyframe, so delete everything
while ((*pktQueue.begin() != add_packet) and (packet_counts[video_stream_id] > max_video_packet_count)) {
std::shared_ptr <ZMPacket>zm_packet = *pktQueue.begin();
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (!lp->trylock()) {
Debug(1, "Found locked packet when trying to free up video packets. Can't continue");
delete lp;
break;
}
delete lp;
for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(1, "Bumping IT because it is at the front that we are deleting");
++(*iterators_it);
}
} // end foreach iterator
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
Debug(1,
"Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%zu",
zm_packet->packet.stream_index,
zm_packet->image_index,
zm_packet->keyframe,
packet_counts[video_stream_id],
max_video_packet_count,
pktQueue.size());
} // end while
}
} // end if too many video packets
if (max_video_packet_count > 0) {
while (packet_counts[video_stream_id] > max_video_packet_count) {
Error("Unable to free up older packets. Waiting.");
condition.notify_all();
condition.wait(lck);
if (deleting or zm_terminate)
return false;
}
}
} // end if this packet is a video packet
pktQueue.push_back(add_packet); pktQueue.push_back(add_packet);
packet_counts[add_packet->packet.stream_index] += 1; packet_counts[add_packet->packet.stream_index] += 1;
Debug(2, "packet counts for %d is %d", Debug(2, "packet counts for %d is %d",
@ -149,25 +93,79 @@ bool PacketQueue::queuePacket(std::shared_ptr<ZMPacket> add_packet) {
packet_counts[add_packet->packet.stream_index]); packet_counts[add_packet->packet.stream_index]);
for ( for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin(); auto iterators_it = iterators.begin();
iterators_it != iterators.end(); iterators_it != iterators.end();
++iterators_it ++iterators_it
) { ) {
packetqueue_iterator *iterator_it = *iterators_it; packetqueue_iterator *iterator_it = *iterators_it;
if (*iterator_it == pktQueue.end()) { if (*iterator_it == pktQueue.end()) {
Debug(4, "pointing it %p to back", iterator_it);
--(*iterator_it); --(*iterator_it);
} else {
Debug(4, "it %p not at end", iterator_it);
} }
} // end foreach iterator } // end foreach iterator
if (
(add_packet->packet.stream_index == video_stream_id)
and
(max_video_packet_count > 0)
and
(packet_counts[video_stream_id] > max_video_packet_count)
) {
Warning("You have set the max video packets in the queue to %u."
" The queue is full. Either Analysis is not keeping up or"
" your camera's keyframe interval is larger than this setting."
, max_video_packet_count);
for (
auto it = ++pktQueue.begin();
it != pktQueue.end() and *it != add_packet;
) {
std::shared_ptr <ZMPacket>zm_packet = *it;
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (!lp->trylock()) {
Debug(1, "Found locked packet when trying to free up video packets. Skipping to next one");
delete lp;
++it;
continue;
}
for (
auto iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
auto iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ((*iterator_it!=pktQueue.end()) and (*(*iterator_it) == zm_packet)) {
Debug(1, "Bumping IT because it is at the front that we are deleting");
++(*iterator_it);
}
} // end foreach iterator
it = pktQueue.erase(it);
packet_counts[zm_packet->packet.stream_index] -= 1;
Debug(1,
"Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%zu",
zm_packet->packet.stream_index,
zm_packet->image_index,
zm_packet->keyframe,
packet_counts[video_stream_id],
max_video_packet_count,
pktQueue.size());
delete lp;
if (zm_packet->packet.stream_index == video_stream_id)
break;
} // end while
} // end if not able catch up
} // end lock scope } // end lock scope
// We signal on every packet because someday we may analyze sound // We signal on every packet because someday we may analyze sound
Debug(4, "packetqueue queuepacket, unlocked signalling"); Debug(4, "packetqueue queuepacket, unlocked signalling");
condition.notify_all(); condition.notify_all();
return true; return true;
} // end bool PacketQueue::queuePacket(ZMPacket* zm_packet) } // end bool PacketQueue::queuePacket(ZMPacket* zm_packet)
void PacketQueue::clearPackets(const std::shared_ptr<ZMPacket> &add_packet) { void PacketQueue::clearPackets(const std::shared_ptr<ZMPacket> &add_packet) {
// Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one. // Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one.
@ -241,8 +239,8 @@ void PacketQueue::clearPackets(const std::shared_ptr<ZMPacket> &add_packet) {
return; return;
} }
packetqueue_iterator it = pktQueue.begin(); auto it = pktQueue.begin();
packetqueue_iterator next_front = pktQueue.begin(); auto next_front = pktQueue.begin();
// First packet is special because we know it is a video keyframe and only need to check for lock // First packet is special because we know it is a video keyframe and only need to check for lock
std::shared_ptr<ZMPacket> zm_packet = *it; std::shared_ptr<ZMPacket> zm_packet = *it;
@ -250,11 +248,10 @@ void PacketQueue::clearPackets(const std::shared_ptr<ZMPacket> &add_packet) {
return; return;
} }
Debug(1, "trying lock on first packet");
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet); ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (lp->trylock()) { if (lp->trylock()) {
int video_packets_to_delete = 0; // This is a count of how many packets we will delete so we know when to stop looking int video_packets_to_delete = 0; // This is a count of how many packets we will delete so we know when to stop looking
Debug(1, "Have lock on first packet"); Debug(4, "Have lock on first packet");
++it; ++it;
delete lp; delete lp;
@ -269,10 +266,14 @@ void PacketQueue::clearPackets(const std::shared_ptr<ZMPacket> &add_packet) {
} }
delete lp; delete lp;
if (is_there_an_iterator_pointing_to_packet(zm_packet) and (pktQueue.begin() == next_front)) { #if 0
Warning("Found iterator at beginning of queue. Some thread isn't keeping up"); // There are no threads that follow analysis thread. So there cannot be an it pointing here
if (is_there_an_iterator_pointing_to_packet(zm_packet)) {
if (pktQueue.begin() == next_front)
Warning("Found iterator at beginning of queue. Some thread isn't keeping up");
break; break;
} }
#endif
if (zm_packet->packet.stream_index == video_stream_id) { if (zm_packet->packet.stream_index == video_stream_id) {
if (zm_packet->keyframe) { if (zm_packet->keyframe) {