Fixed various issues with RTSP streaming.

git-svn-id: http://svn.zoneminder.com/svn/zm/trunk@2656 e3e1d417-86f3-4887-817a-d78f3d33393f
This commit is contained in:
stan 2008-10-09 08:29:03 +00:00
parent 779737fcde
commit 0afd77907e
2 changed files with 144 additions and 81 deletions

View File

@ -116,10 +116,13 @@ int RemoteCameraRtsp::Disconnect()
int RemoteCameraRtsp::PrimeCapture()
{
Debug( 2, "Waiting for sources" );
while( !rtspThread->hasSources() )
for ( int i = 0; i < 50 && !rtspThread->hasSources(); i++ )
{
usleep( 10000 );
usleep( 100000 );
}
if ( !rtspThread->hasSources() )
Fatal( "No RTSP sources" );
Debug( 2, "Got sources" );
formatContext = rtspThread->getFormatContext();
@ -163,6 +166,8 @@ int RemoteCameraRtsp::PreCapture()
}
int RemoteCameraRtsp::PostCapture( Image &image )
{
while ( true )
{
buffer.Empty();
if ( rtspThread->getFrame( buffer ) )
@ -193,8 +198,9 @@ int RemoteCameraRtsp::PostCapture( Image &image )
//}
}
if ( buffer.Size() )
{
if ( !buffer.Size() )
return( -1 );
int initialFrameCount = frameCount;
while ( buffer.Size() > 0 )
{
@ -234,6 +240,8 @@ int RemoteCameraRtsp::PostCapture( Image &image )
image.Assign( width, height, colours, tmp_picture->data[0] );
frameCount++;
return( 0 );
}
else
{
@ -242,11 +250,7 @@ int RemoteCameraRtsp::PostCapture( Image &image )
buffer -= len;
}
}
else
{
}
return( -1 );
}
}
return( 0 );
}

View File

@ -65,7 +65,7 @@ bool RtspThread::recvResponse( std::string &response )
Debug( 4, "Received RTSP response: %s (%d bytes)", response.c_str(), response.size() );
float respVer = 0;
int respCode = -1;
char respText[256];
char respText[BUFSIZ];
if ( sscanf( response.c_str(), "RTSP/%f %3d %[^\r\n]\r\n", &respVer, &respCode, respText ) != 3 )
{
Error( "Response parse failure in '%s'", response.c_str() );
@ -180,11 +180,25 @@ int RtspThread::run()
if ( !mRtspSocket.connect( mHost.c_str(), strtol( mPort.c_str(), NULL, 10 ) ) )
Fatal( "Unable to connect RTSP socket" );
//Select select( 0.25 );
//select.addReader( &mRtspSocket );
//while ( select.wait() )
//{
//mRtspSocket.recv( response );
//Debug( 4, "Drained %d bytes from RTSP socket", response.size() );
//}
if ( mMethod == RTP_RTSP_HTTP )
{
if ( !mRtspSocket2.connect( mHost.c_str(), strtol( mPort.c_str(), NULL, 10 ) ) )
Fatal( "Unable to connect auxiliary RTSP/HTTP socket" );
//Select select( 0.25 );
//select.addReader( &mRtspSocket2 );
//while ( select.wait() )
//{
//mRtspSocket2.recv( response );
//Debug( 4, "Drained %d bytes from HTTP socket", response.size() );
//}
message = "GET "+mPath+" HTTP/1.0\r\n";
message += "x-sessioncookie: "+mHttpSession+"\r\n";
@ -241,9 +255,11 @@ int RtspThread::run()
//recvResponse( response );
message = "DESCRIBE "+mUrl+" RTSP/1.0\r\n";
sendCommand( message );
if ( !sendCommand( message ) )
return( -1 );
sleep( 1 );
recvResponse( response );
if ( !recvResponse( response ) )
return( -1 );
RTSPState *rtsp_st = new RTSPState;
rtsp_st->nb_rtsp_streams = 0;
@ -297,8 +313,10 @@ int RtspThread::run()
}
}
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
StringVector lines = split( response, "\r\n" );
char *session = 0;
@ -376,8 +394,10 @@ int RtspThread::run()
Debug( 2, "RTSP Remote Channels are %d/%d", remoteChannels[0], remoteChannels[1] );
message = "PLAY "+trackUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
lines = split( response, "\r\n" );
char *rtpInfo = 0;
@ -429,12 +449,16 @@ int RtspThread::run()
}
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
rtpDataThread.stop();
rtpCtrlThread.stop();
@ -458,10 +482,10 @@ int RtspThread::run()
RtpDataThread rtpDataThread( *this, *source );
RtpCtrlThread rtpCtrlThread( *this, *source );
Select select( config.http_timeout );
Select select( double(config.http_timeout)/1000.0 );
select.addReader( &mRtspSocket );
unsigned char buffer[10*BUFSIZ];
Buffer buffer( 10*BUFSIZ );
time_t lastKeepalive = time(NULL);
std::string keepaliveMessage = "OPTIONS * RTSP/1.0\r\n";
std::string keepaliveResponse = "RTSP/1.0 200 OK\r\n";
@ -474,69 +498,100 @@ int RtspThread::run()
break;
}
ssize_t nBytes = mRtspSocket.recv( buffer, sizeof(buffer) );
Debug( 4, "Read %d bytes on sd %d", nBytes, mRtspSocket.getReadDesc() );
static char tempBuffer[10*BUFSIZ];
ssize_t nBytes = mRtspSocket.recv( tempBuffer, sizeof(tempBuffer) );
buffer.Append( tempBuffer, nBytes );
Debug( 4, "Read %d bytes on sd %d, %d total", nBytes, mRtspSocket.getReadDesc(), buffer.Size() );
unsigned char *bufferPtr = buffer;
while( nBytes > 0 )
while( buffer.Size() > 0 )
{
if ( bufferPtr[0] == '$' )
if ( buffer[0] == '$' )
{
unsigned char channel = bufferPtr[1];
unsigned short len = ntohs( *((unsigned short *)(bufferPtr+2)) );
unsigned char channel = buffer[1];
unsigned short len = ntohs( *((unsigned short *)(buffer+2)) );
Debug( 4, "Got %d bytes left, expecting %d byte packet on channel %d", nBytes, len, channel );
while ( nBytes < (len+4) )
Debug( 4, "Got %d bytes left, expecting %d byte packet on channel %d", buffer.Size(), len, channel );
if ( buffer.Size() < (len+4) )
{
Debug( 4, "Missing %d bytes, rereading", (len+4)-nBytes );
ssize_t oldNbytes = nBytes;
nBytes += mRtspSocket.recv( bufferPtr+nBytes, sizeof(buffer)-((bufferPtr-buffer)+nBytes) );
Debug( 4, "Read additional %d bytes on sd %d, new total is %d, len is %d", nBytes-oldNbytes, mRtspSocket.getReadDesc(), nBytes, len );
break;
}
if ( channel == remoteChannels[0] )
{
Debug( 4, "Got %d bytes on data channel %d, packet length is %d", nBytes, channel, len );
Hexdump( 4, bufferPtr, 16 );
rtpDataThread.recvPacket( bufferPtr+4, len );
Debug( 4, "Got %d bytes on data channel %d, packet length is %d", buffer.Size(), channel, len );
Hexdump( 4, (char *)buffer, 16 );
rtpDataThread.recvPacket( buffer+4, len );
Debug( 4, "Received" );
}
else if ( channel == remoteChannels[1] )
{
len = ntohs( *((unsigned short *)(bufferPtr+2)) );
len = ntohs( *((unsigned short *)(buffer+2)) );
Debug( 4, "Got %d bytes on control channel %d", nBytes, channel );
rtpCtrlThread.recvPackets( bufferPtr+4, len );
rtpCtrlThread.recvPackets( buffer+4, len );
}
else
{
Error( "Unexpected channel selector %d in RTSP interleaved data", bufferPtr[1] );
Error( "Unexpected channel selector %d in RTSP interleaved data", buffer[1] );
buffer.Empty();
break;
}
bufferPtr += len+4;
buffer.Consume( len+4 );
nBytes -= len+4;
}
else
{
if ( keepaliveResponse.compare( 0, keepaliveResponse.size(), (char *)bufferPtr, keepaliveResponse.size() ) != 0 )
if ( keepaliveResponse.compare( 0, keepaliveResponse.size(), (char *)buffer, keepaliveResponse.size() ) == 0 )
{
Warning( "Unexpected format RTSP interleaved data" );
Hexdump( -1, bufferPtr, 32 );
Debug( 4, "Got keepalive response '%s'", (char *)buffer );
//buffer.Consume( keepaliveResponse.size() );
if ( char *charPtr = (char *)memchr( (char *)buffer, '$', buffer.Size() ) )
{
int discardBytes = charPtr-(char *)buffer;
buffer -= discardBytes;
}
else
{
buffer.Empty();
}
}
else
{
if ( char *charPtr = (char *)memchr( (char *)buffer, '$', buffer.Size() ) )
{
int discardBytes = charPtr-(char *)buffer;
Warning( "Unexpected format RTSP interleaved data, resyncing by %d bytes", discardBytes );
Hexdump( -1, (char *)buffer, discardBytes );
buffer -= discardBytes;
}
else
{
Warning( "Unexpected format RTSP interleaved data, dumping %d bytes", buffer.Size() );
Hexdump( -1, (char *)buffer, 32 );
buffer.Empty();
}
}
break;
}
}
if ( (timeout > 0) && ((time(NULL)-lastKeepalive) > (timeout-5)) )
{
sendCommand( keepaliveMessage );
if ( !sendCommand( message ) )
return( -1 );
lastKeepalive = time(NULL);
}
buffer.Tidy( 1 );
}
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
delete mSources[ssrc];
mSources.clear();
@ -559,12 +614,16 @@ int RtspThread::run()
}
message = "PAUSE "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
message = "TEARDOWN "+mUrl+" RTSP/1.0\r\nSession: "+session+"\r\n";
sendCommand( message );
recvResponse( response );
if ( !sendCommand( message ) )
return( -1 );
if ( !recvResponse( response ) )
return( -1 );
rtpDataThread.stop();
rtpCtrlThread.stop();