Move clear packetqueue logic to it's own function and call it from the analysis thread.
This commit is contained in:
parent
5ad9244a73
commit
045cd219f8
|
@ -1950,7 +1950,7 @@ bool Monitor::Analyse() {
|
||||||
// If doing record, check to see if we need to close the event or not.
|
// If doing record, check to see if we need to close the event or not.
|
||||||
|
|
||||||
if ( event ) {
|
if ( event ) {
|
||||||
Debug(2, "Have event in mocord");
|
Debug(2, "Have event %" PRIu64 " in mocord", event->Id());
|
||||||
if ( section_length
|
if ( section_length
|
||||||
&& ( ( timestamp->tv_sec - video_store_data->recording.tv_sec ) >= section_length )
|
&& ( ( timestamp->tv_sec - video_store_data->recording.tv_sec ) >= section_length )
|
||||||
&& ( (function == MOCORD && (event_close_mode != CLOSE_TIME)) || ! ( timestamp->tv_sec % section_length ) )
|
&& ( (function == MOCORD && (event_close_mode != CLOSE_TIME)) || ! ( timestamp->tv_sec % section_length ) )
|
||||||
|
@ -2229,6 +2229,7 @@ bool Monitor::Analyse() {
|
||||||
shared_data->last_read_time = time(nullptr);
|
shared_data->last_read_time = time(nullptr);
|
||||||
analysis_image_count++;
|
analysis_image_count++;
|
||||||
UpdateAnalysisFPS();
|
UpdateAnalysisFPS();
|
||||||
|
packetqueue.clearPackets(snap);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} // end Monitor::Analyse
|
} // end Monitor::Analyse
|
||||||
|
|
|
@ -99,7 +99,15 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) {
|
||||||
--(*iterator_it);
|
--(*iterator_it);
|
||||||
}
|
}
|
||||||
} // end foreach iterator
|
} // end foreach iterator
|
||||||
|
mutex.unlock();
|
||||||
|
// We signal on every packet because someday we may analyze sound
|
||||||
|
Debug(4, "packetqueue queuepacket, unlocked signalling");
|
||||||
|
condition.notify_all();
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} // end bool PacketQueue::queuePacket(ZMPacket* zm_packet)
|
||||||
|
|
||||||
|
void PacketQueue::clearPackets(ZMPacket *add_packet) {
|
||||||
// Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one.
|
// Only do queueCleaning if we are adding a video keyframe, so that we guarantee that there is one.
|
||||||
// No good. Have to satisfy two conditions:
|
// No good. Have to satisfy two conditions:
|
||||||
// 1. packetqueue starts with a video keyframe
|
// 1. packetqueue starts with a video keyframe
|
||||||
|
@ -109,77 +117,78 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) {
|
||||||
//
|
//
|
||||||
// So start at the beginning, counting video packets until the next keyframe.
|
// So start at the beginning, counting video packets until the next keyframe.
|
||||||
// Then if deleting those packets doesn't break 1 and 2, then go ahead and delete them.
|
// Then if deleting those packets doesn't break 1 and 2, then go ahead and delete them.
|
||||||
if ( add_packet->packet.stream_index == video_stream_id
|
if ( ! (
|
||||||
|
add_packet->packet.stream_index == video_stream_id
|
||||||
and
|
and
|
||||||
add_packet->keyframe
|
add_packet->keyframe
|
||||||
and
|
and
|
||||||
(packet_counts[video_stream_id] > max_video_packet_count)
|
(packet_counts[video_stream_id] > max_video_packet_count)
|
||||||
and
|
and
|
||||||
*(pktQueue.begin()) != add_packet
|
*(pktQueue.begin()) != add_packet
|
||||||
|
)
|
||||||
) {
|
) {
|
||||||
packetqueue_iterator it = pktQueue.begin();
|
return;
|
||||||
packetqueue_iterator next_front = pktQueue.begin();
|
}
|
||||||
|
std::unique_lock<std::mutex> lck(mutex);
|
||||||
|
|
||||||
// First packet is special because we know it is a video keyframe and only need to check for lock
|
packetqueue_iterator it = pktQueue.begin();
|
||||||
ZMPacket *zm_packet = *it;
|
packetqueue_iterator next_front = pktQueue.begin();
|
||||||
if ( zm_packet->trylock() ) {
|
|
||||||
++it;
|
// First packet is special because we know it is a video keyframe and only need to check for lock
|
||||||
|
ZMPacket *zm_packet = *it;
|
||||||
|
if ( zm_packet->trylock() ) {
|
||||||
|
++it;
|
||||||
|
zm_packet->unlock();
|
||||||
|
|
||||||
|
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
|
||||||
|
while ( *it != add_packet ) {
|
||||||
|
zm_packet = *it;
|
||||||
|
Debug(1, "Checking packet to see if we can delete them");
|
||||||
|
if ( !zm_packet->trylock() ) {
|
||||||
|
Debug(1, "Have locked packet %d", zm_packet->image_index);
|
||||||
|
break;
|
||||||
|
}
|
||||||
zm_packet->unlock();
|
zm_packet->unlock();
|
||||||
|
|
||||||
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
|
if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) {
|
||||||
while ( *it != add_packet ) {
|
Debug(4, "Found IT at beginning of queue. Threads not keeping up");
|
||||||
zm_packet = *it;
|
break;
|
||||||
Debug(1, "Checking packet to see if we can delete them");
|
|
||||||
if ( !zm_packet->trylock() ) {
|
|
||||||
Debug(1, "Have locked packet %d", zm_packet->image_index);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
zm_packet->unlock();
|
|
||||||
|
|
||||||
if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) {
|
|
||||||
Debug(4, "Found IT at beginning of queue. Threads not keeping up");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( zm_packet->packet.stream_index == video_stream_id ) {
|
|
||||||
if ( zm_packet->keyframe ) {
|
|
||||||
Debug(1, "Have a video keyframe so breaking out");
|
|
||||||
next_front = it;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
it++;
|
|
||||||
} // end while
|
|
||||||
} // end if first packet not locked
|
|
||||||
Debug(1, "Resulting pointing at latest packet? %d, have next front? %d",
|
|
||||||
( *it == add_packet ),
|
|
||||||
( next_front == pktQueue.begin() )
|
|
||||||
);
|
|
||||||
if ( next_front != pktQueue.begin() ) {
|
|
||||||
Debug(1, "Deleting packets");
|
|
||||||
// It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set
|
|
||||||
while ( pktQueue.begin() != next_front ) {
|
|
||||||
ZMPacket *zm_packet = *pktQueue.begin();
|
|
||||||
if ( !zm_packet ) {
|
|
||||||
Error("NULL zm_packet in queue");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
|
|
||||||
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size());
|
|
||||||
pktQueue.pop_front();
|
|
||||||
packet_counts[zm_packet->packet.stream_index] -= 1;
|
|
||||||
delete zm_packet;
|
|
||||||
}
|
}
|
||||||
} // end if have at least max_video_packet_count video packets remaining
|
|
||||||
} // end if this is a video keyframe
|
|
||||||
|
|
||||||
mutex.unlock();
|
if ( zm_packet->packet.stream_index == video_stream_id ) {
|
||||||
|
if ( zm_packet->keyframe ) {
|
||||||
|
Debug(1, "Have a video keyframe so breaking out");
|
||||||
|
next_front = it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
it++;
|
||||||
|
} // end while
|
||||||
|
} // end if first packet not locked
|
||||||
|
Debug(1, "Resulting pointing at latest packet? %d, have next front? %d",
|
||||||
|
( *it == add_packet ),
|
||||||
|
( next_front == pktQueue.begin() )
|
||||||
|
);
|
||||||
|
if ( next_front != pktQueue.begin() ) {
|
||||||
|
Debug(1, "Deleting packets");
|
||||||
|
// It is enough to delete the packets tested above. A subsequent queuePacket can clear a second set
|
||||||
|
while ( pktQueue.begin() != next_front ) {
|
||||||
|
ZMPacket *zm_packet = *pktQueue.begin();
|
||||||
|
if ( !zm_packet ) {
|
||||||
|
Error("NULL zm_packet in queue");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
|
||||||
|
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size());
|
||||||
|
pktQueue.pop_front();
|
||||||
|
packet_counts[zm_packet->packet.stream_index] -= 1;
|
||||||
|
delete zm_packet;
|
||||||
|
}
|
||||||
|
} // end if have at least max_video_packet_count video packets remaining
|
||||||
// We signal on every packet because someday we may analyze sound
|
// We signal on every packet because someday we may analyze sound
|
||||||
Debug(4, "packetqueue queuepacket, unlocked signalling");
|
|
||||||
condition.notify_all();
|
|
||||||
|
|
||||||
return true;
|
return;
|
||||||
} // end bool PacketQueue::queuePacket(ZMPacket* zm_packet)
|
} // end voidPacketQueue::clearPackets(ZMPacket* zm_packet)
|
||||||
|
|
||||||
ZMPacket* PacketQueue::popPacket( ) {
|
ZMPacket* PacketQueue::popPacket( ) {
|
||||||
Debug(4, "pktQueue size %d", pktQueue.size());
|
Debug(4, "pktQueue size %d", pktQueue.size());
|
||||||
|
|
|
@ -63,6 +63,7 @@ class PacketQueue {
|
||||||
unsigned int get_packet_count(int stream_id) const { return packet_counts[stream_id]; };
|
unsigned int get_packet_count(int stream_id) const { return packet_counts[stream_id]; };
|
||||||
|
|
||||||
void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId);
|
void clear_unwanted_packets(timeval *recording, int pre_event_count, int mVideoStreamId);
|
||||||
|
void clearPackets(ZMPacket *);
|
||||||
int packet_count(int stream_id);
|
int packet_count(int stream_id);
|
||||||
|
|
||||||
bool increment_it(packetqueue_iterator *it);
|
bool increment_it(packetqueue_iterator *it);
|
||||||
|
|
Loading…
Reference in New Issue