From 7c049afc61f2b5e6b72df965e387ed20256457a7 Mon Sep 17 00:00:00 2001 From: Isaac Connor Date: Wed, 6 Jan 2021 11:26:08 -0500 Subject: [PATCH] Add AVPacket parsing queue and other files for rtsp_server --- src/zm_rtsp_server_device_source.cpp | 266 ++++++++++++++++++ src/zm_rtsp_server_device_source.h | 109 +++++++ src/zm_rtsp_server_frame.h | 55 ++++ src/zm_rtsp_server_h264_device_source.cpp | 170 +++++++++++ src/zm_rtsp_server_h264_device_source.h | 106 +++++++ ...zm_rtsp_server_server_media_subsession.cpp | 94 +++++++ src/zm_rtsp_server_server_media_subsession.h | 39 +++ ...server_unicast_server_media_subsession.cpp | 38 +++ ...p_server_unicast_server_media_subsession.h | 43 +++ 9 files changed, 920 insertions(+) create mode 100644 src/zm_rtsp_server_device_source.cpp create mode 100644 src/zm_rtsp_server_device_source.h create mode 100644 src/zm_rtsp_server_frame.h create mode 100644 src/zm_rtsp_server_h264_device_source.cpp create mode 100644 src/zm_rtsp_server_h264_device_source.h create mode 100644 src/zm_rtsp_server_server_media_subsession.cpp create mode 100644 src/zm_rtsp_server_server_media_subsession.h create mode 100644 src/zm_rtsp_server_unicast_server_media_subsession.cpp create mode 100644 src/zm_rtsp_server_unicast_server_media_subsession.h diff --git a/src/zm_rtsp_server_device_source.cpp b/src/zm_rtsp_server_device_source.cpp new file mode 100644 index 000000000..328c667ea --- /dev/null +++ b/src/zm_rtsp_server_device_source.cpp @@ -0,0 +1,266 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** v4l2DeviceSource.cpp +** +** ZoneMinder Live555 source +** +** -------------------------------------------------------------------------*/ + +#include +#include +#include + +#include "zm_rtsp_server_device_source.h" +#include "zm_rtsp_server_frame.h" +#include "zm_logger.h" + +// --------------------------------- +// ZoneMinder FramedSource Stats +// --------------------------------- +int ZoneMinderDeviceSource::Stats::notify(int tv_sec, int framesize) { + m_fps++; + m_size += framesize; + if ( tv_sec != m_fps_sec ) { + //LOG(INFO) << m_msg << "tv_sec:" << tv_sec << " fps:" << m_fps << " bandwidth:"<< (m_size/128) << "kbps"; + m_fps_sec = tv_sec; + m_fps = 0; + m_size = 0; + } + return m_fps; +} + +// Constructor +ZoneMinderDeviceSource::ZoneMinderDeviceSource( + UsageEnvironment& env, + Monitor* monitor, + int outputFd, + unsigned int queueSize, + bool useThread) : + FramedSource(env), + packetBufferSize(0), + packetBuffer(nullptr), + packetBufferPtr(nullptr), + m_in("in"), + m_out("out") , + m_outfd(outputFd), + m_monitor(monitor), + m_packetqueue(nullptr), + m_packetqueue_it(nullptr), + m_queueSize(queueSize) +{ + m_eventTriggerId = envir().taskScheduler().createEventTrigger(ZoneMinderDeviceSource::deliverFrameStub); + memset(&m_thid, 0, sizeof(m_thid)); + memset(&m_mutex, 0, sizeof(m_mutex)); + if ( m_monitor ) { + m_packetqueue = m_monitor->GetPacketQueue(); + if ( !m_packetqueue ) { + Fatal("No packetqueue"); + } + if ( useThread ) { + pthread_mutex_init(&m_mutex, nullptr); + pthread_create(&m_thid, nullptr, threadStub, this); + } else { + Debug(1, "Not using thread"); + //envir().monitorScheduler().turnOnBackgroundReadHandling( m_monitor->getFd(), ZoneMinderDeviceSource::incomingPacketHandlerStub, this); + } + } else { + Error("No monitor in ZoneMinderDeviceSource"); + } +} + +// Destructor +ZoneMinderDeviceSource::~ZoneMinderDeviceSource() { + envir().taskScheduler().deleteEventTrigger(m_eventTriggerId); + pthread_join(m_thid, nullptr); + pthread_mutex_destroy(&m_mutex); +} + +// thread mainloop +void* ZoneMinderDeviceSource::thread() { + int stop = 0; + fd_set fdset; + FD_ZERO(&fdset); + + while ( !stop ) { + getNextFrame(); + } + return nullptr; +} + +// getting FrameSource callback +void ZoneMinderDeviceSource::doGetNextFrame() { + deliverFrame(); +} + +// stopping FrameSource callback +void ZoneMinderDeviceSource::doStopGettingFrames() { + //LOG(INFO) << "ZoneMinderDeviceSource::doStopGettingFrames"; + Debug(1, "ZoneMinderDeviceSource::doStopGettingFrames"); + FramedSource::doStopGettingFrames(); +} + +// deliver frame to the sink +void ZoneMinderDeviceSource::deliverFrame() { + if ( isCurrentlyAwaitingData() ) { + fDurationInMicroseconds = 0; + fFrameSize = 0; + + pthread_mutex_lock(&m_mutex); + if ( m_captureQueue.empty() ) { + //LOG(INFO) << "Queue is empty"; + Debug(1, "Queue is empty"); + } else { + Debug(1, "Queue is not empty"); + timeval curTime; + gettimeofday(&curTime, nullptr); + NAL_Frame *frame = m_captureQueue.front(); + m_captureQueue.pop_front(); + + unsigned int nal_size = frame->size(); + m_out.notify(curTime.tv_sec, nal_size); + + if ( nal_size > fMaxSize ) { + fFrameSize = fMaxSize; + fNumTruncatedBytes = nal_size - fMaxSize; + } else { + fFrameSize = nal_size; + } + timeval diff; + timersub(&curTime, &(frame->m_timestamp), &diff); + + //LOG(INFO) << "deliverFrame\ttimestamp:" << curTime.tv_sec << "." << curTime.tv_usec << "\tsize:" << fFrameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms\tqueue:" << m_captureQueue.size(); + + fPresentationTime = frame->m_timestamp; + memcpy(fTo, frame->buffer(), fFrameSize); + delete frame; + } + pthread_mutex_unlock(&m_mutex); + + if ( fFrameSize > 0 ) { + // send Frame to the consumer + FramedSource::afterGetting(this); + } + } +} + +// FrameSource callback on read event +void ZoneMinderDeviceSource::incomingPacketHandler() { + if ( this->getNextFrame() <= 0 ) { + handleClosure(this); + } +} + +// read from monitor +int ZoneMinderDeviceSource::getNextFrame() { + timeval ref; + gettimeofday(&ref, nullptr); + + if ( !m_packetqueue_it ) { + m_packetqueue_it = m_packetqueue->get_video_it(true); + return -1; + } + ZMPacket *zm_packet = m_packetqueue->get_packet(m_packetqueue_it); + if ( !zm_packet ) { + Debug(1, "null zm_packet %p", zm_packet); + return -1; + } + // packet is locked + AVPacket pkt = zm_packet->packet; + m_packetqueue->increment_it(m_packetqueue_it); + + if ( !packetBufferSize ) { + packetBufferSize = pkt.size * 2; + Debug(1, "Initializing buffer space to %dbytes", packetBufferSize); + packetBuffer = new unsigned char[packetBufferSize]; + packetBufferPtr = packetBuffer; + } else { + int bytesAvailable = packetBufferSize - (packetBufferPtr - packetBuffer); + if ( bytesAvailable < pkt.size ) { + // not enough space in buffer, so double it. + int newPacketBufferSize = packetBufferSize * 2; + if ( newPacketBufferSize < pkt.size ) + newPacketBufferSize = pkt.size * 2; + + Debug(1, "Doubling buffer space to %d . Available=%d, pkt.size=%d", newPacketBufferSize, + bytesAvailable, pkt.size); + unsigned char *newBuffer = new unsigned char[newPacketBufferSize]; + unsigned int bytesUsed = packetBufferPtr-packetBuffer; + Debug(1, "Copying %d bytes as %p-%p", bytesUsed, packetBufferPtr, packetBuffer); + memcpy(newBuffer, packetBuffer, bytesUsed); + delete[] packetBuffer; + packetBuffer = newBuffer; + packetBufferPtr = packetBuffer + bytesUsed; + packetBufferSize = newPacketBufferSize; + } else { + Debug(1, "Not Doubling buffer spaceCurrent size %d . Available=%d, pkt.size=%d", packetBufferSize, + bytesAvailable, pkt.size); + } + } + + Debug(1, "Copying pkt data to %p. buffer start is %p, remaining buffer size %d", + packetBufferPtr, packetBuffer, packetBufferPtr-packetBuffer); + memcpy(packetBufferPtr, pkt.data, pkt.size); + packetBufferPtr += pkt.size; + zm_packet->unlock(); + + size_t frame_size; + size_t pkt_size = packetBufferPtr-packetBuffer; + + Debug(1, "Calling extractFrame. pkt size %d", pkt_size); + unsigned char *data = this->extractFrame(packetBuffer, pkt_size, frame_size); + + if ( !data ) { + ///std::cerr << "No frame from get_h264_frame\n"; + Debug(1, "No frame from packet"); + return -1; + } + + timeval tv; + gettimeofday(&tv, nullptr); + Debug(1, "Have nal frame at %p size %d. Remaining pktsize %d", data, frame_size, pkt_size); + NAL_Frame *frame = new NAL_Frame(data, frame_size, tv); + //frame->check(); + zm_packet->unlock(); + + timeval diff; + timersub(&tv, &ref, &diff); + m_in.notify(tv.tv_sec, frame->size()); + //m_in.notify(tv.tv_sec, frame->nal_size()); + //LOG(INFO) << "getNextFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << frame->nal_size() <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms"; + + pthread_mutex_lock(&m_mutex); + while ( m_captureQueue.size() >= m_queueSize ) { + //LOG(DEBUG) << "Queue full size drop frame size:" << (int)m_captureQueue.size() ; + Debug(2, "Queue full dropping frame %d", m_captureQueue.size()); + NAL_Frame * f = m_captureQueue.front(); + m_captureQueue.pop_front(); + delete f; + } + m_captureQueue.push_back(frame); + pthread_mutex_unlock(&m_mutex); + + if ( pkt_size ) { + // Discard any bytes up to and including the frame. + + memmove(packetBuffer, data+frame_size, pkt_size); + packetBufferPtr = packetBuffer + pkt_size; + Debug(1, "Updated pkt data to %p. buffer start is %p, remaining buffer size %d bytesAfter%d", + packetBufferPtr, packetBuffer, packetBufferPtr-packetBuffer, pkt_size); + } + + // post an event to ask to deliver the frame + envir().taskScheduler().triggerEvent(m_eventTriggerId, this); + return 1; +} + +// split packet in frames +std::list< std::pair > ZoneMinderDeviceSource::splitFrames(unsigned char* frame, unsigned frameSize) { + std::list< std::pair > frameList; + if ( frame != nullptr ) { + frameList.push_back(std::pair(frame, frameSize)); + } + return frameList; +} diff --git a/src/zm_rtsp_server_device_source.h b/src/zm_rtsp_server_device_source.h new file mode 100644 index 000000000..595f7b1bb --- /dev/null +++ b/src/zm_rtsp_server_device_source.h @@ -0,0 +1,109 @@ +/* --------------------------------------------------------------------------- +** +** DeviceSource.h +** +** live555 source +** +** -------------------------------------------------------------------------*/ + + +#ifndef DEVICE_SOURCE +#define DEVICE_SOURCE + +#include +#include +#include +#include + +#include + +#include "zm_monitor.h" +#include "zm_rtsp_server_frame.h" +#include "zm_packetqueue.h" + +#include +/* Four-character-code (FOURCC) */ +#define fourcc(a, b, c, d)\ + ((__u32)(a) | ((__u32)(b) << 8) | ((__u32)(c) << 16) | ((__u32)(d) << 24)) + +#define PIX_FMT_H264 fourcc('H', '2', '6', '4') /* H264 with start codes */ +#define PIX_FMT_H264_NO_SC fourcc('A', 'V', 'C', '1') /* H264 without start codes */ +#define PIX_FMT_VP8 fourcc('V', 'P', '8', '0') +#define PIX_FMT_VP9 fourcc('V', 'P', '9', '0') +#define PIX_FMT_HEVC fourcc('H', 'E', 'V', 'C') + +class ZoneMinderDeviceSource: public FramedSource { + public: + + // --------------------------------- + // Compute simple stats + // --------------------------------- + class Stats { + public: + Stats(const std::string & msg) : m_fps(0), m_fps_sec(0), m_size(0), m_msg(msg) {}; + + public: + int notify(int tv_sec, int framesize); + + protected: + int m_fps; + int m_fps_sec; + int m_size; + const std::string m_msg; + }; + + public: + static ZoneMinderDeviceSource* createNew( + UsageEnvironment& env, + Monitor* monitor, + int outputFd, + unsigned int queueSize, + bool useThread); + std::string getAuxLine() { return m_auxLine; }; + int getWidth() { return m_monitor->Width(); }; + int getHeight() { return m_monitor->Height(); }; + + protected: + ZoneMinderDeviceSource(UsageEnvironment& env, Monitor* monitor, int outputFd, unsigned int queueSize, bool useThread); + virtual ~ZoneMinderDeviceSource(); + + protected: + static void* threadStub(void* clientData) { return ((ZoneMinderDeviceSource*) clientData)->thread();}; + void* thread(); + static void deliverFrameStub(void* clientData) {((ZoneMinderDeviceSource*) clientData)->deliverFrame();}; + void deliverFrame(); + static void incomingPacketHandlerStub(void* clientData, int mask) { ((ZoneMinderDeviceSource*) clientData)->incomingPacketHandler(); }; + void incomingPacketHandler(); + int getNextFrame(); + void processFrame(char * frame, int frameSize, const timeval &ref); + void queueFrame(char * frame, int frameSize, const timeval &tv); + + // split packet in frames + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned frameSize); + + // overide FramedSource + virtual void doGetNextFrame(); + virtual void doStopGettingFrames(); + virtual unsigned char *extractFrame(unsigned char *data, size_t& size, size_t& outsize) = 0; + + protected: + unsigned int packetBufferSize; + unsigned char *packetBuffer; // buffer where we copy packet.data and looks for NALs + unsigned char *packetBufferPtr; // ptr into the buffer where we write new data + + std::list m_captureQueue; + Stats m_in; + Stats m_out; + EventTriggerId m_eventTriggerId; + int m_outfd; + Monitor* m_monitor; + zm_packetqueue *m_packetqueue; + std::list::iterator *m_packetqueue_it; + + unsigned int m_queueSize; + pthread_t m_thid; + pthread_mutex_t m_mutex; + std::string m_auxLine; +}; + +#endif diff --git a/src/zm_rtsp_server_frame.h b/src/zm_rtsp_server_frame.h new file mode 100644 index 000000000..760c4ffdc --- /dev/null +++ b/src/zm_rtsp_server_frame.h @@ -0,0 +1,55 @@ +#pragma once +#include "zm_logger.h" + +#include + // --------------------------------- + // Captured frame + // --------------------------------- +const char H264marker[] = {0,0,0,1}; +const char H264shortmarker[] = {0,0,1}; + +class NAL_Frame { + public: + NAL_Frame(unsigned char * buffer, size_t size, timeval timestamp) : + m_buffer(nullptr), m_size(size), m_timestamp(timestamp), m_ref_count(1) { + m_buffer = new unsigned char[m_size]; + memcpy(m_buffer, buffer, m_size); + }; + NAL_Frame(unsigned char* buffer, size_t size) : m_buffer(buffer), m_size(size) { + gettimeofday(&m_timestamp, NULL); + }; + NAL_Frame& operator=(const NAL_Frame&); + ~NAL_Frame() { + delete[] m_buffer; + m_buffer = nullptr; + }; + unsigned char *buffer() const { return m_buffer; }; + // The buffer has a 32bit nal size value at the front, so if we want the nal, it's + // the address of the buffer plus 4 bytes. + unsigned char *nal() const { return m_buffer+4; }; + size_t size() const { return m_size; }; + size_t nal_size() const { return m_size-4; }; + bool check() const { + // Look for marker at beginning + unsigned char *marker = (unsigned char*)memmem(m_buffer, sizeof(H264marker), H264marker, sizeof(H264marker)); + if ( marker ) { + Debug(1, "marker found at beginning"); + return true; + } else { + marker = (unsigned char*)memmem(m_buffer, m_size, H264marker, sizeof(H264marker)); + if ( marker ) { + Debug(1, "marker not found at beginning"); + return false; + } + } + return false; + } + + private: + unsigned char* m_buffer; + size_t m_size; + public: + timeval m_timestamp; + private: + int m_ref_count; +}; diff --git a/src/zm_rtsp_server_h264_device_source.cpp b/src/zm_rtsp_server_h264_device_source.cpp new file mode 100644 index 000000000..4dbeffbd6 --- /dev/null +++ b/src/zm_rtsp_server_h264_device_source.cpp @@ -0,0 +1,170 @@ +/* --------------------------------------------------------------------------- +** +** H264_DeviceSource.cpp +** +** H264 Live555 source +** +** -------------------------------------------------------------------------*/ + +#include + +// live555 +#include + +// project +#include "zm_rtsp_server_h264_device_source.h" + +// --------------------------------- +// H264 ZoneMinder FramedSource +// --------------------------------- + +// split packet into frames +std::list< std::pair > H264_ZoneMinderDeviceSource::splitFrames(unsigned char* frame, unsigned frameSize) { + std::list< std::pair > frameList; + + size_t bufSize = frameSize; + size_t size = 0; + unsigned char* buffer = this->extractFrame(frame, bufSize, size); + while ( buffer != nullptr ) { + switch ( m_frameType & 0x1F ) { + case 7: + //LOG(INFO) << "SPS size:" << size << " bufSize:" << bufSize; + m_sps.assign((char*)buffer,size); + break; + case 8: + //LOG(INFO) << "PPS size:" << size << " bufSize:" << bufSize; + m_pps.assign((char*)buffer,size); + break; + case 5: + //LOG(INFO) << "IDR size:" << size << " bufSize:" << bufSize; + if ( m_repeatConfig && !m_sps.empty() && !m_pps.empty() ) { + frameList.push_back(std::pair((unsigned char*)m_sps.c_str(), m_sps.size())); + frameList.push_back(std::pair((unsigned char*)m_pps.c_str(), m_pps.size())); + } + break; + default: + break; + } + + if ( !m_sps.empty() && !m_pps.empty() ) { + u_int32_t profile_level_id = 0; + if ( m_sps.size() >= 4 ) profile_level_id = (m_sps[1]<<16)|(m_sps[2]<<8)|m_sps[3]; + + char* sps_base64 = base64Encode(m_sps.c_str(), m_sps.size()); + char* pps_base64 = base64Encode(m_pps.c_str(), m_pps.size()); + + std::ostringstream os; + os << "profile-level-id=" << std::hex << std::setw(6) << std::setfill('0') << profile_level_id; + os << ";sprop-parameter-sets=" << sps_base64 <<"," << pps_base64; + m_auxLine.assign(os.str()); + + delete [] sps_base64; + delete [] pps_base64; + } + frameList.push_back(std::pair(buffer, size)); + + buffer = this->extractFrame(&buffer[size], bufSize, size); + } + return frameList; +} + +// split packet in frames +std::list< std::pair > H265_ZoneMinderDeviceSource::splitFrames(unsigned char* frame, unsigned frameSize) { + std::list< std::pair > frameList; + + size_t bufSize = frameSize; + size_t size = 0; + unsigned char* buffer = this->extractFrame(frame, bufSize, size); + while ( buffer != nullptr ) { + switch ((m_frameType&0x7E)>>1) { + case 32: + //Info( "VPS size:" << size << " bufSize:" << bufSize; + m_vps.assign((char*)buffer,size); + break; + case 33: + //LOG(INFO) << "SPS size:" << size << " bufSize:" << bufSize; + m_sps.assign((char*)buffer,size); + break; + case 34: + //LOG(INFO) << "PPS size:" << size << " bufSize:" << bufSize; + m_pps.assign((char*)buffer,size); + break; + case 19: + case 20: + //LOG(INFO) << "IDR size:" << size << " bufSize:" << bufSize; + if ( m_repeatConfig && !m_vps.empty() && !m_sps.empty() && !m_pps.empty() ) { + frameList.push_back(std::pair((unsigned char*)m_vps.c_str(), m_vps.size())); + frameList.push_back(std::pair((unsigned char*)m_sps.c_str(), m_sps.size())); + frameList.push_back(std::pair((unsigned char*)m_pps.c_str(), m_pps.size())); + } + break; + default: break; + } + + if (!m_vps.empty() && !m_sps.empty() && !m_pps.empty()) { + char* vps_base64 = base64Encode(m_vps.c_str(), m_vps.size()); + char* sps_base64 = base64Encode(m_sps.c_str(), m_sps.size()); + char* pps_base64 = base64Encode(m_pps.c_str(), m_pps.size()); + + std::ostringstream os; + os << "sprop-vps=" << vps_base64; + os << ";sprop-sps=" << sps_base64; + os << ";sprop-pps=" << pps_base64; + m_auxLine.assign(os.str()); + + delete [] vps_base64; + delete [] sps_base64; + delete [] pps_base64; + } + frameList.push_back(std::pair(buffer, size)); + + buffer = this->extractFrame(&buffer[size], bufSize, size); + } + return frameList; +} + +// extract a frame +unsigned char* H26X_ZoneMinderDeviceSource::extractFrame(unsigned char* frame, size_t& size, size_t& outsize) { + unsigned char * outFrame = nullptr; + outsize = 0; + unsigned int markerlength = 0; + m_frameType = 0; + + unsigned char *startFrame = (unsigned char*)memmem(frame, size, H264marker, sizeof(H264marker)); + if ( startFrame != nullptr ) { + markerlength = sizeof(H264marker); + } else { + startFrame = (unsigned char*)memmem(frame, size, H264shortmarker, sizeof(H264shortmarker)); + if ( startFrame != nullptr ) { + markerlength = sizeof(H264shortmarker); + } + } + if ( startFrame != nullptr ) { + m_frameType = startFrame[markerlength]; + + int remainingSize = size-(startFrame-frame+markerlength); + unsigned char *endFrame = (unsigned char*)memmem(&startFrame[markerlength], remainingSize, H264marker, sizeof(H264marker)); + if ( endFrame == nullptr ) { + endFrame = (unsigned char*)memmem(&startFrame[markerlength], remainingSize, H264shortmarker, sizeof(H264shortmarker)); + } + + if ( m_keepMarker ) { + size -= startFrame-frame; + outFrame = startFrame; + } else { + size -= startFrame-frame+markerlength; + outFrame = &startFrame[markerlength]; + } + + if ( endFrame != nullptr ) { + outsize = endFrame - outFrame; + } else { + outsize = size; + } + size -= outsize; + } else if ( size >= sizeof(H264shortmarker) ) { + Info("No marker found"); + } + + return outFrame; +} diff --git a/src/zm_rtsp_server_h264_device_source.h b/src/zm_rtsp_server_h264_device_source.h new file mode 100644 index 000000000..95dc33f94 --- /dev/null +++ b/src/zm_rtsp_server_h264_device_source.h @@ -0,0 +1,106 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** H264_ZoneMinderDeviceSource.h +** +** H264 ZoneMinder live555 source +** +** -------------------------------------------------------------------------*/ + + +#ifndef H264_ZoneMinder_DEVICE_SOURCE +#define H264_ZoneMinder_DEVICE_SOURCE + +// project +#include "zm_rtsp_server_device_source.h" +#include "zm_rtsp_server_frame.h" + + +// --------------------------------- +// H264 ZoneMinder FramedSource +// --------------------------------- + +class H26X_ZoneMinderDeviceSource : public ZoneMinderDeviceSource { + protected: + H26X_ZoneMinderDeviceSource( + UsageEnvironment& env, + Monitor *monitor, + int outputFd, + unsigned int queueSize, + bool useThread, + bool repeatConfig, + bool keepMarker) + : + ZoneMinderDeviceSource(env, monitor, outputFd, queueSize, useThread), + m_repeatConfig(repeatConfig), + m_keepMarker(keepMarker), + m_frameType(0) {} + + virtual ~H26X_ZoneMinderDeviceSource() {} + + virtual unsigned char* extractFrame(unsigned char* frame, size_t& size, size_t& outsize); + + protected: + std::string m_sps; + std::string m_pps; + bool m_repeatConfig; + bool m_keepMarker; + int m_frameType; +}; + +class H264_ZoneMinderDeviceSource : public H26X_ZoneMinderDeviceSource { + public: + static H264_ZoneMinderDeviceSource* createNew( + UsageEnvironment& env, + Monitor *monitor, + int outputFd, + unsigned int queueSize, + bool useThread, + bool repeatConfig, + bool keepMarker) { + return new H264_ZoneMinderDeviceSource(env, monitor, outputFd, queueSize, useThread, repeatConfig, keepMarker); + } + + protected: + H264_ZoneMinderDeviceSource( + UsageEnvironment& env, + Monitor *monitor, int outputFd, unsigned int queueSize, bool useThread, bool repeatConfig, bool keepMarker) + : H26X_ZoneMinderDeviceSource(env, monitor, outputFd, queueSize, useThread, repeatConfig, keepMarker) {} + + // overide ZoneMinderDeviceSource + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned frameSize); +}; + +class H265_ZoneMinderDeviceSource : public H26X_ZoneMinderDeviceSource { + public: + static H265_ZoneMinderDeviceSource* createNew( + UsageEnvironment& env, + Monitor *monitor, + int outputFd, + unsigned int queueSize, + bool useThread, + bool repeatConfig, + bool keepMarker) { + return new H265_ZoneMinderDeviceSource(env, monitor, outputFd, queueSize, useThread, repeatConfig, keepMarker); + } + + protected: + H265_ZoneMinderDeviceSource( + UsageEnvironment& env, + Monitor *monitor, + int outputFd, + unsigned int queueSize, + bool useThread, + bool repeatConfig, + bool keepMarker) + : H26X_ZoneMinderDeviceSource(env, monitor, outputFd, queueSize, useThread, repeatConfig, keepMarker) {} + + // overide ZoneMinderDeviceSource + virtual std::list< std::pair > splitFrames(unsigned char* frame, unsigned frameSize); + + protected: + std::string m_vps; +}; +#endif diff --git a/src/zm_rtsp_server_server_media_subsession.cpp b/src/zm_rtsp_server_server_media_subsession.cpp new file mode 100644 index 000000000..5cb2eaef6 --- /dev/null +++ b/src/zm_rtsp_server_server_media_subsession.cpp @@ -0,0 +1,94 @@ +/* --------------------------------------------------------------------------- +** +** ServerMediaSubsession.cpp +** +** -------------------------------------------------------------------------*/ + +#include + +#include "zm_rtsp_server_server_media_subsession.h" +#include "zm_rtsp_server_device_source.h" + +// --------------------------------- +// BaseServerMediaSubsession +// --------------------------------- +FramedSource* BaseServerMediaSubsession::createSource( + UsageEnvironment& env, FramedSource* videoES, const std::string& format) +{ + FramedSource* source = nullptr; + if ( format == "video/MP2T" ) { + source = MPEG2TransportStreamFramer::createNew(env, videoES); + } else if ( format == "video/H264" ) { + source = H264VideoStreamDiscreteFramer::createNew(env, videoES); + } +#if LIVEMEDIA_LIBRARY_VERSION_INT > 1414454400 + else if ( format == "video/H265" ) { + source = H265VideoStreamDiscreteFramer::createNew(env, videoES); + } +#endif +#if 0 + else if (format == "video/JPEG") { + source = MJPEGVideoSource::createNew(env, videoES); + } +#endif + else { + source = videoES; + } + Error("Source %p %s", source, format.c_str()); + return source; +} + +RTPSink* BaseServerMediaSubsession::createSink( + UsageEnvironment& env, + Groupsock* rtpGroupsock, + unsigned char rtpPayloadTypeIfDynamic, + const std::string& format + ) { + RTPSink* videoSink = nullptr; + if ( format == "video/MP2T" ) { + videoSink = SimpleRTPSink::createNew(env, rtpGroupsock, rtpPayloadTypeIfDynamic, 90000, "video", "MP2T", 1, True, False); + } else if ( format == "video/H264" ) { + videoSink = H264VideoRTPSink::createNew(env, rtpGroupsock,rtpPayloadTypeIfDynamic); + } else if ( format == "video/VP8" ) { + videoSink = VP8VideoRTPSink::createNew (env, rtpGroupsock,rtpPayloadTypeIfDynamic); + } +#if LIVEMEDIA_LIBRARY_VERSION_INT > 1414454400 + else if ( format == "video/VP9" ) { + videoSink = VP9VideoRTPSink::createNew (env, rtpGroupsock,rtpPayloadTypeIfDynamic); + } else if ( format == "video/H265" ) { + videoSink = H265VideoRTPSink::createNew(env, rtpGroupsock,rtpPayloadTypeIfDynamic); +#endif + } else { + std::cerr << "unknown format\n"; + } +#if 0 + else if (format == "video/JPEG") { + videoSink = JPEGVideoRTPSink::createNew (env, rtpGroupsock); + } +#endif + Error("Sinkce %p %s", videoSink, format.c_str()); + return videoSink; +} + +char const* BaseServerMediaSubsession::getAuxLine( + ZoneMinderDeviceSource* source, + unsigned char rtpPayloadType + ) { + const char* auxLine = nullptr; + if ( source ) { + std::ostringstream os; + os << "a=fmtp:" << int(rtpPayloadType) << " "; + os << source->getAuxLine(); + os << "\r\n"; + int width = source->getWidth(); + int height = source->getHeight(); + if ( (width > 0) && (height>0) ) { + os << "a=x-dimensions:" << width << "," << height << "\r\n"; + } + auxLine = strdup(os.str().c_str()); + Error( "auxLine: %s", auxLine); + } else { + Error( "No source auxLine: "); + } + return auxLine; +} diff --git a/src/zm_rtsp_server_server_media_subsession.h b/src/zm_rtsp_server_server_media_subsession.h new file mode 100644 index 000000000..1a719c74c --- /dev/null +++ b/src/zm_rtsp_server_server_media_subsession.h @@ -0,0 +1,39 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** ServerMediaSubsession.h +** +** -------------------------------------------------------------------------*/ + +#pragma once + +#include + +#include +#include +#include +#include + +// live555 +#include + +// forward declaration +class ZoneMinderDeviceSource; + +// --------------------------------- +// BaseServerMediaSubsession +// --------------------------------- +class BaseServerMediaSubsession { + public: + BaseServerMediaSubsession(StreamReplicator* replicator): m_replicator(replicator) {}; + + public: + static FramedSource* createSource(UsageEnvironment& env, FramedSource * videoES, const std::string& format); + static RTPSink* createSink(UsageEnvironment& env, Groupsock * rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, const std::string& format); + char const* getAuxLine(ZoneMinderDeviceSource* source, unsigned char rtpPayloadType); + + protected: + StreamReplicator* m_replicator; +}; diff --git a/src/zm_rtsp_server_unicast_server_media_subsession.cpp b/src/zm_rtsp_server_unicast_server_media_subsession.cpp new file mode 100644 index 000000000..b823f6830 --- /dev/null +++ b/src/zm_rtsp_server_unicast_server_media_subsession.cpp @@ -0,0 +1,38 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** ServerMediaSubsession.cpp +** +** -------------------------------------------------------------------------*/ + + +#include "zm_rtsp_server_unicast_server_media_subsession.h" +#include "zm_rtsp_server_device_source.h" + +// ----------------------------------------- +// ServerMediaSubsession for Unicast +// ----------------------------------------- +UnicastServerMediaSubsession* UnicastServerMediaSubsession::createNew( + UsageEnvironment& env, StreamReplicator* replicator, const std::string& format) +{ + return new UnicastServerMediaSubsession(env, replicator, format); +} + +FramedSource* UnicastServerMediaSubsession::createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate) +{ + FramedSource* source = m_replicator->createStreamReplica(); + return createSource(envir(), source, m_format); +} + +RTPSink* UnicastServerMediaSubsession::createNewRTPSink( + Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource) +{ + return createSink(envir(), rtpGroupsock, rtpPayloadTypeIfDynamic, m_format); +} + +char const* UnicastServerMediaSubsession::getAuxSDPLine(RTPSink* rtpSink, FramedSource* inputSource) +{ + return this->getAuxLine(dynamic_cast(m_replicator->inputSource()), rtpSink->rtpPayloadType()); +} diff --git a/src/zm_rtsp_server_unicast_server_media_subsession.h b/src/zm_rtsp_server_unicast_server_media_subsession.h new file mode 100644 index 000000000..3531ff746 --- /dev/null +++ b/src/zm_rtsp_server_unicast_server_media_subsession.h @@ -0,0 +1,43 @@ +/* --------------------------------------------------------------------------- +** This software is in the public domain, furnished "as is", without technical +** support, and with no warranty, express or implied, as to its usefulness for +** any purpose. +** +** ServerMediaSubsession.h +** +** -------------------------------------------------------------------------*/ + +#pragma once + +#include "zm_rtsp_server_server_media_subsession.h" + +// ----------------------------------------- +// ServerMediaSubsession for Unicast +// ----------------------------------------- +class UnicastServerMediaSubsession : + public OnDemandServerMediaSubsession, + public BaseServerMediaSubsession +{ + public: + static UnicastServerMediaSubsession* createNew( + UsageEnvironment& env, + StreamReplicator* replicator, + const std::string& format); + + protected: + UnicastServerMediaSubsession( + UsageEnvironment& env, + StreamReplicator* replicator, + const std::string& format) + : + OnDemandServerMediaSubsession(env, False), + BaseServerMediaSubsession(replicator), + m_format(format) {}; + + virtual FramedSource* createNewStreamSource(unsigned clientSessionId, unsigned& estBitrate); + virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock, unsigned char rtpPayloadTypeIfDynamic, FramedSource* inputSource); + virtual char const* getAuxSDPLine(RTPSink* rtpSink,FramedSource* inputSource); + + protected: + const std::string m_format; +};