use the timeout version of read_into so that we don't stay blocked while we have been told to exit. If getNextFrame returns -1 sleep for a second.

This commit is contained in:
Isaac Connor 2021-03-05 14:18:12 -05:00
parent 07339e443b
commit ebd29a3cb9
1 changed files with 21 additions and 15 deletions

View File

@ -46,6 +46,7 @@ ZoneMinderFifoSource::~ZoneMinderFifoSource() {
m_captureQueue.pop_front();
delete f;
}
Debug(1, "Deleting Fifo Source done");
pthread_mutex_destroy(&m_mutex);
}
@ -54,7 +55,8 @@ void* ZoneMinderFifoSource::thread() {
stop = 0;
while (!stop) {
getNextFrame();
if ( getNextFrame() < 0 )
sleep(1);
}
return nullptr;
}
@ -119,6 +121,7 @@ void ZoneMinderFifoSource::deliverFrame() {
// FrameSource callback on read event
void ZoneMinderFifoSource::incomingPacketHandler() {
Debug(1, "incomingPacketHandler");
if (this->getNextFrame() <= 0) {
handleClosure(this);
}
@ -126,7 +129,10 @@ void ZoneMinderFifoSource::incomingPacketHandler() {
// read from monitor
int ZoneMinderFifoSource::getNextFrame() {
if (zm_terminate) return -1;
if (zm_terminate or stop) {
Debug(1, "Terminating %d %d", zm_terminate, stop);
return -1;
}
if (m_fd == -1) {
Debug(1, "Opening fifo %s", m_fifo.c_str());
@ -137,7 +143,7 @@ int ZoneMinderFifoSource::getNextFrame() {
}
}
int bytes_read = m_buffer.read_into(m_fd, 4096);
int bytes_read = m_buffer.read_into(m_fd, 4096, {1,0});
if (bytes_read == 0) {
Debug(3, "No bytes read");
sleep(1);
@ -152,7 +158,6 @@ int ZoneMinderFifoSource::getNextFrame() {
Debug(4, "%s bytes read %d bytes, buffer size %u", m_fifo.c_str(), bytes_read, m_buffer.size());
while (m_buffer.size()) {
unsigned int data_size = 0;
int64_t pts;
unsigned char *header_end = nullptr;
@ -164,7 +169,7 @@ int ZoneMinderFifoSource::getNextFrame() {
if (!header_end) {
// Must not have enough data. So... keep all.
Debug(1, "Didn't find newline");
return -1;
return 0;
}
unsigned int header_size = header_end-header_start;
@ -176,8 +181,8 @@ int ZoneMinderFifoSource::getNextFrame() {
if (!content_length_ptr) {
Debug(1, "Didn't find space delineating size in %s", header);
m_buffer.consume(header_start-m_buffer.head() + 2);
delete header;
return -1;
delete[] header;
return 0;
}
*content_length_ptr = '\0';
content_length_ptr ++;
@ -185,8 +190,8 @@ int ZoneMinderFifoSource::getNextFrame() {
if (!pts_ptr) {
m_buffer.consume(header_start-m_buffer.head() + 2);
Debug(1, "Didn't find space delineating pts in %s", header);
delete header;
return -1;
delete[] header;
return 0;
}
*pts_ptr = '\0';
pts_ptr ++;
@ -194,8 +199,8 @@ int ZoneMinderFifoSource::getNextFrame() {
pts = strtoll(pts_ptr, nullptr, 10);
delete[] header;
} else {
Debug(1, "ZM header not found.");
return -1;
Debug(1, "ZM header not found %s.",m_buffer.head());
return 0;
}
Debug(4, "ZM Packet size %u pts %" PRId64, data_size, pts);
if (header_start != m_buffer) {
@ -209,9 +214,11 @@ int ZoneMinderFifoSource::getNextFrame() {
int bytes_needed = data_size - (m_buffer.size() - header_size);
if (bytes_needed > 0) {
Debug(4, "Need another %d bytes. Trying to read them", bytes_needed);
int bytes_read = m_buffer.read_into(m_fd, bytes_needed);
if ( bytes_read != bytes_needed )
int bytes_read = m_buffer.read_into(m_fd, bytes_needed, {1,0});
if ( bytes_read != bytes_needed ) {
Debug(4, "Failed to read another %d bytes.", bytes_needed);
return -1;
}
}
unsigned char *packet_start = m_buffer.head() + header_size;
@ -236,12 +243,11 @@ int ZoneMinderFifoSource::getNextFrame() {
if (m_captureQueue.size() > 25) { // 1 sec at 25 fps
NAL_Frame * f = m_captureQueue.front();
while (m_captureQueue.size() and ((tv.tv_sec - f->m_timestamp.tv_sec) > 2)) {
Debug(1, "Deleting Front NAL is %d seconds old", (tv.tv_sec - f->m_timestamp.tv_sec));
m_captureQueue.pop_front();
delete f;
f = m_captureQueue.front();
}
Debug(1, "Done clearing");
Debug(3, "Done clearing. Queue size is now %d", m_captureQueue.size());
}
m_captureQueue.push_back(frame);
pthread_mutex_unlock(&m_mutex);