Merge branch 'decoder_thread'

This commit is contained in:
Isaac Connor 2021-03-15 17:05:36 -04:00
commit 2b0e3d0d2c
10 changed files with 335 additions and 122 deletions

View File

@ -16,6 +16,7 @@ set(ZM_BIN_SRC_FILES
zm_crypt.cpp
zm.cpp
zm_db.cpp
zm_decoder_thread.cpp
zm_logger.cpp
zm_event.cpp
zm_eventstream.cpp

54
src/zm_decoder_thread.cpp Normal file
View File

@ -0,0 +1,54 @@
#include "zm_decoder_thread.h"
#include "zm_monitor.h"
#include "zm_signal.h"
#include "zm_utils.h"
//DecoderThread::DecoderThread(std::shared_ptr<Monitor> monitor) :
DecoderThread::DecoderThread(Monitor * monitor) :
monitor_(monitor), terminate_(false) {
//monitor_(std::move(monitor)), terminate_(false) {
thread_ = std::thread(&DecoderThread::Run, this);
}
DecoderThread::~DecoderThread() {
Stop();
if (thread_.joinable())
thread_.join();
}
void DecoderThread::Run() {
Debug(2, "DecoderThread::Run() for %d", monitor_->Id());
//Microseconds decoder_rate = Microseconds(monitor_->GetDecoderRate());
//Seconds decoder_update_delay = Seconds(monitor_->GetDecoderUpdateDelay());
//Debug(2, "DecoderThread::Run() have update delay %d", decoder_update_delay);
//TimePoint last_decoder_update_time = std::chrono::steady_clock::now();
//TimePoint cur_time;
while (!(terminate_ or zm_terminate)) {
// Some periodic updates are required for variable capturing framerate
//if (decoder_update_delay != Seconds::zero()) {
//cur_time = std::chrono::steady_clock::now();
//Debug(2, "Updating adaptive skip");
//if ((cur_time - last_decoder_update_time) > decoder_update_delay) {
//decoder_rate = Microseconds(monitor_->GetDecoderRate());
//last_decoder_update_time = cur_time;
//}
//}
if (!monitor_->Decode()) {
//if ( !(terminate_ or zm_terminate) ) {
//Microseconds sleep_for = monitor_->Active() ? Microseconds(ZM_SAMPLE_RATE) : Microseconds(ZM_SUSPENDED_RATE);
//Debug(2, "Sleeping for %" PRId64 "us", int64(sleep_for.count()));
//std::this_thread::sleep_for(sleep_for);
//}
//} else if (decoder_rate != Microseconds::zero()) {
//Debug(2, "Sleeping for %" PRId64 " us", int64(decoder_rate.count()));
//std::this_thread::sleep_for(decoder_rate);
//} else {
//Debug(2, "Not sleeping");
}
}
}

29
src/zm_decoder_thread.h Normal file
View File

@ -0,0 +1,29 @@
#ifndef ZM_DECODER_THREAD_H
#define ZM_DECODER_THREAD_H
#include <atomic>
#include <memory>
#include <thread>
class Monitor;
class DecoderThread {
public:
explicit DecoderThread(Monitor* monitor);
//explicit DecoderThread(std::shared_ptr<Monitor> monitor);
~DecoderThread();
DecoderThread(DecoderThread &rhs) = delete;
DecoderThread(DecoderThread &&rhs) = delete;
void Stop() { terminate_ = true; }
private:
void Run();
Monitor* monitor_;
//std::shared_ptr<Monitor> monitor_;
std::atomic<bool> terminate_;
std::thread thread_;
};
#endif

View File

@ -395,6 +395,8 @@ Monitor::Monitor()
storage(nullptr),
videoStore(nullptr),
analysis_it(nullptr),
decoder_it(nullptr),
decoder(nullptr),
n_zones(0),
zones(nullptr),
privacy_bitmask(nullptr),
@ -1802,22 +1804,22 @@ void Monitor::UpdateAnalysisFPS() {
// If there isn't then we keep pre-event + alarm frames. = pre_event_count
bool Monitor::Analyse() {
if ( !Enabled() ) {
if (!Enabled()) {
Warning("Shouldn't be doing Analyse when not Enabled");
return false;
}
if ( !analysis_it )
analysis_it = packetqueue.get_video_it(true);
if (!analysis_it) analysis_it = packetqueue.get_video_it(true);
// if have event, send frames until we find a video packet, at which point do analysis. Adaptive skip should only affect which frames we do analysis on.
// get_analysis_packet will lock the packet and may wait if analysis_it is at the end
ZMPacket *snap = packetqueue.get_packet(analysis_it);
if ( !snap ) return false;
ZMLockedPacket *packet_lock = packetqueue.get_packet(analysis_it);
if (!packet_lock) return false;
ZMPacket *snap = packet_lock->packet_;
// Is it possible for snap->score to be ! -1 ? Not if everything is working correctly
if ( snap->score != -1 ) {
snap->unlock();
if (snap->score != -1) {
delete packet_lock;
packetqueue.increment_it(analysis_it);
Error("skipping because score was %d", snap->score);
return false;
@ -1837,25 +1839,24 @@ bool Monitor::Analyse() {
std::lock_guard<std::mutex> lck(event_mutex);
// if we have been told to be OFF, then we are off and don't do any processing.
if ( trigger_data->trigger_state != TriggerState::TRIGGER_OFF ) {
if (trigger_data->trigger_state != TriggerState::TRIGGER_OFF) {
Debug(4, "Trigger not OFF state is (%d)", int(trigger_data->trigger_state));
int score = 0;
// Ready means that we have captured the warmup # of frames
if ( !Ready() ) {
if (!Ready()) {
Debug(3, "Not ready?");
snap->unlock();
delete packet_lock;
return false;
}
Debug(4, "Ready");
std::string cause;
Event::StringSetMap noteSetMap;
// Specifically told to be on. Setting the score here will trigger the alarm.
if ( trigger_data->trigger_state == TriggerState::TRIGGER_ON ) {
if (trigger_data->trigger_state == TriggerState::TRIGGER_ON) {
score += trigger_data->trigger_score;
Debug(1, "Triggered on score += %d => %d", trigger_data->trigger_score, score);
if ( !event ) {
if (!event) {
cause += trigger_data->trigger_cause;
}
Event::StringSet noteSet;
@ -1863,12 +1864,13 @@ bool Monitor::Analyse() {
noteSetMap[trigger_data->trigger_cause] = noteSet;
} // end if trigger_on
if ( signal_change ) {
// FIXME this snap might not be the one that caused the signal change. Need to store that in the packet.
if (signal_change) {
Debug(2, "Signal change, new signal is %d", signal);
const char *signalText = "Unknown";
if ( !signal ) {
if (!signal) {
signalText = "Lost";
if ( event ) {
if (event) {
Info("%s: %03d - Closing event %" PRIu64 ", signal loss", name, analysis_image_count, event->Id());
closeEvent();
last_section_mod = 0;
@ -1877,9 +1879,8 @@ bool Monitor::Analyse() {
signalText = "Reacquired";
score += 100;
}
if ( !event ) {
if ( cause.length() )
cause += ", ";
if (!event) {
if (cause.length()) cause += ", ";
cause += SIGNAL_CAUSE;
}
Event::StringSet noteSet;
@ -1887,15 +1888,26 @@ bool Monitor::Analyse() {
noteSetMap[SIGNAL_CAUSE] = noteSet;
shared_data->state = state = IDLE;
shared_data->active = signal;
if ( (function == MODECT or function == MOCORD) and snap->image )
if ((function == MODECT or function == MOCORD) and snap->image)
ref_image.Assign(*(snap->image));
}// else
if (signal) {
if (snap->image or (snap->codec_type == AVMEDIA_TYPE_VIDEO)) {
if (snap->codec_type == AVMEDIA_TYPE_VIDEO) {
while (!snap->image and !snap->decoded) {
// Need to wait for the decoder thread.
Debug(1, "Waiting for decode");
packet_lock->wait();
if (!snap->image and snap->decoded) {
Debug(1, "No image but was decoded, giving up");
delete packet_lock;
return false;
}
} // end while ! decoded
struct timeval *timestamp = snap->timestamp;
if ( Active() and (function == MODECT or function == MOCORD) and snap->image ) {
if (Active() and (function == MODECT or function == MOCORD)) {
Debug(3, "signal and active and modect");
Event::StringSet zoneSet;
@ -1999,22 +2011,32 @@ bool Monitor::Analyse() {
);
// This gets a lock on the starting packet
ZMPacket *starting_packet = packetqueue.get_packet(start_it);
ZMLockedPacket *starting_packet_lock = nullptr;
ZMPacket *starting_packet = nullptr;
if ( *start_it != snap_it ) {
starting_packet_lock = packetqueue.get_packet(start_it);
if (!starting_packet_lock) return false;
starting_packet = starting_packet_lock->packet_;
} else {
starting_packet = snap;
}
event = new Event(this, *(starting_packet->timestamp), "Continuous", noteSetMap);
// Write out starting packets, do not modify packetqueue it will garbage collect itself
while ( starting_packet and (*start_it) != snap_it ) {
while ( starting_packet and ((*start_it) != snap_it) ) {
event->AddPacket(starting_packet);
// Have added the packet, don't want to unlock it until we have locked the next
packetqueue.increment_it(start_it);
if ( (*start_it) == snap_it ) {
starting_packet->unlock();
if (starting_packet_lock) delete starting_packet_lock;
break;
}
ZMPacket *p = packetqueue.get_packet(start_it);
starting_packet->unlock();
starting_packet = p;
ZMLockedPacket *lp = packetqueue.get_packet(start_it);
delete starting_packet_lock;
starting_packet_lock = lp;
starting_packet = lp->packet_;
}
packetqueue.free_it(start_it);
delete start_it;
@ -2030,9 +2052,7 @@ bool Monitor::Analyse() {
for ( int i=0; i < n_zones; i++ ) {
if ( zones[i]->Alarmed() ) {
alarm_cause += std::string(zones[i]->Label());
if ( i < n_zones-1 ) {
alarm_cause += ",";
}
if (i < n_zones-1) alarm_cause += ",";
}
}
alarm_cause = cause+" "+alarm_cause;
@ -2085,7 +2105,16 @@ bool Monitor::Analyse() {
snap_it,
(pre_event_count > alarm_frame_count ? pre_event_count : alarm_frame_count)
);
ZMPacket *starting_packet = *(*start_it);
ZMLockedPacket *starting_packet_lock = nullptr;
ZMPacket *starting_packet = nullptr;
if ( *start_it != snap_it ) {
starting_packet_lock = packetqueue.get_packet(start_it);
if (!starting_packet_lock) return false;
starting_packet = starting_packet_lock->packet_;
} else {
starting_packet = snap;
}
event = new Event(this, *(starting_packet->timestamp), cause, noteSetMap);
shared_data->last_event_id = event->Id();
@ -2099,12 +2128,13 @@ bool Monitor::Analyse() {
packetqueue.increment_it(start_it);
if ( (*start_it) == snap_it ) {
starting_packet->unlock();
if (starting_packet_lock) delete starting_packet_lock;
break;
}
ZMPacket *p = packetqueue.get_packet(start_it);
starting_packet->unlock();
starting_packet = p;
ZMLockedPacket *lp = packetqueue.get_packet(start_it);
delete starting_packet_lock;
starting_packet_lock = lp;
starting_packet = lp->packet_;
}
packetqueue.free_it(start_it);
delete start_it;
@ -2255,27 +2285,9 @@ bool Monitor::Analyse() {
} // end if ( trigger_data->trigger_state != TRIGGER_OFF )
if (event) event->AddPacket(snap);
#if 0
if (snap->packet.stream_index == video_stream_id) {
if (video_fifo) {
if ( snap->keyframe ) {
// avcodec strips out important nals that describe the stream and
// stick them in extradata. Need to send them along with keyframes
AVStream *stream = camera->getVideoStream();
video_fifo->write(
static_cast<unsigned char *>(stream->codecpar->extradata),
stream->codecpar->extradata_size);
}
video_fifo->writePacket(*snap);
}
} else if (snap->packet.stream_index == audio_stream_id) {
if (audio_fifo)
audio_fifo->writePacket(*snap);
}
#endif
// popPacket will have placed a second lock on snap, so release it here.
snap->unlock();
delete packet_lock;
if ( snap->image_index > 0 ) {
// Only do these if it's a video packet.
@ -2507,7 +2519,6 @@ int Monitor::Capture() {
gettimeofday(packet->timestamp, nullptr);
shared_data->zmc_heartbeat_time = packet->timestamp->tv_sec;
Image* capture_image = image_buffer[index].image;
int captureResult = 0;
if ( deinterlacing_value == 4 ) {
@ -2536,7 +2547,7 @@ int Monitor::Capture() {
Rgb signalcolor;
/* HTML colour code is actually BGR in memory, we want RGB */
signalcolor = rgb_convert(signal_check_colour, ZM_SUBPIX_ORDER_BGR);
capture_image = new Image(width, height, camera->Colours(), camera->SubpixelOrder());
Image *capture_image = new Image(width, height, camera->Colours(), camera->SubpixelOrder());
capture_image->Fill(signalcolor);
shared_data->signal = false;
shared_data->last_write_index = index;
@ -2592,6 +2603,7 @@ int Monitor::Capture() {
return 1;
} // end if audio
#if 0
if ( !packet->image ) {
if ( packet->packet.size and !packet->in_frame ) {
if ( !decoding_enabled ) {
@ -2678,6 +2690,7 @@ int Monitor::Capture() {
shared_data->signal = ( capture_image and signal_check_points ) ? CheckSignal(capture_image) : true;
shared_data->last_write_index = index;
shared_data->last_write_time = packet->timestamp->tv_sec;
#endif
image_count++;
// Will only be queued if there are iterators allocated in the queue.
@ -2710,6 +2723,101 @@ int Monitor::Capture() {
return captureResult;
} // end Monitor::Capture
bool Monitor::Decode() {
if (!decoder_it) decoder_it = packetqueue.get_video_it(true);
ZMLockedPacket *packet_lock = packetqueue.get_packet(decoder_it);
if (!packet_lock) return false;
ZMPacket *packet = packet_lock->packet_;
packetqueue.increment_it(decoder_it);
if (packet->image or (packet->codec_type != AVMEDIA_TYPE_VIDEO)) {
delete packet_lock;
return true; // Don't need decode
}
int ret = 0;
if (packet->packet.size and !packet->in_frame) {
// Allocate the image first so that it can be used by hwaccel
// We don't actually care about camera colours, pixel order etc. We care about the desired settings
//
//capture_image = packet->image = new Image(width, height, camera->Colours(), camera->SubpixelOrder());
ret = packet->decode(camera->getVideoCodecContext());
} else {
Debug(1, "No packet.size(%d) or packet->in_frame(%p). Not decoding", packet->packet.size, packet->in_frame);
}
Image* capture_image = nullptr;
unsigned int index = image_count % image_buffer_count;
if (ret > 0) {
if (packet->in_frame and !packet->image) {
packet->image = new Image(camera_width, camera_height, camera->Colours(), camera->SubpixelOrder());
packet->get_image();
}
if (packet->image) {
capture_image = packet->image;
/* Deinterlacing */
if ( deinterlacing_value ) {
if ( deinterlacing_value == 1 ) {
capture_image->Deinterlace_Discard();
} else if ( deinterlacing_value == 2 ) {
capture_image->Deinterlace_Linear();
} else if ( deinterlacing_value == 3 ) {
capture_image->Deinterlace_Blend();
} else if ( deinterlacing_value == 4 ) {
capture_image->Deinterlace_4Field(next_buffer.image, (deinterlacing>>8)&0xff);
} else if ( deinterlacing_value == 5 ) {
capture_image->Deinterlace_Blend_CustomRatio((deinterlacing>>8)&0xff);
}
}
if ( orientation != ROTATE_0 ) {
Debug(2, "Doing rotation");
switch ( orientation ) {
case ROTATE_0 :
// No action required
break;
case ROTATE_90 :
case ROTATE_180 :
case ROTATE_270 :
capture_image->Rotate((orientation-1)*90);
break;
case FLIP_HORI :
case FLIP_VERT :
capture_image->Flip(orientation==FLIP_HORI);
break;
}
} // end if have rotation
if (privacy_bitmask) {
Debug(1, "Applying privacy");
capture_image->MaskPrivacy(privacy_bitmask);
}
if (config.timestamp_on_capture) {
Debug(1, "Timestampprivacy");
TimestampImage(packet->image, packet->timestamp);
}
if (!ref_image.Buffer()) {
// First image, so assign it to ref image
Debug(1, "Assigning ref image %dx%d size: %d", width, height, camera->ImageSize());
ref_image.Assign(width, height, camera->Colours(), camera->SubpixelOrder(),
packet->image->Buffer(), camera->ImageSize());
}
image_buffer[index].image->Assign(*(packet->image));
*(image_buffer[index].timestamp) = *(packet->timestamp);
} // end if have image
} // end if did decoding
packet->decoded = true;
delete packet_lock;
shared_data->signal = ( capture_image and signal_check_points ) ? CheckSignal(capture_image) : true;
shared_data->last_write_index = index;
shared_data->last_write_time = packet->timestamp->tv_sec;
return true;
} // end bool Monitor::Decode()
void Monitor::TimestampImage(Image *ts_image, const struct timeval *ts_time) const {
if ( !label_format[0] )
return;
@ -3026,6 +3134,7 @@ int Monitor::PrimeCapture() {
audio_fifo = new Fifo(shared_data->audio_fifo_path, true);
}
} // end if rtsp_server
if (decoding_enabled) decoder = new DecoderThread(this);
} else {
Debug(2, "Failed to prime %d", ret);
}
@ -3035,6 +3144,8 @@ int Monitor::PrimeCapture() {
int Monitor::PreCapture() const { return camera->PreCapture(); }
int Monitor::PostCapture() const { return camera->PostCapture(); }
int Monitor::Close() {
decoder->Stop();
delete decoder;
std::lock_guard<std::mutex> lck(event_mutex);
if (event) {
Info("%s: image_count:%d - Closing event %" PRIu64 ", shutting down", name, image_count, event->Id());
@ -3051,25 +3162,25 @@ Monitor::Orientation Monitor::getOrientation() const { return orientation; }
// So this should be done as the first task in the analysis thread startup.
// This function is deprecated.
void Monitor::get_ref_image() {
ZMPacket *snap = nullptr;
ZMLockedPacket *snap_lock = nullptr;
if ( !analysis_it )
analysis_it = packetqueue.get_video_it(true);
while (
(
!( snap = packetqueue.get_packet(analysis_it))
!( snap_lock = packetqueue.get_packet(analysis_it))
or
( snap->codec_type != AVMEDIA_TYPE_VIDEO )
( snap_lock->packet_->codec_type != AVMEDIA_TYPE_VIDEO )
or
! snap->image
! snap_lock->packet_->image
)
and !zm_terminate) {
Debug(1, "Waiting for capture daemon lastwriteindex(%d) lastwritetime(%d)",
shared_data->last_write_index, shared_data->last_write_time);
if ( snap and ! snap->image ) {
snap->unlock();
if ( snap_lock and ! snap_lock->packet_->image ) {
delete snap_lock;
// can't analyse it anyways, incremement
packetqueue.increment_it(analysis_it);
}
@ -3078,6 +3189,7 @@ void Monitor::get_ref_image() {
if ( zm_terminate )
return;
ZMPacket *snap = snap_lock->packet_;
Debug(1, "get_ref_image: packet.stream %d ?= video_stream %d, packet image id %d packet image %p",
snap->packet.stream_index, video_stream_id, snap->image_index, snap->image );
// Might not have been decoded yet FIXME
@ -3088,7 +3200,7 @@ void Monitor::get_ref_image() {
} else {
Debug(2, "Have no ref image about to unlock");
}
snap->unlock();
delete snap_lock;
}
std::vector<Group *> Monitor::Groups() {

View File

@ -22,6 +22,7 @@
#include "zm_define.h"
#include "zm_camera.h"
#include "zm_decoder_thread.h"
#include "zm_event.h"
#include "zm_fifo.h"
#include "zm_image.h"
@ -375,6 +376,8 @@ protected:
VideoStore *videoStore;
PacketQueue packetqueue;
packetqueue_iterator *analysis_it;
packetqueue_iterator *decoder_it;
DecoderThread *decoder;
int n_zones;
@ -549,6 +552,7 @@ public:
//unsigned int DetectBlack( const Image &comp_image, Event::StringSet &zoneSet );
bool CheckSignal( const Image *image );
bool Analyse();
bool Decode();
void DumpImage( Image *dump_image ) const;
void TimestampImage( Image *ts_image, const struct timeval *ts_time ) const;
bool closeEvent();

View File

@ -37,7 +37,8 @@ ZMPacket::ZMPacket() :
score(-1),
codec_type(AVMEDIA_TYPE_UNKNOWN),
image_index(-1),
codec_imgsize(0)
codec_imgsize(0),
decoded(0)
{
av_init_packet(&packet);
packet.size = 0; // So we can detect whether it has been filled.

View File

@ -21,6 +21,7 @@
#define ZM_PACKET_H
#include "zm_logger.h"
#include <condition_variable>
#include <mutex>
extern "C" {
@ -36,7 +37,9 @@ class Image;
class ZMPacket {
public:
std::recursive_mutex mutex;
std::mutex mutex_;
std::condition_variable condition_;
int keyframe;
AVStream *stream; // Input stream
AVPacket packet; // Input packet, undecoded
@ -51,6 +54,7 @@ class ZMPacket {
int image_index;
int codec_imgsize;
int64_t pts; // pts in the packet can be in another time base. This MUST be in AV_TIME_BASE_Q
bool decoded;
public:
AVPacket *av_packet() { return &packet; }
@ -65,21 +69,45 @@ class ZMPacket {
explicit ZMPacket(ZMPacket &packet);
ZMPacket();
~ZMPacket();
void lock() {
Debug(4,"Locking packet %d", this->image_index);
mutex.lock();
Debug(4,"packet %d locked", this->image_index);
};
bool trylock() {
Debug(4,"TryLocking packet %d", this->image_index);
return mutex.try_lock();
};
void unlock() {
Debug(4,"packet %d unlocked", this->image_index);
mutex.unlock();
};
AVFrame *get_out_frame( const AVCodecContext *ctx );
AVFrame *get_out_frame(const AVCodecContext *ctx);
int get_codec_imgsize() { return codec_imgsize; };
};
class ZMLockedPacket {
public:
ZMPacket *packet_;
std::unique_lock<std::mutex> lck_;
ZMLockedPacket(ZMPacket *p) :
packet_(p),
lck_(packet_->mutex_, std::defer_lock) {
}
~ZMLockedPacket() {
unlock();
}
void lock() {
Debug(4, "locking packet %d", packet_->image_index);
lck_.lock();
Debug(4, "packet %d locked", packet_->image_index);
};
bool trylock() {
Debug(4, "TryLocking packet %d", packet_->image_index);
return lck_.try_lock();
};
void unlock() {
Debug(4, "packet %d unlocked", packet_->image_index);
lck_.unlock();
packet_->condition_.notify_all();
};
void wait() {
Debug(4, "packet %d waiting", packet_->image_index);
// We already have a lock, but it's a recursive mutex.. so this may be ok
packet_->condition_.wait(lck_);
}
};
#endif /* ZM_PACKET_H */

View File

@ -144,17 +144,19 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
// First packet is special because we know it is a video keyframe and only need to check for lock
ZMPacket *zm_packet = *it;
if ( zm_packet->trylock() ) {
ZMLockedPacket *lp = new ZMLockedPacket(zm_packet);
if ( lp->trylock() ) {
++it;
zm_packet->unlock();
delete lp;
// Since we have many packets in the queue, we should NOT be pointing at end so don't need to test for that
while ( *it != add_packet ) {
zm_packet = *it;
if ( !zm_packet->trylock() ) {
lp = new ZMLockedPacket(zm_packet);
if ( !lp->trylock() ) {
break;
}
zm_packet->unlock();
delete lp;
if ( is_there_an_iterator_pointing_to_packet(zm_packet) ) {
Warning("Found iterator at beginning of queue. Some thread isn't keeping up");
@ -200,7 +202,7 @@ void PacketQueue::clearPackets(ZMPacket *add_packet) {
return;
} // end voidPacketQueue::clearPackets(ZMPacket* zm_packet)
ZMPacket* PacketQueue::popPacket( ) {
ZMLockedPacket* PacketQueue::popPacket( ) {
Debug(4, "pktQueue size %d", pktQueue.size());
if ( pktQueue.empty() ) {
return nullptr;
@ -222,14 +224,15 @@ ZMPacket* PacketQueue::popPacket( ) {
}
} // end foreach iterator
zm_packet->lock();
ZMLockedPacket *lp = new ZMLockedPacket (zm_packet);
lp->lock();
pktQueue.pop_front();
packet_counts[zm_packet->packet.stream_index] -= 1;
mutex.unlock();
return zm_packet;
return lp;
} // popPacket
@ -325,9 +328,9 @@ void PacketQueue::clear() {
while (!pktQueue.empty()) {
ZMPacket *packet = pktQueue.front();
// Someone might have this packet, but not for very long and since we have locked the queue they won't be able to get another one
packet->lock();
ZMLockedPacket lp(packet);
lp.lock();
pktQueue.pop_front();
packet->unlock();
delete packet;
}
@ -452,7 +455,7 @@ int PacketQueue::packet_count(int stream_id) {
// Returns a packet. Packet will be locked
ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
ZMLockedPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
if ( deleting or zm_terminate )
return nullptr;
@ -477,14 +480,17 @@ ZMPacket *PacketQueue::get_packet(packetqueue_iterator *it) {
Error("Null p?!");
return nullptr;
}
ZMLockedPacket *lp = new ZMLockedPacket(p);
Debug(3, "get_packet %p image_index: %d, about to lock packet", p, p->image_index);
while ( !(zm_terminate or deleting) and !p->trylock() ) {
Debug(3, "waiting. Queue size %d it == end? %d", pktQueue.size(), ( *it == pktQueue.end() ) );
while (!(zm_terminate or deleting) and !lp->trylock()) {
Debug(3, "waiting on index %d. Queue size %d it == end? %d",
p->image_index, pktQueue.size(), ( *it == pktQueue.end() ) );
ZM_DUMP_PACKET(p->packet, "");
condition.wait(lck);
}
Debug(2, "Locked packet, unlocking packetqueue mutex");
return p;
} // end ZMPacket *PacketQueue::get_packet(it)
return lp;
} // end ZMLockedPacket *PacketQueue::get_packet(it)
bool PacketQueue::increment_it(packetqueue_iterator *it) {
Debug(2, "Incrementing %p, queue size %d, end? %d", it, pktQueue.size(), ((*it) == pktQueue.end()));
@ -612,14 +618,14 @@ packetqueue_iterator * PacketQueue::get_video_it(bool wait) {
while ( *it != pktQueue.end() ) {
ZMPacket *zm_packet = *(*it);
if ( !zm_packet ) {
if (!zm_packet) {
Error("Null zmpacket in queue!?");
free_it(it);
return nullptr;
}
Debug(1, "Packet keyframe %d for stream %d, so returning the it to it",
zm_packet->keyframe, zm_packet->packet.stream_index);
if ( zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id ) ) {
if (zm_packet->keyframe and ( zm_packet->packet.stream_index == video_stream_id )) {
Debug(1, "Found a keyframe for stream %d, so returning the it to it", video_stream_id);
return it;
}
@ -627,7 +633,7 @@ packetqueue_iterator * PacketQueue::get_video_it(bool wait) {
}
Debug(1, "DIdn't Found a keyframe for stream %d, so returning the it to it", video_stream_id);
return it;
}
} // get video_it
void PacketQueue::free_it(packetqueue_iterator *it) {
for (

View File

@ -24,6 +24,7 @@
#include <mutex>
class ZMPacket;
class ZMLockedPacket;
typedef std::list<ZMPacket *>::iterator packetqueue_iterator;
@ -52,7 +53,7 @@ class PacketQueue {
void setMaxVideoPackets(int p);
bool queuePacket(ZMPacket* packet);
ZMPacket * popPacket();
ZMLockedPacket * popPacket();
bool popVideoPacket(ZMPacket* packet);
bool popAudioPacket(ZMPacket* packet);
unsigned int clear(unsigned int video_frames_to_keep, int stream_id);
@ -68,7 +69,7 @@ class PacketQueue {
bool increment_it(packetqueue_iterator *it);
bool increment_it(packetqueue_iterator *it, int stream_id);
ZMPacket *get_packet(packetqueue_iterator *);
ZMLockedPacket *get_packet(packetqueue_iterator *);
packetqueue_iterator *get_video_it(bool wait);
packetqueue_iterator *get_stream_it(int stream_id);
void free_it(packetqueue_iterator *);

View File

@ -1211,29 +1211,6 @@ int VideoStore::writeAudioFramePacket(ZMPacket *zm_packet) {
return 0;
} // end int VideoStore::writeAudioFramePacket(AVPacket *ipkt)
int VideoStore::write_packets(PacketQueue &queue) {
// Need to write out all the frames from the last keyframe?
// No... need to write out all frames from when the event began. Due to PreEventFrames, this could be more than since the last keyframe.
unsigned int packet_count = 0;
ZMPacket *queued_packet;
while ( ( queued_packet = queue.popPacket() ) ) {
AVPacket *avp = queued_packet->av_packet();
packet_count += 1;
//Write the packet to our video store
Debug(2, "Writing queued packet stream: %d KEY %d, remaining (%d)",
avp->stream_index, avp->flags & AV_PKT_FLAG_KEY, queue.size() );
int ret = this->writePacket( queued_packet );
if ( ret < 0 ) {
//Less than zero and we skipped a frame
}
delete queued_packet;
} // end while packets in the packetqueue
Debug(2, "Wrote %d queued packets", packet_count );
return packet_count;
} // end int VideoStore::write_packets( PacketQueue &queue ) {
int VideoStore::write_packet(AVPacket *pkt, AVStream *stream) {
pkt->pos = -1;
pkt->stream_index = stream->index;