diff --git a/src/zm_remote_camera_rtsp.cpp b/src/zm_remote_camera_rtsp.cpp index 44d5046af..7cf59b64d 100644 --- a/src/zm_remote_camera_rtsp.cpp +++ b/src/zm_remote_camera_rtsp.cpp @@ -48,7 +48,6 @@ RemoteCameraRtsp::RemoteCameraRtsp( p_brightness, p_contrast, p_hue, p_colour, p_capture, p_record_audio), rtsp_describe(p_rtsp_describe), - rtspThread(0), frameCount(0) { if ( p_method == "rtpUni" ) @@ -114,19 +113,15 @@ void RemoteCameraRtsp::Terminate() { } int RemoteCameraRtsp::Connect() { - rtspThread = new RtspThread(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe); - - rtspThread->start(); + rtspThread = ZM::make_unique(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe); return 0; } int RemoteCameraRtsp::Disconnect() { - if ( rtspThread ) { - rtspThread->stop(); - rtspThread->join(); - delete rtspThread; - rtspThread = nullptr; + if (rtspThread) { + rtspThread->Stop(); + rtspThread.reset(); } return 0; } @@ -214,7 +209,7 @@ int RemoteCameraRtsp::PrimeCapture() { } // end PrimeCapture int RemoteCameraRtsp::PreCapture() { - if ( !rtspThread->isRunning() ) + if (!rtspThread || rtspThread->IsStopped()) return -1; if ( !rtspThread->hasSources() ) { Error("Cannot precapture, no RTP sources"); @@ -234,7 +229,7 @@ int RemoteCameraRtsp::Capture(ZMPacket &zm_packet) { while ( !frameComplete ) { buffer.clear(); - if ( !rtspThread->isRunning() ) + if (!rtspThread || rtspThread->IsStopped()) return -1; if ( rtspThread->getFrame(buffer) ) { diff --git a/src/zm_remote_camera_rtsp.h b/src/zm_remote_camera_rtsp.h index bd8b30a7c..e9014e66b 100644 --- a/src/zm_remote_camera_rtsp.h +++ b/src/zm_remote_camera_rtsp.h @@ -44,7 +44,7 @@ protected: RtspThread::RtspMethod method; - RtspThread *rtspThread; + std::unique_ptr rtspThread; int frameCount; diff --git a/src/zm_rtp_ctrl.cpp b/src/zm_rtp_ctrl.cpp index a978d2f2d..8c0875c19 100644 --- a/src/zm_rtp_ctrl.cpp +++ b/src/zm_rtp_ctrl.cpp @@ -327,7 +327,7 @@ int RtpCtrlThread::run() { } // end foeach comms iterator } rtpCtrlServer.close(); - mRtspThread.stop(); + mRtspThread.Stop(); return 0; } diff --git a/src/zm_rtp_data.cpp b/src/zm_rtp_data.cpp index 1d8443a37..4409d39ff 100644 --- a/src/zm_rtp_data.cpp +++ b/src/zm_rtp_data.cpp @@ -98,7 +98,7 @@ int RtpDataThread::run() { } // end foreach commsList } rtpDataSocket.close(); - mRtspThread.stop(); + mRtspThread.Stop(); return 0; } diff --git a/src/zm_rtsp.cpp b/src/zm_rtsp.cpp index 67373813e..92e14fccc 100644 --- a/src/zm_rtsp.cpp +++ b/src/zm_rtsp.cpp @@ -161,7 +161,7 @@ RtspThread::RtspThread( mSsrc(0), mDist(UNDEFINED), mRtpTime(0), - mStop(false) + mTerminate(false) { mUrl = mProtocol+"://"+mHost+":"+mPort; if ( !mPath.empty() ) { @@ -185,9 +185,15 @@ RtspThread::RtspThread( mAuthenticator = new zm::Authenticator(parts[0], parts[1]); else mAuthenticator = new zm::Authenticator(parts[0], ""); + + mThread = std::thread(&RtspThread::Run, this); } RtspThread::~RtspThread() { + Stop(); + if (mThread.joinable()) + mThread.join(); + if ( mFormatContext ) { #if LIBAVFORMAT_VERSION_CHECK(52, 96, 0, 96, 0) avformat_free_context(mFormatContext); @@ -204,7 +210,7 @@ RtspThread::~RtspThread() { mAuthenticator = nullptr; } -int RtspThread::run() { +void RtspThread::Run() { std::string message; std::string response; @@ -246,11 +252,11 @@ int RtspThread::run() { Debug(2, "Sending HTTP message: %s", message.c_str()); if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) { Error("Unable to send message '%s': %s", message.c_str(), strerror(errno)); - return -1; + return; } if ( mRtspSocket.recv(response) < 0 ) { Error("Recv failed; %s", strerror(errno)); - return -1; + return; } Debug(2, "Received HTTP response: %s (%zd bytes)", response.c_str(), response.size()); @@ -264,7 +270,7 @@ int RtspThread::run() { if ( response.size() ) Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16)); } - return -1; + return; } // If Server requests authentication, check WWW-Authenticate header and fill required fields // for requested authentication method @@ -282,7 +288,7 @@ int RtspThread::run() { if ( respCode != 200 ) { Error("Unexpected response code %d, text is '%s'", respCode, respText); - return -1; + return; } message = "POST "+mPath+" HTTP/1.0\r\n"; @@ -295,7 +301,7 @@ int RtspThread::run() { Debug(2, "Sending HTTP message: %s", message.c_str()); if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) { Error("Unable to send message '%s': %s", message.c_str(), strerror(errno)); - return -1; + return; } } // end if ( mMethod == RTP_RTSP_HTTP ) @@ -305,18 +311,18 @@ int RtspThread::run() { // Request supported RTSP commands by the server message = "OPTIONS "+mUrl+" RTSP/1.0\r\n"; if ( !sendCommand(message) ) - return -1; + return; // A negative return here may indicate auth failure, but we will have setup the auth mechanisms so we need to retry. if ( !recvResponse(response) ) { if ( mNeedAuth ) { Debug(2, "Resending OPTIONS due to possible auth requirement"); if ( !sendCommand(message) ) - return -1; + return; if ( !recvResponse(response) ) - return -1; + return; } else { - return -1; + return; } } // end if failed response maybe due to auth @@ -347,7 +353,7 @@ int RtspThread::run() { const std::string endOfHeaders = "\r\n\r\n"; size_t sdpStart = response.find(endOfHeaders); if ( sdpStart == std::string::npos ) - return -1; + return; if ( mRtspDescribe ) { std::string DescHeader = response.substr(0, sdpStart); @@ -374,7 +380,7 @@ int RtspThread::run() { mFormatContext = mSessDesc->generateFormatContext(); } catch ( const Exception &e ) { Error(e.getMessage().c_str()); - return -1; + return; } #if 0 @@ -450,9 +456,9 @@ int RtspThread::run() { } if ( !sendCommand(message) ) - return -1; + return; if ( !recvResponse(response) ) - return -1; + return; lines = split(response, "\r\n"); std::string session; @@ -525,9 +531,9 @@ int RtspThread::run() { message = "PLAY "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\nRange: npt=0.000-\r\n"; if ( !sendCommand(message) ) - return -1; + return; if ( !recvResponse(response) ) - return -1; + return; lines = split(response, "\r\n"); std::string rtpInfo; @@ -590,14 +596,14 @@ int RtspThread::run() { rtpDataThread.start(); rtpCtrlThread.start(); - while( !mStop ) { + while (!mTerminate) { now = time(nullptr); // Send a keepalive message if the server supports this feature and we are close to the timeout expiration Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepalive, timeout, now, lastKeepalive, (now-lastKeepalive) ); if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) { if ( !sendCommand( message ) ) - return( -1 ); + return; lastKeepalive = now; } usleep( 100000 ); @@ -612,9 +618,9 @@ int RtspThread::run() { message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; if ( !sendCommand( message ) ) - return( -1 ); + return; if ( !recvResponse( response ) ) - return( -1 ); + return; rtpDataThread.stop(); rtpCtrlThread.stop(); @@ -624,7 +630,7 @@ int RtspThread::run() { rtpDataThread.join(); rtpCtrlThread.join(); - + delete mSources[ssrc]; mSources.clear(); @@ -647,7 +653,7 @@ int RtspThread::run() { Buffer buffer( ZM_NETWORK_BUFSIZ ); std::string keepaliveMessage = "OPTIONS "+mUrl+" RTSP/1.0\r\n"; std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n"; - while ( !mStop && select.wait() >= 0 ) { + while (!mTerminate && select.wait() >= 0) { ZM::Select::CommsList readable = select.getReadable(); if ( readable.size() == 0 ) { Error( "RTSP timed out" ); @@ -720,7 +726,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) { if ( !sendCommand( message ) ) - return( -1 ); + return; lastKeepalive = now; } buffer.tidy( 1 ); @@ -735,7 +741,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali // Send a teardown message but don't expect a response as this may not be implemented on the server when using TCP message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; if ( !sendCommand( message ) ) - return( -1 ); + return; delete mSources[ssrc]; mSources.clear(); @@ -752,11 +758,11 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali rtpDataThread.start(); rtpCtrlThread.start(); - while ( !mStop ) { + while (!mTerminate) { // Send a keepalive message if the server supports this feature and we are close to the timeout expiration if ( sendKeepalive && (timeout > 0) && ((time(nullptr)-lastKeepalive) > (timeout-5)) ) { if ( !sendCommand( message ) ) - return -1; + return; lastKeepalive = time(nullptr); } usleep(100000); @@ -770,9 +776,9 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali #endif message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; if ( !sendCommand(message) ) - return -1; + return; if ( !recvResponse(response) ) - return -1; + return; rtpDataThread.stop(); rtpCtrlThread.stop(); @@ -791,7 +797,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali break; } - return 0; + return; } #endif // HAVE_LIBAVFORMAT diff --git a/src/zm_rtsp.h b/src/zm_rtsp.h index 26189cd20..6cb88d091 100644 --- a/src/zm_rtsp.h +++ b/src/zm_rtsp.h @@ -24,10 +24,12 @@ #include "zm_rtp_source.h" #include "zm_rtsp_auth.h" #include "zm_sdp.h" +#include #include #include +#include -class RtspThread : public Thread { +class RtspThread { public: typedef enum { RTP_UNICAST, RTP_MULTICAST, RTP_RTSP, RTP_RTSP_HTTP } RtspMethod; typedef enum { UNDEFINED, UNICAST, MULTICAST } RtspDist; @@ -84,12 +86,14 @@ private: unsigned long mRtpTime; - bool mStop; + std::thread mThread; + std::atomic mTerminate; private: bool sendCommand( std::string message ); bool recvResponse( std::string &response ); - void checkAuthResponse(std::string &response); + void checkAuthResponse(std::string &response); + void Run(); public: RtspThread( int id, RtspMethod method, const std::string &protocol, const std::string &host, const std::string &port, const std::string &path, const std::string &auth, bool rtsp_describe ); @@ -124,15 +128,10 @@ public: return( false ); return( iter->second->getFrame( frame ) ); } - int run(); - void stop() - { - mStop = true; - } - bool stopped() const - { - return( mStop ); - } + + void Stop() { mTerminate = true; } + bool IsStopped() const { return mTerminate; } + int getAddressFamily () { return mRtspSocket.getDomain();