Switch from live555 to PHZ76/RtspServer

This commit is contained in:
Isaac Connor 2021-03-10 11:01:04 -05:00
parent f4cb4ec5b3
commit b1f6eb127b
14 changed files with 301 additions and 474 deletions

View File

@ -52,17 +52,11 @@ set(ZM_BIN_SRC_FILES
zm_rtp_source.cpp
zm_rtsp.cpp
zm_rtsp_auth.cpp
zm_rtsp_server_thread.cpp
zm_rtsp_server_adts_source.cpp
zm_rtsp_server_adts_fifo_source.cpp
zm_rtsp_server_h264_device_source.cpp
zm_rtsp_server_h264_fifo_source.cpp
zm_rtsp_server_device_source.cpp
zm_rtsp_server_fifo_source.cpp
zm_rtsp_server_fifo_adts_source.cpp
zm_rtsp_server_fifo_h264_source.cpp
zm_rtsp_server_fifo_audio_source.cpp
zm_rtsp_server_fifo_video_source.cpp
zm_rtsp_server_server_media_subsession.cpp
zm_rtsp_server_unicast_server_media_subsession.cpp
zm_sdp.cpp
zm_signal.cpp
zm_stream.cpp
@ -85,6 +79,7 @@ target_link_libraries(zm
PUBLIC
libbcrypt::bcrypt
jwt-cpp::jwt-cpp
RtspServer::RtspServer
PRIVATE
zm-core-interface)

View File

@ -51,15 +51,16 @@ and provide that stream over rtsp
#include "zm_db.h"
#include "zm_define.h"
#include "zm_monitor.h"
#include "zm_rtsp_server_thread.h"
#include "zm_rtsp_server_fifo_audio_source.h"
#include "zm_rtsp_server_fifo_video_source.h"
#include "zm_rtsp_server_fifo_h264_source.h"
#include "zm_rtsp_server_fifo_adts_source.h"
#include "zm_signal.h"
#include "zm_time.h"
#include "zm_utils.h"
#include <getopt.h>
#include <iostream>
#include <StreamReplicator.hh>
#include "xop/RtspServer.h"
void Usage() {
fprintf(stderr, "zm_rtsp_server -m <monitor_id>\n");
@ -157,17 +158,23 @@ int main(int argc, char *argv[]) {
sigaddset(&block_set, SIGUSR1);
sigaddset(&block_set, SIGUSR2);
std::unique_ptr<RTSPServerThread> rtsp_server_thread;
if (config.min_rtsp_port) {
rtsp_server_thread = ZM::make_unique<RTSPServerThread>(config.min_rtsp_port);
Debug(1, "Starting RTSP server because min_rtsp_port is set");
} else {
if (!config.min_rtsp_port) {
Debug(1, "Not starting RTSP server because min_rtsp_port not set");
exit(-1);
}
ServerMediaSession **sessions = new ServerMediaSession *[monitors.size()];
std::shared_ptr<xop::EventLoop> eventLoop(new xop::EventLoop());
std::shared_ptr<xop::RtspServer> rtspServer = xop::RtspServer::Create(eventLoop.get());
if ( !rtspServer->Start("0.0.0.0", config.min_rtsp_port) ) {
Debug(1, "Failed starting RTSP server on port %d", config.min_rtsp_port);
exit(-1);
}
xop::MediaSession **sessions = new xop::MediaSession *[monitors.size()];
for (size_t i = 0; i < monitors.size(); i++) sessions[i] = nullptr;
std::list<ZoneMinderFifoSource *> sources;
while (!zm_terminate) {
for (size_t i = 0; i < monitors.size(); i++) {
@ -176,13 +183,12 @@ int main(int argc, char *argv[]) {
if (!(monitor->ShmValid() or monitor->connect())) {
Warning("Couldn't connect to monitor %d", monitor->Id());
if (sessions[i]) {
rtsp_server_thread->removeSession(sessions[i]);
rtspServer->RemoveSession(sessions[i]->GetMediaSessionId());
sessions[i] = nullptr;
}
monitor->disconnect();
continue;
}
Debug(1, "monitor %d is connected", monitor->Id());
if (!sessions[i]) {
std::string videoFifoPath = monitor->GetVideoFifoPath();
@ -191,14 +197,49 @@ int main(int argc, char *argv[]) {
continue;
}
std::string streamname = monitor->GetRTSPStreamName();
Debug(1, "Adding session for %s", streamname.c_str());
ServerMediaSession *sms = sessions[i] = rtsp_server_thread->addSession(streamname);
xop::MediaSession *session = sessions[i] = xop::MediaSession::CreateNew(streamname);
if (session) {
session->AddNotifyConnectedCallback([] (xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port){
Debug(1, "RTSP client connect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port);
});
session->AddNotifyDisconnectedCallback([](xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port) {
Debug(1, "RTSP client disconnect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port);
});
rtspServer->AddSession(session);
//char *url = rtspServer->rtspURL(session);
//Debug(1, "url is %s for stream %s", url, streamname.c_str());
//delete[] url;
}
Debug(1, "Adding video fifo %s", videoFifoPath.c_str());
ZoneMinderFifoVideoSource *video_source = static_cast<ZoneMinderFifoVideoSource *>(rtsp_server_thread->addFifo(sms, videoFifoPath));
ZoneMinderFifoVideoSource *videoSource = nullptr;
if (std::string::npos != videoFifoPath.find("h264")) {
session->AddSource(xop::channel_0, xop::H264Source::CreateNew());
videoSource = new H264_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, videoFifoPath);
} else if (
std::string::npos != videoFifoPath.find("hevc")
or
std::string::npos != videoFifoPath.find("h265")) {
session->AddSource(xop::channel_0, xop::H265Source::CreateNew());
videoSource = new H265_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, videoFifoPath);
} else {
Warning("Unknown format in %s", videoFifoPath.c_str());
}
if (videoSource == nullptr) {
Error("Unable to create source");
}
sources.push_back(videoSource);
#if 0
if (video_source) {
video_source->setWidth(monitor->Width());
video_source->setHeight(monitor->Height());
}
#endif
std::string audioFifoPath = monitor->GetAudioFifoPath();
if (audioFifoPath.empty()) {
@ -206,13 +247,28 @@ int main(int argc, char *argv[]) {
continue;
}
Debug(1, "Adding audio fifo %s", audioFifoPath.c_str());
ZoneMinderFifoAudioSource *audio_source = static_cast<ZoneMinderFifoAudioSource *>(rtsp_server_thread->addFifo(sms, audioFifoPath));
if (audio_source) {
audio_source->setFrequency(monitor->GetAudioFrequency());
audio_source->setChannels(monitor->GetAudioChannels());
ZoneMinderFifoAudioSource *audioSource = nullptr;
if (std::string::npos != audioFifoPath.find("aac")) {
Debug(1, "Adding aac source at %dHz %d channels", monitor->GetAudioFrequency(), monitor->GetAudioChannels());
session->AddSource(xop::channel_1, xop::AACSource::CreateNew(
monitor->GetAudioFrequency(),
monitor->GetAudioChannels(),
false /* has_adts */));
audioSource = new ADTS_ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_1, audioFifoPath);
audioSource->setFrequency(monitor->GetAudioFrequency());
audioSource->setChannels(monitor->GetAudioChannels());
} else {
Warning("Unknown format in %s", audioFifoPath.c_str());
}
if (audioSource == nullptr) {
Error("Unable to create source");
}
sources.push_back(audioSource);
} // end if ! sessions[i]
} // end foreach monitor
sleep(1);
if (zm_reload) {
@ -229,11 +285,23 @@ int main(int argc, char *argv[]) {
for (size_t i = 0; i < monitors.size(); i++) {
if (sessions[i]) {
Debug(1, "Removing session for %s", monitors[i]->Name());
rtsp_server_thread->removeSession(sessions[i]);
rtspServer->RemoveSession(sessions[i]->GetMediaSessionId());
sessions[i] = nullptr;
}
} // end foreach monitor
rtsp_server_thread->Stop();
for ( std::list<ZoneMinderFifoSource *>::iterator it = sources.begin(); it != sources.end(); ++it ) {
Debug(1, "RTSPServerThread::stopping source");
(*it)->Stop();
}
while (sources.size()) {
Debug(1, "RTSPServerThread::stop closing source");
ZoneMinderFifoSource *source = sources.front();
sources.pop_front();
delete source;
}
rtspServer->Stop();
delete[] sessions;
sessions = nullptr;

View File

@ -7,32 +7,27 @@
** -------------------------------------------------------------------------*/
#include "zm_logger.h"
#include "zm_rtsp_server_adts_fifo_source.h"
#include "zm_rtsp_server_fifo_adts_source.h"
#include <iomanip>
#include <sstream>
#if HAVE_RTSP_SERVER
static unsigned const samplingFrequencyTable[16] = {
96000, 88200, 64000, 48000,
44100, 32000, 24000, 22050,
16000, 12000, 11025, 8000,
7350, 0, 0, 0
};
// ---------------------------------
// ADTS ZoneMinder FramedSource
// ---------------------------------
//
ADTS_ZoneMinderFifoSource::ADTS_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
)
:
ZoneMinderFifoAudioSource(env, fifo, queueSize)
ZoneMinderFifoAudioSource(rtspServer, sessionId, channelId, fifo)
{
#if 1
#if 0
int profile = 0;
unsigned char audioSpecificConfig[2];

View File

@ -21,19 +21,12 @@
// ---------------------------------
class ADTS_ZoneMinderFifoSource : public ZoneMinderFifoAudioSource {
public:
static ADTS_ZoneMinderFifoSource* createNew(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
) {
return new ADTS_ZoneMinderFifoSource(env, fifo, queueSize);
};
protected:
ADTS_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
public:
ADTS_ZoneMinderFifoSource(
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
virtual ~ADTS_ZoneMinderFifoSource() {}

View File

@ -10,7 +10,7 @@
#if HAVE_RTSP_SERVER
static unsigned const samplingFrequencyTable[16] = {
static const int samplingFrequencyTable[16] = {
96000, 88200, 64000, 48000,
44100, 32000, 24000, 22050,
16000, 12000, 11025, 8000,
@ -21,12 +21,13 @@ static unsigned const samplingFrequencyTable[16] = {
// ---------------------------------
//
ZoneMinderFifoAudioSource::ZoneMinderFifoAudioSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
)
:
ZoneMinderFifoSource(env, fifo, queueSize),
ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo),
samplingFrequencyIndex(-1),
frequency(-1),
channels(1)
@ -37,4 +38,17 @@ int ZoneMinderFifoAudioSource::getFrequencyIndex() {
if (samplingFrequencyTable[i] == frequency) return i;
return -1;
}
void ZoneMinderFifoAudioSource::PushFrame(const uint8_t *data, size_t size, int64_t pts) {
Debug(1, "Pushing audio frame to session %d channel %d pts %" PRId64, m_sessionId, m_channelId, pts);
xop::AVFrame frame = {0};
frame.type = xop::AUDIO_FRAME;
frame.size = size;
frame.timestamp = av_rescale_q(pts, AV_TIME_BASE_Q, m_timeBase);
frame.buffer.reset(new uint8_t[size]);
memcpy(frame.buffer.get(), data, size);
m_rtspServer->PushFrame(m_sessionId, m_channelId, frame);
}
#endif // HAVE_RTSP_SERVER

View File

@ -22,26 +22,19 @@
class ZoneMinderFifoAudioSource : public ZoneMinderFifoSource {
public:
static ZoneMinderFifoAudioSource* createNew(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
) {
return new ZoneMinderFifoAudioSource(env, fifo, queueSize);
};
protected:
ZoneMinderFifoAudioSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
virtual ~ZoneMinderFifoAudioSource() {}
public:
void setFrequency(int p_frequency) {
frequency = p_frequency;
samplingFrequencyIndex = getFrequencyIndex();
m_timeBase = {1, frequency};
};
int getFrequency() { return frequency; };
int getFrequencyIndex();
@ -49,10 +42,13 @@ class ZoneMinderFifoAudioSource : public ZoneMinderFifoSource {
void setChannels(int p_channels) { channels = p_channels; };
int getChannels() const { return channels; };
protected:
void PushFrame(const uint8_t *data, size_t size, int64_t pts);
protected:
std::string config;
int samplingFrequencyIndex;
unsigned int frequency;
int frequency;
int channels;
};
#endif // HAVE_RTSP_SERVER

View File

@ -6,11 +6,10 @@
**
** -------------------------------------------------------------------------*/
#include "zm_rtsp_server_h264_fifo_source.h"
#include "zm_rtsp_server_fifo_h264_source.h"
#include "zm_config.h"
#include "zm_logger.h"
#include "zm_rtsp_server_frame.h"
#include <iomanip>
#include <sstream>
@ -23,26 +22,27 @@
// ---------------------------------
//
H264_ZoneMinderFifoSource::H264_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker)
: H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker)
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
)
: H26X_ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo)
{
// extradata appears to simply be the SPS and PPS NAL's
//this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size);
}
// split packet into frames
std::list< std::pair<unsigned char*, size_t> > H264_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) {
std::list< std::pair<unsigned char*, size_t> > H264_ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) {
std::list< std::pair<unsigned char*, size_t> > frameList;
size_t bufSize = frameSize;
size_t size = 0;
unsigned char* buffer = this->extractFrame(frame, bufSize, size);
bool updateAux = false;
while ( buffer != nullptr ) {
#if 0
bool updateAux = false;
switch ( m_frameType & 0x1F ) {
case 7:
Debug(4, "SPS_Size: %d bufSize %d", size, bufSize);
@ -83,6 +83,7 @@ std::list< std::pair<unsigned char*, size_t> > H264_ZoneMinderFifoSource::splitF
delete [] sps_base64;
delete [] pps_base64;
}
#endif
frameList.push_back(std::pair<unsigned char*,size_t>(buffer, size));
buffer = this->extractFrame(&buffer[size], bufSize, size);
@ -92,12 +93,12 @@ std::list< std::pair<unsigned char*, size_t> > H264_ZoneMinderFifoSource::splitF
}
H265_ZoneMinderFifoSource::H265_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker)
: H26X_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker)
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
)
: H26X_ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo)
{
// extradata appears to simply be the SPS and PPS NAL's
// this->splitFrames(m_stream->codecpar->extradata, m_stream->codecpar->extradata_size);
@ -105,14 +106,15 @@ H265_ZoneMinderFifoSource::H265_ZoneMinderFifoSource(
// split packet in frames
std::list< std::pair<unsigned char*,size_t> >
H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) {
std::list< std::pair<unsigned char*,size_t> > frameList;
H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) {
std::list< std::pair<unsigned char*, size_t> > frameList;
size_t bufSize = frameSize;
size_t size = 0;
unsigned char* buffer = this->extractFrame(frame, bufSize, size);
bool updateAux = false;
while ( buffer != nullptr ) {
#if 0
bool updateAux = false;
switch ((m_frameType&0x7E)>>1) {
case 32:
Debug(4, "VPS_Size: %d bufSize %d", size, bufSize);
@ -160,13 +162,11 @@ H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize
delete [] sps_base64;
delete [] pps_base64;
}
#endif
frameList.push_back(std::pair<unsigned char*,size_t>(buffer, size));
buffer = this->extractFrame(&buffer[size], bufSize, size);
} // end while buffer
if ( bufSize ) {
Debug(1, "%d bytes remaining", bufSize);
}
frameSize = bufSize;
return frameList;
} // end H265_ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned frameSize)

View File

@ -19,18 +19,19 @@
// H264 ZoneMinder FramedSource
// ---------------------------------
#if HAVE_RTSP_SERVER
const char H264marker[] = {0,0,0,1};
const char H264shortmarker[] = {0,0,1};
class H26X_ZoneMinderFifoSource : public ZoneMinderFifoVideoSource {
protected:
public:
H26X_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker)
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
)
:
ZoneMinderFifoVideoSource(env, fifo, queueSize),
m_repeatConfig(repeatConfig),
m_keepMarker(keepMarker),
ZoneMinderFifoVideoSource(rtspServer, sessionId, channelId, fifo),
m_keepMarker(false),
m_frameType(0) { }
virtual ~H26X_ZoneMinderFifoSource() {}
@ -41,55 +42,34 @@ class H26X_ZoneMinderFifoSource : public ZoneMinderFifoVideoSource {
protected:
std::string m_sps;
std::string m_pps;
bool m_repeatConfig;
bool m_keepMarker;
int m_frameType;
};
class H264_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource {
public:
static H264_ZoneMinderFifoSource* createNew(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker) {
return new H264_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker);
}
protected:
H264_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker);
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
// overide ZoneMinderFifoSource
virtual std::list< std::pair<unsigned char*,size_t> > splitFrames(unsigned char* frame, unsigned &frameSize);
virtual std::list< std::pair<unsigned char*,size_t> > splitFrames(unsigned char* frame, size_t &frameSize);
};
class H265_ZoneMinderFifoSource : public H26X_ZoneMinderFifoSource {
public:
static H265_ZoneMinderFifoSource* createNew(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker) {
return new H265_ZoneMinderFifoSource(env, fifo, queueSize, repeatConfig, keepMarker);
}
protected:
H265_ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize,
bool repeatConfig,
bool keepMarker);
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
// overide ZoneMinderFifoSource
virtual std::list< std::pair<unsigned char*,size_t> > splitFrames(unsigned char* frame, unsigned &frameSize);
virtual std::list< std::pair<unsigned char*,size_t> > splitFrames(unsigned char* frame, size_t &frameSize);
protected:
std::string m_vps;

View File

@ -11,8 +11,8 @@
#include "zm_rtsp_server_fifo_source.h"
#include "zm_config.h"
#include "zm_ffmpeg.h"
#include "zm_logger.h"
#include "zm_rtsp_server_frame.h"
#include "zm_signal.h"
#include <fcntl.h>
@ -20,16 +20,17 @@
#if HAVE_RTSP_SERVER
ZoneMinderFifoSource::ZoneMinderFifoSource(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
) :
FramedSource(env),
m_rtspServer(rtspServer),
m_sessionId(sessionId),
m_channelId(channelId),
m_fifo(fifo),
m_queueSize(queueSize),
m_fd(-1)
{
m_eventTriggerId = envir().taskScheduler().createEventTrigger(ZoneMinderFifoSource::deliverFrameStub);
memset(&m_thid, 0, sizeof(m_thid));
memset(&m_mutex, 0, sizeof(m_mutex));
pthread_mutex_init(&m_mutex, nullptr);
@ -39,13 +40,7 @@ ZoneMinderFifoSource::ZoneMinderFifoSource(
ZoneMinderFifoSource::~ZoneMinderFifoSource() {
Debug(1, "Deleting Fifo Source");
stop = 1;
envir().taskScheduler().deleteEventTrigger(m_eventTriggerId);
pthread_join(m_thid, nullptr);
while (m_captureQueue.size()) {
NAL_Frame * f = m_captureQueue.front();
m_captureQueue.pop_front();
delete f;
}
Debug(1, "Deleting Fifo Source done");
pthread_mutex_destroy(&m_mutex);
}
@ -55,78 +50,11 @@ void* ZoneMinderFifoSource::thread() {
stop = 0;
while (!stop) {
if ( getNextFrame() < 0 )
sleep(1);
if (getNextFrame() < 0) sleep(1);
}
return nullptr;
}
// getting FrameSource callback
void ZoneMinderFifoSource::doGetNextFrame() {
deliverFrame();
}
// stopping FrameSource callback
void ZoneMinderFifoSource::doStopGettingFrames() {
//stop = 1;
Debug(1, "ZoneMinderFifoSource::doStopGettingFrames");
FramedSource::doStopGettingFrames();
}
// deliver frame to the sink
void ZoneMinderFifoSource::deliverFrame() {
if (!isCurrentlyAwaitingData()) {
Debug(5, "not awaiting data");
return;
}
pthread_mutex_lock(&m_mutex);
if (m_captureQueue.empty()) {
Debug(5, "Queue is empty");
pthread_mutex_unlock(&m_mutex);
return;
}
NAL_Frame *frame = m_captureQueue.front();
m_captureQueue.pop_front();
pthread_mutex_unlock(&m_mutex);
fDurationInMicroseconds = 0;
fFrameSize = 0;
unsigned int nal_size = frame->size();
if (nal_size > fMaxSize) {
fFrameSize = fMaxSize;
fNumTruncatedBytes = nal_size - fMaxSize;
} else {
fFrameSize = nal_size;
}
Debug(4, "deliverFrame timestamp: %ld.%06ld size: %d queuesize: %d",
frame->m_timestamp.tv_sec, frame->m_timestamp.tv_usec,
fFrameSize,
m_captureQueue.size()
);
fPresentationTime = frame->m_timestamp;
memcpy(fTo, frame->buffer(), fFrameSize);
if (fFrameSize > 0) {
// send Frame to the consumer
FramedSource::afterGetting(this);
}
delete frame;
} // end void ZoneMinderFifoSource::deliverFrame()
// FrameSource callback on read event
void ZoneMinderFifoSource::incomingPacketHandler() {
Debug(1, "incomingPacketHandler");
if (this->getNextFrame() <= 0) {
handleClosure(this);
}
}
// read from monitor
int ZoneMinderFifoSource::getNextFrame() {
if (zm_terminate or stop) {
@ -221,53 +149,28 @@ int ZoneMinderFifoSource::getNextFrame() {
}
}
unsigned char *packet_start = m_buffer.head() + header_size;
// splitFrames modifies so make a copy
unsigned int bytes_remaining = data_size;
size_t bytes_remaining = data_size;
std::list< std::pair<unsigned char*, size_t> > framesList = this->splitFrames(packet_start, bytes_remaining);
Debug(3, "Got %d frames, consuming %d bytes", framesList.size(), header_size + data_size);
Debug(3, "Got %d frames, consuming %d bytes, remaining %d", framesList.size(), header_size + data_size, bytes_remaining);
m_buffer.consume(header_size + data_size);
timeval tv;
tv.tv_sec = pts / 1000000;
tv.tv_usec = pts % 1000000;
while (framesList.size()) {
std::pair<unsigned char*, size_t> nal = framesList.front();
framesList.pop_front();
NAL_Frame *frame = new NAL_Frame(nal.first, nal.second, tv);
Debug(3, "Got frame, size %d, queue_size %d", frame->size(), m_captureQueue.size());
pthread_mutex_lock(&m_mutex);
if (m_captureQueue.size() > 25) { // 1 sec at 25 fps
NAL_Frame * f = m_captureQueue.front();
while (m_captureQueue.size() and ((tv.tv_sec - f->m_timestamp.tv_sec) > 2)) {
m_captureQueue.pop_front();
delete f;
f = m_captureQueue.front();
}
Debug(3, "Done clearing. Queue size is now %d", m_captureQueue.size());
}
m_captureQueue.push_back(frame);
pthread_mutex_unlock(&m_mutex);
// post an event to ask to deliver the frame
envir().taskScheduler().triggerEvent(m_eventTriggerId, this);
} // end while we get frame from data
PushFrame(nal.first, nal.second, pts);
}
} // end while m_buffer.size()
return 1;
}
// split packet in frames
std::list< std::pair<unsigned char*,size_t> > ZoneMinderFifoSource::splitFrames(unsigned char* frame, unsigned &frameSize) {
std::list< std::pair<unsigned char*,size_t> > frameList;
if (frame != nullptr) {
frameList.push_back(std::pair<unsigned char*,size_t>(frame, frameSize));
}
// We consume it all
std::list< std::pair<unsigned char*,size_t> > ZoneMinderFifoSource::splitFrames(unsigned char* frame, size_t &frameSize) {
std::list< std::pair<unsigned char*,size_t> > frameList;
if ( frame != nullptr ) {
frameList.push_back(std::pair<unsigned char*,size_t>(frame, frameSize));
}
frameSize = 0;
return frameList;
return frameList;
}
// extract a frame

View File

@ -11,70 +11,53 @@
#include "zm_buffer.h"
#include "zm_config.h"
#include "zm_ffmpeg.h"
#include "zm_define.h"
#include <list>
#include <string>
#include <utility>
#if HAVE_RTSP_SERVER
#include <liveMedia.hh>
#include "xop/RtspServer.h"
class NAL_Frame;
class ZoneMinderFifoSource: public FramedSource {
class ZoneMinderFifoSource {
public:
static ZoneMinderFifoSource* createNew(
UsageEnvironment& env,
std::string fifo,
unsigned int queueSize
) {
return new ZoneMinderFifoSource(env, fifo, queueSize);
};
std::string getAuxLine() { return m_auxLine; };
int getWidth() { return m_width; };
int getHeight() { return m_height; };
int setWidth(int width) { return m_width=width; };
int setHeight(int height) { return m_height=height; };
protected:
ZoneMinderFifoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize);
void Stop() { stop=1; };
ZoneMinderFifoSource(
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
virtual ~ZoneMinderFifoSource();
protected:
static void* threadStub(void* clientData) { return ((ZoneMinderFifoSource*) clientData)->thread();};
void* thread();
static void deliverFrameStub(void* clientData) {((ZoneMinderFifoSource*) clientData)->deliverFrame();};
void deliverFrame();
static void incomingPacketHandlerStub(void* clientData, int mask) { ((ZoneMinderFifoSource*) clientData)->incomingPacketHandler(); };
void incomingPacketHandler();
int getNextFrame();
void processFrame(char * frame, int frameSize, const timeval &ref);
void queueFrame(char * frame, int frameSize, const timeval &tv);
// split packet in frames
virtual std::list< std::pair<unsigned char*, size_t> > splitFrames(unsigned char* frame, unsigned &frameSize);
// overide FramedSource
virtual void doGetNextFrame();
virtual void doStopGettingFrames();
virtual void PushFrame(const uint8_t *data, size_t size, int64_t pts) = 0;
// split packet in frames
virtual std::list< std::pair<unsigned char*, size_t> > splitFrames(unsigned char* frame, size_t &frameSize);
virtual unsigned char *extractFrame(unsigned char *data, size_t& size, size_t& outsize);
protected:
std::list<NAL_Frame*> m_captureQueue;
EventTriggerId m_eventTriggerId;
std::string m_fifo;
int m_width;
int m_height;
unsigned int m_queueSize;
pthread_t m_thid;
pthread_mutex_t m_mutex;
std::string m_auxLine;
int stop;
std::shared_ptr<xop::RtspServer>& m_rtspServer;
xop::MediaSessionId m_sessionId;
xop::MediaChannelId m_channelId;
std::string m_fifo;
int m_fd;
Buffer m_buffer;
AVRational m_timeBase;
};
#endif // HAVE_RTSP_SERVER

View File

@ -12,9 +12,26 @@
#if HAVE_RTSP_SERVER
ZoneMinderFifoVideoSource::ZoneMinderFifoVideoSource(
UsageEnvironment& env, std::string fifo, unsigned int queueSize) :
ZoneMinderFifoSource(env,fifo,queueSize)
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
) :
ZoneMinderFifoSource(rtspServer, sessionId, channelId, fifo)
{
m_timeBase = {1, 90000};
}
void ZoneMinderFifoVideoSource::PushFrame(const uint8_t *data, size_t size, int64_t pts) {
xop::AVFrame frame = {0};
frame.type = 0;
frame.size = size;
frame.timestamp = av_rescale_q(pts, AV_TIME_BASE_Q, m_timeBase);
frame.buffer.reset(new uint8_t[size]);
memcpy(frame.buffer.get(), data, size);
m_rtspServer->PushFrame(m_sessionId, m_channelId, frame);
}
#endif // HAVE_RTSP_SERVER

View File

@ -21,9 +21,16 @@ class ZoneMinderFifoVideoSource: public ZoneMinderFifoSource {
int setWidth(int width) { return m_width=width; };
int setHeight(int height) { return m_height=height; };
protected:
ZoneMinderFifoVideoSource(UsageEnvironment& env, std::string fifo, unsigned int queueSize);
ZoneMinderFifoVideoSource(
std::shared_ptr<xop::RtspServer>& rtspServer,
xop::MediaSessionId sessionId,
xop::MediaChannelId channelId,
std::string fifo
);
protected:
void PushFrame(const uint8_t *data, size_t size, int64_t pts);
protected:
int m_width;
int m_height;
};

View File

@ -2,38 +2,22 @@
#include "zm_config.h"
#include "zm_logger.h"
#include "zm_rtsp_server_adts_fifo_source.h"
#include "zm_rtsp_server_adts_source.h"
#include "zm_rtsp_server_h264_fifo_source.h"
#include "zm_rtsp_server_h264_device_source.h"
#include "zm_rtsp_server_unicast_server_media_subsession.h"
#if HAVE_RTSP_SERVER
#include <StreamReplicator.hh>
RTSPServerThread::RTSPServerThread(int port) :
terminate_(false), scheduler_watch_var_(0)
RTSPServerThread::RTSPServerThread(int p_port) :
terminate_(false), scheduler_watch_var_(0), port(p_port)
{
//unsigned short rtsp_over_http_port = 0;
//const char *realm = "ZoneMinder";
//unsigned int timeout = 65;
OutPacketBuffer::maxSize = 2000000;
scheduler = BasicTaskScheduler::createNew();
env = BasicUsageEnvironment::createNew(*scheduler);
authDB = nullptr;
//authDB = new UserAuthenticationDatabase("ZoneMinder");
//authDB->addUserRecord("username1", "password1"); // replace these with real strings
portNumBits rtspServerPortNum = port;
rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, authDB);
//
eventLoop = std::make_shared<xop::EventLoop>();
rtspServer = xop::RtspServer::Create(eventLoop.get());
if ( rtspServer == nullptr ) {
Fatal("Failed to create rtspServer at port %d", rtspServerPortNum);
Fatal("Failed to create rtspServer");
return;
}
const char *prefix = rtspServer->rtspURLPrefix();
delete[] prefix;
thread_ = std::thread(&RTSPServerThread::Run, this);
}
@ -43,25 +27,24 @@ RTSPServerThread::~RTSPServerThread() {
if (thread_.joinable())
thread_.join();
if (rtspServer) {
Medium::close(rtspServer);
} // end if rtsp_server
while ( sources.size() ) {
FramedSource *source = sources.front();
sources.pop_front();
Medium::close(source);
}
env->reclaim();
delete scheduler;
}
void RTSPServerThread::Run() {
Debug(1, "RTSPServerThread::Run()");
if (rtspServer)
env->taskScheduler().doEventLoop(&scheduler_watch_var_); // does not return
if (rtspServer) {
while (!scheduler_watch_var_) {
//if (clients > 0) {
sleep(1);
//}
}
}
Debug(1, "RTSPServerThread::done()");
}
int RTSPServerThread::Start() {
return rtspServer->Start(std::string("0.0.0.0"), port);
}
void RTSPServerThread::Stop() {
Debug(1, "RTSPServerThread::stop()");
terminate_ = true;
@ -71,174 +54,75 @@ void RTSPServerThread::Stop() {
scheduler_watch_var_ = 1;
}
for ( std::list<FramedSource *>::iterator it = sources.begin(); it != sources.end(); ++it ) {
for ( std::list<ZoneMinderFifoSource *>::iterator it = sources.begin(); it != sources.end(); ++it ) {
Debug(1, "RTSPServerThread::stopping source");
(*it)->stopGettingFrames();
(*it)->Stop();
}
while ( sources.size() ) {
Debug(1, "RTSPServerThread::stop closing source");
FramedSource *source = sources.front();
ZoneMinderFifoSource *source = sources.front();
sources.pop_front();
Medium::close(source);
delete source;
}
}
ServerMediaSession *RTSPServerThread::addSession(std::string &streamname) {
ServerMediaSession *sms = ServerMediaSession::createNew(*env, streamname.c_str());
if (sms) {
rtspServer->addServerMediaSession(sms);
char *url = rtspServer->rtspURL(sms);
Debug(1, "url is %s for stream %s", url, streamname.c_str());
delete[] url;
xop::MediaSession *RTSPServerThread::addSession(std::string &streamname) {
xop::MediaSession *session = xop::MediaSession::CreateNew(streamname);
if (session) {
session->AddNotifyConnectedCallback([] (xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port){
Debug(1, "RTSP client connect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port);
});
session->AddNotifyDisconnectedCallback([](xop::MediaSessionId sessionId, std::string peer_ip, uint16_t peer_port) {
Debug(1, "RTSP client disconnect, ip=%s, port=%hu \n", peer_ip.c_str(), peer_port);
});
rtspServer->AddSession(session);
//char *url = rtspServer->rtspURL(session);
//Debug(1, "url is %s for stream %s", url, streamname.c_str());
//delete[] url;
}
return sms;
return session;
}
void RTSPServerThread::removeSession(ServerMediaSession *sms) {
rtspServer->removeServerMediaSession(sms);
void RTSPServerThread::removeSession(xop::MediaSession *session) {
//rtspServer->removeServerMediaSession(session);
}
FramedSource *RTSPServerThread::addFifo(
ServerMediaSession *sms,
ZoneMinderFifoSource *RTSPServerThread::addFifo(
xop::MediaSession *session,
std::string fifo) {
if (!rtspServer) return nullptr;
int queueSize = 60;
bool repeatConfig = false;
bool muxTS = false;
FramedSource *source = nullptr;
ZoneMinderFifoSource *source = nullptr;
if (!fifo.empty()) {
StreamReplicator* replicator = nullptr;
std::string rtpFormat;
if (std::string::npos != fifo.find("h264")) {
rtpFormat = "video/H264";
source = H264_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS);
session->AddSource(xop::channel_0, xop::H264Source::CreateNew());
source = new ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, fifo);
} else if (
std::string::npos != fifo.find("hevc")
or
std::string::npos != fifo.find("h265")) {
rtpFormat = "video/H265";
source = H265_ZoneMinderFifoSource::createNew(*env, fifo, queueSize, repeatConfig, muxTS);
session->AddSource(xop::channel_0, xop::H265Source::CreateNew());
source = new ZoneMinderFifoSource(rtspServer, session->GetMediaSessionId(), xop::channel_0, fifo);
} else if (std::string::npos != fifo.find("aac")) {
rtpFormat = "audio/AAC";
source = ADTS_ZoneMinderFifoSource::createNew(*env, fifo, queueSize);
Debug(1, "ADTS source %p", source);
} else {
Warning("Unknown format in %s", fifo.c_str());
}
if (source == nullptr) {
Error("Unable to create source");
} else {
replicator = StreamReplicator::createNew(*env, source, false);
}
sources.push_back(source);
if (replicator) {
sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat));
}
} else {
Debug(1, "Not Adding stream as fifo was empty");
}
return source;
} // end void addFifo
void RTSPServerThread::addStream(std::string &streamname, AVStream *video_stream, AVStream *audio_stream) {
if ( !rtspServer )
return;
int queueSize = 30;
bool repeatConfig = true;
bool muxTS = false;
ServerMediaSession *sms = nullptr;
if ( video_stream ) {
StreamReplicator* videoReplicator = nullptr;
FramedSource *source = nullptr;
std::string rtpFormat = getRtpFormat(
#if LIBAVCODEC_VERSION_CHECK(57, 64, 0, 64, 0)
video_stream->codecpar->codec_id
#else
video_stream->codec->codec_id
#endif
, false);
if ( rtpFormat.empty() ) {
Error("No streaming format");
return;
}
Debug(1, "RTSP: format %s", rtpFormat.c_str());
if ( rtpFormat == "video/H264" ) {
source = H264_ZoneMinderDeviceSource::createNew(*env, monitor_, video_stream, queueSize, repeatConfig, muxTS);
} else if ( rtpFormat == "video/H265" ) {
source = H265_ZoneMinderDeviceSource::createNew(*env, monitor_, video_stream, queueSize, repeatConfig, muxTS);
}
if ( source == nullptr ) {
Error("Unable to create source");
} else {
videoReplicator = StreamReplicator::createNew(*env, source, false);
}
sources.push_back(source);
// Create Unicast Session
if ( videoReplicator ) {
if ( !sms )
sms = ServerMediaSession::createNew(*env, streamname.c_str());
sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, videoReplicator, rtpFormat));
}
}
if ( audio_stream ) {
StreamReplicator* replicator = nullptr;
FramedSource *source = nullptr;
std::string rtpFormat = getRtpFormat(
#if LIBAVCODEC_VERSION_CHECK(57, 64, 0, 64, 0)
audio_stream->codecpar->codec_id
#else
audio_stream->codec->codec_id
#endif
, false);
if ( rtpFormat == "audio/AAC" ) {
source = ADTS_ZoneMinderDeviceSource::createNew(*env, monitor_, audio_stream, queueSize);
Debug(1, "ADTS source %p", source);
}
if ( source ) {
replicator = StreamReplicator::createNew(*env, source, false /* deleteWhenLastReplicaDies */);
sources.push_back(source);
}
if ( replicator ) {
if ( !sms )
sms = ServerMediaSession::createNew(*env, streamname.c_str());
sms->addSubsession(UnicastServerMediaSubsession::createNew(*env, replicator, rtpFormat));
}
} else {
Debug(1, "Not Adding audio stream");
}
if ( sms ) {
rtspServer->addServerMediaSession(sms);
char *url = rtspServer->rtspURL(sms);
Debug(1, "url is %s", url);
delete[] url;
}
} // end void addStream
// -----------------------------------------
// convert V4L2 pix format to RTP mime
// -----------------------------------------
const std::string RTSPServerThread::getRtpFormat(AVCodecID codec_id, bool muxTS) {
if ( muxTS ) {
return "video/MP2T";
} else {
switch ( codec_id ) {
case AV_CODEC_ID_H265 : return "video/H265";
case AV_CODEC_ID_H264 : return "video/H264";
//case PIX_FMT_MJPEG: rtpFormat = "video/JPEG"; break;
//case PIX_FMT_JPEG : rtpFormat = "video/JPEG"; break;
//case AV_PIX_FMT_VP8 : rtpFormat = "video/VP8" ; break;
//case AV_PIX_FMT_VP9 : rtpFormat = "video/VP9" ; break;
case AV_CODEC_ID_AAC : return "audio/AAC";
default: break;
}
}
return "";
}
#endif // HAVE_RTSP_SERVER

View File

@ -3,15 +3,14 @@
#include "zm_config.h"
#include "zm_ffmpeg.h"
#include "zm_rtsp_server_server_media_subsession.h"
#include "xop/RtspServer.h"
#include "zm_rtsp_server_fifo_source.h"
#include <atomic>
#include <list>
#include <memory>
#if HAVE_RTSP_SERVER
#include <BasicUsageEnvironment.hh>
#include <RTSPServer.hh>
class Monitor;
@ -24,29 +23,22 @@ class RTSPServerThread {
std::mutex scheduler_watch_var_mutex_;
char scheduler_watch_var_;
TaskScheduler* scheduler;
UsageEnvironment* env;
UserAuthenticationDatabase* authDB;
std::shared_ptr<xop::EventLoop> eventLoop;
std::shared_ptr<xop::RtspServer> rtspServer;
RTSPServer* rtspServer;
std::list<FramedSource *> sources;
std::list<ZoneMinderFifoSource *> sources;
int port;
public:
explicit RTSPServerThread(int port);
~RTSPServerThread();
ServerMediaSession *addSession(std::string &streamname);
void removeSession(ServerMediaSession *sms);
void addStream(std::string &streamname, AVStream *, AVStream *);
FramedSource *addFifo(ServerMediaSession *sms, std::string fifo);
xop::MediaSession *addSession(std::string &streamname);
void removeSession(xop::MediaSession *sms);
ZoneMinderFifoSource *addFifo(xop::MediaSession *sms, std::string fifo);
void Run();
void Stop();
int Start();
bool IsStopped() const { return terminate_; };
private:
const std::string getRtpFormat(AVCodecID codec, bool muxTS);
int addSession(
const std::string & sessionName,
const std::list<ServerMediaSubsession*> & subSession
);
};
#endif // HAVE_RTSP_SERVER