RtpDataThread: Convert to std::thread

This commit is contained in:
Peter Keresztes Schmidt 2021-03-03 00:40:57 +01:00
parent 71edb9d830
commit 8f0431d85b
3 changed files with 23 additions and 24 deletions

View File

@ -26,8 +26,15 @@
#if HAVE_LIBAVFORMAT
RtpDataThread::RtpDataThread(RtspThread &rtspThread, RtpSource &rtpSource) :
mRtspThread(rtspThread), mRtpSource(rtpSource), mStop(false)
mRtspThread(rtspThread), mRtpSource(rtpSource), mTerminate(false)
{
mThread = std::thread(&RtpDataThread::Run, this);
}
RtpDataThread::~RtpDataThread() {
Stop();
if (mThread.joinable())
mThread.join();
}
bool RtpDataThread::recvPacket(const unsigned char *packet, size_t packetLen) {
@ -54,7 +61,7 @@ bool RtpDataThread::recvPacket(const unsigned char *packet, size_t packetLen) {
return mRtpSource.handlePacket(packet, packetLen);
}
int RtpDataThread::run() {
void RtpDataThread::Run() {
Debug(2, "Starting data thread %d on port %d",
mRtpSource.getSsrc(), mRtpSource.getLocalDataPort());
@ -75,11 +82,11 @@ int RtpDataThread::run() {
select.addReader(&rtpDataSocket);
unsigned char buffer[ZM_NETWORK_BUFSIZ];
while ( !zm_terminate && !mStop && (select.wait() >= 0) ) {
while ( !zm_terminate && !mTerminate && (select.wait() >= 0) ) {
ZM::Select::CommsList readable = select.getReadable();
if ( readable.size() == 0 ) {
Error("RTP timed out");
mStop = true;
Stop();
break;
}
for ( ZM::Select::CommsList::iterator iter = readable.begin(); iter != readable.end(); ++iter ) {
@ -89,7 +96,7 @@ int RtpDataThread::run() {
if ( nBytes ) {
recvPacket(buffer, nBytes);
} else {
mStop = true;
Stop();
break;
}
} else {
@ -99,7 +106,6 @@ int RtpDataThread::run() {
}
rtpDataSocket.close();
mRtspThread.Stop();
return 0;
}
#endif // HAVE_LIBAVFORMAT

View File

@ -21,7 +21,8 @@
#define ZM_RTP_DATA_H
#include "zm_define.h"
#include "zm_thread.h"
#include <atomic>
#include <thread>
class RtspThread;
class RtpSource;
@ -40,26 +41,26 @@ struct RtpDataHeader
uint32_t csrc[]; // optional CSRC list
};
class RtpDataThread : public Thread
class RtpDataThread
{
friend class RtspThread;
private:
RtspThread &mRtspThread;
RtpSource &mRtpSource;
bool mStop;
std::atomic<bool> mTerminate;
std::thread mThread;
private:
bool recvPacket( const unsigned char *packet, size_t packetLen );
int run();
void Run();
public:
RtpDataThread( RtspThread &rtspThread, RtpSource &rtpSource );
~RtpDataThread();
void stop()
{
mStop = true;
}
void Stop() { mTerminate = true; }
};
#endif // ZM_RTP_DATA_H

View File

@ -586,8 +586,6 @@ void RtspThread::Run() {
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start();
while (!mTerminate) {
now = time(nullptr);
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
@ -614,15 +612,12 @@ void RtspThread::Run() {
if ( !recvResponse( response ) )
return;
rtpDataThread.stop();
rtpDataThread.Stop();
rtpCtrlThread.Stop();
//rtpDataThread.kill( SIGTERM );
//rtpCtrlThread.kill( SIGTERM );
rtpDataThread.join();
rtpCtrlThread.join();
delete mSources[ssrc];
mSources.clear();
@ -747,7 +742,6 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
rtpDataThread.start();
while (!mTerminate) {
// Send a keepalive message if the server supports this feature and we are close to the timeout expiration
@ -771,11 +765,9 @@ Debug(5, "sendkeepalive %d, timeout %d, now: %d last: %d since: %d", sendKeepali
if ( !recvResponse(response) )
return;
rtpDataThread.stop();
rtpDataThread.Stop();
rtpCtrlThread.Stop();
rtpDataThread.join();
delete mSources[ssrc];
mSources.clear();