RtspThread: Convert to std::thread

This commit is contained in:
Peter Keresztes Schmidt 2021-03-02 19:56:35 +01:00 committed by Peter Keresztes Schmidt
parent e67626b4e2
commit 8f941c75cd
6 changed files with 56 additions and 56 deletions

View File

@ -48,7 +48,6 @@ RemoteCameraRtsp::RemoteCameraRtsp(
p_brightness, p_contrast, p_hue, p_colour, p_brightness, p_contrast, p_hue, p_colour,
p_capture, p_record_audio), p_capture, p_record_audio),
rtsp_describe(p_rtsp_describe), rtsp_describe(p_rtsp_describe),
rtspThread(0),
frameCount(0) frameCount(0)
{ {
if ( p_method == "rtpUni" ) if ( p_method == "rtpUni" )
@ -114,19 +113,15 @@ void RemoteCameraRtsp::Terminate() {
} }
int RemoteCameraRtsp::Connect() { int RemoteCameraRtsp::Connect() {
rtspThread = new RtspThread(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe); rtspThread = ZM::make_unique<RtspThread>(monitor->Id(), method, protocol, host, port, path, auth, rtsp_describe);
rtspThread->start();
return 0; return 0;
} }
int RemoteCameraRtsp::Disconnect() { int RemoteCameraRtsp::Disconnect() {
if ( rtspThread ) { if (rtspThread) {
rtspThread->stop(); rtspThread->Stop();
rtspThread->join(); rtspThread.reset();
delete rtspThread;
rtspThread = nullptr;
} }
return 0; return 0;
} }
@ -214,7 +209,7 @@ int RemoteCameraRtsp::PrimeCapture() {
} // end PrimeCapture } // end PrimeCapture
int RemoteCameraRtsp::PreCapture() { int RemoteCameraRtsp::PreCapture() {
if ( !rtspThread->isRunning() ) if (!rtspThread || rtspThread->IsStopped())
return -1; return -1;
if ( !rtspThread->hasSources() ) { if ( !rtspThread->hasSources() ) {
Error("Cannot precapture, no RTP sources"); Error("Cannot precapture, no RTP sources");
@ -234,7 +229,7 @@ int RemoteCameraRtsp::Capture(ZMPacket &zm_packet) {
while ( !frameComplete ) { while ( !frameComplete ) {
buffer.clear(); buffer.clear();
if ( !rtspThread->isRunning() ) if (!rtspThread || rtspThread->IsStopped())
return -1; return -1;
if ( rtspThread->getFrame(buffer) ) { if ( rtspThread->getFrame(buffer) ) {

View File

@ -44,7 +44,7 @@ protected:
RtspThread::RtspMethod method; RtspThread::RtspMethod method;
RtspThread *rtspThread; std::unique_ptr<RtspThread> rtspThread;
int frameCount; int frameCount;

View File

@ -327,7 +327,7 @@ int RtpCtrlThread::run() {
} // end foeach comms iterator } // end foeach comms iterator
} }
rtpCtrlServer.close(); rtpCtrlServer.close();
mRtspThread.stop(); mRtspThread.Stop();
return 0; return 0;
} }

View File

@ -98,7 +98,7 @@ int RtpDataThread::run() {
} // end foreach commsList } // end foreach commsList
} }
rtpDataSocket.close(); rtpDataSocket.close();
mRtspThread.stop(); mRtspThread.Stop();
return 0; return 0;
} }

View File

@ -161,7 +161,7 @@ RtspThread::RtspThread(
mSsrc(0), mSsrc(0),
mDist(UNDEFINED), mDist(UNDEFINED),
mRtpTime(0), mRtpTime(0),
mStop(false) mTerminate(false)
{ {
mUrl = mProtocol+"://"+mHost+":"+mPort; mUrl = mProtocol+"://"+mHost+":"+mPort;
if ( !mPath.empty() ) { if ( !mPath.empty() ) {
@ -185,9 +185,15 @@ RtspThread::RtspThread(
mAuthenticator = new zm::Authenticator(parts[0], parts[1]); mAuthenticator = new zm::Authenticator(parts[0], parts[1]);
else else
mAuthenticator = new zm::Authenticator(parts[0], ""); mAuthenticator = new zm::Authenticator(parts[0], "");
mThread = std::thread(&RtspThread::Run, this);
} }
RtspThread::~RtspThread() { RtspThread::~RtspThread() {
Stop();
if (mThread.joinable())
mThread.join();
if ( mFormatContext ) { if ( mFormatContext ) {
#if LIBAVFORMAT_VERSION_CHECK(52, 96, 0, 96, 0) #if LIBAVFORMAT_VERSION_CHECK(52, 96, 0, 96, 0)
avformat_free_context(mFormatContext); avformat_free_context(mFormatContext);
@ -204,7 +210,7 @@ RtspThread::~RtspThread() {
mAuthenticator = nullptr; mAuthenticator = nullptr;
} }
int RtspThread::run() { void RtspThread::Run() {
std::string message; std::string message;
std::string response; std::string response;
@ -246,11 +252,11 @@ int RtspThread::run() {
Debug(2, "Sending HTTP message: %s", message.c_str()); Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) { if ( mRtspSocket.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno)); Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return -1; return;
} }
if ( mRtspSocket.recv(response) < 0 ) { if ( mRtspSocket.recv(response) < 0 ) {
Error("Recv failed; %s", strerror(errno)); Error("Recv failed; %s", strerror(errno));
return -1; return;
} }
Debug(2, "Received HTTP response: %s (%zd bytes)", response.c_str(), response.size()); Debug(2, "Received HTTP response: %s (%zd bytes)", response.c_str(), response.size());
@ -264,7 +270,7 @@ int RtspThread::run() {
if ( response.size() ) if ( response.size() )
Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16)); Hexdump(Logger::ERROR, response.data(), std::min(int(response.size()), 16));
} }
return -1; return;
} }
// If Server requests authentication, check WWW-Authenticate header and fill required fields // If Server requests authentication, check WWW-Authenticate header and fill required fields
// for requested authentication method // for requested authentication method
@ -282,7 +288,7 @@ int RtspThread::run() {
if ( respCode != 200 ) { if ( respCode != 200 ) {
Error("Unexpected response code %d, text is '%s'", respCode, respText); Error("Unexpected response code %d, text is '%s'", respCode, respText);
return -1; return;
} }
message = "POST "+mPath+" HTTP/1.0\r\n"; message = "POST "+mPath+" HTTP/1.0\r\n";
@ -295,7 +301,7 @@ int RtspThread::run() {
Debug(2, "Sending HTTP message: %s", message.c_str()); Debug(2, "Sending HTTP message: %s", message.c_str());
if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) { if ( mRtspSocket2.send(message.c_str(), message.size()) != (int)message.length() ) {
Error("Unable to send message '%s': %s", message.c_str(), strerror(errno)); Error("Unable to send message '%s': %s", message.c_str(), strerror(errno));
return -1; return;
} }
} // end if ( mMethod == RTP_RTSP_HTTP ) } // end if ( mMethod == RTP_RTSP_HTTP )
@ -305,18 +311,18 @@ int RtspThread::run() {
// Request supported RTSP commands by the server // Request supported RTSP commands by the server
message = "OPTIONS "+mUrl+" RTSP/1.0\r\n"; message = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
if ( !sendCommand(message) ) if ( !sendCommand(message) )
return -1; return;
// A negative return here may indicate auth failure, but we will have setup the auth mechanisms so we need to retry. // A negative return here may indicate auth failure, but we will have setup the auth mechanisms so we need to retry.
if ( !recvResponse(response) ) { if ( !recvResponse(response) ) {
if ( mNeedAuth ) { if ( mNeedAuth ) {
Debug(2, "Resending OPTIONS due to possible auth requirement"); Debug(2, "Resending OPTIONS due to possible auth requirement");
if ( !sendCommand(message) ) if ( !sendCommand(message) )
return -1; return;
if ( !recvResponse(response) ) if ( !recvResponse(response) )
return -1; return;
} else { } else {
return -1; return;
} }
} // end if failed response maybe due to auth } // end if failed response maybe due to auth
@ -347,7 +353,7 @@ int RtspThread::run() {
const std::string endOfHeaders = "\r\n\r\n"; const std::string endOfHeaders = "\r\n\r\n";
size_t sdpStart = response.find(endOfHeaders); size_t sdpStart = response.find(endOfHeaders);
if ( sdpStart == std::string::npos ) if ( sdpStart == std::string::npos )
return -1; return;
if ( mRtspDescribe ) { if ( mRtspDescribe ) {
std::string DescHeader = response.substr(0, sdpStart); std::string DescHeader = response.substr(0, sdpStart);
@ -374,7 +380,7 @@ int RtspThread::run() {
mFormatContext = mSessDesc->generateFormatContext(); mFormatContext = mSessDesc->generateFormatContext();
} catch ( const Exception &e ) { } catch ( const Exception &e ) {
Error(e.getMessage().c_str()); Error(e.getMessage().c_str());
return -1; return;
} }
#if 0 #if 0
@ -450,9 +456,9 @@ int RtspThread::run() {
} }
if ( !sendCommand(message) ) if ( !sendCommand(message) )
return -1; return;
if ( !recvResponse(response) ) if ( !recvResponse(response) )
return -1; return;
lines = split(response, "\r\n"); lines = split(response, "\r\n");
std::string session; std::string session;
@ -525,9 +531,9 @@ int RtspThread::run() {
message = "PLAY "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\nRange: npt=0.000-\r\n"; message = "PLAY "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\nRange: npt=0.000-\r\n";
if ( !sendCommand(message) ) if ( !sendCommand(message) )
return -1; return;
if ( !recvResponse(response) ) if ( !recvResponse(response) )
return -1; return;
lines = split(response, "\r\n"); lines = split(response, "\r\n");
std::string rtpInfo; std::string rtpInfo;
@ -590,14 +596,14 @@ int RtspThread::run() {
rtpDataThread.start(); rtpDataThread.start();
rtpCtrlThread.start(); rtpCtrlThread.start();
while( !mStop ) { while (!mTerminate) {
now = time(nullptr); now = time(nullptr);
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration // Send a keepalive message if the server supports this feature and we are close to the timeout expiration
Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d",
sendKeepalive, timeout, now, lastKeepalive, (now-lastKeepalive) ); sendKeepalive, timeout, now, lastKeepalive, (now-lastKeepalive) );
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) { if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) ) if ( !sendCommand( message ) )
return( -1 ); return;
lastKeepalive = now; lastKeepalive = now;
} }
usleep( 100000 ); usleep( 100000 );
@ -612,9 +618,9 @@ int RtspThread::run() {
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) ) if ( !sendCommand( message ) )
return( -1 ); return;
if ( !recvResponse( response ) ) if ( !recvResponse( response ) )
return( -1 ); return;
rtpDataThread.stop(); rtpDataThread.stop();
rtpCtrlThread.stop(); rtpCtrlThread.stop();
@ -624,7 +630,7 @@ int RtspThread::run() {
rtpDataThread.join(); rtpDataThread.join();
rtpCtrlThread.join(); rtpCtrlThread.join();
delete mSources[ssrc]; delete mSources[ssrc];
mSources.clear(); mSources.clear();
@ -647,7 +653,7 @@ int RtspThread::run() {
Buffer buffer( ZM_NETWORK_BUFSIZ ); Buffer buffer( ZM_NETWORK_BUFSIZ );
std::string keepaliveMessage = "OPTIONS "+mUrl+" RTSP/1.0\r\n"; std::string keepaliveMessage = "OPTIONS "+mUrl+" RTSP/1.0\r\n";
std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n"; std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n";
while ( !mStop && select.wait() >= 0 ) { while (!mTerminate && select.wait() >= 0) {
ZM::Select::CommsList readable = select.getReadable(); ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) { if ( readable.size() == 0 ) {
Error( "RTSP timed out" ); Error( "RTSP timed out" );
@ -720,7 +726,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) ) if ( sendKeepalive && (timeout > 0) && ((now-lastKeepalive) > (timeout-5)) )
{ {
if ( !sendCommand( message ) ) if ( !sendCommand( message ) )
return( -1 ); return;
lastKeepalive = now; lastKeepalive = now;
} }
buffer.tidy( 1 ); buffer.tidy( 1 );
@ -735,7 +741,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
// Send a teardown message but don't expect a response as this may not be implemented on the server when using TCP // Send a teardown message but don't expect a response as this may not be implemented on the server when using TCP
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand( message ) ) if ( !sendCommand( message ) )
return( -1 ); return;
delete mSources[ssrc]; delete mSources[ssrc];
mSources.clear(); mSources.clear();
@ -752,11 +758,11 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
rtpDataThread.start(); rtpDataThread.start();
rtpCtrlThread.start(); rtpCtrlThread.start();
while ( !mStop ) { while (!mTerminate) {
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration // Send a keepalive message if the server supports this feature and we are close to the timeout expiration
if ( sendKeepalive && (timeout > 0) && ((time(nullptr)-lastKeepalive) > (timeout-5)) ) { if ( sendKeepalive && (timeout > 0) && ((time(nullptr)-lastKeepalive) > (timeout-5)) ) {
if ( !sendCommand( message ) ) if ( !sendCommand( message ) )
return -1; return;
lastKeepalive = time(nullptr); lastKeepalive = time(nullptr);
} }
usleep(100000); usleep(100000);
@ -770,9 +776,9 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
#endif #endif
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n"; message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
if ( !sendCommand(message) ) if ( !sendCommand(message) )
return -1; return;
if ( !recvResponse(response) ) if ( !recvResponse(response) )
return -1; return;
rtpDataThread.stop(); rtpDataThread.stop();
rtpCtrlThread.stop(); rtpCtrlThread.stop();
@ -791,7 +797,7 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
break; break;
} }
return 0; return;
} }
#endif // HAVE_LIBAVFORMAT #endif // HAVE_LIBAVFORMAT

View File

@ -24,10 +24,12 @@
#include "zm_rtp_source.h" #include "zm_rtp_source.h"
#include "zm_rtsp_auth.h" #include "zm_rtsp_auth.h"
#include "zm_sdp.h" #include "zm_sdp.h"
#include <atomic>
#include <map> #include <map>
#include <set> #include <set>
#include <thread>
class RtspThread : public Thread { class RtspThread {
public: public:
typedef enum { RTP_UNICAST, RTP_MULTICAST, RTP_RTSP, RTP_RTSP_HTTP } RtspMethod; typedef enum { RTP_UNICAST, RTP_MULTICAST, RTP_RTSP, RTP_RTSP_HTTP } RtspMethod;
typedef enum { UNDEFINED, UNICAST, MULTICAST } RtspDist; typedef enum { UNDEFINED, UNICAST, MULTICAST } RtspDist;
@ -84,12 +86,14 @@ private:
unsigned long mRtpTime; unsigned long mRtpTime;
bool mStop; std::thread mThread;
std::atomic<bool> mTerminate;
private: private:
bool sendCommand( std::string message ); bool sendCommand( std::string message );
bool recvResponse( std::string &response ); bool recvResponse( std::string &response );
void checkAuthResponse(std::string &response); void checkAuthResponse(std::string &response);
void Run();
public: public:
RtspThread( int id, RtspMethod method, const std::string &protocol, const std::string &host, const std::string &port, const std::string &path, const std::string &auth, bool rtsp_describe ); RtspThread( int id, RtspMethod method, const std::string &protocol, const std::string &host, const std::string &port, const std::string &path, const std::string &auth, bool rtsp_describe );
@ -124,15 +128,10 @@ public:
return( false ); return( false );
return( iter->second->getFrame( frame ) ); return( iter->second->getFrame( frame ) );
} }
int run();
void stop() void Stop() { mTerminate = true; }
{ bool IsStopped() const { return mTerminate; }
mStop = true;
}
bool stopped() const
{
return( mStop );
}
int getAddressFamily () int getAddressFamily ()
{ {
return mRtspSocket.getDomain(); return mRtspSocket.getDomain();