Load max_image_buffer_count in monitor and set it in packetqueue. rename max_video_packet_count to pre_event_video_count in packetqueue and adjust logic. When queuing packets check to ensure that we aren't going over. Delete packets from front if possible, do not queue packet if not possible

This commit is contained in:
Isaac Connor 2021-03-26 14:26:37 -04:00
parent 3655b25b7c
commit 52cb182ae3
4 changed files with 71 additions and 37 deletions

View File

@ -85,7 +85,7 @@ std::string load_monitor_sql =
"`RecordAudio`, "
"`Brightness`, `Contrast`, `Hue`, `Colour`, "
"`EventPrefix`, `LabelFormat`, `LabelX`, `LabelY`, `LabelSize`,"
"`ImageBufferCount`, `WarmupCount`, `PreEventCount`, `PostEventCount`, `StreamReplayBuffer`, `AlarmFrameCount`, "
"`ImageBufferCount`, `MaxImageBufferCount`, `WarmupCount`, `PreEventCount`, `PostEventCount`, `StreamReplayBuffer`, `AlarmFrameCount`, "
"`SectionLength`, `MinSectionLength`, `FrameSkip`, `MotionFrameSkip`, "
"`FPSReportInterval`, `RefBlendPerc`, `AlarmRefBlendPerc`, `TrackMotion`, `Exif`,"
"`RTSPServer`, `RTSPStreamName`,"
@ -327,6 +327,7 @@ Monitor::Monitor()
label_coord(Coord(0,0)),
label_size(0),
image_buffer_count(0),
max_image_buffer_count(0),
warmup_count(0),
pre_event_count(0),
post_event_count(0),
@ -431,7 +432,7 @@ Monitor::Monitor()
"RecordAudio, "
"Brightness, Contrast, Hue, Colour, "
"EventPrefix, LabelFormat, LabelX, LabelY, LabelSize,"
"ImageBufferCount, WarmupCount, PreEventCount, PostEventCount, StreamReplayBuffer, AlarmFrameCount, "
"ImageBufferCount, `MaxImageBufferCount`, WarmupCount, PreEventCount, PostEventCount, StreamReplayBuffer, AlarmFrameCount, "
"SectionLength, MinSectionLength, FrameSkip, MotionFrameSkip, "
"FPSReportInterval, RefBlendPerc, AlarmRefBlendPerc, TrackMotion, Exif,"
"`RTSPServer`,`RTSPStreamName`,
@ -579,10 +580,14 @@ void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
label_size = atoi(dbrow[col]); col++;
image_buffer_count = atoi(dbrow[col]); col++;
max_image_buffer_count = atoi(dbrow[col]); col++;
warmup_count = atoi(dbrow[col]); col++;
pre_event_count = atoi(dbrow[col]); col++;
packetqueue.setMaxVideoPackets(pre_event_count);
packetqueue.setPreEventVideoPackets(pre_event_count);
packetqueue.setMaxVideoPackets(max_image_buffer_count);
packetqueue.setKeepKeyframes(videowriter == PASSTHROUGH);
post_event_count = atoi(dbrow[col]); col++;
stream_replay_buffer = atoi(dbrow[col]); col++;
alarm_frame_count = atoi(dbrow[col]); col++;
@ -590,7 +595,6 @@ void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
alarm_frame_count = 1;
else if ( alarm_frame_count > MAX_PRE_ALARM_FRAMES )
alarm_frame_count = MAX_PRE_ALARM_FRAMES;
pre_event_buffer_count = pre_event_count + alarm_frame_count + warmup_count - 1;
section_length = atoi(dbrow[col]); col++;
min_section_length = atoi(dbrow[col]); col++;
@ -639,9 +643,9 @@ void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
// Should maybe store this for later use
std::string monitor_dir = stringtf("%s/%d", storage->Path(), id);
LoadCamera();
if ( purpose != QUERY ) {
LoadCamera();
Zone **zones = 0;
int n_zones = Zone::Load(this, zones);
this->AddZones(n_zones, zones);
@ -676,7 +680,6 @@ void Monitor::Load(MYSQL_ROW dbrow, bool load_zones=true, Purpose p = QUERY) {
}
} // end if purpose
Debug(1, "Loaded monitor %d(%s), %d zones", id, name, n_zones);
} // Monitor::Load
@ -3125,7 +3128,7 @@ int Monitor::PrimeCapture() {
}
Debug(2, "Video stream id is %d, audio is %d, minimum_packets to keep in buffer %d",
video_stream_id, audio_stream_id, pre_event_buffer_count);
video_stream_id, audio_stream_id, pre_event_count);
if (rtsp_server) {
if (video_stream_id >= 0) {

View File

@ -292,9 +292,8 @@ protected:
char label_format[64]; // The format of the timestamp on the images
Coord label_coord; // The coordinates of the timestamp on the images
int label_size; // Size of the timestamp on the images
int32_t image_buffer_count; // Size of circular image buffer, at least twice the size of the pre_event_count
int pre_event_buffer_count; // Size of dedicated circular pre event buffer used when analysis is not performed at capturing framerate,
// value is pre_event_count + alarm_frame_count - 1
int32_t image_buffer_count; // Size of circular image buffer, kept in /dev/shm
int32_t max_image_buffer_count; // Max # of video packets to keep in packet queue
int warmup_count; // How many images to process before looking for events
int pre_event_count; // How many images to hold and prepend to an alarm event
int post_event_count; // How many unalarmed images must occur before the alarm state is reset

View File

@ -29,6 +29,7 @@
PacketQueue::PacketQueue():
video_stream_id(-1),
max_video_packet_count(-1),
pre_event_video_packet_count(-1),
max_stream_id(-1),
packet_counts(nullptr),
deleting(false),
@ -86,6 +87,41 @@ bool PacketQueue::queuePacket(ZMPacket* add_packet) {
}
mutex.lock();
if (add_packet->packet.stream_index == video_stream_id) {
if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) {
Warning("You have set the video packets in the queue to %d. The queue is full. Either Analysis is not keeping up or your camera's keyframe interval is larger than this setting. We are dropping packets.");
while ((*pktQueue.begin() != add_packet) and (packet_counts[video_stream_id] > max_video_packet_count)) {
ZMPacket *zm_packet = *pktQueue.begin();
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (!lp->trylock()) break;
delete lp;
for (
std::list<packetqueue_iterator *>::iterator iterators_it = iterators.begin();
iterators_it != iterators.end();
++iterators_it
) {
packetqueue_iterator *iterator_it = *iterators_it;
// Have to check each iterator and make sure it doesn't point to the packet we are about to delete
if ( *(*iterator_it) == zm_packet ) {
Debug(4, "Bumping IT because it is at the front that we are deleting");
++(*iterators_it);
}
} // end foreach iterator
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size());
delete zm_packet;
} // end while
} // end if too many video packets
if ((max_video_packet_count > 0) and (packet_counts[video_stream_id] > max_video_packet_count)) {
Error("Unable to free up older packets. Not queueing this video packet.");
return false;
}
} // end if this packet is a video packet
pktQueue.push_back(add_packet);
packet_counts[add_packet->packet.stream_index] += 1;
Debug(2, "packet counts for %d is %d",
@ -126,13 +162,13 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
and
add_packet->keyframe
and
(packet_counts[video_stream_id] > max_video_packet_count)
(packet_counts[video_stream_id] > pre_event_video_packet_count)
and
*(pktQueue.begin()) != add_packet
)
) {
Debug(3, "stream index %d ?= video_stream_id %d, keyframe %d, keep_keyframes %d, counts %d > max %d at begin %d",
add_packet->packet.stream_index, video_stream_id, add_packet->keyframe, keep_keyframes, packet_counts[video_stream_id], max_video_packet_count,
Debug(3, "stream index %d ?= video_stream_id %d, keyframe %d, keep_keyframes %d, counts %d > pre_event_count %d at begin %d",
add_packet->packet.stream_index, video_stream_id, add_packet->keyframe, keep_keyframes, packet_counts[video_stream_id], pre_event_video_packet_count,
( *(pktQueue.begin()) != add_packet )
);
return;
@ -155,7 +191,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
if (!keep_keyframes) {
// If not doing passthrough, we don't care about starting with a keyframe so logic is simpler
while ((*pktQueue.begin() != add_packet) and (packet_counts[video_stream_id] > max_video_packet_count + tail_count)) {
while ((*pktQueue.begin() != add_packet) and (packet_counts[video_stream_id] > pre_event_video_packet_count + tail_count)) {
ZMPacket *zm_packet = *pktQueue.begin();
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if (!lp->trylock()) break;
@ -164,7 +200,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size());
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], pre_event_video_packet_count, pktQueue.size());
delete zm_packet;
} // end while
return;
@ -201,7 +237,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
++video_packets_to_delete;
Debug(4, "Counted %d video packets. Which would leave %d in packetqueue tail count is %d",
video_packets_to_delete, packet_counts[video_stream_id]-video_packets_to_delete, tail_count);
if (packet_counts[video_stream_id] - video_packets_to_delete <= max_video_packet_count + tail_count) {
if (packet_counts[video_stream_id] - video_packets_to_delete <= pre_event_video_packet_count + tail_count) {
break;
}
}
@ -221,7 +257,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
}
Debug(1, "Deleting a packet with stream index:%d image_index:%d with keyframe:%d, video frames in queue:%d max: %d, queuesize:%d",
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], max_video_packet_count, pktQueue.size());
zm_packet->packet.stream_index, zm_packet->image_index, zm_packet->keyframe, packet_counts[video_stream_id], pre_event_video_packet_count, pktQueue.size());
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
delete zm_packet;
@ -238,7 +274,7 @@ ZMLockedPacket* PacketQueue::popPacket( ) {
return nullptr;
}
Debug(4, "poPacket Mutex locking");
mutex.lock();
std::unique_lock<std::mutex> lck(mutex);
ZMPacket *zm_packet = pktQueue.front();
for (
@ -260,8 +296,6 @@ ZMLockedPacket* PacketQueue::popPacket( ) {
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
mutex.unlock();
return lp;
} // popPacket
@ -285,7 +319,7 @@ unsigned int PacketQueue::clear(unsigned int frames_to_keep, int stream_id) {
return 0;
}
Debug(5, "Locking in clear");
mutex.lock();
std::unique_lock<std::mutex> lck(mutex);
packetqueue_iterator it = pktQueue.end()--; // point to last element instead of end
ZMPacket *zm_packet = nullptr;
@ -340,13 +374,7 @@ unsigned int PacketQueue::clear(unsigned int frames_to_keep, int stream_id) {
delete_count += 1;
} // while our iterator is not the first packet
zm_packet = nullptr; // tidy up for valgrind
Debug(3, "Deleted %d packets, %d remaining", delete_count, pktQueue.size());
mutex.unlock();
return delete_count;
Debug(3, "Deleted packets, resulting size is %d", pktQueue.size());
mutex.unlock();
return delete_count;
} // end unsigned int PacketQueue::clear( unsigned int frames_to_keep, int stream_id )
@ -389,7 +417,7 @@ unsigned int PacketQueue::clear(struct timeval *duration, int streamId) {
return 0;
}
Debug(4, "Locking in clear");
mutex.lock();
std::unique_lock<std::mutex> lck(mutex);
struct timeval keep_from;
std::list<ZMPacket *>::reverse_iterator it = pktQueue.rbegin();
@ -440,7 +468,6 @@ unsigned int PacketQueue::clear(struct timeval *duration, int streamId) {
}
if ( it == pktQueue.rend() ) {
Debug(1, "Didn't find a keyframe before event starttime. keeping all" );
mutex.unlock();
return 0;
}
@ -462,14 +489,10 @@ unsigned int PacketQueue::clear(struct timeval *duration, int streamId) {
} // end foreach iterator
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
//if ( zm_packet->image_index == -1 )
delete zm_packet;
deleted_frames += 1;
}
zm_packet = nullptr;
Debug(3, "Deleted %d frames", deleted_frames);
mutex.unlock();
return deleted_frames;
}
@ -706,7 +729,13 @@ bool PacketQueue::is_there_an_iterator_pointing_to_packet(ZMPacket *zm_packet) {
void PacketQueue::setMaxVideoPackets(int p) {
max_video_packet_count = p;
Debug(1, "Setting max_video_packet_count to %d", p);
if ( max_video_packet_count < 1 )
max_video_packet_count = 1 ;
if ( max_video_packet_count < 0 )
max_video_packet_count = 0 ;
}
void PacketQueue::setPreEventVideoPackets(int p) {
pre_event_video_packet_count = p;
Debug(1, "Setting pre_event_video_packet_count to %d", p);
if ( pre_event_video_packet_count < 1 )
pre_event_video_packet_count = 1;
// We can simplify a lot of logic in queuePacket if we can assume at least 1 packet in queue
}

View File

@ -35,6 +35,8 @@ class PacketQueue {
int video_stream_id;
int max_video_packet_count; // allow a negative value to someday mean unlimited
// This is now a hard limit on the # of video packets to keep in the queue so that we can limit ram
int pre_event_video_packet_count; // Was max_video_packet_count
int max_stream_id;
int *packet_counts; /* packet count for each stream_id, to keep track of how many video vs audio packets are in the queue */
bool deleting;
@ -52,6 +54,7 @@ class PacketQueue {
int addStream();
void setMaxVideoPackets(int p);
void setPreEventVideoPackets(int p);
void setKeepKeyframes(bool k) { keep_keyframes = k; };
bool queuePacket(ZMPacket* packet);