More properly fix the threading lock. Instead of waiting on a packet, release it and wait on the packetqueue.
This commit is contained in:
parent
167dece604
commit
bf5c0a8617
|
@ -1735,10 +1735,6 @@ bool Monitor::Analyse() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the it that points to our snap we will need it later
|
|
||||||
packetqueue_iterator snap_it = *analysis_it;
|
|
||||||
packetqueue.increment_it(analysis_it);
|
|
||||||
|
|
||||||
// signal is set by capture
|
// signal is set by capture
|
||||||
bool signal = shared_data->signal;
|
bool signal = shared_data->signal;
|
||||||
bool signal_change = (signal != last_signal);
|
bool signal_change = (signal != last_signal);
|
||||||
|
@ -1848,8 +1844,15 @@ bool Monitor::Analyse() {
|
||||||
while (!snap->decoded and !zm_terminate and !analysis_thread->Stopped()) {
|
while (!snap->decoded and !zm_terminate and !analysis_thread->Stopped()) {
|
||||||
// Need to wait for the decoder thread.
|
// Need to wait for the decoder thread.
|
||||||
Debug(1, "Waiting for decode");
|
Debug(1, "Waiting for decode");
|
||||||
|
packetqueue.unlock(packet_lock);
|
||||||
packetqueue.notify_all(); // decode might be waiting
|
packetqueue.notify_all(); // decode might be waiting
|
||||||
packet_lock->wait();
|
packetqueue.wait();
|
||||||
|
|
||||||
|
// Another thread may have moved our it. Unlikely but possible
|
||||||
|
packet_lock = packetqueue.get_packet(analysis_it);
|
||||||
|
if (!packet_lock) return false;
|
||||||
|
snap = packet_lock->packet_;
|
||||||
|
|
||||||
if (!snap->image and snap->decoded) {
|
if (!snap->image and snap->decoded) {
|
||||||
Debug(1, "No image but was decoded, giving up");
|
Debug(1, "No image but was decoded, giving up");
|
||||||
delete packet_lock;
|
delete packet_lock;
|
||||||
|
@ -1947,14 +1950,14 @@ bool Monitor::Analyse() {
|
||||||
// Must start on a keyframe so rewind. Only for passthrough though I guess.
|
// Must start on a keyframe so rewind. Only for passthrough though I guess.
|
||||||
// FIXME this iterator is not protected from invalidation
|
// FIXME this iterator is not protected from invalidation
|
||||||
packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it(
|
packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it(
|
||||||
snap_it, 0 /* pre_event_count */
|
*analysis_it, 0 /* pre_event_count */
|
||||||
);
|
);
|
||||||
|
|
||||||
// This gets a lock on the starting packet
|
// This gets a lock on the starting packet
|
||||||
|
|
||||||
ZMLockedPacket *starting_packet_lock = nullptr;
|
ZMLockedPacket *starting_packet_lock = nullptr;
|
||||||
std::shared_ptr<ZMPacket> starting_packet = nullptr;
|
std::shared_ptr<ZMPacket> starting_packet = nullptr;
|
||||||
if (*start_it != snap_it) {
|
if (*start_it != *analysis_it) {
|
||||||
starting_packet_lock = packetqueue.get_packet(start_it);
|
starting_packet_lock = packetqueue.get_packet(start_it);
|
||||||
if (!starting_packet_lock) {
|
if (!starting_packet_lock) {
|
||||||
Warning("Unable to get starting packet lock");
|
Warning("Unable to get starting packet lock");
|
||||||
|
@ -1968,12 +1971,12 @@ bool Monitor::Analyse() {
|
||||||
|
|
||||||
event = new Event(this, starting_packet->timestamp, "Continuous", noteSetMap);
|
event = new Event(this, starting_packet->timestamp, "Continuous", noteSetMap);
|
||||||
// Write out starting packets, do not modify packetqueue it will garbage collect itself
|
// Write out starting packets, do not modify packetqueue it will garbage collect itself
|
||||||
while (starting_packet and ((*start_it) != snap_it)) {
|
while (starting_packet and ((*start_it) != *analysis_it)) {
|
||||||
event->AddPacket(starting_packet);
|
event->AddPacket(starting_packet);
|
||||||
// Have added the packet, don't want to unlock it until we have locked the next
|
// Have added the packet, don't want to unlock it until we have locked the next
|
||||||
|
|
||||||
packetqueue.increment_it(start_it);
|
packetqueue.increment_it(start_it);
|
||||||
if ((*start_it) == snap_it) {
|
if ((*start_it) == *analysis_it) {
|
||||||
if (starting_packet_lock) delete starting_packet_lock;
|
if (starting_packet_lock) delete starting_packet_lock;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2051,12 +2054,12 @@ bool Monitor::Analyse() {
|
||||||
|
|
||||||
if (!event) {
|
if (!event) {
|
||||||
packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it(
|
packetqueue_iterator *start_it = packetqueue.get_event_start_packet_it(
|
||||||
snap_it,
|
*analysis_it,
|
||||||
(pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count)
|
(pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count)
|
||||||
);
|
);
|
||||||
ZMLockedPacket *starting_packet_lock = nullptr;
|
ZMLockedPacket *starting_packet_lock = nullptr;
|
||||||
std::shared_ptr<ZMPacket> starting_packet = nullptr;
|
std::shared_ptr<ZMPacket> starting_packet = nullptr;
|
||||||
if (*start_it != snap_it) {
|
if (*start_it != *analysis_it) {
|
||||||
starting_packet_lock = packetqueue.get_packet(start_it);
|
starting_packet_lock = packetqueue.get_packet(start_it);
|
||||||
if (!starting_packet_lock) return false;
|
if (!starting_packet_lock) return false;
|
||||||
starting_packet = starting_packet_lock->packet_;
|
starting_packet = starting_packet_lock->packet_;
|
||||||
|
@ -2071,11 +2074,11 @@ bool Monitor::Analyse() {
|
||||||
shared_data->state = state = ALARM;
|
shared_data->state = state = ALARM;
|
||||||
|
|
||||||
// Write out starting packets, do not modify packetqueue it will garbage collect itself
|
// Write out starting packets, do not modify packetqueue it will garbage collect itself
|
||||||
while (*start_it != snap_it) {
|
while (*start_it != *analysis_it) {
|
||||||
event->AddPacket(starting_packet);
|
event->AddPacket(starting_packet);
|
||||||
|
|
||||||
packetqueue.increment_it(start_it);
|
packetqueue.increment_it(start_it);
|
||||||
if ( (*start_it) == snap_it ) {
|
if ( (*start_it) == (*analysis_it) ) {
|
||||||
if (starting_packet_lock) delete starting_packet_lock;
|
if (starting_packet_lock) delete starting_packet_lock;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2262,6 +2265,7 @@ bool Monitor::Analyse() {
|
||||||
if (function == MODECT or function == MOCORD)
|
if (function == MODECT or function == MOCORD)
|
||||||
UpdateAnalysisFPS();
|
UpdateAnalysisFPS();
|
||||||
}
|
}
|
||||||
|
packetqueue.increment_it(analysis_it);
|
||||||
packetqueue.unlock(packet_lock);
|
packetqueue.unlock(packet_lock);
|
||||||
shared_data->last_read_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
shared_data->last_read_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||||
|
|
||||||
|
|
|
@ -663,3 +663,12 @@ void PacketQueue::setPreEventVideoPackets(int p) {
|
||||||
pre_event_video_packet_count = 1;
|
pre_event_video_packet_count = 1;
|
||||||
// We can simplify a lot of logic in queuePacket if we can assume at least 1 packet in queue
|
// We can simplify a lot of logic in queuePacket if we can assume at least 1 packet in queue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void PacketQueue::notify_all() {
|
||||||
|
condition.notify_all();
|
||||||
|
};
|
||||||
|
|
||||||
|
void PacketQueue::wait() {
|
||||||
|
std::unique_lock<std::mutex> lck(mutex);
|
||||||
|
condition.wait(lck);
|
||||||
|
}
|
||||||
|
|
|
@ -81,7 +81,8 @@ class PacketQueue {
|
||||||
);
|
);
|
||||||
bool is_there_an_iterator_pointing_to_packet(const std::shared_ptr<ZMPacket> &zm_packet);
|
bool is_there_an_iterator_pointing_to_packet(const std::shared_ptr<ZMPacket> &zm_packet);
|
||||||
void unlock(ZMLockedPacket *lp);
|
void unlock(ZMLockedPacket *lp);
|
||||||
void notify_all() { condition.notify_all(); };
|
void notify_all();
|
||||||
|
void wait();
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* ZM_PACKETQUEUE_H */
|
#endif /* ZM_PACKETQUEUE_H */
|
||||||
|
|
Loading…
Reference in New Issue