RtpCtrlThread: Convert to std::thread

This commit is contained in:
Peter Keresztes Schmidt 2021-03-03 00:35:34 +01:00
parent ff2bfb58da
commit 71edb9d830
3 changed files with 24 additions and 19 deletions

View File

@ -25,9 +25,16 @@
#if HAVE_LIBAVFORMAT #if HAVE_LIBAVFORMAT
RtpCtrlThread::RtpCtrlThread( RtspThread &rtspThread, RtpSource &rtpSource ) RtpCtrlThread::RtpCtrlThread(RtspThread &rtspThread, RtpSource &rtpSource)
: mRtspThread( rtspThread ), mRtpSource( rtpSource ), mStop( false ) : mRtspThread(rtspThread), mRtpSource(rtpSource), mTerminate(false)
{ {
mThread = std::thread(&RtpCtrlThread::Run, this);
}
RtpCtrlThread::~RtpCtrlThread() {
Stop();
if (mThread.joinable())
mThread.join();
} }
int RtpCtrlThread::recvPacket( const unsigned char *packet, ssize_t packetLen ) { int RtpCtrlThread::recvPacket( const unsigned char *packet, ssize_t packetLen ) {
@ -121,7 +128,7 @@ int RtpCtrlThread::recvPacket( const unsigned char *packet, ssize_t packetLen )
} }
case RTCP_BYE : case RTCP_BYE :
Debug(5, "RTCP Got BYE"); Debug(5, "RTCP Got BYE");
mStop = true; Stop();
break; break;
case RTCP_APP : case RTCP_APP :
// Ignoring as per RFC 3550 // Ignoring as per RFC 3550
@ -241,7 +248,7 @@ int RtpCtrlThread::recvPackets( unsigned char *buffer, ssize_t nBytes ) {
return nBytes; return nBytes;
} }
int RtpCtrlThread::run() { void RtpCtrlThread::Run() {
Debug( 2, "Starting control thread %x on port %d", mRtpSource.getSsrc(), mRtpSource.getLocalCtrlPort() ); Debug( 2, "Starting control thread %x on port %d", mRtpSource.getSsrc(), mRtpSource.getLocalCtrlPort() );
ZM::SockAddrInet localAddr, remoteAddr; ZM::SockAddrInet localAddr, remoteAddr;
@ -272,8 +279,7 @@ int RtpCtrlThread::run() {
time_t last_receive = time(nullptr); time_t last_receive = time(nullptr);
bool timeout = false; // used as a flag that we had a timeout, and then sent an RR to see if we wake back up. Real timeout will happen when this is true. bool timeout = false; // used as a flag that we had a timeout, and then sent an RR to see if we wake back up. Real timeout will happen when this is true.
while ( !mStop && select.wait() >= 0 ) { while (!mTerminate && select.wait() >= 0) {
time_t now = time(nullptr); time_t now = time(nullptr);
ZM::Select::CommsList readable = select.getReadable(); ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) { if ( readable.size() == 0 ) {
@ -318,7 +324,7 @@ int RtpCtrlThread::run() {
} }
} else { } else {
// Here is another case of not receiving some data causing us to terminate... why? Sometimes there are pauses in the interwebs. // Here is another case of not receiving some data causing us to terminate... why? Sometimes there are pauses in the interwebs.
mStop = true; Stop();
break; break;
} }
} else { } else {
@ -328,7 +334,6 @@ int RtpCtrlThread::run() {
} }
rtpCtrlServer.close(); rtpCtrlServer.close();
mRtspThread.Stop(); mRtspThread.Stop();
return 0;
} }
#endif // HAVE_LIBAVFORMAT #endif // HAVE_LIBAVFORMAT

View File

@ -21,6 +21,8 @@
#define ZM_RTP_CTRL_H #define ZM_RTP_CTRL_H
#include "zm_thread.h" #include "zm_thread.h"
#include <atomic>
#include <thread>
// Defined in ffmpeg rtp.h // Defined in ffmpeg rtp.h
//#define RTP_MAX_SDES 255 // maximum text length for SDES //#define RTP_MAX_SDES 255 // maximum text length for SDES
@ -32,7 +34,7 @@
class RtspThread; class RtspThread;
class RtpSource; class RtpSource;
class RtpCtrlThread : public Thread { class RtpCtrlThread {
friend class RtspThread; friend class RtspThread;
private: private:
@ -121,7 +123,9 @@ private:
RtspThread &mRtspThread; RtspThread &mRtspThread;
RtpSource &mRtpSource; RtpSource &mRtpSource;
int mPort; int mPort;
bool mStop;
std::atomic<bool> mTerminate;
std::thread mThread;
private: private:
int recvPacket( const unsigned char *packet, ssize_t packetLen ); int recvPacket( const unsigned char *packet, ssize_t packetLen );
@ -129,14 +133,13 @@ private:
int generateSdes( const unsigned char *packet, ssize_t packetLen ); int generateSdes( const unsigned char *packet, ssize_t packetLen );
int generateBye( const unsigned char *packet, ssize_t packetLen ); int generateBye( const unsigned char *packet, ssize_t packetLen );
int recvPackets( unsigned char *buffer, ssize_t nBytes ); int recvPackets( unsigned char *buffer, ssize_t nBytes );
int run(); void Run();
public: public:
RtpCtrlThread( RtspThread &rtspThread, RtpSource &rtpSource ); RtpCtrlThread( RtspThread &rtspThread, RtpSource &rtpSource );
~RtpCtrlThread();
void stop() { void Stop() { mTerminate = true; }
mStop = true;
}
}; };
#endif // ZM_RTP_CTRL_H #endif // ZM_RTP_CTRL_H

View File

@ -587,7 +587,6 @@ void RtspThread::Run() {
RtpCtrlThread rtpCtrlThread( *this, *source ); RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start(); rtpDataThread.start();
rtpCtrlThread.start();
while (!mTerminate) { while (!mTerminate) {
now = time(nullptr); now = time(nullptr);
@ -616,7 +615,7 @@ void RtspThread::Run() {
return; return;
rtpDataThread.stop(); rtpDataThread.stop();
rtpCtrlThread.stop(); rtpCtrlThread.Stop();
//rtpDataThread.kill( SIGTERM ); //rtpDataThread.kill( SIGTERM );
//rtpCtrlThread.kill( SIGTERM ); //rtpCtrlThread.kill( SIGTERM );
@ -749,7 +748,6 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
RtpCtrlThread rtpCtrlThread( *this, *source ); RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start(); rtpDataThread.start();
rtpCtrlThread.start();
while (!mTerminate) { while (!mTerminate) {
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration // Send a keepalive message if the server supports this feature and we are close to the timeout expiration
@ -774,10 +772,9 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
return; return;
rtpDataThread.stop(); rtpDataThread.stop();
rtpCtrlThread.stop(); rtpCtrlThread.Stop();
rtpDataThread.join(); rtpDataThread.join();
rtpCtrlThread.join();
delete mSources[ssrc]; delete mSources[ssrc];
mSources.clear(); mSources.clear();