zoneminder/src/zm_rtsp.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

791 lines
28 KiB
C++
Raw Normal View History

2013-03-17 07:45:21 +08:00
//
// ZoneMinder RTSP Class Implementation, $Date$, $Revision$
// Copyright (C) 2001-2008 Philip Coombes
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
2013-03-17 07:45:21 +08:00
//
#include "zm_rtsp.h"
#include "zm_config.h"
2013-03-17 07:45:21 +08:00
#include "zm_rtp_data.h"
#include "zm_rtp_ctrl.h"
#include "zm_db.h"
#include <algorithm>
2013-03-17 07:45:21 +08:00
int RtspThread::smMinDataPort = 0;
int RtspThread::smMaxDataPort = 0;
RtspThread::PortSet RtspThread::smAssignedPorts;
2013-03-17 07:45:21 +08:00
bool RtspThread::sendCommand(std::string message) {
if ( mNeedAuth ) {
2021-04-04 06:30:18 +08:00
StringVector parts = Split(message, " ");
if ( parts.size() > 1 )
message += mAuthenticator->getAuthHeader(parts[0], parts[1]);
}
message += stringtf("User-Agent: ZoneMinder/%s\r\n", ZM_VERSION);
message += stringtf("CSeq: %d\r\n\r\n", ++mSeq);
Debug(2, "Sending RTSP message: %s", message.c_str());
2017-11-17 20:52:26 +08:00
if ( mMethod == RTP_RTSP_HTTP ) {
2021-04-04 06:39:40 +08:00
message = Base64Encode(message);
2019-04-29 00:05:32 +08:00
Debug(2, "Sending encoded RTSP message: %s", message.c_str());
if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return false;
2013-03-17 07:45:21 +08:00
}
2017-11-17 20:52:26 +08:00
} else {
2019-04-29 00:05:32 +08:00
if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return false;
2013-03-17 07:45:21 +08:00
}
}
2019-04-29 00:05:32 +08:00
return true;
2013-03-17 07:45:21 +08:00
}
bool RtspThread::recvResponse(std::string &response) {
if ( mRtspSocket.recv(response) < 0 )
2019-04-29 00:05:32 +08:00
Error("Recv failed; %s", strerror(errno));
Debug(2, "Received RTSP response: %s (%zd bytes)", response.c_str(), response.size());
float respVer = 0;
respCode = -1;
char respText[ZM_NETWORK_BUFSIZ];
2019-04-29 00:05:32 +08:00
if ( sscanf(response.c_str(), "RTSP/%f %3d %[^\r\n]\r\n", &respVer, &respCode, respText) != 3 ) {
2017-11-17 20:52:26 +08:00
if ( isalnum(response[0]) ) {
2019-04-29 00:05:32 +08:00
Error("Response parse failure in '%s'", response.c_str());
2017-11-17 20:52:26 +08:00
} else {
2019-04-29 00:05:32 +08:00
Error("Response parse failure, %zd bytes follow", response.size());
if ( response.size() )
Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16));
2013-03-17 07:45:21 +08:00
}
2019-04-29 00:05:32 +08:00
return false;
}
2019-04-29 00:05:32 +08:00
if ( respCode == 401 ) {
Debug(2, "Got 401 access denied response code, check WWW-Authenticate header and retry");
mAuthenticator->checkAuthResponse(response);
mNeedAuth = true;
2019-04-29 00:05:32 +08:00
return false;
2017-11-17 20:52:26 +08:00
} else if ( respCode != 200 ) {
2019-04-29 00:05:32 +08:00
Error("Unexpected response code %d, text is '%s'", respCode, respText);
return false;
}
2019-04-29 00:05:32 +08:00
return true;
} // end RtspThread::recvResponse
2013-03-17 07:45:21 +08:00
2017-11-17 20:52:26 +08:00
int RtspThread::requestPorts() {
if ( !smMinDataPort ) {
char sql[ZM_SQL_SML_BUFSIZ];
2019-04-29 00:05:32 +08:00
//FIXME Why not load specifically by Id? This will get ineffeicient with a lot of monitors
strncpy(sql, "SELECT `Id` FROM `Monitors` WHERE `Function` != 'None' AND `Type` = 'Remote' AND `Protocol` = 'rtsp' AND `Method` = 'rtpUni' ORDER BY `Id` ASC", sizeof(sql));
2013-03-17 07:45:21 +08:00
MYSQL_RES *result = zmDbFetch(sql);
int nMonitors = mysql_num_rows(result);
int position = 0;
2017-11-17 20:52:26 +08:00
if ( nMonitors ) {
2019-04-29 00:05:32 +08:00
for ( int i = 0; MYSQL_ROW dbrow = mysql_fetch_row(result); i++ ) {
int id = atoi(dbrow[0]);
2017-11-17 20:52:26 +08:00
if ( mId == id ) {
position = i;
break;
2013-03-17 07:45:21 +08:00
}
}
2017-11-17 20:52:26 +08:00
} else {
// Minor hack for testing when not strictly enabled
nMonitors = 1;
position = 0;
}
mysql_free_result(result);
int portRange = int(((config.max_rtp_port-config.min_rtp_port)+1)/nMonitors);
smMinDataPort = config.min_rtp_port + (position * portRange);
smMaxDataPort = smMinDataPort + portRange - 1;
Debug(2, "Assigned RTP port range is %d-%d", smMinDataPort, smMaxDataPort);
}
2017-11-17 20:52:26 +08:00
for ( int i = smMinDataPort; i <= smMaxDataPort; i++ ) {
2019-04-29 00:05:32 +08:00
PortSet::const_iterator iter = smAssignedPorts.find(i);
2017-11-17 20:52:26 +08:00
if ( iter == smAssignedPorts.end() ) {
2019-04-29 00:05:32 +08:00
smAssignedPorts.insert(i);
return i;
}
}
2019-04-29 00:05:32 +08:00
Panic("Can assign RTP port, no ports left in pool");
return -1;
2013-03-17 07:45:21 +08:00
}
void RtspThread::releasePorts(int port) {
if ( port > 0 )
2019-04-29 00:05:32 +08:00
smAssignedPorts.erase(port);
2013-03-17 07:45:21 +08:00
}
2019-04-29 00:05:32 +08:00
RtspThread::RtspThread(
int id,
RtspMethod method,
const std::string &protocol,
const std::string &host,
const std::string &port,
const std::string &path,
const std::string &auth,
bool rtsp_describe) :
mId(id),
mMethod(method),
mProtocol(protocol),
mHost(host),
mPort(port),
mPath(path),
mRtspDescribe(rtsp_describe),
mSessDesc(0),
mFormatContext(0),
mSeq(0),
mSession(0),
mSsrc(0),
mDist(UNDEFINED),
mRtpTime(0),
2021-03-03 02:56:35 +08:00
mTerminate(false)
2013-03-17 07:45:21 +08:00
{
mUrl = mProtocol+"://"+mHost+":"+mPort;
2017-11-17 20:52:26 +08:00
if ( !mPath.empty() ) {
if ( mPath[0] == '/' )
mUrl += mPath;
2014-04-29 05:37:31 +08:00
else
mUrl += '/'+mPath;
}
mSsrc = rand();
Debug(2, "RTSP Local SSRC is %x, url is %s", mSsrc, mUrl.c_str());
if ( mMethod == RTP_RTSP_HTTP )
2019-04-29 00:05:32 +08:00
mHttpSession = stringtf("%d", rand());
mNeedAuth = false;
2021-04-04 06:30:18 +08:00
StringVector parts = Split(auth, ":");
Debug(2, "# of auth parts %zu", parts.size());
if ( parts.size() > 1 )
mAuthenticator = new zm::Authenticator(parts[0], parts[1]);
else
mAuthenticator = new zm::Authenticator(parts[0], "");
2021-03-03 02:56:35 +08:00
mThread = std::thread(&RtspThread::Run, this);
2013-03-17 07:45:21 +08:00
}
2017-11-17 20:52:26 +08:00
RtspThread::~RtspThread() {
2021-03-03 02:56:35 +08:00
Stop();
if (mThread.joinable())
mThread.join();
2017-11-17 20:52:26 +08:00
if ( mFormatContext ) {
#if LIBAVFORMAT_VERSION_CHECK(52, 96, 0, 96, 0)
avformat_free_context(mFormatContext);
#else
av_free_format_context(mFormatContext);
#endif
mFormatContext = nullptr;
}
2017-11-17 20:52:26 +08:00
if ( mSessDesc ) {
delete mSessDesc;
mSessDesc = nullptr;
}
delete mAuthenticator;
mAuthenticator = nullptr;
2013-03-17 07:45:21 +08:00
}
2021-03-03 02:56:35 +08:00
void RtspThread::Run() {
std::string message;
std::string response;
response.reserve(ZM_NETWORK_BUFSIZ);
if ( !mRtspSocket.connect(mHost.c_str(), mPort.c_str()) )
Fatal("Unable to connect RTSP socket");
//Select select( 0.25 );
//select.addReader( &mRtspSocket );
//while ( select.wait() )
//{
//mRtspSocket.recv( response );
//Debug( 4, "Drained %d bytes from RTSP socket", response.size() );
//}
bool authTried = false;
2017-11-17 20:52:26 +08:00
if ( mMethod == RTP_RTSP_HTTP ) {
2019-04-29 00:05:32 +08:00
if ( !mRtspSocket2.connect(mHost.c_str(), mPort.c_str()) )
Fatal("Unable to connect auxiliary RTSP/HTTP socket");
2013-03-17 07:45:21 +08:00
//Select select( 0.25 );
//select.addReader( &mRtspSocket2 );
2013-03-17 07:45:21 +08:00
//while ( select.wait() )
//{
//mRtspSocket2.recv( response );
//Debug( 4, "Drained %d bytes from HTTP socket", response.size() );
2013-03-17 07:45:21 +08:00
//}
2014-04-29 05:37:31 +08:00
//possibly retry sending the message for authentication
int respCode = -1;
char respText[256];
do {
message = "GET "+mPath+" HTTP/1.0\r\n";
message += "X-SessionCookie: "+mHttpSession+"\r\n";
if ( mNeedAuth ) {
message += mAuthenticator->getAuthHeader("GET", mPath);
authTried = true;
}
message += "Accept: application/x-rtsp-tunnelled\r\n\r\n";
Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) {
2019-04-29 00:05:32 +08:00
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
2021-03-03 02:56:35 +08:00
return;
}
if ( mRtspSocket.recv(response) < 0 ) {
2019-04-29 00:05:32 +08:00
Error("Recv failed; %s", strerror(errno));
2021-03-03 02:56:35 +08:00
return;
}
2019-04-29 00:05:32 +08:00
Debug(2, "Received HTTP response: %s (%zd bytes)", response.c_str(), response.size());
float respVer = 0;
respCode = -1;
if ( sscanf(response.c_str(), "HTTP/%f %3d %[^\r\n]\r\n", &respVer, &respCode, respText) != 3 ) {
2017-11-17 20:52:26 +08:00
if ( isalnum(response[0]) ) {
Error("Response parse failure in '%s'", response.c_str());
2017-11-17 20:52:26 +08:00
} else {
Error("Response parse failure, %zd bytes follow", response.size());
if ( response.size() )
Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16));
2013-03-17 07:45:21 +08:00
}
2021-03-03 02:56:35 +08:00
return;
}
// If Server requests authentication, check WWW-Authenticate header and fill required fields
// for requested authentication method
2019-04-29 00:05:32 +08:00
if ( respCode == 401 && !authTried ) {
mNeedAuth = true;
mAuthenticator->checkAuthResponse(response);
Debug(2, "Processed 401 response");
mRtspSocket.close();
2019-04-29 00:05:32 +08:00
if ( !mRtspSocket.connect(mHost.c_str(), mPort.c_str()) )
Fatal("Unable to reconnect RTSP socket");
Debug(2, "connection should be reopened now");
}
} while (respCode == 401 && !authTried);
2017-11-17 20:52:26 +08:00
if ( respCode != 200 ) {
2019-04-29 00:05:32 +08:00
Error("Unexpected response code %d, text is '%s'", respCode, respText);
2021-03-03 02:56:35 +08:00
return;
}
2015-01-31 22:31:27 +08:00
message = "POST "+mPath+" HTTP/1.0\r\n";
message += "X-SessionCookie: "+mHttpSession+"\r\n";
if ( mNeedAuth )
message += mAuthenticator->getAuthHeader("POST", mPath);
message += "Content-Length: 32767\r\n";
message += "Content-Type: application/x-rtsp-tunnelled\r\n";
message += "\r\n";
Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
2021-03-03 02:56:35 +08:00
return;
2015-08-21 23:29:54 +08:00
}
} // end if ( mMethod == RTP_RTSP_HTTP )
2015-01-31 22:31:27 +08:00
std::string localHost = "";
int localPorts[2] = { 0, 0 };
2013-03-17 07:45:21 +08:00
// Request supported RTSP commands by the server
message = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
if ( !sendCommand(message) )
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
// A negative return here may indicate auth failure, but we will have setup the auth mechanisms so we need to retry.
2019-04-29 00:05:32 +08:00
if ( !recvResponse(response) ) {
if ( mNeedAuth ) {
Debug(2, "Resending OPTIONS due to possible auth requirement");
2019-04-29 00:05:32 +08:00
if ( !sendCommand(message) )
2021-03-03 02:56:35 +08:00
return;
2019-04-29 00:05:32 +08:00
if ( !recvResponse(response) )
2021-03-03 02:56:35 +08:00
return;
} else {
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
}
} // end if failed response maybe due to auth
char publicLine[256] = "";
2021-04-04 06:30:18 +08:00
StringVector lines = Split(response, "\r\n");
for ( size_t i = 0; i < lines.size(); i++ )
2019-04-29 00:05:32 +08:00
sscanf(lines[i].c_str(), "Public: %[^\r\n]\r\n", publicLine);
// Check if the server supports the GET_PARAMETER command
// If yes, it is likely that the server will request this command as a keepalive message
bool sendKeepalive = false;
if ( publicLine[0] && strstr(publicLine, "GET_PARAMETER") )
sendKeepalive = true;
message = "DESCRIBE "+mUrl+" RTSP/1.0\r\n";
bool res;
do {
if ( mNeedAuth )
authTried = true;
2019-04-29 00:05:32 +08:00
sendCommand(message);
// FIXME Why sleep 1?
usleep(10000);
2019-04-29 00:05:32 +08:00
res = recvResponse(response);
if ( !res && respCode==401 )
mNeedAuth = true;
} while (!res && respCode==401 && !authTried);
const std::string endOfHeaders = "\r\n\r\n";
2019-04-29 00:05:32 +08:00
size_t sdpStart = response.find(endOfHeaders);
if ( sdpStart == std::string::npos )
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
2017-11-17 20:52:26 +08:00
if ( mRtspDescribe ) {
2019-04-29 00:05:32 +08:00
std::string DescHeader = response.substr(0, sdpStart);
Debug(1, "Processing DESCRIBE response header '%s'", DescHeader.c_str());
2013-03-17 07:45:21 +08:00
2021-04-04 06:30:18 +08:00
lines = Split(DescHeader, "\r\n");
2017-11-17 20:52:26 +08:00
for ( size_t i = 0; i < lines.size(); i++ ) {
2019-04-29 00:05:32 +08:00
// If the device sends us a url value for Content-Base in the response header, we should use that instead
if ( ( lines[i].size() > 13 ) && ( lines[i].substr( 0, 13 ) == "Content-Base:" ) ) {
mUrl = TrimSpaces(lines[i].substr(13));
2019-04-29 00:05:32 +08:00
Info("Received new Content-Base in DESCRIBE response header. Updated device Url to: '%s'", mUrl.c_str() );
break;
}
2019-04-29 00:05:32 +08:00
} // end foreach line
} // end if mRtspDescribe
sdpStart += endOfHeaders.length();
2019-04-29 00:05:32 +08:00
std::string sdp = response.substr(sdpStart);
Debug(1, "Processing SDP '%s'", sdp.c_str());
2017-11-17 20:52:26 +08:00
try {
mSessDesc = new SessionDescriptor( mUrl, sdp );
mFormatContext = mSessDesc->generateFormatContext();
2020-12-28 01:03:44 +08:00
} catch ( const Exception &e ) {
Error("%s", e.getMessage().c_str());
2021-03-03 02:56:35 +08:00
return;
}
#if 0
// New method using ffmpeg native functions
std::string authUrl = mUrl;
if ( !mAuth.empty() )
authUrl.insert( authUrl.find( "://" )+3, mAuth+"@" );
if ( av_open_input_file( &mFormatContext, authUrl.c_str(), nullptr, 0, nullptr ) != 0 )
{
Error( "Unable to open input '%s'", authUrl.c_str() );
return( -1 );
}
2013-03-17 07:45:21 +08:00
#endif
uint32_t rtpClock = 0;
std::string trackUrl = mUrl;
std::string controlUrl;
_AVCODECID codecId = AV_CODEC_ID_NONE;
2017-11-17 20:52:26 +08:00
if ( mFormatContext->nb_streams >= 1 ) {
for ( unsigned int i = 0; i < mFormatContext->nb_streams; i++ ) {
2021-01-29 22:53:48 +08:00
SessionDescriptor::MediaDescriptor *mediaDesc = mSessDesc->getStream(i);
#if LIBAVFORMAT_VERSION_CHECK(57, 33, 0, 33, 0)
if ( mFormatContext->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO )
#elif (LIBAVCODEC_VERSION_CHECK(52, 64, 0, 64, 0) || LIBAVUTIL_VERSION_CHECK(50, 14, 0, 14, 0))
if ( mFormatContext->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO )
2013-03-17 07:45:21 +08:00
#else
if ( mFormatContext->streams[i]->codec->codec_type == CODEC_TYPE_VIDEO )
2013-03-17 07:45:21 +08:00
#endif
{
// Check if control Url is absolute or relative
controlUrl = mediaDesc->getControlUrl();
RtspThread: Fix a stack-buffer-overflow reported by ASAN ==8109==ERROR: AddressSanitizer: stack-buffer-overflow on address 0x7fab9b156480 at pc 0x7fabaebef57b bp 0x7fab9b154640 sp 0x7fab9b153df0 READ of size 32 at 0x7fab9b156480 thread T2 #0 0x7fabaebef57a (/lib/x86_64-linux-gnu/libasan.so.5+0xb857a) #1 0x561c0a9e24eb in bool std::__equal<true>::equal<char>(char const*, char const*, char const*) /usr/include/c++/8/bits/stl_algobase.h:814 #2 0x561c0a9dfa8e in bool std::__equal_aux<char*, char*>(char*, char*, char*) /usr/include/c++/8/bits/stl_algobase.h:831 #3 0x561c0a9dd982 in bool std::equal<__gnu_cxx::__normal_iterator<char*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > , __gnu_cxx::__normal_iterator<char*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > > >(__gnu_cxx::__normal_iterator<char*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >, __gnu_cxx::__normal_iterator<char*, std::__cxx11::basic_string<char, std::c har_traits<char>, std::allocator<char> > >, __gnu_cxx::__normal_iterator<char*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >) /usr/include/c++/8/bits/stl_algobase.h:1049 #4 0x561c0a9cf75a in RtspThread::Run() /root/zoneminder/src/zm_rtsp.cpp:411 #5 0x561c0a9df6e9 in void std::__invoke_impl<void, void (RtspThread::*)(), RtspThread*>(std::__invoke_memfun_deref, void (RtspThread::*&&)(), RtspThread*& &) /usr/include/c++/8/bits/invoke.h:73 #6 0x561c0a9dd4ae in std::__invoke_result<void (RtspThread::*)(), RtspThread*>::type std::__invoke<void (RtspThread::*)(), RtspThread*>(void (RtspThread:: *&&)(), RtspThread*&&) (/root/zoneminder/cmake-build-debug-remote/src/zmc+0x1544ae) #7 0x561c0a9e6a1a in decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (RtspThread::*)(), RtspThread*> > ::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/include/c++/8/thread:244 #8 0x561c0a9e698d in std::thread::_Invoker<std::tuple<void (RtspThread::*)(), RtspThread*> >::operator()() /usr/include/c++/8/thread:253 #9 0x561c0a9e68ff in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (RtspThread::*)(), RtspThread*> > >::_M_run() /usr/include/c++/8/threa d:196 #10 0x7fabaca57b2e (/lib/x86_64-linux-gnu/libstdc++.so.6+0xbbb2e) #11 0x7fabae50dfa2 in start_thread /build/glibc-vjB4T1/glibc-2.28/nptl/pthread_create.c:486 #12 0x7fabac7354ce in clone (/lib/x86_64-linux-gnu/libc.so.6+0xf94ce)
2021-04-11 17:37:03 +08:00
if (trackUrl == controlUrl) {
trackUrl = controlUrl;
2017-11-17 20:52:26 +08:00
} else {
2021-01-29 22:53:48 +08:00
if ( *trackUrl.rbegin() != '/' ) {
trackUrl += "/" + controlUrl;
} else {
trackUrl += controlUrl;
}
2013-03-17 07:45:21 +08:00
}
rtpClock = mediaDesc->getClock();
#if LIBAVFORMAT_VERSION_CHECK(57, 33, 0, 33, 0)
codecId = mFormatContext->streams[i]->codecpar->codec_id;
#else
codecId = mFormatContext->streams[i]->codec->codec_id;
#endif
break;
2021-01-28 05:14:17 +08:00
} // end if is video
} // end foreach stream
} // end if have stream
2013-03-17 07:45:21 +08:00
2020-12-28 01:03:44 +08:00
switch ( mMethod ) {
case RTP_UNICAST :
localPorts[0] = requestPorts();
localPorts[1] = localPorts[0]+1;
2013-03-17 07:45:21 +08:00
2021-01-29 22:53:48 +08:00
message = "SETUP "+trackUrl+" RTSP/1.0\r\nTransport: RTP/AVP;unicast;client_port="
+stringtf("%d", localPorts[0] )+"-"+stringtf( "%d", localPorts[1])+"\r\n";
break;
case RTP_MULTICAST :
message = "SETUP "+trackUrl+" RTSP/1.0\r\nTransport: RTP/AVP;multicast\r\n";
break;
case RTP_RTSP :
case RTP_RTSP_HTTP :
message = "SETUP "+trackUrl+" RTSP/1.0\r\nTransport: RTP/AVP/TCP;unicast\r\n";
break;
default:
2021-01-29 22:53:48 +08:00
Panic("Got unexpected method %d", mMethod);
break;
}
2013-03-17 07:45:21 +08:00
2020-12-28 01:03:44 +08:00
if ( !sendCommand(message) )
2021-03-03 02:56:35 +08:00
return;
2020-12-28 01:03:44 +08:00
if ( !recvResponse(response) )
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
2021-04-04 06:30:18 +08:00
lines = Split(response, "\r\n");
std::string session;
int timeout = 0;
char transport[256] = "";
2013-03-17 07:45:21 +08:00
2017-11-17 20:52:26 +08:00
for ( size_t i = 0; i < lines.size(); i++ ) {
2021-01-29 22:53:48 +08:00
if ( ( lines[i].size() > 8 ) && ( lines[i].substr(0, 8) == "Session:" ) ) {
2021-04-04 06:30:18 +08:00
StringVector sessionLine = Split(lines[i].substr(9), ";");
session = TrimSpaces(sessionLine[0]);
if ( sessionLine.size() == 2 )
sscanf(TrimSpaces(sessionLine[1]).c_str(), "timeout=%d", &timeout);
2013-03-17 07:45:21 +08:00
}
2020-12-28 01:03:44 +08:00
sscanf(lines[i].c_str(), "Transport: %s", transport);
}
if ( session.empty() )
2020-12-28 01:03:44 +08:00
Fatal("Unable to get session identifier from response '%s'", response.c_str());
2020-12-28 01:03:44 +08:00
Debug(2, "Got RTSP session %s, timeout %d secs", session.c_str(), timeout);
if ( !transport[0] )
2020-12-28 01:03:44 +08:00
Fatal("Unable to get transport details from response '%s'", response.c_str());
2020-12-28 01:03:44 +08:00
Debug(2, "Got RTSP transport %s", transport);
std::string method = "";
int remotePorts[2] = { 0, 0 };
int remoteChannels[2] = { 0, 0 };
std::string distribution = "";
unsigned long ssrc = 0;
2021-04-04 06:30:18 +08:00
StringVector parts = Split(transport, ";");
2017-11-17 20:52:26 +08:00
for ( size_t i = 0; i < parts.size(); i++ ) {
if ( parts[i] == "unicast" || parts[i] == "multicast" )
distribution = parts[i];
2021-04-04 05:51:12 +08:00
else if (StartsWith(parts[i], "server_port=") ) {
method = "RTP/UNICAST";
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[i], "=");
StringVector ports = Split(subparts[1], "-");
remotePorts[0] = strtol( ports[0].c_str(), nullptr, 10 );
remotePorts[1] = strtol( ports[1].c_str(), nullptr, 10 );
2021-04-04 05:51:12 +08:00
} else if (StartsWith(parts[i], "interleaved=") ) {
method = "RTP/RTSP";
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[i], "=");
StringVector channels = Split(subparts[1], "-");
remoteChannels[0] = strtol( channels[0].c_str(), nullptr, 10 );
remoteChannels[1] = strtol( channels[1].c_str(), nullptr, 10 );
2021-04-04 05:51:12 +08:00
} else if (StartsWith(parts[i], "port=") ) {
method = "RTP/MULTICAST";
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[i], "=");
StringVector ports = Split(subparts[1], "-");
localPorts[0] = strtol( ports[0].c_str(), nullptr, 10 );
localPorts[1] = strtol( ports[1].c_str(), nullptr, 10 );
2021-04-04 05:51:12 +08:00
} else if (StartsWith(parts[i], "destination=") ) {
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[i], "=");
localHost = subparts[1];
2021-04-04 05:51:12 +08:00
} else if (StartsWith(parts[i], "ssrc=") ) {
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[i], "=");
ssrc = strtoll( subparts[1].c_str(), nullptr, 16 );
}
}
Debug( 2, "RTSP Method is %s", method.c_str() );
Debug( 2, "RTSP Distribution is %s", distribution.c_str() );
Debug( 2, "RTSP SSRC is %lx", ssrc );
Debug( 2, "RTSP Local Host is %s", localHost.c_str() );
Debug( 2, "RTSP Local Ports are %d/%d", localPorts[0], localPorts[1] );
Debug( 2, "RTSP Remote Ports are %d/%d", remotePorts[0], remotePorts[1] );
Debug( 2, "RTSP Remote Channels are %d/%d", remoteChannels[0], remoteChannels[1] );
message = "PLAY "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\nRange: npt=0.000-\r\n";
2020-12-28 01:03:44 +08:00
if ( !sendCommand(message) )
2021-03-03 02:56:35 +08:00
return;
2020-12-28 01:03:44 +08:00
if ( !recvResponse(response) )
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
2021-04-04 06:30:18 +08:00
lines = Split(response, "\r\n");
std::string rtpInfo;
2017-11-17 20:52:26 +08:00
for ( size_t i = 0; i < lines.size(); i++ ) {
2020-12-28 01:03:44 +08:00
if ( ( lines[i].size() > 9 ) && ( lines[i].substr(0, 9) == "RTP-Info:" ) )
rtpInfo = TrimSpaces(lines[i].substr(9));
2020-12-28 01:03:44 +08:00
// Check for a timeout again. Some rtsp devices don't send a timeout until after the PLAY command is sent
if ( ( lines[i].size() > 8 ) && ( lines[i].substr(0, 8) == "Session:" ) && ( timeout == 0 ) ) {
2021-04-04 06:30:18 +08:00
StringVector sessionLine = Split(lines[i].substr(9), ";");
if ( sessionLine.size() == 2 )
sscanf(TrimSpaces(sessionLine[1]).c_str(), "timeout=%d", &timeout);
if ( timeout > 0 )
2020-12-28 01:03:44 +08:00
Debug(2, "Got timeout %d secs from PLAY command response", timeout);
2014-12-02 07:15:08 +08:00
}
}
int seq = 0;
unsigned long rtpTime = 0;
StringVector streams;
2017-11-17 20:52:26 +08:00
if ( rtpInfo.empty() ) {
Debug( 1, "RTP Info Empty. Starting values for Sequence and Rtptime shall be zero.");
2017-11-17 20:52:26 +08:00
} else {
Debug( 2, "Got RTP Info %s", rtpInfo.c_str() );
// More than one stream can be included in the RTP Info
2021-04-04 06:30:18 +08:00
streams = Split(rtpInfo.c_str(), ",");
2017-11-17 20:52:26 +08:00
for ( size_t i = 0; i < streams.size(); i++ ) {
// We want the stream that matches the trackUrl we are using
2017-11-17 20:52:26 +08:00
if ( streams[i].find(controlUrl.c_str()) != std::string::npos ) {
// Parse the sequence and rtptime values
2021-04-04 06:30:18 +08:00
parts = Split(streams[i].c_str(), ";");
2017-11-17 20:52:26 +08:00
for ( size_t j = 0; j < parts.size(); j++ ) {
2021-04-04 05:51:12 +08:00
if (StartsWith(parts[j], "seq=") ) {
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[j], "=");
seq = strtol( subparts[1].c_str(), nullptr, 10 );
2021-04-04 05:51:12 +08:00
} else if (StartsWith(parts[j], "rtptime=") ) {
2021-04-04 06:30:18 +08:00
StringVector subparts = Split(parts[j], "=");
rtpTime = strtol( subparts[1].c_str(), nullptr, 10 );
}
2013-03-17 07:45:21 +08:00
}
break;
}
2013-03-17 07:45:21 +08:00
}
}
2013-03-17 07:45:21 +08:00
Debug( 2, "RTSP Seq is %d", seq );
Debug( 2, "RTSP Rtptime is %ld", rtpTime );
2013-03-17 07:45:21 +08:00
time_t lastKeepalive = time(nullptr);
time_t now;
message = "GET_PARAMETER "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
2017-11-17 20:52:26 +08:00
switch( mMethod ) {
case RTP_UNICAST :
2013-03-17 07:45:21 +08:00
{
RtpSource *source = new RtpSource( mId, "", localPorts[0], mHost, remotePorts[0], ssrc, seq, rtpClock, rtpTime, codecId );
mSources[ssrc] = source;
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
2021-03-03 02:56:35 +08:00
while (!mTerminate) {
now = time(nullptr);
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
Debug(5, "sendkeepalive %d, timeout %d, now: %" PRIi64 " last: %" PRIi64 " since: %" PRIi64,
sendKeepalive,
timeout,
static_cast<int64>(now),
static_cast<int64>(lastKeepalive),
static_cast<int64>(now - lastKeepalive));
2017-11-17 20:52:26 +08:00
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) )
2021-03-03 02:56:35 +08:00
return;
lastKeepalive = now;
}
usleep( 100000 );
}
2013-03-17 07:45:21 +08:00
#if 0
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
2013-03-17 07:45:21 +08:00
#endif
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
2021-03-03 02:56:35 +08:00
return;
if ( !recvResponse( response ) )
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
2021-03-03 07:40:57 +08:00
rtpDataThread.Stop();
2021-03-03 07:35:34 +08:00
rtpCtrlThread.Stop();
2013-03-17 07:45:21 +08:00
//rtpDataThread.kill( SIGTERM );
//rtpCtrlThread.kill( SIGTERM );
2013-03-17 07:45:21 +08:00
delete mSources[ssrc];
mSources.clear();
2013-03-17 07:45:21 +08:00
releasePorts( localPorts[0] );
2013-03-17 07:45:21 +08:00
break;
}
case RTP_RTSP :
case RTP_RTSP_HTTP :
{
RtpSource *source = new RtpSource( mId, "", remoteChannels[0], mHost, remoteChannels[0], ssrc, seq, rtpClock, rtpTime, codecId );
mSources[ssrc] = source;
// These never actually run
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
ZM::Select select( double(config.http_timeout)/1000.0 );
select.addReader( &mRtspSocket );
Buffer buffer( ZM_NETWORK_BUFSIZ );
std::string keepaliveMessage = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n";
2021-03-03 02:56:35 +08:00
while (!mTerminate && select.wait() >= 0) {
ZM::Select::CommsList readable = select.getReadable();
2017-11-17 20:52:26 +08:00
if ( readable.size() == 0 ) {
Error( "RTSP timed out" );
break;
}
2013-03-17 07:45:21 +08:00
static char tempBuffer[ZM_NETWORK_BUFSIZ];
ssize_t nBytes = mRtspSocket.recv( tempBuffer, sizeof(tempBuffer) );
buffer.append( tempBuffer, nBytes );
Debug( 4, "Read %zd bytes on sd %d, %d total", nBytes, mRtspSocket.getReadDesc(), buffer.size() );
2013-03-17 07:45:21 +08:00
2017-11-17 20:52:26 +08:00
while( buffer.size() > 0 ) {
if ( buffer[0] == '$' ) {
if ( buffer.size() < 4 )
break;
unsigned char channel = buffer[1];
unsigned short len = ntohs( *((unsigned short *)(buffer+2)) );
2013-03-17 07:45:21 +08:00
Debug( 4, "Got %d bytes left, expecting %d byte packet on channel %d", buffer.size(), len, channel );
2017-11-17 20:52:26 +08:00
if ( (unsigned short)buffer.size() < (len+4) ) {
Debug( 4, "Missing %d bytes, rereading", (len+4)-buffer.size() );
break;
}
2017-11-17 20:52:26 +08:00
if ( channel == remoteChannels[0] ) {
Debug(4, "Got %d bytes on data channel %d, packet length is %d", buffer.size(), channel, len);
Hexdump(4, (char *)buffer, 16);
rtpDataThread.recvPacket(buffer+4, len);
2019-04-29 00:05:32 +08:00
} else if ( channel == remoteChannels[1] ) {
// len = ntohs( *((unsigned short *)(buffer+2)) );
// Debug( 4, "Got %d bytes on control channel %d", nBytes, channel );
Debug(4, "Got %d bytes on control channel %d, packet length is %d", buffer.size(), channel, len);
Hexdump(4, (char *)buffer, 16);
rtpCtrlThread.recvPackets(buffer+4, len);
2019-04-29 00:05:32 +08:00
} else {
Error("Unexpected channel selector %d in RTSP interleaved data", buffer[1]);
buffer.clear();
break;
}
buffer.consume(len+4);
nBytes -= len+4;
2019-04-29 00:05:32 +08:00
} else {
if ( keepaliveResponse.compare( 0, keepaliveResponse.size(), (char *)buffer, keepaliveResponse.size() ) == 0 ) {
Debug( 4, "Got keepalive response '%s'", (char *)buffer );
//buffer.consume( keepaliveResponse.size() );
2019-04-29 00:05:32 +08:00
if ( char *charPtr = (char *)memchr( (char *)buffer, '$', buffer.size() ) ) {
int discardBytes = charPtr-(char *)buffer;
buffer -= discardBytes;
2019-04-29 00:05:32 +08:00
} else {
buffer.clear();
}
2019-04-29 00:05:32 +08:00
} else {
if ( char *charPtr = (char *)memchr( (char *)buffer, '$', buffer.size() ) ) {
int discardBytes = charPtr-(char *)buffer;
Warning( "Unexpected format RTSP interleaved data, resyncing by %d bytes", discardBytes );
Hexdump( -1, (char *)buffer, discardBytes );
buffer -= discardBytes;
2019-04-29 00:05:32 +08:00
} else {
Warning( "Unexpected format RTSP interleaved data, dumping %d bytes", buffer.size() );
Hexdump( -1, (char *)buffer, 32 );
buffer.clear();
}
}
}
}
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
// FIXME: Is this really necessary when using tcp ?
now = time(nullptr);
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
Debug(5, "sendkeepalive %d, timeout %d, now: %" PRIi64 " last: %" PRIi64 " since: %" PRIi64,
sendKeepalive,
timeout,
static_cast<int64>(now),
static_cast<int64>(lastKeepalive),
static_cast<int64>(now - lastKeepalive));
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) )
{
if ( !sendCommand( message ) )
2021-03-03 02:56:35 +08:00
return;
lastKeepalive = now;
}
buffer.tidy( 1 );
}
2013-03-17 07:45:21 +08:00
#if 0
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
2013-03-17 07:45:21 +08:00
#endif
// Send a teardown message but don't expect a response as this may not be implemented on the server when using TCP
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
2021-03-03 02:56:35 +08:00
return;
delete mSources[ssrc];
mSources.clear();
break;
}
case RTP_MULTICAST :
{
RtpSource *source = new RtpSource( mId, localHost, localPorts[0], mHost, remotePorts[0], ssrc, seq, rtpClock, rtpTime, codecId );
mSources[ssrc] = source;
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
2021-03-03 02:56:35 +08:00
while (!mTerminate) {
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
if ( sendKeepalive && (timeout > 0) && ((time(nullptr)-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) )
2021-03-03 02:56:35 +08:00
return;
lastKeepalive = time(nullptr);
2013-03-17 07:45:21 +08:00
}
2019-04-29 00:05:32 +08:00
usleep(100000);
}
#if 0
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
#endif
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
2019-04-29 00:05:32 +08:00
if ( !sendCommand(message) )
2021-03-03 02:56:35 +08:00
return;
2019-04-29 00:05:32 +08:00
if ( !recvResponse(response) )
2021-03-03 02:56:35 +08:00
return;
2021-03-03 07:40:57 +08:00
rtpDataThread.Stop();
2021-03-03 07:35:34 +08:00
rtpCtrlThread.Stop();
delete mSources[ssrc];
mSources.clear();
releasePorts( localPorts[0] );
break;
}
default:
2019-04-29 00:05:32 +08:00
Panic("Got unexpected method %d", mMethod);
break;
}
2013-03-17 07:45:21 +08:00
2021-03-03 02:56:35 +08:00
return;
2013-03-17 07:45:21 +08:00
}