improve the logic of clearing packets from queue. make get_event_start_it return a pointer to an it tracked by the packetqueue.

This commit is contained in:
Isaac Connor 2021-02-02 14:22:38 -05:00
parent cb0008fb8c
commit 8de260472d
2 changed files with 74 additions and 102 deletions

View File

@ -38,6 +38,7 @@ PacketQueue::PacketQueue():
* Assumes first stream added will be the video stream * Assumes first stream added will be the video stream
*/ */
void PacketQueue::addStreamId(int p_stream_id) { void PacketQueue::addStreamId(int p_stream_id) {
deleting = false;
if ( video_stream_id == -1 ) if ( video_stream_id == -1 )
video_stream_id = p_stream_id; video_stream_id = p_stream_id;
if ( max_stream_id < p_stream_id ) { if ( max_stream_id < p_stream_id ) {
@ -108,72 +109,46 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) {
*(pktQueue.begin()) != add_packet *(pktQueue.begin()) != add_packet
) { ) {
packetqueue_iterator it = pktQueue.begin(); packetqueue_iterator it = pktQueue.begin();
int video_stream_packets = 0; packetqueue_iterator next_front = pktQueue.begin();
// First packet is special because we know it is a video keyframe and only need to check for lock
ZMPacket *zm_packet = *it;
if ( zm_packet->trylock() ) {
++it;
zm_packet->unlock();
// Checkk for locks on first packet
ZMPacket *zm_packet = *(pktQueue.begin());
if ( !zm_packet->trylock() ) {
Debug(1, "Have locked packet %d", zm_packet->image_index);
video_stream_packets = max_video_packet_count;
}
zm_packet->unlock();
if ( ! video_stream_packets ) {
it++;
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that // Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
while ( *it != add_packet and it != pktQueue.end() ) { while ( *it != add_packet ) {
zm_packet = *it; zm_packet = *it;
Debug(1, "Checking packet to see if we can delete them"); Debug(1, "Checking packet to see if we can delete them");
if ( zm_packet->packet.stream_index == video_stream_id ) {
if ( zm_packet->keyframe ) {
Debug(1, "Have a video keyframe so breaking out");
if ( !zm_packet->trylock() ) {
Debug(1, "Have locked packet %d", zm_packet->image_index);
video_stream_packets = max_video_packet_count;
}
zm_packet->unlock();
break;
}
video_stream_packets ++;
}
if ( !zm_packet->trylock() ) { if ( !zm_packet->trylock() ) {
Debug(1, "Have locked packet %d", zm_packet->image_index); Debug(1, "Have locked packet %d", zm_packet->image_index);
video_stream_packets = max_video_packet_count;
break; break;
} }
zm_packet->unlock(); zm_packet->unlock();
for ( if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) {
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin(); Debug(4, "Found IT at beginning of queue. Threads not keeping up");
iterators_it != iterators.end(); break;
++iterators_it }
) {
packetqueue_iterator *iterator_it = *iterators_it; if ( zm_packet->packet.stream_index == video_stream_id ) {
if ( *iterator_it == pktQueue.end() ) { if ( zm_packet->keyframe ) {
continue; Debug(1, "Have a video keyframe so breaking out");
next_front = it;
} }
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete }
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "Found IT at beginning of queue. Threads not keeping up");
video_stream_packets = max_video_packet_count;
}
} // end foreach iterator
it++; it++;
} // end while } // end while
} // end if first packet not locked } // end if first packet not locked
Debug(1, "Resulting video_stream_packets count %d, %d > max:%d, pointing at latest packet? %d", Debug(1, "Resulting pointing at latest packet? %d, have next front? %d",
video_stream_packets, ( *it == add_packet ),
packet_counts[video_stream_id] - video_stream_packets, max_video_packet_count, ( next_front == pktQueue.begin() )
( *it == add_packet )
); );
if ( if ( next_front != pktQueue.begin() ) {
packet_counts[video_stream_id] - video_stream_packets > max_video_packet_count
and
( *it != add_packet )
) {
Debug(1, "Deleting packets"); Debug(1, "Deleting packets");
// It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set // It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set
while ( pktQueue.begin() != it ) { while ( pktQueue.begin() != next_front ) {
ZMPacket *zm_packet = *pktQueue.begin(); ZMPacket *zm_packet = *pktQueue.begin();
if ( !zm_packet ) { if ( !zm_packet ) {
Error("NULL zm_packet in queue"); Error("NULL zm_packet in queue");
@ -507,92 +482,64 @@ bool PacketQueue::increment_it(packetqueue_iterator *it, int stream_id) {
return false; return false;
} // end bool PacketQueue::increment_it(packetqueue_iterator *it) } // end bool PacketQueue::increment_it(packetqueue_iterator *it)
std::list<ZMPacket *>::iterator PacketQueue::get_event_start_packet_it( packetqueue_iterator *PacketQueue::get_event_start_packet_it(
std::list<ZMPacket *>::iterator snapshot_it, packetqueue_iterator snapshot_it,
unsigned int pre_event_count unsigned int pre_event_count
) { ) {
std::list<ZMPacket *>::iterator it = snapshot_it; packetqueue_iterator *it = new packetqueue_iterator;
dumpPacket(&((*it)->packet)); iterators.push_back(it);
*it = snapshot_it;
dumpPacket(&((*(*it))->packet));
// Step one count back pre_event_count frames as the minimum // Step one count back pre_event_count frames as the minimum
// Do not assume that snapshot_it is video // Do not assume that snapshot_it is video
// snapshot it might already point to the beginning // snapshot it might already point to the beginning
while ( ( it != pktQueue.begin() ) and pre_event_count ) { while ( ( *it != pktQueue.begin() ) and pre_event_count ) {
Debug(1, "Previous packet pre_event_count %d stream_index %d keyframe %d", pre_event_count, (*it)->packet.stream_index, (*it)->keyframe); Debug(1, "Previous packet pre_event_count %d stream_index %d keyframe %d",
dumpPacket(&((*it)->packet)); pre_event_count, (*(*it))->packet.stream_index, (*(*it))->keyframe);
if ( (*it)->packet.stream_index == video_stream_id ) { dumpPacket(&((*(*it))->packet));
if ( (*(*it))->packet.stream_index == video_stream_id ) {
pre_event_count --; pre_event_count --;
if ( ! pre_event_count ) if ( ! pre_event_count )
break; break;
} }
it--; (*it)--;
} }
// it either points to beginning or we have seen pre_event_count video packets. // it either points to beginning or we have seen pre_event_count video packets.
if ( it == pktQueue.begin() ) { if ( (*it) == pktQueue.begin() ) {
Debug(1, "Hit begin"); Debug(1, "Hit begin");
// hit end, the first packet in the queue should ALWAYS be a video keyframe. // hit end, the first packet in the queue should ALWAYS be a video keyframe.
// So we should be able to return it. // So we should be able to return it.
if ( pre_event_count ) { if ( pre_event_count ) {
if ( (*it)->image_index < (int)pre_event_count ) { if ( (*(*it))->image_index < (int)pre_event_count ) {
// probably just starting up // probably just starting up
Debug(1, "Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count); Debug(1, "Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
} else { } else {
Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count); Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
} }
dumpPacket(&((*it)->packet)); dumpPacket(&((*(*it))->packet));
} }
return it; return it;
} }
// Not at beginning, so must be pointing at a video keyframe or maybe pre_event_count == 0 // Not at beginning, so must be pointing at a video keyframe or maybe pre_event_count == 0
if ( (*it)->keyframe ) { if ( (*(*it))->keyframe ) {
dumpPacket(&((*it)->packet), "Found video keyframe, Returning"); dumpPacket(&((*(*it))->packet), "Found video keyframe, Returning");
return it; return it;
} }
while ( it-- != pktQueue.begin() ) { while ( (*it)-- != pktQueue.begin() ) {
dumpPacket(&((*it)->packet), "No keyframe"); dumpPacket(&((*(*it))->packet), "No keyframe");
if ( (*it)->packet.stream_index == video_stream_id and (*it)->keyframe ) if ( (*(*it))->packet.stream_index == video_stream_id and (*(*it))->keyframe )
return it; // Success return it; // Success
} }
if ( !(*it)->keyframe ) { if ( !(*(*it))->keyframe ) {
Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count); Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
} }
return it; return it;
} // end packetqueue_iterator *PacketQueue::get_event_start_packet_it
#if 0
std::list<ZMPacket *>::iterator it = snapshot_it.base();
// Step one count back pre_event_count frames as the minimum
// Do not assume that snapshot_it is video
while ( ( it++ != pktQueue.rend() ) and pre_event_count ) {
// Is video, maybe should compare stream_id instead
if ( *it->image_index != -1 ) {
pre_event_count --;
}
}
if ( it == pktQueue.rend() ) {
// hit end, the first packet in the queue should ALWAYS be a video keyframe.
// So we should be able to return it.
if ( pre_event_count )
Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
return it.base();
}
if ( *it->keyframe ) {
return (it++).base();
}
while ( ( it++ != pktQueue.rend() ) and ! (*it)->keyframe ) { }
if ( it == pktQueue.rend() ) {
// hit end, the first packet in the queue should ALWAYS be a video keyframe.
// So we should be able to return it.
if ( pre_event_count )
Warning("Hit end of packetqueue before satisfying pre_event_count. Needed %d more video frames", pre_event_count);
return it.base();
}
return (it++).base();
#endif
}
void PacketQueue::dumpQueue() { void PacketQueue::dumpQueue() {
std::list<ZMPacket *>::reverse_iterator it; std::list<ZMPacket *>::reverse_iterator it;
@ -657,3 +604,22 @@ void PacketQueue::free_it(packetqueue_iterator *it) {
} }
} }
} }
bool PacketQueue::is_there_an_iterator_pointing_to_packet(ZMPacket *zm_packet) {
for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
if ( *iterator_it == pktQueue.end() ) {
continue;
}
Debug(4, "Checking iterator %p == packet ? %d", (*iterator_it), ( *(*iterator_it) == zm_packet ));
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
return true;
}
} // end foreach iterator
return false;
}

View File

@ -53,7 +53,12 @@ class PacketQueue {
std::list<ZMPacket *>::const_iterator begin() const { return pktQueue.begin(); } std::list<ZMPacket *>::const_iterator begin() const { return pktQueue.begin(); }
void addStreamId(int p_stream_id); void addStreamId(int p_stream_id);
void setMaxVideoPackets(int p) { max_video_packet_count = p; } void setMaxVideoPackets(int p) {
max_video_packet_count = p;
if ( max_video_packet_count < 1 )
max_video_packet_count = 1 ;
// We can simplify a lot of logic in queuePacket if we can assume at least 1 packet in queue
}
bool queuePacket(ZMPacket* packet); bool queuePacket(ZMPacket* packet);
ZMPacket * popPacket(); ZMPacket * popPacket();
@ -76,10 +81,11 @@ class PacketQueue {
packetqueue_iterator *get_stream_it(int stream_id); packetqueue_iterator *get_stream_it(int stream_id);
void free_it(packetqueue_iterator *); void free_it(packetqueue_iterator *);
std::list<ZMPacket *>::iterator get_event_start_packet_it( packetqueue_iterator *get_event_start_packet_it(
packetqueue_iterator snapshot_it, packetqueue_iterator snapshot_it,
unsigned int pre_event_count unsigned int pre_event_count
); );
bool is_there_an_iterator_pointing_to_packet(ZMPacket *zm_packet);
}; };
#endif /* ZM_PACKETQUEUE_H */ #endif /* ZM_PACKETQUEUE_H */