340 lines
9.2 KiB
C++
340 lines
9.2 KiB
C++
//
|
|
// ZoneMinder Thread Class Implementation, $Date$, $Revision$
|
|
// Copyright (C) 2001-2008 Philip Coombes
|
|
//
|
|
// This program is free software; you can redistribute it and/or
|
|
// modify it under the terms of the GNU General Public License
|
|
// as published by the Free Software Foundation; either version 2
|
|
// of the License, or (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with this program; if not, write to the Free Software
|
|
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
//
|
|
|
|
#include "zm_thread.h"
|
|
|
|
#include "zm_logger.h"
|
|
#include "zm_utils.h"
|
|
|
|
#include <string.h>
|
|
#include <signal.h>
|
|
#include <errno.h>
|
|
#include <sys/time.h>
|
|
|
|
struct timespec getTimeout( int secs )
|
|
{
|
|
struct timespec timeout;
|
|
struct timeval temp_timeout;
|
|
gettimeofday( &temp_timeout, 0 );
|
|
timeout.tv_sec = temp_timeout.tv_sec + secs;
|
|
timeout.tv_nsec = temp_timeout.tv_usec*1000;
|
|
return( timeout );
|
|
}
|
|
|
|
struct timespec getTimeout( double secs )
|
|
{
|
|
struct timespec timeout;
|
|
struct timeval temp_timeout;
|
|
gettimeofday( &temp_timeout, 0 );
|
|
timeout.tv_sec = temp_timeout.tv_sec + int(secs);
|
|
timeout.tv_nsec = temp_timeout.tv_usec += (long int)(1000000000.0*(secs-int(secs)));
|
|
if ( timeout.tv_nsec > 1000000000 )
|
|
{
|
|
timeout.tv_sec += 1;
|
|
timeout.tv_nsec -= 1000000000;
|
|
}
|
|
return( timeout );
|
|
}
|
|
|
|
Mutex::Mutex()
|
|
{
|
|
if ( pthread_mutex_init( &mMutex, NULL ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to create pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
Mutex::~Mutex()
|
|
{
|
|
if ( locked() )
|
|
Warning( "Destroying mutex when locked" );
|
|
if ( pthread_mutex_destroy( &mMutex ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to destroy pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Mutex::lock()
|
|
{
|
|
if ( pthread_mutex_lock( &mMutex ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to lock pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Mutex::lock( int secs )
|
|
{
|
|
struct timespec timeout = getTimeout( secs );
|
|
if ( pthread_mutex_timedlock( &mMutex, &timeout ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to timedlock pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Mutex::lock( double secs )
|
|
{
|
|
struct timespec timeout = getTimeout( secs );
|
|
if ( pthread_mutex_timedlock( &mMutex, &timeout ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to timedlock pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Mutex::unlock()
|
|
{
|
|
if ( pthread_mutex_unlock( &mMutex ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to unlock pthread mutex: %s", strerror(errno) ) );
|
|
}
|
|
|
|
bool Mutex::locked()
|
|
{
|
|
int state = pthread_mutex_trylock( &mMutex );
|
|
if ( state != 0 && state != EBUSY )
|
|
throw ThreadException( stringtf( "Unable to trylock pthread mutex: %s", strerror(errno) ) );
|
|
if ( state != EBUSY )
|
|
unlock();
|
|
return( state == EBUSY );
|
|
}
|
|
|
|
Condition::Condition( Mutex &mutex ) : mMutex( mutex )
|
|
{
|
|
if ( pthread_cond_init( &mCondition, NULL ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to create pthread condition: %s", strerror(errno) ) );
|
|
}
|
|
|
|
Condition::~Condition()
|
|
{
|
|
if ( pthread_cond_destroy( &mCondition ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to destroy pthread condition: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Condition::wait()
|
|
{
|
|
// Locking done outside of this function
|
|
if ( pthread_cond_wait( &mCondition, mMutex.getMutex() ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to wait pthread condition: %s", strerror(errno) ) );
|
|
}
|
|
|
|
bool Condition::wait( int secs )
|
|
{
|
|
// Locking done outside of this function
|
|
Debug( 8, "Waiting for %d seconds", secs );
|
|
struct timespec timeout = getTimeout( secs );
|
|
if ( pthread_cond_timedwait( &mCondition, mMutex.getMutex(), &timeout ) < 0 && errno != ETIMEDOUT )
|
|
throw ThreadException( stringtf( "Unable to timedwait pthread condition: %s", strerror(errno) ) );
|
|
return( errno != ETIMEDOUT );
|
|
}
|
|
|
|
bool Condition::wait( double secs )
|
|
{
|
|
// Locking done outside of this function
|
|
struct timespec timeout = getTimeout( secs );
|
|
if ( pthread_cond_timedwait( &mCondition, mMutex.getMutex(), &timeout ) < 0 && errno != ETIMEDOUT )
|
|
throw ThreadException( stringtf( "Unable to timedwait pthread condition: %s", strerror(errno) ) );
|
|
return( errno != ETIMEDOUT );
|
|
}
|
|
|
|
void Condition::signal()
|
|
{
|
|
if ( pthread_cond_signal( &mCondition ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to signal pthread condition: %s", strerror(errno) ) );
|
|
}
|
|
|
|
void Condition::broadcast()
|
|
{
|
|
if ( pthread_cond_broadcast( &mCondition ) < 0 )
|
|
throw ThreadException( stringtf( "Unable to broadcast pthread condition: %s", strerror(errno) ) );
|
|
}
|
|
|
|
template <class T> const T ThreadData<T>::getValue() const
|
|
{
|
|
mMutex.lock();
|
|
const T valueCopy = mValue;
|
|
mMutex.unlock();
|
|
return( valueCopy );
|
|
}
|
|
|
|
template <class T> T ThreadData<T>::setValue( const T value )
|
|
{
|
|
mMutex.lock();
|
|
const T valueCopy = mValue = value;
|
|
mMutex.unlock();
|
|
return( valueCopy );
|
|
}
|
|
|
|
template <class T> const T ThreadData<T>::getUpdatedValue() const
|
|
{
|
|
Debug( 8, "Waiting for value update, %p", this );
|
|
mMutex.lock();
|
|
mChanged = false;
|
|
//do {
|
|
mCondition.wait();
|
|
//} while ( !mChanged );
|
|
const T valueCopy = mValue;
|
|
mMutex.unlock();
|
|
Debug( 9, "Got value update, %p", this );
|
|
return( valueCopy );
|
|
}
|
|
|
|
template <class T> const T ThreadData<T>::getUpdatedValue( double secs ) const
|
|
{
|
|
Debug( 8, "Waiting for value update, %.2f secs, %p", secs, this );
|
|
mMutex.lock();
|
|
mChanged = false;
|
|
//do {
|
|
mCondition.wait( secs );
|
|
//} while ( !mChanged );
|
|
const T valueCopy = mValue;
|
|
mMutex.unlock();
|
|
Debug( 9, "Got value update, %p", this );
|
|
return( valueCopy );
|
|
}
|
|
|
|
template <class T> const T ThreadData<T>::getUpdatedValue( int secs ) const
|
|
{
|
|
Debug( 8, "Waiting for value update, %d secs, %p", secs, this );
|
|
mMutex.lock();
|
|
mChanged = false;
|
|
//do {
|
|
mCondition.wait( secs );
|
|
//} while ( !mChanged );
|
|
const T valueCopy = mValue;
|
|
mMutex.unlock();
|
|
Debug( 9, "Got value update, %p", this );
|
|
return( valueCopy );
|
|
}
|
|
|
|
template <class T> void ThreadData<T>::updateValueSignal( const T value )
|
|
{
|
|
Debug( 8, "Updating value with signal, %p", this );
|
|
mMutex.lock();
|
|
mValue = value;
|
|
mChanged = true;
|
|
mCondition.signal();
|
|
mMutex.unlock();
|
|
Debug( 9, "Updated value, %p", this );
|
|
}
|
|
|
|
template <class T> void ThreadData<T>::updateValueBroadcast( const T value )
|
|
{
|
|
Debug( 8, "Updating value with broadcast, %p", this );
|
|
mMutex.lock();
|
|
mValue = value;
|
|
mChanged = true;
|
|
mCondition.broadcast();
|
|
mMutex.unlock();
|
|
Debug( 9, "Updated value, %p", this );
|
|
}
|
|
|
|
Thread::Thread() :
|
|
mThreadCondition( mThreadMutex ),
|
|
mPid( -1 ),
|
|
mStarted( false ),
|
|
mRunning( false )
|
|
{
|
|
Debug( 1, "Creating thread" );
|
|
}
|
|
|
|
Thread::~Thread()
|
|
{
|
|
Debug( 1, "Destroying thread %d", mPid );
|
|
if ( mStarted )
|
|
join();
|
|
}
|
|
|
|
void *Thread::mThreadFunc( void *arg )
|
|
{
|
|
Debug( 2, "Invoking thread" );
|
|
|
|
Thread *thisPtr = (Thread *)arg;
|
|
thisPtr->status = 0;
|
|
try
|
|
{
|
|
thisPtr->mThreadMutex.lock();
|
|
thisPtr->mPid = thisPtr->id();
|
|
thisPtr->mThreadCondition.signal();
|
|
thisPtr->mThreadMutex.unlock();
|
|
thisPtr->mRunning = true;
|
|
thisPtr->status = thisPtr->run();
|
|
thisPtr->mRunning = false;
|
|
Debug( 2, "Exiting thread, status %p", (void *)&(thisPtr->status) );
|
|
return (void *)&(thisPtr->status);
|
|
}
|
|
catch ( const ThreadException &e )
|
|
{
|
|
Error( "%s", e.getMessage().c_str() );
|
|
thisPtr->mRunning = false;
|
|
Debug( 2, "Exiting thread after exception, status %p", (void *)-1 );
|
|
return (void *)-1;
|
|
}
|
|
}
|
|
|
|
void Thread::start()
|
|
{
|
|
Debug( 1, "Starting thread" );
|
|
if ( isThread() )
|
|
throw ThreadException( "Can't self start thread" );
|
|
mThreadMutex.lock();
|
|
if ( !mStarted )
|
|
{
|
|
pthread_attr_t threadAttrs;
|
|
pthread_attr_init( &threadAttrs );
|
|
pthread_attr_setscope( &threadAttrs, PTHREAD_SCOPE_SYSTEM );
|
|
|
|
mStarted = true;
|
|
if ( pthread_create( &mThread, &threadAttrs, mThreadFunc, this ) < 0 )
|
|
throw ThreadException( stringtf( "Can't create thread: %s", strerror(errno) ) );
|
|
pthread_attr_destroy( &threadAttrs );
|
|
}
|
|
else
|
|
{
|
|
Error( "Attempt to start already running thread %d", mPid );
|
|
}
|
|
mThreadCondition.wait();
|
|
mThreadMutex.unlock();
|
|
Debug( 1, "Started thread %d", mPid );
|
|
}
|
|
|
|
void Thread::join()
|
|
{
|
|
Debug( 1, "Joining thread %d", mPid );
|
|
if ( isThread() )
|
|
throw ThreadException( "Can't self join thread" );
|
|
mThreadMutex.lock();
|
|
if ( mPid >= 0 )
|
|
{
|
|
if ( mStarted )
|
|
{
|
|
void *threadStatus = 0;
|
|
if ( pthread_join( mThread, &threadStatus ) < 0 )
|
|
throw ThreadException( stringtf( "Can't join sender thread: %s", strerror(errno) ) );
|
|
mStarted = false;
|
|
Debug( 1, "Thread %d exited, status %p", mPid, threadStatus );
|
|
}
|
|
else
|
|
{
|
|
Warning( "Attempt to join already finished thread %d", mPid );
|
|
}
|
|
}
|
|
else
|
|
{
|
|
Warning( "Attempt to join non-started thread %d", mPid );
|
|
}
|
|
mThreadMutex.unlock();
|
|
Debug( 1, "Joined thread %d", mPid );
|
|
}
|
|
|
|
void Thread::kill( int signal )
|
|
{
|
|
pthread_kill( mThread, signal );
|
|
}
|
|
|
|
// Some explicit template instantiations
|
|
#include "zm_threaddata.cpp"
|