From b1f6eb127b957df2dd544310e38c3ee73ea3352d Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Wed, 10 Mar 2021 11:01:04 -0500 Subject: [PATCH] Switch from live555 to PHZ76/RtspServer --- src/CMakeLists.txt | 11 +- src/zm_rtsp_server.cpp | 110 +++++++-- ...pp => zm_rtsp_server_fifo_adts_source.cpp} | 19 +- ...ce.h => zm_rtsp_server_fifo_adts_source.h} | 19 +- src/zm_rtsp_server_fifo_audio_source.cpp | 24 +- src/zm_rtsp_server_fifo_audio_source.h | 22 +- ...pp => zm_rtsp_server_fifo_h264_source.cpp} | 44 ++-- ...ce.h => zm_rtsp_server_fifo_h264_source.h} | 64 ++---- src/zm_rtsp_server_fifo_source.cpp | 135 ++--------- src/zm_rtsp_server_fifo_source.h | 57 ++--- src/zm_rtsp_server_fifo_video_source.cpp | 21 +- src/zm_rtsp_server_fifo_video_source.h | 11 +- src/zm_rtsp_server_thread.cpp | 210 ++++-------------- src/zm_rtsp_server_thread.h | 28 +-- 14 files changed, 301 insertions(+), 474 deletions(-) rename src/{zm_rtsp_server_adts_fifo_source.cpp => zm_rtsp_server_fifo_adts_source.cpp} (78%) rename src/{zm_rtsp_server_adts_fifo_source.h => zm_rtsp_server_fifo_adts_source.h} (71%) rename src/{zm_rtsp_server_h264_fifo_source.cpp => zm_rtsp_server_fifo_h264_source.cpp} (90%) rename src/{zm_rtsp_server_h264_fifo_source.h => zm_rtsp_server_fifo_h264_source.h} (56%) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ea5ab4d30..c6a9c04c7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -52,17 +52,11 @@ set(ZM_BIN_SRC_FILES zm_rtp_source.cpp zm_rtsp.cpp zm_rtsp_auth.cpp - zm_rtsp_server_thread.cpp - zm_rtsp_server_adts_source.cpp - zm_rtsp_server_adts_fifo_source.cpp - zm_rtsp_server_h264_device_source.cpp - zm_rtsp_server_h264_fifo_source.cpp - zm_rtsp_server_device_source.cpp zm_rtsp_server_fifo_source.cpp + zm_rtsp_server_fifo_adts_source.cpp + zm_rtsp_server_fifo_h264_source.cpp zm_rtsp_server_fifo_audio_source.cpp zm_rtsp_server_fifo_video_source.cpp - zm_rtsp_server_server_media_subsession.cpp - zm_rtsp_server_unicast_server_media_subsession.cpp zm_sdp.cpp zm_signal.cpp zm_stream.cpp @@ -85,6 +79,7 @@ target_link_libraries(zm PUBLIC libbcrypt::bcrypt jwt-cpp::jwt-cpp + RtspServer::RtspServer PRIVATE zm-core-interface) diff --git a/src/zm_rtsp_server.cpp b/src/zm_rtsp_server.cpp index f8c0e9426..e97ea5677 100644 --- a/src/zm_rtsp_server.cpp +++ b/src/zm_rtsp_server.cpp @@ -51,15 +51,16 @@ and provide that stream over rtsp #include "zm_db.h" #include "zm_define.h" #include "zm_monitor.h" -#include "zm_rtsp_server_thread.h" -#include "zm_rtsp_server_fifo_audio_source.h" -#include "zm_rtsp_server_fifo_video_source.h" +#include "zm_rtsp_server_fifo_h264_source.h" +#include "zm_rtsp_server_fifo_adts_source.h" #include "zm_signal.h" #include "zm_time.h" #include "zm_utils.h" #include #include -#include + + +#include "xop/RtspServer.h" void Usage() { fprintf(stderr, "zm_rtsp_server -m \n"); @@ -157,17 +158,23 @@ int main(int argc, char *argv[]) { sigaddset(&block_set, SIGUSR1); sigaddset(&block_set, SIGUSR2); - std::unique_ptr rtsp_server_thread; - if (config.min_rtsp_port) { - rtsp_server_thread = ZM::make_unique(config.min_rtsp_port); - Debug(1, "Starting RTSP server because min_rtsp_port is set"); - } else { + if (!config.min_rtsp_port) { Debug(1, "Not starting RTSP server because min_rtsp_port not set"); exit(-1); } - ServerMediaSession **sessions = new ServerMediaSession *[monitors.size()]; + + std::shared_ptr eventLoop(new xop::EventLoop()); + std::shared_ptr rtspServer = xop::RtspServer::Create(eventLoop.get()); + if ( !rtspServer->Start("0.0.0.0", config.min_rtsp_port) ) { + Debug(1, "Failed starting RTSP server on port %d", config.min_rtsp_port); + exit(-1); + } + + xop::MediaSession **sessions = new xop::MediaSession *[monitors.size()]; for (size_t i = 0; i < monitors.size(); i++) sessions[i] = nullptr; + std::list sources; + while (!zm_terminate) { for (size_t i = 0; i < monitors.size(); i++) { @@ -176,13 +183,12 @@ int main(int argc, char *argv[]) { if (!(monitor->ShmValid() or monitor->connect())) { Warning("Couldn't connect to monitor %d", monitor->Id()); if (sessions[i]) { - rtsp_server_thread->removeSession(sessions[i]); + rtspServer->RemoveSession(sessions[i]->GetMediaSessionId()); sessions[i] = nullptr; } monitor->disconnect(); continue; } - Debug(1, "monitor %d is connected", monitor->Id()); if (!sessions[i]) { std::string videoFifoPath = monitor->GetVideoFifoPath(); @@ -191,14 +197,49 @@ int main(int argc, char *argv[]) { continue; } std::string streamname = monitor->GetRTSPStreamName(); - Debug(1, "Adding session for %s", streamname.c_str()); - ServerMediaSession *sms = sessions[i] = rtsp_server_thread->addSession(streamname); + + xop::MediaSession *session = sessions[i] = xop::MediaSession::CreateNew(streamname); + if (session) { + session->AddNotifyConnectedCallback([] (xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port){ + Debug(1, "RTSP client connect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port); + }); + + session->AddNotifyDisconnectedCallback([](xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port) { + Debug(1, "RTSP client disconnect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port); + }); + + rtspServer->AddSession(session); + //char *url = rtspServer->rtspURL(session); + //Debug(1, "url is %s for stream %s", url, streamname.c_str()); + //delete[] url; + } + Debug(1, "Adding video fifo %s", videoFifoPath.c_str()); - ZoneMinderFifoVideoSource *video_source = static_cast(rtsp_server_thread->addFifo(sms, videoFifoPath)); + ZoneMinderFifoVideoSource *videoSource = nullptr; + + if (std::string::npos != videoFifoPath.find("h264")) { + session->AddSource(xop::channel_0, xop::H264Source::CreateNew()); + videoSource = new H264_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, videoFifoPath); + } else if ( + std::string::npos != videoFifoPath.find("hevc") + or + std::string::npos != videoFifoPath.find("h265")) { + session->AddSource(xop::channel_0, xop::H265Source::CreateNew()); + videoSource = new H265_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, videoFifoPath); + } else { + Warning("Unknown format in %s", videoFifoPath.c_str()); + } + if (videoSource == nullptr) { + Error("Unable to create source"); + } + sources.push_back(videoSource); + +#if 0 if (video_source) { video_source->setWidth(monitor->Width()); video_source->setHeight(monitor->Height()); } +#endif std::string audioFifoPath = monitor->GetAudioFifoPath(); if (audioFifoPath.empty()) { @@ -206,13 +247,28 @@ int main(int argc, char *argv[]) { continue; } Debug(1, "Adding audio fifo %s", audioFifoPath.c_str()); - ZoneMinderFifoAudioSource *audio_source = static_cast(rtsp_server_thread->addFifo(sms, audioFifoPath)); - if (audio_source) { - audio_source->setFrequency(monitor->GetAudioFrequency()); - audio_source->setChannels(monitor->GetAudioChannels()); + + ZoneMinderFifoAudioSource *audioSource = nullptr; + + if (std::string::npos != audioFifoPath.find("aac")) { + Debug(1, "Adding aac source at %dHz %d channels", monitor->GetAudioFrequency(), monitor->GetAudioChannels()); + session->AddSource(xop::channel_1, xop::AACSource::CreateNew( + monitor->GetAudioFrequency(), + monitor->GetAudioChannels(), + false /* has_adts */)); + audioSource = new ADTS_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_1, audioFifoPath); + audioSource->setFrequency(monitor->GetAudioFrequency()); + audioSource->setChannels(monitor->GetAudioChannels()); + } else { + Warning("Unknown format in %s", audioFifoPath.c_str()); } + if (audioSource == nullptr) { + Error("Unable to create source"); + } + sources.push_back(audioSource); } // end if ! sessions[i] } // end foreach monitor + sleep(1); if (zm_reload) { @@ -229,11 +285,23 @@ int main(int argc, char *argv[]) { for (size_t i = 0; i < monitors.size(); i++) { if (sessions[i]) { Debug(1, "Removing session for %s", monitors[i]->Name()); - rtsp_server_thread->removeSession(sessions[i]); + rtspServer->RemoveSession(sessions[i]->GetMediaSessionId()); sessions[i] = nullptr; } } // end foreach monitor - rtsp_server_thread->Stop(); + + for ( std::list::iterator it = sources.begin(); it != sources.end(); ++it ) { + Debug(1, "RTSPServerThread::stopping source"); + (*it)->Stop(); + } + while (sources.size()) { + Debug(1, "RTSPServerThread::stop closing source"); + ZoneMinderFifoSource *source = sources.front(); + sources.pop_front(); + delete source; + } + + rtspServer->Stop(); delete[] sessions; sessions = nullptr; diff --git a/src/zm_rtsp_server_adts_fifo_source.cpp b/src/zm_rtsp_server_fifo_adts_source.cpp similarity index 78% rename from src/zm_rtsp_server_adts_fifo_source.cpp rename to src/zm_rtsp_server_fifo_adts_source.cpp index d62b04df1..6c01371e5 100644 --- a/src/zm_rtsp_server_adts_fifo_source.cpp +++ b/src/zm_rtsp_server_fifo_adts_source.cpp @@ -7,32 +7,27 @@ ** -------------------------------------------------------------------------*/ #include "zm_logger.h" -#include "zm_rtsp_server_adts_fifo_source.h" +#include "zm_rtsp_server_fifo_adts_source.h" #include #include #if HAVE_RTSP_SERVER -static unsigned const samplingFrequencyTable[16] = { - 96000, 88200, 64000, 48000, - 44100, 32000, 24000, 22050, - 16000, 12000, 11025, 8000, - 7350, 0, 0, 0 -}; // --------------------------------- // ADTS ZoneMinder FramedSource // --------------------------------- // ADTS_ZoneMinderFifoSource::ADTS_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo ) : - ZoneMinderFifoAudioSource(env, fifo, queueSize) + ZoneMinderFifoAudioSource(rtspServer, sessionId, channelId, fifo) { -#if 1 +#if 0 int profile = 0; unsigned char audioSpecificConfig[2]; diff --git a/src/zm_rtsp_server_adts_fifo_source.h b/src/zm_rtsp_server_fifo_adts_source.h similarity index 71% rename from src/zm_rtsp_server_adts_fifo_source.h rename to src/zm_rtsp_server_fifo_adts_source.h index df4abd417..0bbe0890c 100644 --- a/src/zm_rtsp_server_adts_fifo_source.h +++ b/src/zm_rtsp_server_fifo_adts_source.h @@ -21,19 +21,12 @@ // --------------------------------- class ADTS_ZoneMinderFifoSource : public ZoneMinderFifoAudioSource { - public: - static ADTS_ZoneMinderFifoSource* createNew( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize - ) { - return new ADTS_ZoneMinderFifoSource(env, fifo, queueSize); - }; - protected: - ADTS_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize + public: + ADTS_ZoneMinderFifoSource( + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo ); virtual ~ADTS_ZoneMinderFifoSource() {} diff --git a/src/zm_rtsp_server_fifo_audio_source.cpp b/src/zm_rtsp_server_fifo_audio_source.cpp index 25e10f7f0..f28c5951f 100644 --- a/src/zm_rtsp_server_fifo_audio_source.cpp +++ b/src/zm_rtsp_server_fifo_audio_source.cpp @@ -10,7 +10,7 @@ #if HAVE_RTSP_SERVER -static unsigned const samplingFrequencyTable[16] = { +static const int samplingFrequencyTable[16] = { 96000, 88200, 64000, 48000, 44100, 32000, 24000, 22050, 16000, 12000, 11025, 8000, @@ -21,12 +21,13 @@ static unsigned const samplingFrequencyTable[16] = { // --------------------------------- // ZoneMinderFifoAudioSource::ZoneMinderFifoAudioSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo ) : - ZoneMinderFifoSource(env, fifo, queueSize), + ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo), samplingFrequencyIndex(-1), frequency(-1), channels(1) @@ -37,4 +38,17 @@ int ZoneMinderFifoAudioSource::getFrequencyIndex() { if (samplingFrequencyTable[i] == frequency) return i; return -1; } + +void ZoneMinderFifoAudioSource::PushFrame(const uint8_t *data, size_t size, int64_t pts) { + + Debug(1, "Pushing audio frame to session %d channel %d pts %" PRId64, m_sessionId, m_channelId, pts); + xop::AVFrame frame = {0}; + frame.type = xop::AUDIO_FRAME; + frame.size = size; + frame.timestamp = av_rescale_q(pts, AV_TIME_BASE_Q, m_timeBase); + frame.buffer.reset(new uint8_t[size]); + memcpy(frame.buffer.get(), data, size); + m_rtspServer->PushFrame(m_sessionId, m_channelId, frame); + +} #endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_fifo_audio_source.h b/src/zm_rtsp_server_fifo_audio_source.h index 691652f8f..83cb8c709 100644 --- a/src/zm_rtsp_server_fifo_audio_source.h +++ b/src/zm_rtsp_server_fifo_audio_source.h @@ -22,26 +22,19 @@ class ZoneMinderFifoAudioSource : public ZoneMinderFifoSource { public: - static ZoneMinderFifoAudioSource* createNew( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize - ) { - return new ZoneMinderFifoAudioSource(env, fifo, queueSize); - }; - protected: ZoneMinderFifoAudioSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo ); virtual ~ZoneMinderFifoAudioSource() {} - public: void setFrequency(int p_frequency) { frequency = p_frequency; samplingFrequencyIndex = getFrequencyIndex(); + m_timeBase = {1, frequency}; }; int getFrequency() { return frequency; }; int getFrequencyIndex(); @@ -49,10 +42,13 @@ class ZoneMinderFifoAudioSource : public ZoneMinderFifoSource { void setChannels(int p_channels) { channels = p_channels; }; int getChannels() const { return channels; }; + protected: + void PushFrame(const uint8_t *data, size_t size, int64_t pts); + protected: std::string config; int samplingFrequencyIndex; - unsigned int frequency; + int frequency; int channels; }; #endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_h264_fifo_source.cpp b/src/zm_rtsp_server_fifo_h264_source.cpp similarity index 90% rename from src/zm_rtsp_server_h264_fifo_source.cpp rename to src/zm_rtsp_server_fifo_h264_source.cpp index 9f72eea02..6d8a5393e 100644 --- a/src/zm_rtsp_server_h264_fifo_source.cpp +++ b/src/zm_rtsp_server_fifo_h264_source.cpp @@ -6,11 +6,10 @@ ** ** -------------------------------------------------------------------------*/ -#include "zm_rtsp_server_h264_fifo_source.h" +#include "zm_rtsp_server_fifo_h264_source.h" #include "zm_config.h" #include "zm_logger.h" -#include "zm_rtsp_server_frame.h" #include #include @@ -23,26 +22,27 @@ // --------------------------------- // H264_ZoneMinderFifoSource::H264_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker) - : H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker) + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ) + : H26X_ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo) { // extradata appears to simply be the SPS and PPS NAL's //this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size); } // split packet into frames -std::list< std::pair > H264_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { +std::list< std::pair > H264_ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) { std::list< std::pair > frameList; size_t bufSize = frameSize; size_t size = 0; unsigned char* buffer = this->extractFrame(frame, bufSize, size); - bool updateAux = false; while ( buffer != nullptr ) { +#if 0 + bool updateAux = false; switch ( m_frameType & 0x1F ) { case 7: Debug(4, "SPS_Size: %d bufSize %d", size, bufSize); @@ -83,6 +83,7 @@ std::list< std::pair > H264_ZoneMinderFifoSource::splitF delete [] sps_base64; delete [] pps_base64; } +#endif frameList.push_back(std::pair(buffer, size)); buffer = this->extractFrame(&buffer[size], bufSize, size); @@ -92,12 +93,12 @@ std::list< std::pair > H264_ZoneMinderFifoSource::splitF } H265_ZoneMinderFifoSource::H265_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker) - : H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker) + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ) + : H26X_ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo) { // extradata appears to simply be the SPS and PPS NAL's // this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size); @@ -105,14 +106,15 @@ H265_ZoneMinderFifoSource::H265_ZoneMinderFifoSource( // split packet in frames std::list< std::pair > -H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { - std::list< std::pair > frameList; +H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) { + std::list< std::pair > frameList; size_t bufSize = frameSize; size_t size = 0; unsigned char* buffer = this->extractFrame(frame, bufSize, size); - bool updateAux = false; while ( buffer != nullptr ) { +#if 0 + bool updateAux = false; switch ((m_frameType&0x7E)>>1) { case 32: Debug(4, "VPS_Size: %d bufSize %d", size, bufSize); @@ -160,13 +162,11 @@ H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize delete [] sps_base64; delete [] pps_base64; } +#endif frameList.push_back(std::pair(buffer, size)); buffer = this->extractFrame(&buffer[size], bufSize, size); } // end while buffer - if ( bufSize ) { - Debug(1, "%d bytes remaining", bufSize); - } frameSize = bufSize; return frameList; } // end H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned frameSize) diff --git a/src/zm_rtsp_server_h264_fifo_source.h b/src/zm_rtsp_server_fifo_h264_source.h similarity index 56% rename from src/zm_rtsp_server_h264_fifo_source.h rename to src/zm_rtsp_server_fifo_h264_source.h index 27ca2ef71..d73aa1730 100644 --- a/src/zm_rtsp_server_h264_fifo_source.h +++ b/src/zm_rtsp_server_fifo_h264_source.h @@ -19,18 +19,19 @@ // H264 ZoneMinder FramedSource // --------------------------------- #if HAVE_RTSP_SERVER +const char H264marker[] = {0,0,0,1}; +const char H264shortmarker[] = {0,0,1}; class H26X_ZoneMinderFifoSource : public ZoneMinderFifoVideoSource { - protected: + public: H26X_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker) + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ) : - ZoneMinderFifoVideoSource(env, fifo, queueSize), - m_repeatConfig(repeatConfig), - m_keepMarker(keepMarker), + ZoneMinderFifoVideoSource(rtspServer, sessionId, channelId, fifo), + m_keepMarker(false), m_frameType(0) { } virtual ~H26X_ZoneMinderFifoSource() {} @@ -41,55 +42,34 @@ class H26X_ZoneMinderFifoSource : public ZoneMinderFifoVideoSource { protected: std::string m_sps; std::string m_pps; - bool m_repeatConfig; bool m_keepMarker; int m_frameType; }; class H264_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource { public: - static H264_ZoneMinderFifoSource* createNew( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker) { - return new H264_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker); - } - - protected: H264_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker); + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ); // overide ZoneMinderFifoSource - virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned &frameSize); + virtual std::list< std::pair > splitFrames(unsigned char* frame, size_t &frameSize); }; class H265_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource { public: - static H265_ZoneMinderFifoSource* createNew( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker) { - return new H265_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker); - } - - protected: H265_ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize, - bool repeatConfig, - bool keepMarker); + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ); // overide ZoneMinderFifoSource - virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned &frameSize); + virtual std::list< std::pair > splitFrames(unsigned char* frame, size_t &frameSize); protected: std::string m_vps; diff --git a/src/zm_rtsp_server_fifo_source.cpp b/src/zm_rtsp_server_fifo_source.cpp index 81bd42f30..cc3d443b1 100644 --- a/src/zm_rtsp_server_fifo_source.cpp +++ b/src/zm_rtsp_server_fifo_source.cpp @@ -11,8 +11,8 @@ #include "zm_rtsp_server_fifo_source.h" #include "zm_config.h" +#include "zm_ffmpeg.h" #include "zm_logger.h" -#include "zm_rtsp_server_frame.h" #include "zm_signal.h" #include @@ -20,16 +20,17 @@ #if HAVE_RTSP_SERVER ZoneMinderFifoSource::ZoneMinderFifoSource( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo ) : - FramedSource(env), + m_rtspServer(rtspServer), + m_sessionId(sessionId), + m_channelId(channelId), m_fifo(fifo), - m_queueSize(queueSize), m_fd(-1) { - m_eventTriggerId = envir().taskScheduler().createEventTrigger(ZoneMinderFifoSource::deliverFrameStub); memset(&m_thid, 0, sizeof(m_thid)); memset(&m_mutex, 0, sizeof(m_mutex)); pthread_mutex_init(&m_mutex, nullptr); @@ -39,13 +40,7 @@ ZoneMinderFifoSource::ZoneMinderFifoSource( ZoneMinderFifoSource::~ZoneMinderFifoSource() { Debug(1, "Deleting Fifo Source"); stop = 1; - envir().taskScheduler().deleteEventTrigger(m_eventTriggerId); pthread_join(m_thid, nullptr); - while (m_captureQueue.size()) { - NAL_Frame * f = m_captureQueue.front(); - m_captureQueue.pop_front(); - delete f; - } Debug(1, "Deleting Fifo Source done"); pthread_mutex_destroy(&m_mutex); } @@ -55,78 +50,11 @@ void* ZoneMinderFifoSource::thread() { stop = 0; while (!stop) { - if ( getNextFrame() < 0 ) - sleep(1); + if (getNextFrame() < 0) sleep(1); } return nullptr; } -// getting FrameSource callback -void ZoneMinderFifoSource::doGetNextFrame() { - deliverFrame(); -} - -// stopping FrameSource callback -void ZoneMinderFifoSource::doStopGettingFrames() { - //stop = 1; - Debug(1, "ZoneMinderFifoSource::doStopGettingFrames"); - FramedSource::doStopGettingFrames(); -} - -// deliver frame to the sink -void ZoneMinderFifoSource::deliverFrame() { - if (!isCurrentlyAwaitingData()) { - Debug(5, "not awaiting data"); - return; - } - - pthread_mutex_lock(&m_mutex); - if (m_captureQueue.empty()) { - Debug(5, "Queue is empty"); - pthread_mutex_unlock(&m_mutex); - return; - } - - NAL_Frame *frame = m_captureQueue.front(); - m_captureQueue.pop_front(); - pthread_mutex_unlock(&m_mutex); - - fDurationInMicroseconds = 0; - fFrameSize = 0; - - unsigned int nal_size = frame->size(); - - if (nal_size > fMaxSize) { - fFrameSize = fMaxSize; - fNumTruncatedBytes = nal_size - fMaxSize; - } else { - fFrameSize = nal_size; - } - - Debug(4, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d", - frame->m_timestamp.tv_sec, frame->m_timestamp.tv_usec, - fFrameSize, - m_captureQueue.size() - ); - - fPresentationTime = frame->m_timestamp; - memcpy(fTo, frame->buffer(), fFrameSize); - - if (fFrameSize > 0) { - // send Frame to the consumer - FramedSource::afterGetting(this); - } - delete frame; -} // end void ZoneMinderFifoSource::deliverFrame() - -// FrameSource callback on read event -void ZoneMinderFifoSource::incomingPacketHandler() { - Debug(1, "incomingPacketHandler"); - if (this->getNextFrame() <= 0) { - handleClosure(this); - } -} - // read from monitor int ZoneMinderFifoSource::getNextFrame() { if (zm_terminate or stop) { @@ -221,53 +149,28 @@ int ZoneMinderFifoSource::getNextFrame() { } } unsigned char *packet_start = m_buffer.head() + header_size; - - // splitFrames modifies so make a copy - unsigned int bytes_remaining = data_size; + size_t bytes_remaining = data_size; std::list< std::pair > framesList = this->splitFrames(packet_start, bytes_remaining); - Debug(3, "Got %d frames, consuming %d bytes", framesList.size(), header_size + data_size); + Debug(3, "Got %d frames, consuming %d bytes, remaining %d", framesList.size(), header_size + data_size, bytes_remaining); m_buffer.consume(header_size + data_size); - - timeval tv; - tv.tv_sec = pts / 1000000; - tv.tv_usec = pts % 1000000; - while (framesList.size()) { std::pair nal = framesList.front(); framesList.pop_front(); - NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv); - Debug(3, "Got frame, size %d, queue_size %d", frame->size(), m_captureQueue.size()); - - pthread_mutex_lock(&m_mutex); - if (m_captureQueue.size() > 25) { // 1 sec at 25 fps - NAL_Frame * f = m_captureQueue.front(); - while (m_captureQueue.size() and ((tv.tv_sec - f->m_timestamp.tv_sec) > 2)) { - m_captureQueue.pop_front(); - delete f; - f = m_captureQueue.front(); - } - Debug(3, "Done clearing. Queue size is now %d", m_captureQueue.size()); - } - m_captureQueue.push_back(frame); - pthread_mutex_unlock(&m_mutex); - - // post an event to ask to deliver the frame - envir().taskScheduler().triggerEvent(m_eventTriggerId, this); - } // end while we get frame from data + PushFrame(nal.first, nal.second, pts); + } } // end while m_buffer.size() return 1; } // split packet in frames -std::list< std::pair > ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) { - std::list< std::pair > frameList; - if (frame != nullptr) { - frameList.push_back(std::pair(frame, frameSize)); - } - // We consume it all +std::list< std::pair > ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) { + std::list< std::pair > frameList; + if ( frame != nullptr ) { + frameList.push_back(std::pair(frame, frameSize)); + } frameSize = 0; - return frameList; + return frameList; } // extract a frame diff --git a/src/zm_rtsp_server_fifo_source.h b/src/zm_rtsp_server_fifo_source.h index 990d32984..e1c60cfdf 100644 --- a/src/zm_rtsp_server_fifo_source.h +++ b/src/zm_rtsp_server_fifo_source.h @@ -11,70 +11,53 @@ #include "zm_buffer.h" #include "zm_config.h" +#include "zm_ffmpeg.h" #include "zm_define.h" #include #include #include #if HAVE_RTSP_SERVER -#include +#include "xop/RtspServer.h" -class NAL_Frame; - -class ZoneMinderFifoSource: public FramedSource { +class ZoneMinderFifoSource { public: - static ZoneMinderFifoSource* createNew( - UsageEnvironment& env, - std::string fifo, - unsigned int queueSize - ) { - return new ZoneMinderFifoSource(env, fifo, queueSize); - }; - std::string getAuxLine() { return m_auxLine; }; - int getWidth() { return m_width; }; - int getHeight() { return m_height; }; - int setWidth(int width) { return m_width=width; }; - int setHeight(int height) { return m_height=height; }; - protected: - ZoneMinderFifoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize); + void Stop() { stop=1; }; + + ZoneMinderFifoSource( + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ); virtual ~ZoneMinderFifoSource(); protected: static void* threadStub(void* clientData) { return ((ZoneMinderFifoSource*) clientData)->thread();}; void* thread(); - static void deliverFrameStub(void* clientData) {((ZoneMinderFifoSource*) clientData)->deliverFrame();}; - void deliverFrame(); - static void incomingPacketHandlerStub(void* clientData, int mask) { ((ZoneMinderFifoSource*) clientData)->incomingPacketHandler(); }; - void incomingPacketHandler(); int getNextFrame(); - void processFrame(char * frame, int frameSize, const timeval &ref); - void queueFrame(char * frame, int frameSize, const timeval &tv); - - // split packet in frames - virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned &frameSize); - - // overide FramedSource - virtual void doGetNextFrame(); - virtual void doStopGettingFrames(); + virtual void PushFrame(const uint8_t *data, size_t size, int64_t pts) = 0; + // split packet in frames + virtual std::list< std::pair > splitFrames(unsigned char* frame, size_t &frameSize); virtual unsigned char *extractFrame(unsigned char *data, size_t& size, size_t& outsize); - + protected: - std::list m_captureQueue; - EventTriggerId m_eventTriggerId; - std::string m_fifo; int m_width; int m_height; - unsigned int m_queueSize; pthread_t m_thid; pthread_mutex_t m_mutex; - std::string m_auxLine; int stop; + std::shared_ptr& m_rtspServer; + xop::MediaSessionId m_sessionId; + xop::MediaChannelId m_channelId; + std::string m_fifo; int m_fd; Buffer m_buffer; + AVRational m_timeBase; }; #endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_fifo_video_source.cpp b/src/zm_rtsp_server_fifo_video_source.cpp index bcf1f8841..3fb21f78d 100644 --- a/src/zm_rtsp_server_fifo_video_source.cpp +++ b/src/zm_rtsp_server_fifo_video_source.cpp @@ -12,9 +12,26 @@ #if HAVE_RTSP_SERVER ZoneMinderFifoVideoSource::ZoneMinderFifoVideoSource( - UsageEnvironment& env, std::string fifo, unsigned int queueSize) : - ZoneMinderFifoSource(env,fifo,queueSize) + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ) : + ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo) { + m_timeBase = {1, 90000}; +} + +void ZoneMinderFifoVideoSource::PushFrame(const uint8_t *data, size_t size, int64_t pts) { + + xop::AVFrame frame = {0}; + frame.type = 0; + frame.size = size; + frame.timestamp = av_rescale_q(pts, AV_TIME_BASE_Q, m_timeBase); + frame.buffer.reset(new uint8_t[size]); + memcpy(frame.buffer.get(), data, size); + m_rtspServer->PushFrame(m_sessionId, m_channelId, frame); + } #endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_fifo_video_source.h b/src/zm_rtsp_server_fifo_video_source.h index 444d0ca04..b06dbe5a7 100644 --- a/src/zm_rtsp_server_fifo_video_source.h +++ b/src/zm_rtsp_server_fifo_video_source.h @@ -21,9 +21,16 @@ class ZoneMinderFifoVideoSource: public ZoneMinderFifoSource { int setWidth(int width) { return m_width=width; }; int setHeight(int height) { return m_height=height; }; - protected: - ZoneMinderFifoVideoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize); + ZoneMinderFifoVideoSource( + std::shared_ptr& rtspServer, + xop::MediaSessionId sessionId, + xop::MediaChannelId channelId, + std::string fifo + ); + protected: + void PushFrame(const uint8_t *data, size_t size, int64_t pts); + protected: int m_width; int m_height; }; diff --git a/src/zm_rtsp_server_thread.cpp b/src/zm_rtsp_server_thread.cpp index 7f989f987..cbe7b905b 100644 --- a/src/zm_rtsp_server_thread.cpp +++ b/src/zm_rtsp_server_thread.cpp @@ -2,38 +2,22 @@ #include "zm_config.h" #include "zm_logger.h" -#include "zm_rtsp_server_adts_fifo_source.h" -#include "zm_rtsp_server_adts_source.h" -#include "zm_rtsp_server_h264_fifo_source.h" -#include "zm_rtsp_server_h264_device_source.h" -#include "zm_rtsp_server_unicast_server_media_subsession.h" #if HAVE_RTSP_SERVER -#include -RTSPServerThread::RTSPServerThread(int port) : - terminate_(false), scheduler_watch_var_(0) +RTSPServerThread::RTSPServerThread(int p_port) : + terminate_(false), scheduler_watch_var_(0), port(p_port) { //unsigned short rtsp_over_http_port = 0; //const char *realm = "ZoneMinder"; - //unsigned int timeout = 65; - OutPacketBuffer::maxSize = 2000000; - - scheduler = BasicTaskScheduler::createNew(); - env = BasicUsageEnvironment::createNew(*scheduler); - authDB = nullptr; - //authDB = new UserAuthenticationDatabase("ZoneMinder"); - //authDB->addUserRecord("username1", "password1"); // replace these with real strings - - portNumBits rtspServerPortNum = port; - rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, authDB); + // + eventLoop = std::make_shared(); + rtspServer = xop::RtspServer::Create(eventLoop.get()); if ( rtspServer == nullptr ) { - Fatal("Failed to create rtspServer at port %d", rtspServerPortNum); + Fatal("Failed to create rtspServer"); return; } - const char *prefix = rtspServer->rtspURLPrefix(); - delete[] prefix; thread_ = std::thread(&RTSPServerThread::Run, this); } @@ -43,25 +27,24 @@ RTSPServerThread::~RTSPServerThread() { if (thread_.joinable()) thread_.join(); - if (rtspServer) { - Medium::close(rtspServer); - } // end if rtsp_server - while ( sources.size() ) { - FramedSource *source = sources.front(); - sources.pop_front(); - Medium::close(source); - } - env->reclaim(); - delete scheduler; } void RTSPServerThread::Run() { Debug(1, "RTSPServerThread::Run()"); - if (rtspServer) - env->taskScheduler().doEventLoop(&scheduler_watch_var_); // does not return + if (rtspServer) { + while (!scheduler_watch_var_) { + //if (clients > 0) { + sleep(1); + //} + } + } Debug(1, "RTSPServerThread::done()"); } +int RTSPServerThread::Start() { + return rtspServer->Start(std::string("0.0.0.0"), port); +} + void RTSPServerThread::Stop() { Debug(1, "RTSPServerThread::stop()"); terminate_ = true; @@ -71,174 +54,75 @@ void RTSPServerThread::Stop() { scheduler_watch_var_ = 1; } - for ( std::list::iterator it = sources.begin(); it != sources.end(); ++it ) { + for ( std::list::iterator it = sources.begin(); it != sources.end(); ++it ) { Debug(1, "RTSPServerThread::stopping source"); - (*it)->stopGettingFrames(); + (*it)->Stop(); } while ( sources.size() ) { Debug(1, "RTSPServerThread::stop closing source"); - FramedSource *source = sources.front(); + ZoneMinderFifoSource *source = sources.front(); sources.pop_front(); - Medium::close(source); + delete source; } } -ServerMediaSession *RTSPServerThread::addSession(std::string &streamname) { - ServerMediaSession *sms = ServerMediaSession::createNew(*env, streamname.c_str()); - if (sms) { - rtspServer->addServerMediaSession(sms); - char *url = rtspServer->rtspURL(sms); - Debug(1, "url is %s for stream %s", url, streamname.c_str()); - delete[] url; +xop::MediaSession *RTSPServerThread::addSession(std::string &streamname) { + + xop::MediaSession *session = xop::MediaSession::CreateNew(streamname); + if (session) { + session->AddNotifyConnectedCallback([] (xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port){ + Debug(1, "RTSP client connect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port); +}); + + session->AddNotifyDisconnectedCallback([](xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port) { + Debug(1, "RTSP client disconnect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port); +}); + + rtspServer->AddSession(session); + //char *url = rtspServer->rtspURL(session); + //Debug(1, "url is %s for stream %s", url, streamname.c_str()); + //delete[] url; } - return sms; + return session; } -void RTSPServerThread::removeSession(ServerMediaSession *sms) { - rtspServer->removeServerMediaSession(sms); +void RTSPServerThread::removeSession(xop::MediaSession *session) { + //rtspServer->removeServerMediaSession(session); } -FramedSource *RTSPServerThread::addFifo( - ServerMediaSession *sms, +ZoneMinderFifoSource *RTSPServerThread::addFifo( + xop::MediaSession *session, std::string fifo) { if (!rtspServer) return nullptr; - int queueSize = 60; - bool repeatConfig = false; - bool muxTS = false; - FramedSource *source = nullptr; + ZoneMinderFifoSource *source = nullptr; if (!fifo.empty()) { - StreamReplicator* replicator = nullptr; std::string rtpFormat; if (std::string::npos != fifo.find("h264")) { rtpFormat = "video/H264"; - source = H264_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS); + session->AddSource(xop::channel_0, xop::H264Source::CreateNew()); + source = new ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, fifo); } else if ( std::string::npos != fifo.find("hevc") or std::string::npos != fifo.find("h265")) { rtpFormat = "video/H265"; - source = H265_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS); + session->AddSource(xop::channel_0, xop::H265Source::CreateNew()); + source = new ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, fifo); } else if (std::string::npos != fifo.find("aac")) { - rtpFormat = "audio/AAC"; - source = ADTS_ZoneMinderFifoSource::createNew(*env, fifo, queueSize); Debug(1, "ADTS source %p", source); } else { Warning("Unknown format in %s", fifo.c_str()); } if (source == nullptr) { Error("Unable to create source"); - } else { - replicator = StreamReplicator::createNew(*env, source, false); } sources.push_back(source); - - if (replicator) { - sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat)); - } } else { Debug(1, "Not Adding stream as fifo was empty"); } return source; } // end void addFifo -void RTSPServerThread::addStream(std::string &streamname, AVStream *video_stream, AVStream *audio_stream) { - if ( !rtspServer ) - return; - - int queueSize = 30; - bool repeatConfig = true; - bool muxTS = false; - ServerMediaSession *sms = nullptr; - - if ( video_stream ) { - StreamReplicator* videoReplicator = nullptr; - FramedSource *source = nullptr; - std::string rtpFormat = getRtpFormat( -#if LIBAVCODEC_VERSION_CHECK(57, 64, 0, 64, 0) - video_stream->codecpar->codec_id -#else - video_stream->codec->codec_id -#endif - , false); - if ( rtpFormat.empty() ) { - Error("No streaming format"); - return; - } - Debug(1, "RTSP: format %s", rtpFormat.c_str()); - if ( rtpFormat == "video/H264" ) { - source = H264_ZoneMinderDeviceSource::createNew(*env, monitor_, video_stream, queueSize, repeatConfig, muxTS); - } else if ( rtpFormat == "video/H265" ) { - source = H265_ZoneMinderDeviceSource::createNew(*env, monitor_, video_stream, queueSize, repeatConfig, muxTS); - } - if ( source == nullptr ) { - Error("Unable to create source"); - } else { - videoReplicator = StreamReplicator::createNew(*env, source, false); - } - sources.push_back(source); - - // Create Unicast Session - if ( videoReplicator ) { - if ( !sms ) - sms = ServerMediaSession::createNew(*env, streamname.c_str()); - sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, videoReplicator, rtpFormat)); - } - } - if ( audio_stream ) { - StreamReplicator* replicator = nullptr; - FramedSource *source = nullptr; - std::string rtpFormat = getRtpFormat( -#if LIBAVCODEC_VERSION_CHECK(57, 64, 0, 64, 0) - audio_stream->codecpar->codec_id -#else - audio_stream->codec->codec_id -#endif - , false); - if ( rtpFormat == "audio/AAC" ) { - source = ADTS_ZoneMinderDeviceSource::createNew(*env, monitor_, audio_stream, queueSize); - Debug(1, "ADTS source %p", source); - } - if ( source ) { - replicator = StreamReplicator::createNew(*env, source, false /* deleteWhenLastReplicaDies */); - sources.push_back(source); - } - if ( replicator ) { - if ( !sms ) - sms = ServerMediaSession::createNew(*env, streamname.c_str()); - sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat)); - } - } else { - Debug(1, "Not Adding audio stream"); - } - if ( sms ) { - rtspServer->addServerMediaSession(sms); - char *url = rtspServer->rtspURL(sms); - Debug(1, "url is %s", url); - delete[] url; - } -} // end void addStream - -// ----------------------------------------- -// convert V4L2 pix format to RTP mime -// ----------------------------------------- -const std::string RTSPServerThread::getRtpFormat(AVCodecID codec_id, bool muxTS) { - if ( muxTS ) { - return "video/MP2T"; - } else { - switch ( codec_id ) { - case AV_CODEC_ID_H265 : return "video/H265"; - case AV_CODEC_ID_H264 : return "video/H264"; - //case PIX_FMT_MJPEG: rtpFormat = "video/JPEG"; break; - //case PIX_FMT_JPEG : rtpFormat = "video/JPEG"; break; - //case AV_PIX_FMT_VP8 : rtpFormat = "video/VP8" ; break; - //case AV_PIX_FMT_VP9 : rtpFormat = "video/VP9" ; break; - case AV_CODEC_ID_AAC : return "audio/AAC"; - default: break; - } - } - - return ""; -} #endif // HAVE_RTSP_SERVER diff --git a/src/zm_rtsp_server_thread.h b/src/zm_rtsp_server_thread.h index 9d02ec2a7..393203831 100644 --- a/src/zm_rtsp_server_thread.h +++ b/src/zm_rtsp_server_thread.h @@ -3,15 +3,14 @@ #include "zm_config.h" #include "zm_ffmpeg.h" -#include "zm_rtsp_server_server_media_subsession.h" +#include "xop/RtspServer.h" + #include "zm_rtsp_server_fifo_source.h" #include #include #include #if HAVE_RTSP_SERVER -#include -#include class Monitor; @@ -24,29 +23,22 @@ class RTSPServerThread { std::mutex scheduler_watch_var_mutex_; char scheduler_watch_var_; - TaskScheduler* scheduler; - UsageEnvironment* env; - UserAuthenticationDatabase* authDB; + std::shared_ptr eventLoop; + std::shared_ptr rtspServer; - RTSPServer* rtspServer; - std::list sources; + std::list sources; + int port; public: explicit RTSPServerThread(int port); ~RTSPServerThread(); - ServerMediaSession *addSession(std::string &streamname); - void removeSession(ServerMediaSession *sms); - void addStream(std::string &streamname, AVStream *, AVStream *); - FramedSource *addFifo(ServerMediaSession *sms, std::string fifo); + xop::MediaSession *addSession(std::string &streamname); + void removeSession(xop::MediaSession *sms); + ZoneMinderFifoSource *addFifo(xop::MediaSession *sms, std::string fifo); void Run(); void Stop(); + int Start(); bool IsStopped() const { return terminate_; }; - private: - const std::string getRtpFormat(AVCodecID codec, bool muxTS); - int addSession( - const std::string & sessionName, - const std::list & subSession - ); }; #endif // HAVE_RTSP_SERVER