From fdf1fbd4972b112af130e39a959789a71b88d40e Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Sat, 27 Feb 2021 12:26:19 -0500 Subject: [PATCH] Add a fifo version of the rtsp server --- src/zm_rtsp_server.cpp | 226 +++++++++++++++++ src/zm_rtsp_server_adts_fifo_source.cpp | 43 ++++ src/zm_rtsp_server_adts_fifo_source.h | 66 +++++ src/zm_rtsp_server_fifo_source.cpp | 215 ++++++++++++++++ src/zm_rtsp_server_fifo_source.h | 82 ++++++ src/zm_rtsp_server_fifo_video_source.cpp | 20 ++ src/zm_rtsp_server_fifo_video_source.h | 32 +++ src/zm_rtsp_server_h264_fifo_source.cpp | 234 ++++++++++++++++++ src/zm_rtsp_server_h264_fifo_source.h | 99 ++++++++ ...zm_rtsp_server_server_media_subsession.cpp | 37 ++- src/zm_rtsp_server_server_media_subsession.h | 3 +- src/zm_rtsp_server_thread.cpp | 62 ++++- src/zm_rtsp_server_thread.h | 6 +- ...server_unicast_server_media_subsession.cpp | 5 +- 14 files changed, 1104 insertions(+), 26 deletions(-) create mode 100644 src/zm_rtsp_server.cpp create mode 100644 src/zm_rtsp_server_adts_fifo_source.cpp create mode 100644 src/zm_rtsp_server_adts_fifo_source.h create mode 100644 src/zm_rtsp_server_fifo_source.cpp create mode 100644 src/zm_rtsp_server_fifo_source.h create mode 100644 src/zm_rtsp_server_fifo_video_source.cpp create mode 100644 src/zm_rtsp_server_fifo_video_source.h create mode 100644 src/zm_rtsp_server_h264_fifo_source.cpp create mode 100644 src/zm_rtsp_server_h264_fifo_source.h diff --git a/src/zm_rtsp_server.cpp b/src/zm_rtsp_server.cpp new file mode 100644 index 000000000..d31ec50fe --- /dev/null +++ b/src/zm_rtsp_server.cpp @@ -0,0 +1,226 @@ +// +// ZoneMinder RTSP Daemon +// Copyright (C) 2021 Isaac Connor +// +// This program is free software; you can redistribute it and/or +// modify it under the terms of the GNU General Public License +// as published by the Free Software Foundation; either version 2 +// of the License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +// + +/* + +=head1 NAME + +zm_rtsp_server - The ZoneMinder Server + +=head1 SYNOPSIS + + zmc -m + zmc --monitor + zmc -h + zmc --help + zmc -v + zmc --version + +=head1 DESCRIPTION + +This binary's job is to connect to fifo's provided by local zmc processes +and provide that stream over rtsp + +=head1 OPTIONS + + -m, --monitor_id - ID of a monitor to stream + -h, --help - Display usage information + -v, --version - Print the installed version of ZoneMinder + +=cut + +*/ + +#include "zm.h" +#include "zm_db.h" +#include "zm_define.h" +#include "zm_monitor.h" +#include "zm_rtsp_server_thread.h" +#include "zm_rtsp_server_fifo_video_source.h" +#include "zm_signal.h" +#include "zm_time.h" +#include "zm_utils.h" +#include +#include + +void Usage() { + fprintf(stderr, "zm_rtsp_server -m \n"); + + fprintf(stderr, "Options:\n"); + fprintf(stderr, " -m, --monitor : We default to all monitors use this to specify just one\n"); + fprintf(stderr, " -h, --help : This screen\n"); + fprintf(stderr, " -v, --version : Report the installed version of ZoneMinder\n"); + exit(0); +} + +int main(int argc, char *argv[]) { + self = argv[0]; + + srand(getpid() * time(nullptr)); + + int monitor_id = -1; + + static struct option long_options[] = { + {"monitor", 1, nullptr, 'm'}, + {"help", 0, nullptr, 'h'}, + {"version", 0, nullptr, 'v'}, + {nullptr, 0, nullptr, 0} + }; + + while (1) { + int option_index = 0; + + int c = getopt_long(argc, argv, "m:h:v", long_options, &option_index); + if ( c == -1 ) { + break; + } + + switch (c) { + case 'm': + monitor_id = atoi(optarg); + break; + case 'h': + case '?': + Usage(); + break; + case 'v': + std::cout << ZM_VERSION << "\n"; + exit(0); + default: + // fprintf(stderr, "?? getopt returned character code 0%o ??\n", c); + break; + } + } + + if (optind < argc) { + fprintf(stderr, "Extraneous options, "); + while (optind < argc) + printf("%s ", argv[optind++]); + printf("\n"); + Usage(); + } + + const char *log_id_string = "zm_rtsp_server"; + ///std::string log_id_string = std::string("zm_rtsp_server"); + ///if ( monitor_id > 0 ) log_id_string += stringtf("_m%d", monitor_id); + + logInit(log_id_string); + zmLoadStaticConfig(); + zmDbConnect(); + zmLoadDBConfig(); + logInit(log_id_string); + + hwcaps_detect(); + + std::string where = "`Function` != 'None' AND `RTSPServer` != false"; + if (staticConfig.SERVER_ID) + where += stringtf(" AND `ServerId`=%d", staticConfig.SERVER_ID); + if (monitor_id > 0) + where += stringtf(" AND `Id`=%d", monitor_id); + + std::vector> monitors = Monitor::LoadMonitors(where, Monitor::QUERY); + + if (monitors.empty()) { + Error("No monitors found"); + exit(-1); + } else { + Debug(2, "%d monitors loaded", monitors.size()); + } + + Info("Starting RTSP Server version %s", ZM_VERSION); + zmSetDefaultHupHandler(); + zmSetDefaultTermHandler(); + zmSetDefaultDieHandler(); + + sigset_t block_set; + sigemptyset(&block_set); + + sigaddset(&block_set, SIGHUP); + sigaddset(&block_set, SIGUSR1); + sigaddset(&block_set, SIGUSR2); + + int result = 0; + + while (!zm_terminate) { + result = 0; + + for (const std::shared_ptr &monitor : monitors) { + monitor->LoadCamera(); + + while (!monitor->connect()) { + Warning("Couldn't connect to monitor %d", monitor->Id()); + sleep(1); + } + } // end foreach monitor + + RTSPServerThread ** rtsp_server_threads = nullptr; + if (config.min_rtsp_port) { + rtsp_server_threads = new RTSPServerThread *[monitors.size()]; + Debug(1, "Starting RTSP server because min_rtsp_port is set"); + } else { + Debug(1, "Not starting RTSP server because min_rtsp_port not set"); + exit(-1); + } + + for (size_t i = 0; i < monitors.size(); i++) { + rtsp_server_threads[i] = new RTSPServerThread(monitors[i]); + std::string streamname = monitors[i]->GetRTSPStreamName(); + ServerMediaSession *sms = rtsp_server_threads[i]->addSession(streamname); + ZoneMinderFifoVideoSource *video_source = static_cast(rtsp_server_threads[i]->addFifo(sms, monitors[i]->GetVideoFifo())); + video_source->setWidth(monitors[i]->Width()); + video_source->setHeight(monitors[i]->Height()); + FramedSource *audio_source = rtsp_server_threads[i]->addFifo(sms, monitors[i]->GetAudioFifo()); + rtsp_server_threads[i]->start(); + } + + while (!zm_terminate) { + sleep(1); + // What to do in here? Sleep mostly. Wait for reload commands maybe watch for dead monitors. + if ((result < 0) or zm_reload) { + // Failure, try reconnecting + break; + } + } // end while ! zm_terminate and connected + + for (size_t i = 0; i < monitors.size(); i++) { + rtsp_server_threads[i]->stop(); + rtsp_server_threads[i]->join(); + delete rtsp_server_threads[i]; + rtsp_server_threads[i] = nullptr; + } + + delete[] rtsp_server_threads; + rtsp_server_threads = nullptr; + + if (zm_reload) { + for (std::shared_ptr &monitor : monitors) { + monitor->Reload(); + } + logTerm(); + logInit(log_id_string); + zm_reload = false; + } // end if zm_reload + } // end while ! zm_terminate outer connection loop + + Image::Deinitialise(); + logTerm(); + zmDbClose(); + + return zm_terminate ? 0 : result; +} diff --git a/src/zm_rtsp_server_adts_fifo_source.cpp b/src/zm_rtsp_server_adts_fifo_source.cpp new file mode 100644 index 000000000..50677b519 --- /dev/null +++ b/src/zm_rtsp_server_adts_fifo_source.cpp @@ -0,0 +1,43 @@ +/* --------------------------------------------------------------------------- +** +** ADTS_FifoSource.cpp +** +** ADTS Live555 source +** +** -------------------------------------------------------------------------*/ + +#include "zm_rtsp_server_adts_fifo_source.h" + +#include + +#if HAVE_RTSP_SERVER + +static unsigned const samplingFrequencyTable[16] = { + 96000, 88200, 64000, 48000, + 44100, 32000, 24000, 22050, + 16000, 12000, 11025, 8000, + 7350, 0, 0, 0 +}; +// --------------------------------- +// ADTS ZoneMinder FramedSource +// --------------------------------- +// +ADTS_ZoneMinderFifoSource::ADTS_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize + ) + : + ZoneMinderFifoSource(env, fifo, queueSize), + samplingFrequencyIndex(0), + channels(1) +{ + std::ostringstream os; + os << + "profile-level-id=1;" + "mode=AAC-hbr;sizelength=13;indexlength=3;" + "indexdeltalength=3" + << "\r\n"; + m_auxLine.assign(os.str()); +} +#endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_adts_fifo_source.h b/src/zm_rtsp_server_adts_fifo_source.h new file mode 100644 index 000000000..7278f1be1 --- /dev/null +++ b/src/zm_rtsp_server_adts_fifo_source.h @@ -0,0 +1,66 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** ADTS_ZoneMinderFifoSource.h +** +** ADTS ZoneMinder live555 source +** +** -------------------------------------------------------------------------*/ + +#ifndef ZM_RTSP_SERVER_ADTS_FIFO_SOURCE_H +#define ZM_RTSP_SERVER_ADTS_FIFO_SOURCE_H + +#include "zm_config.h" +#include "zm_rtsp_server_fifo_source.h" + +#if HAVE_RTSP_SERVER +// --------------------------------- +// ADTS(AAC) ZoneMinder FramedSource +// --------------------------------- + +class ADTS_ZoneMinderFifoSource : public ZoneMinderFifoSource { + public: + static ADTS_ZoneMinderFifoSource* createNew( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize + ) { + return new ADTS_ZoneMinderFifoSource(env, fifo, queueSize); + }; + protected: + ADTS_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize + ); + + virtual ~ADTS_ZoneMinderFifoSource() {} + + /* + virtual unsigned char* extractFrame(unsigned char* frame, size_t& size, size_t& outsize); + virtual unsigned char* findMarker(unsigned char *frame, size_t size, size_t &length); + */ + public: + int samplingFrequency() { return 8000; //m_stream->codecpar->sample_rate; + }; + const char *configStr() { return config.c_str(); }; + int numChannels() { + //Debug(1, "this %p m_stream %p channels %d", + //this, m_stream, channels); + //Debug(1, "m_stream %p codecpar %p channels %d => %d", + //m_stream, m_stream->codecpar, m_stream->codecpar->channels, channels); + return 1; + //return channels; + //return m_stream->codecpar->channels; + } + + protected: + std::string config; + int samplingFrequencyIndex; + int channels; +}; +#endif // HAVE_RTSP_SERVER + +#endif // ZM_RTSP_SERVER_ADTS_FIFO_SOURCE_H diff --git a/src/zm_rtsp_server_fifo_source.cpp b/src/zm_rtsp_server_fifo_source.cpp new file mode 100644 index 000000000..7b25058ab --- /dev/null +++ b/src/zm_rtsp_server_fifo_source.cpp @@ -0,0 +1,215 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** +** ZoneMinder Live555 source +** +** -------------------------------------------------------------------------*/ + +#include "zm_rtsp_server_fifo_source.h" + +#include "zm_config.h" +#include "zm_logger.h" +#include "zm_rtsp_server_frame.h" +#include "zm_signal.h" + +#include +#include + +#if HAVE_RTSP_SERVER +ZoneMinderFifoSource::ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize + ) : + FramedSource(env), + m_fifo(fifo), + m_queueSize(queueSize), + m_fd(-1) +{ + m_eventTriggerId = envir().taskScheduler().createEventTrigger(ZoneMinderFifoSource::deliverFrameStub); + memset(&m_thid, 0, sizeof(m_thid)); + memset(&m_mutex, 0, sizeof(m_mutex)); + pthread_mutex_init(&m_mutex, nullptr); + pthread_create(&m_thid, nullptr, threadStub, this); + m_buffer_ptr = &m_buffer[0]; +} + +ZoneMinderFifoSource::~ZoneMinderFifoSource() { + stop = 1; + envir().taskScheduler().deleteEventTrigger(m_eventTriggerId); + pthread_join(m_thid, nullptr); + while ( m_captureQueue.size() ) { + NAL_Frame * f = m_captureQueue.front(); + m_captureQueue.pop_front(); + delete f; + } + + pthread_mutex_destroy(&m_mutex); +} + +// thread mainloop +void* ZoneMinderFifoSource::thread() { + stop = 0; + + while (!stop) { + getNextFrame(); + } + return nullptr; +} + +// getting FrameSource callback +void ZoneMinderFifoSource::doGetNextFrame() { + deliverFrame(); +} + +// stopping FrameSource callback +void ZoneMinderFifoSource::doStopGettingFrames() { + //stop = 1; + Debug(1, "ZoneMinderFifoSource::doStopGettingFrames"); + FramedSource::doStopGettingFrames(); +} + +// deliver frame to the sink +void ZoneMinderFifoSource::deliverFrame() { + if (!isCurrentlyAwaitingData()) { + Debug(4, "not awaiting data"); + return; + } + + pthread_mutex_lock(&m_mutex); + if (m_captureQueue.empty()) { + Debug(4, "Queue is empty"); + pthread_mutex_unlock(&m_mutex); + return; + } + + NAL_Frame *frame = m_captureQueue.front(); + m_captureQueue.pop_front(); + pthread_mutex_unlock(&m_mutex); + + fDurationInMicroseconds = 0; + fFrameSize = 0; + + unsigned int nal_size = frame->size(); + + if (nal_size > fMaxSize) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = nal_size - fMaxSize; + } else { + fFrameSize = nal_size; + } + Debug(2, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d", + frame->m_timestamp.tv_sec, frame->m_timestamp.tv_usec, + fFrameSize, + m_captureQueue.size() + ); + + fPresentationTime = frame->m_timestamp; + memcpy(fTo, frame->buffer(), fFrameSize); + + if (fFrameSize > 0) { + // send Frame to the consumer + FramedSource::afterGetting(this); + } + delete frame; +} // end void ZoneMinderFifoSource::deliverFrame() + +// FrameSource callback on read event +void ZoneMinderFifoSource::incomingPacketHandler() { + if (this->getNextFrame() <= 0) { + handleClosure(this); + } +} + +// read from monitor +int ZoneMinderFifoSource::getNextFrame() { + if (zm_terminate) return -1; + + if (m_fd == -1) { + Debug(1, "Opening fifo %s", m_fifo.c_str()); + m_fd = open(m_fifo.c_str(), O_RDONLY); + if (m_fd < 0) { + Error("Can't open %s: %s", m_fifo.c_str(), strerror(errno)); + return -1; + } + } + + int bytes_in_buffer = m_buffer_ptr - &m_buffer[0]; + int bytes_read = read(m_fd, m_buffer_ptr, BUFFER_SIZE-bytes_in_buffer); + if (bytes_read == 0) + return -1; + if (bytes_read < 0) { + Error("Problem during reading: %s", strerror(errno)); + ::close(m_fd); + m_fd = -1; + return -1; + } + bytes_in_buffer += bytes_read; + unsigned int bytes_remaining = bytes_in_buffer; + + timeval tv; + gettimeofday(&tv, nullptr); + + std::list< std::pair > framesList = this->splitFrames(m_buffer, bytes_remaining); + Debug(1, "Got %d frames, bytes remaining %d", framesList.size(), bytes_remaining); + + if ( bytes_remaining > 0 ) { + memmove(&m_buffer[0], &m_buffer[0] + ( bytes_in_buffer - bytes_remaining ), bytes_remaining); + m_buffer_ptr = &m_buffer[0] + bytes_remaining; + } else { + m_buffer_ptr = &m_buffer[0]; + } + + while (framesList.size()) { + std::pair nal = framesList.front(); + framesList.pop_front(); + + NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv); + + pthread_mutex_lock(&m_mutex); + if (m_captureQueue.size() > 10) { + NAL_Frame * f = m_captureQueue.front(); + while (m_captureQueue.size() and ((f->m_timestamp.tv_sec - tv.tv_sec) > 2)) { + m_captureQueue.pop_front(); + delete f; + f = m_captureQueue.front(); + } + } +#if 0 + while ( m_captureQueue.size() >= m_queueSize ) { + Debug(2, "Queue full dropping frame %d", m_captureQueue.size()); + NAL_Frame * f = m_captureQueue.front(); + m_captureQueue.pop_front(); + delete f; + } +#endif + m_captureQueue.push_back(frame); + pthread_mutex_unlock(&m_mutex); + + // post an event to ask to deliver the frame + envir().taskScheduler().triggerEvent(m_eventTriggerId, this); + } // end while we get frame from data + return 1; +} + +// split packet in frames +std::list< std::pair > ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { + std::list< std::pair > frameList; + if ( frame != nullptr ) { + frameList.push_back(std::pair(frame, frameSize)); + } + // We consume it all + frameSize = 0; + return frameList; +} + +// extract a frame +unsigned char* ZoneMinderFifoSource::extractFrame(unsigned char* frame, size_t& size, size_t& outsize) { + outsize = size; + size = 0; + return frame; +} +#endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_fifo_source.h b/src/zm_rtsp_server_fifo_source.h new file mode 100644 index 000000000..142946de2 --- /dev/null +++ b/src/zm_rtsp_server_fifo_source.h @@ -0,0 +1,82 @@ +/* --------------------------------------------------------------------------- +** +** FifoSource.h +** +** live555 source +** +** -------------------------------------------------------------------------*/ + +#ifndef ZM_RTSP_SERVER_FIFO_SOURCE_H +#define ZM_RTSP_SERVER_FIFO_SOURCE_H + +#include "zm_config.h" +#include "zm_define.h" +#include +#include +#include + +#if HAVE_RTSP_SERVER +#include + +class NAL_Frame; + +class ZoneMinderFifoSource: public FramedSource { + + public: + static ZoneMinderFifoSource* createNew( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize + ) { + return new ZoneMinderFifoSource(env, fifo, queueSize); + }; + std::string getAuxLine() { return m_auxLine; }; + int getWidth() { return m_width; }; + int getHeight() { return m_height; }; + int setWidth(int width) { return m_width=width; }; + int setHeight(int height) { return m_height=height; }; + + protected: + ZoneMinderFifoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize); + virtual ~ZoneMinderFifoSource(); + + protected: + static void* threadStub(void* clientData) { return ((ZoneMinderFifoSource*) clientData)->thread();}; + void* thread(); + static void deliverFrameStub(void* clientData) {((ZoneMinderFifoSource*) clientData)->deliverFrame();}; + void deliverFrame(); + static void incomingPacketHandlerStub(void* clientData, int mask) { ((ZoneMinderFifoSource*) clientData)->incomingPacketHandler(); }; + void incomingPacketHandler(); + int getNextFrame(); + void processFrame(char * frame, int frameSize, const timeval &ref); + void queueFrame(char * frame, int frameSize, const timeval &tv); + + // split packet in frames + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned &frameSize); + + // overide FramedSource + virtual void doGetNextFrame(); + virtual void doStopGettingFrames(); + virtual unsigned char *extractFrame(unsigned char *data, size_t& size, size_t& outsize); + + protected: + std::list m_captureQueue; + EventTriggerId m_eventTriggerId; + std::string m_fifo; + + int m_width; + int m_height; + unsigned int m_queueSize; + pthread_t m_thid; + pthread_mutex_t m_mutex; + std::string m_auxLine; + int stop; + + int m_fd; + #define BUFFER_SIZE 65536 + unsigned char m_buffer[BUFFER_SIZE]; + unsigned char *m_buffer_ptr; +}; +#endif // HAVE_RTSP_SERVER + +#endif // ZM_RTSP_SERVER_FIFO_SOURCE_H diff --git a/src/zm_rtsp_server_fifo_video_source.cpp b/src/zm_rtsp_server_fifo_video_source.cpp new file mode 100644 index 000000000..bcf1f8841 --- /dev/null +++ b/src/zm_rtsp_server_fifo_video_source.cpp @@ -0,0 +1,20 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** +** ZoneMinder Live555 source +** +** -------------------------------------------------------------------------*/ + +#include "zm_rtsp_server_fifo_video_source.h" + +#if HAVE_RTSP_SERVER +ZoneMinderFifoVideoSource::ZoneMinderFifoVideoSource( + UsageEnvironment& env, std::string fifo, unsigned int queueSize) : + ZoneMinderFifoSource(env,fifo,queueSize) +{ +} + +#endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_fifo_video_source.h b/src/zm_rtsp_server_fifo_video_source.h new file mode 100644 index 000000000..444d0ca04 --- /dev/null +++ b/src/zm_rtsp_server_fifo_video_source.h @@ -0,0 +1,32 @@ +/* --------------------------------------------------------------------------- +** +** FifoSource.h +** +** live555 source +** +** -------------------------------------------------------------------------*/ + +#ifndef ZM_RTSP_SERVER_FIFO_VIDEO_SOURCE_H +#define ZM_RTSP_SERVER_FIFO_VIDEO_SOURCE_H + +#include "zm_rtsp_server_fifo_source.h" + +#if HAVE_RTSP_SERVER + +class ZoneMinderFifoVideoSource: public ZoneMinderFifoSource { + + public: + int getWidth() { return m_width; }; + int getHeight() { return m_height; }; + int setWidth(int width) { return m_width=width; }; + int setHeight(int height) { return m_height=height; }; + + protected: + ZoneMinderFifoVideoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize); + + int m_width; + int m_height; +}; +#endif // HAVE_RTSP_SERVER + +#endif // ZM_RTSP_SERVER_FIFO_VIDEO_SOURCE_H diff --git a/src/zm_rtsp_server_h264_fifo_source.cpp b/src/zm_rtsp_server_h264_fifo_source.cpp new file mode 100644 index 000000000..cc8391f7c --- /dev/null +++ b/src/zm_rtsp_server_h264_fifo_source.cpp @@ -0,0 +1,234 @@ +/* --------------------------------------------------------------------------- +** +** H264_FifoSource.cpp +** +** H264 Live555 source +** +** -------------------------------------------------------------------------*/ + +#include "zm_rtsp_server_h264_fifo_source.h" + +#include "zm_config.h" +#include "zm_logger.h" +#include "zm_rtsp_server_frame.h" +#include +#include + +#if HAVE_RTSP_SERVER +// live555 +#include + +// --------------------------------- +// H264 ZoneMinder FramedSource +// --------------------------------- +// +H264_ZoneMinderFifoSource::H264_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker) + : H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker) +{ + // extradata appears to simply be the SPS and PPS NAL's + //this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size); +} + +// split packet into frames +std::list< std::pair > H264_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { + std::list< std::pair > frameList; + + size_t bufSize = frameSize; + size_t size = 0; + unsigned char* buffer = this->extractFrame(frame, bufSize, size); + bool updateAux = false; + while ( buffer != nullptr ) { + switch ( m_frameType & 0x1F ) { + case 7: + Debug(4, "SPS_Size: %d bufSize %d", size, bufSize); + m_sps.assign((char*)buffer, size); + updateAux = true; + break; + case 8: + Debug(4, "PPS_Size: %d bufSize %d", size, bufSize); + m_pps.assign((char*)buffer, size); + updateAux = true; + break; + case 5: + Debug(4, "IDR_Size: %d bufSize %d", size, bufSize); + if ( m_repeatConfig && !m_sps.empty() && !m_pps.empty() ) { + frameList.push_back(std::pair((unsigned char*)m_sps.c_str(), m_sps.size())); + frameList.push_back(std::pair((unsigned char*)m_pps.c_str(), m_pps.size())); + } + break; + default: + Debug(4, "Unknown frametype!? %d %d", m_frameType, m_frameType & 0x1F); + break; + } + + if ( updateAux and !m_sps.empty() and !m_pps.empty() ) { + u_int32_t profile_level_id = 0; + if ( m_sps.size() >= 4 ) profile_level_id = (m_sps[1]<<16)|(m_sps[2]<<8)|m_sps[3]; + + char* sps_base64 = base64Encode(m_sps.c_str(), m_sps.size()); + char* pps_base64 = base64Encode(m_pps.c_str(), m_pps.size()); + + std::ostringstream os; + os << "profile-level-id=" << std::hex << std::setw(6) << std::setfill('0') << profile_level_id; + os << ";sprop-parameter-sets=" << sps_base64 << "," << pps_base64; + os << "a=x-dimensions:" << m_width << "," << m_height << "\r\n"; + m_auxLine.assign(os.str()); + Debug(1, "auxLine: %s", m_auxLine.c_str()); + + delete [] sps_base64; + delete [] pps_base64; + } + frameList.push_back(std::pair(buffer, size)); + + buffer = this->extractFrame(&buffer[size], bufSize, size); + } // end while buffer + frameSize = bufSize; + return frameList; +} + +H265_ZoneMinderFifoSource::H265_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker) + : H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker) +{ + // extradata appears to simply be the SPS and PPS NAL's + // this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size); +} + +// split packet in frames +std::list< std::pair > +H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned frameSize) { + std::list< std::pair > frameList; + + size_t bufSize = frameSize; + size_t size = 0; + unsigned char* buffer = this->extractFrame(frame, bufSize, size); + while ( buffer != nullptr ) { + switch ((m_frameType&0x7E)>>1) { + case 32: + Debug(4, "VPS_Size: %d bufSize %d", size, bufSize); + m_vps.assign((char*)buffer,size); + break; + case 33: + Debug(4, "SPS_Size: %d bufSize %d", size, bufSize); + m_sps.assign((char*)buffer,size); + break; + case 34: + Debug(4, "PPS_Size: %d bufSize %d", size, bufSize); + m_pps.assign((char*)buffer,size); + break; + case 19: + case 20: + Debug(4, "IDR_Size: %d bufSize %d", size, bufSize); + if ( m_repeatConfig && !m_vps.empty() && !m_sps.empty() && !m_pps.empty() ) { + frameList.push_back(std::pair((unsigned char*)m_vps.c_str(), m_vps.size())); + frameList.push_back(std::pair((unsigned char*)m_sps.c_str(), m_sps.size())); + frameList.push_back(std::pair((unsigned char*)m_pps.c_str(), m_pps.size())); + } + break; + default: + Debug(4, "Unknown frametype!? %d %d", m_frameType, ((m_frameType & 0x7E) >> 1)); + break; + } + + if ( !m_vps.empty() && !m_sps.empty() && !m_pps.empty() ) { + char* vps_base64 = base64Encode(m_vps.c_str(), m_vps.size()); + char* sps_base64 = base64Encode(m_sps.c_str(), m_sps.size()); + char* pps_base64 = base64Encode(m_pps.c_str(), m_pps.size()); + + std::ostringstream os; + os << "sprop-vps=" << vps_base64; + os << ";sprop-sps=" << sps_base64; + os << ";sprop-pps=" << pps_base64; + os << "a=x-dimensions:" << m_width << "," << m_height << "\r\n"; + m_auxLine.assign(os.str()); + Debug(1, "Assigned auxLine to %s", m_auxLine.c_str()); + + delete [] vps_base64; + delete [] sps_base64; + delete [] pps_base64; + } + frameList.push_back(std::pair(buffer, size)); + + buffer = this->extractFrame(&buffer[size], bufSize, size); + } // end while buffer + if ( bufSize ) { + Debug(1, "%d bytes remaining", bufSize); + } + frameSize = bufSize; + return frameList; +} // end H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned frameSize) + +unsigned char * H26X_ZoneMinderFifoSource::findMarker( + unsigned char *frame, size_t size, size_t &length + ) { + //Debug(1, "findMarker %p %d", frame, size); + unsigned char *start = nullptr; + for ( size_t i = 0; i < size-2; i += 1 ) { + //Debug(1, "%d: %d %d %d", i, frame[i], frame[i+1], frame[i+2]); + if ( (frame[i] == 0) and (frame[i+1]) == 0 and (frame[i+2] == 1) ) { + if ( i and (frame[i-1] == 0) ) { + start = frame + i - 1; + length = sizeof(H264marker); + } else { + start = frame + i; + length = sizeof(H264shortmarker); + } + break; + } + } + return start; +} + +// extract a frame +unsigned char* H26X_ZoneMinderFifoSource::extractFrame(unsigned char* frame, size_t& size, size_t& outsize) { + unsigned char *outFrame = nullptr; + Debug(4, "ExtractFrame: %p %d", frame, size); + outsize = 0; + size_t markerLength = 0; + size_t endMarkerLength = 0; + m_frameType = 0; + unsigned char *startFrame = nullptr; + if ( size >= 3 ) + startFrame = this->findMarker(frame, size, markerLength); + if ( startFrame != nullptr ) { + Debug(4, "startFrame: %p marker Length %d", startFrame, markerLength); + m_frameType = startFrame[markerLength]; + + int remainingSize = size-(startFrame-frame+markerLength); + unsigned char *endFrame = nullptr; + if ( remainingSize > 3 ) { + endFrame = this->findMarker(startFrame+markerLength, remainingSize, endMarkerLength); + } + Debug(4, "endFrame: %p marker Length %d, remaining size %d", endFrame, endMarkerLength, remainingSize); + + if ( m_keepMarker ) { + size -= startFrame-frame; + outFrame = startFrame; + } else { + size -= startFrame-frame+markerLength; + outFrame = &startFrame[markerLength]; + } + + if ( endFrame != nullptr ) { + outsize = endFrame - outFrame; + } else { + outsize = size; + } + size -= outsize; + Debug(4, "Have frame type: %d size %d, keepmarker %d", m_frameType, outsize, m_keepMarker); + } else if ( size >= sizeof(H264shortmarker) ) { + Info("No marker found size %d", size); + } + + return outFrame; +} +#endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_h264_fifo_source.h b/src/zm_rtsp_server_h264_fifo_source.h new file mode 100644 index 000000000..9896c6122 --- /dev/null +++ b/src/zm_rtsp_server_h264_fifo_source.h @@ -0,0 +1,99 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** H264_ZoneMinderFifoSource.h +** +** H264 ZoneMinder live555 source +** +** -------------------------------------------------------------------------*/ + +#ifndef ZM_RTSP_H264_FIFO_SOURCE_H +#define ZM_RTSP_H264_FIFO_SOURCE_H + +#include "zm_config.h" +#include "zm_rtsp_server_fifo_video_source.h" + +// --------------------------------- +// H264 ZoneMinder FramedSource +// --------------------------------- +#if HAVE_RTSP_SERVER +class H26X_ZoneMinderFifoSource : public ZoneMinderFifoVideoSource { + protected: + H26X_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker) + : + ZoneMinderFifoVideoSource(env, fifo, queueSize), + m_repeatConfig(repeatConfig), + m_keepMarker(keepMarker), + m_frameType(0) { } + + virtual ~H26X_ZoneMinderFifoSource() {} + + virtual unsigned char* extractFrame(unsigned char* frame, size_t& size, size_t& outsize); + virtual unsigned char* findMarker(unsigned char *frame, size_t size, size_t &length); + + protected: + std::string m_sps; + std::string m_pps; + bool m_repeatConfig; + bool m_keepMarker; + int m_frameType; +}; + +class H264_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource { + public: + static H264_ZoneMinderFifoSource* createNew( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker) { + return new H264_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker); + } + + protected: + H264_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker); + + // overide ZoneMinderFifoSource + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned &frameSize); +}; + +class H265_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource { + public: + static H265_ZoneMinderFifoSource* createNew( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker) { + return new H265_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker); + } + + protected: + H265_ZoneMinderFifoSource( + UsageEnvironment& env, + std::string fifo, + unsigned int queueSize, + bool repeatConfig, + bool keepMarker); + + // overide ZoneMinderFifoSource + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned frameSize); + + protected: + std::string m_vps; +}; +#endif // HAVE_RTSP_SERVER + +#endif // ZM_RTSP_H264_FIFO_SOURCE_H diff --git a/src/zm_rtsp_server_server_media_subsession.cpp b/src/zm_rtsp_server_server_media_subsession.cpp index 72702c02d..87f3fdfd7 100644 --- a/src/zm_rtsp_server_server_media_subsession.cpp +++ b/src/zm_rtsp_server_server_media_subsession.cpp @@ -8,6 +8,7 @@ #include "zm_config.h" #include "zm_rtsp_server_adts_source.h" +#include "zm_rtsp_server_adts_fifo_source.h" #include #if HAVE_RTSP_SERVER @@ -15,16 +16,18 @@ // BaseServerMediaSubsession // --------------------------------- FramedSource* BaseServerMediaSubsession::createSource( - UsageEnvironment& env, FramedSource* inputSource, const std::string& format) + UsageEnvironment& env, + FramedSource* inputSource, + const std::string& format) { FramedSource* source = nullptr; - if ( format == "video/MP2T" ) { + if (format == "video/MP2T") { source = MPEG2TransportStreamFramer::createNew(env, inputSource); - } else if ( format == "video/H264" ) { + } else if (format == "video/H264") { source = H264VideoStreamDiscreteFramer::createNew(env, inputSource); } #if LIVEMEDIA_LIBRARY_VERSION_INT > 1414454400 - else if ( format == "video/H265" ) { + else if (format == "video/H265") { source = H265VideoStreamDiscreteFramer::createNew(env, inputSource); } #endif @@ -49,21 +52,21 @@ RTPSink* BaseServerMediaSubsession::createSink( ) { RTPSink* sink = nullptr; - if ( format == "video/MP2T" ) { + if (format == "video/MP2T") { sink = SimpleRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic, 90000, "video", "MP2T", 1, true, false); - } else if ( format == "video/H264" ) { + } else if (format == "video/H264") { sink = H264VideoRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic); - } else if ( format == "video/VP8" ) { + } else if (format == "video/VP8") { sink = VP8VideoRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic); } #if LIVEMEDIA_LIBRARY_VERSION_INT > 1414454400 - else if ( format == "video/VP9" ) { + else if (format == "video/VP9") { sink = VP9VideoRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic); - } else if ( format == "video/H265" ) { + } else if (format == "video/H265") { sink = H265VideoRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic); #endif - } else if ( format == "audio/AAC" ) { - ADTS_ZoneMinderDeviceSource *adts_source = (ADTS_ZoneMinderDeviceSource *)(m_replicator->inputSource()); + } else if (format == "audio/AAC") { + ADTS_ZoneMinderFifoSource *adts_source = (ADTS_ZoneMinderFifoSource *)(m_replicator->inputSource()); sink = MPEG4GenericRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic, adts_source->samplingFrequency(), @@ -83,24 +86,20 @@ RTPSink* BaseServerMediaSubsession::createSink( } char const* BaseServerMediaSubsession::getAuxLine( - ZoneMinderDeviceSource* source, + ZoneMinderFifoSource* source, unsigned char rtpPayloadType ) { const char* auxLine = nullptr; - if ( source ) { + if (source) { std::ostringstream os; os << "a=fmtp:" << int(rtpPayloadType) << " "; os << source->getAuxLine(); os << "\r\n"; - int width = source->getWidth(); - int height = source->getHeight(); - if ( (width > 0) && (height>0) ) { - os << "a=x-dimensions:" << width << "," << height << "\r\n"; - } auxLine = strdup(os.str().c_str()); Debug(1, "auxLine: %s", auxLine); } else { - Error("No source auxLine: "); + Error("No source auxLine:"); + return ""; } return auxLine; } diff --git a/src/zm_rtsp_server_server_media_subsession.h b/src/zm_rtsp_server_server_media_subsession.h index 4b6166b44..d3780cd6e 100644 --- a/src/zm_rtsp_server_server_media_subsession.h +++ b/src/zm_rtsp_server_server_media_subsession.h @@ -11,6 +11,7 @@ #define ZM_RTSP_SERVER_SERVER_MEDIA_SUBSESSION_H #include "zm_config.h" +#include "zm_rtsp_server_fifo_source.h" #include #if HAVE_RTSP_SERVER @@ -36,7 +37,7 @@ class BaseServerMediaSubsession { FramedSource *source); char const* getAuxLine( - ZoneMinderDeviceSource* source, + ZoneMinderFifoSource* source, unsigned char rtpPayloadType); protected: diff --git a/src/zm_rtsp_server_thread.cpp b/src/zm_rtsp_server_thread.cpp index 1b5c2f61a..de6c9addb 100644 --- a/src/zm_rtsp_server_thread.cpp +++ b/src/zm_rtsp_server_thread.cpp @@ -2,7 +2,9 @@ #include "zm_config.h" #include "zm_rtsp_server_adts_source.h" +#include "zm_rtsp_server_adts_fifo_source.h" #include "zm_rtsp_server_h264_device_source.h" +#include "zm_rtsp_server_h264_fifo_source.h" #include "zm_rtsp_server_unicast_server_media_subsession.h" #if HAVE_RTSP_SERVER @@ -31,7 +33,6 @@ RTSPServerThread::RTSPServerThread(std::shared_ptr monitor) : return; } const char *prefix = rtspServer->rtspURLPrefix(); - Debug(1, "RTSP prefix is %s", prefix); delete[] prefix; } // end RTSPServerThread::RTSPServerThread @@ -68,7 +69,60 @@ bool RTSPServerThread::stopped() const { return terminate ? true : false; } // end RTSPServerThread::stopped() -void RTSPServerThread::addStream(AVStream *video_stream, AVStream *audio_stream) { +ServerMediaSession *RTSPServerThread::addSession(std::string &streamname) { + ServerMediaSession *sms = ServerMediaSession::createNew(*env, streamname.c_str()); + if (sms) { + rtspServer->addServerMediaSession(sms); + char *url = rtspServer->rtspURL(sms); + Debug(1, "url is %s for stream %s", url, streamname.c_str()); + delete[] url; + } + return sms; +} + +FramedSource *RTSPServerThread::addFifo( + ServerMediaSession *sms, + std::string fifo) { + if (!rtspServer) return nullptr; + + int queueSize = 30; + bool repeatConfig = true; + bool muxTS = false; + FramedSource *source = nullptr; + + if (!fifo.empty()) { + StreamReplicator* replicator = nullptr; + std::string rtpFormat; + if (fifo.find("h264")) { + rtpFormat = "video/H264"; + source = H264_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS); + } else if (fifo.find("h265")) { + rtpFormat = "video/H265"; + source = H265_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS); + } else if (fifo.find("aac")) { + rtpFormat = "audio/AAC"; + source = ADTS_ZoneMinderFifoSource::createNew(*env, fifo, queueSize); + Debug(1, "ADTS source %p", source); + } else { + Warning("Unknown format in %s", fifo.c_str()); + } + if (source == nullptr) { + Error("Unable to create source"); + } else { + replicator = StreamReplicator::createNew(*env, source, false); + } + sources.push_back(source); + + if (replicator) { + sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat)); + } + } else { + Debug(1, "Not Adding stream"); + } + return source; +} // end void addFifo + +void RTSPServerThread::addStream(std::string &streamname, AVStream *video_stream, AVStream *audio_stream) { if ( !rtspServer ) return; @@ -101,7 +155,7 @@ void RTSPServerThread::addStream(AVStream *video_stream, AVStream *audio_stream) // Create Unicast Session if ( videoReplicator ) { if ( !sms ) - sms = ServerMediaSession::createNew(*env, "streamname"); + sms = ServerMediaSession::createNew(*env, streamname.c_str()); sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, videoReplicator, rtpFormat)); } } @@ -119,7 +173,7 @@ void RTSPServerThread::addStream(AVStream *video_stream, AVStream *audio_stream) } if ( replicator ) { if ( !sms ) - sms = ServerMediaSession::createNew(*env, "streamname"); + sms = ServerMediaSession::createNew(*env, streamname.c_str()); sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat)); } } else { diff --git a/src/zm_rtsp_server_thread.h b/src/zm_rtsp_server_thread.h index 57fc33bc9..97edb05de 100644 --- a/src/zm_rtsp_server_thread.h +++ b/src/zm_rtsp_server_thread.h @@ -4,6 +4,8 @@ #include "zm_config.h" #include "zm_ffmpeg.h" #include "zm_thread.h" +#include "zm_rtsp_server_server_media_subsession.h" +#include "zm_rtsp_server_fifo_source.h" #include #include @@ -28,7 +30,9 @@ class RTSPServerThread : public Thread { public: explicit RTSPServerThread(std::shared_ptr monitor); ~RTSPServerThread(); - void addStream(AVStream *, AVStream *); + ServerMediaSession *addSession(std::string &streamname); + void addStream(std::string &streamname, AVStream *, AVStream *); + FramedSource *addFifo(ServerMediaSession *sms, std::string fifo); int run(); void stop(); bool stopped() const; diff --git a/src/zm_rtsp_server_unicast_server_media_subsession.cpp b/src/zm_rtsp_server_unicast_server_media_subsession.cpp index 0b5566860..ede9ce527 100644 --- a/src/zm_rtsp_server_unicast_server_media_subsession.cpp +++ b/src/zm_rtsp_server_unicast_server_media_subsession.cpp @@ -11,6 +11,7 @@ #include "zm_config.h" #include "zm_rtsp_server_device_source.h" +#include "zm_rtsp_server_fifo_source.h" #if HAVE_RTSP_SERVER // ----------------------------------------- @@ -45,6 +46,8 @@ RTPSink* UnicastServerMediaSubsession::createNewRTPSink( char const* UnicastServerMediaSubsession::getAuxSDPLine( RTPSink* rtpSink, FramedSource* inputSource ) { - return this->getAuxLine(dynamic_cast(m_replicator->inputSource()), rtpSink->rtpPayloadType()); + return this->getAuxLine( + dynamic_cast(m_replicator->inputSource()), + rtpSink->rtpPayloadType()); } #endif // HAVE_RTSP_SERVER