在Livemedia的基础上开发自己的流媒体客户端 V 0.01
桂堂东
xiaoguizi@gmail.com
2004-10
本文档处于最原始阶段,将在近期进行完善,希望大家给出良好意见和建议
如有转载,请注明出处。
B. How to control the receive loop. 17
一、背景
如今流媒体无处不在,而主流流媒体服务器为Realworks、Windows Media Server、Apple Darwin server, 而客户端程序,即包括会话建立、接收以及解码播放,则百花齐放,如何利用一种开源的代码实现自己的流媒体客户端,同时可以支持新的媒体格式呢?这是本文重点所在。
公司接触一个项目,要求能够按照3GPP的标准,实现RTSP/RTP协议以及对RTP包进行解析(独特格式),解码以及播放,因为时间比较紧,因此考虑在一种比较稳定并全面的开放源码标准基础上进行二次开发,主要是对新媒体的支持、bug-fix以及架构的调整。
二、Livemedia框架介绍
详细的帮助文档见www.live.comlivemedia
1.总体框架
Live的网站上有doxgen产生的帮助文档以及各个类之间的相互关系,这里不再螯述,不过这里要提醒的是,live的库代码可以同时供服务器和客户端使用,因此如果只是开发单个程序或者需要把服务器和客户端的程序分割清楚的话,最好先将代码剥离,这里可以参考live的参考例子openRTSP以及TestOnDemandServer
2.客户端框架
2.1 客户端openRTSP流程
这里给出了openRTSP的流程,同时最后给出了接收packet循环中的操作顺序,最后将会叙述奖励客户端需要建立些什么。
Ø “ || “ present that this item is depend on the input execute parameter
1. Socket environment initial.
2. Parse the input parameters.
3. CreateClient. Get class RTSPClient construct (return class medium and some vars)
3.1 For the class Medium
3.1.1 // First generate a name for the new medium: and put into the result buffer
3.1.2 // Then add it to our table: (It’s a hash table store the medium session, should has a MAX store value, in other words, the client should handle limited medium session)
3.2 RTSPClient variables initial and construct RTSP “User-Agent”
4. Send RTSP “options“ and get OPTIONS from server.
4.1 Create Socket connection
4.2 Send OPTIONS string
4.3 Get response from the server (If response code is 200 and it’s supported public method|OPTIONS)
5. Get SDP description by the URL of the server(return value:SDPstring)
5.1 Create Socket connection
5.2 Check if the URL has username and password
5.3 Send OPTIONS string
5.3.1 construct Authentication
5.3.2 construct DESCRIPS string and send
5.4 Get response from the server
5.4.1 If response code is what we can handle?
5.4.2 find the SDP descriptor and do some validate check
6. Create media session from the SDP descriptor above.
6.1 session=mediasession::createNew
6.1.1 for the class Medium(this is different from the medium of class RTSPClient)
(1.1) // First generate a name for the new medium: and put into the result buffer
(1.2) // Then add it to our table: (It’s a hash table store the medium session, should has a MAX store value, in other words, the client should handle limited medium session)
(2) Some variables initial, such as subsession (m= present a new subsession) and CNAME etc
6.1.2 initial the mediasession with the SDP info
(1) Parse SDP string, get the key and related value to the var.
(2) Get the “m=”(If there have) and create subsession
Decide use UDP or RTP; Mediumname; protocol; payload format etc.
6.2 initial of the MediaSubsessionIterator (Using the session and subsession(m))
6.1.1 Check the subsession’s property and set some Var.
6.2.2 for the receivers [receive data but not 'play' the stream(s)]
(1) subsession->initial()
(1.1) Create RTP and RTCP 'Groupsocks' on which to receive incoming data.
(1.2) According the protocol name, create out UDP or ‘RTP’ special source
(1.3)Create RTCPInstance []
(1.3.1) // Arrange to handle incoming reports from others:
(1.3.2)// fRTCPInterface.startNetworkReading(handler);
(1.3.3)// Send our first report. Which compose with RR and SDES(CNAME) to the server
(2)set the big threshold time, for reorder the incoming packet and restore it. Maybe set the receiveBufferSize (if we set it in the input parameter)
6.2.3 for the player (not recoding the stream, instead, 'play' the stream(s))
Just do nothing here, waiting the follow action.
6.3 SetupStreams(RTSP “SETUP”)
Perform additional 'setup' on each subsession, before playing them:
For each subsession, RTSPClient->setupMediaSubsession(*)
6.3.1 // First, construct an authenticator string:
6.3.2 // When sending more than one "SETUP" request, include a "Session:" header in the 2nd and later "SETUP"s.
6.3.3 // Construct a standard "Transport:" header. [see the appendix (1)]
6.3.4 Send request string and get response,
(1) Check the validation(such as response code
(2)// Look for a "Session:" header (to set our session id), and a "Transport: " header (to set the server address/port)
(3) If the subsession receive RTP (and send/receive RTCP) over the RTSP stream, then get the socket connect changed to the right way
7. Create output files: Only for the Receiver (Store the streaming but not play it)
For different file format, use different *FileSink class
This uses the QuickTime file as demo. Output to the ‘::stdout’
7.1 qtout = QuickTimeFileSink::createNew(***)
7.1.1 For construct class medium again, see the front for detail.
7.1.2 Some variables get their initial value
7.1.3 // Set up I/O state for each input subsession:
(1) // Ignore subsessions without a data source:
(2) // If "subsession's" SDP description specified screen dimension or frame rate parameters, then use these. (Note that this must be done before the call to "setQTState()" below.)
(3) Maybe create a hint track if input parameter contains it
(4) // Also set a 'BYE' handler for this subsession's RTCP instance:
(5) // Use the current time as the file's creation and modification time. Use Apple's time format: seconds since January 1, 1904
7.1.4 startPlaying (details in 7.2)
|| 7.2 Common File
7.2.1 filesink = FileSink::createNew(***)
(1) first use MediaSink (use class Medium constructor again, see the front)
(2) some variables got initial values.
7.2.2 filesink->startPlaying(actually using the parent function mediasink->st.)
(1) Check, such as // Make sure we're not already being played; our source is compatible:
(2) ContinuePlaying()
(2.1) FramedSource::getNextFrame (source type was appointed in the startplaying…as FrameSource)
check and valued some callback function: // Make sure we're not already being read:
“Different media source”->doGetNextFrame() //such as Mp3FromADUSource virtual func.
In this function // Before returning a frame, we must enqueue at least one ADU:
OR // Return a frame now:
8 startPlayingStreams
// Finally, start playing each subsession, to start the data flow:
8.1 rtspClient->playMediaSession(*)
8.1.1 check validation
// First, make sure that we have a RTSP session in progress
8.1.2 Send the PLAY command:
(1) // First, construct an authenticator string:
(2) // And then a "Range:" string:
(3) Construct “PLAY” string
(4) Send to server
(5) Get response. And check response code / Cseq /…
8.2 // Figure out how long to delay (if at all) before shutting down, or repeating the playing
|| 8.3 checkForPacketArrival //see if there any packet coming in the subsessions.
|| 8.4 checkInterPacketGaps // Check each subsession, counting up how many packets have been received:
9 env->taskScheduler().doEventLoop()
Main loop for get the data from the server and parse and store or play directly.
9.1 BasicTaskScheduler0::doEventLoop,
will loop use SingleStep
9.2 BasicTaskScheduler::SingleStep
See if there any readable socket in the fReadSet(store the socket descriptor of the subsession) and if have will handle it
(1) fDelayQueue.handleAlarm();
(2) (*handler->handlerProc)(handler->clientData, SOCKET_READABLE); loop handle the subsession task.
[this is MultiFramedRTPSource:: networkReadHandler]
(3) MultiFramedRTPSource:: networkReadHandler
// Get a free BufferedPacket descriptor to hold the new network packet:
BufferedPacket* bPacket
= source->fReorderingBuffer->getFreePacket(source);
// Read the network packet, and perform sanity checks on the RTP header:
if (!bPacket->fillInData(source->fRTPInterface)) //The coming packet not belongs cur session
// Handle the RTP header part
// The rest of the packet is the usable data. Record and save it(To the recordingBuffer)
Boolean usableInJitterCalculation //RTCP jitter calculate
= source->packetIsUsableInJitterCalculation((bPacket->data()),bPacket->dataSize());
source->receptionStatsDB() // Note that we have reve a rtp packet
.noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,
source->timestampFrequency(),
usableInJitterCalculation, presentationTime,
hasBeenSyncedUsingRTCP, bPacket->dataSize());
// Fill in the rest of the packet descriptor, and store it:
bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,
hasBeenSyncedUsingRTCP, rtpMarkerBit,
timeNow);
//Store the packet.
source->fReorderingBuffer->storePacket(bPacket);
Then
source->doGetNextFrame1();// If we didn't get proper data this time, we'll get another chance
9.3 MultiFramedRTPSource::doGetNextFrame1()
To MultiFramedRTPSource or some other inherit class
(1)// If we already have packet data available, then deliver it now.
BufferedPacket* nextPacket
= fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
(2)// Before using the packet, check whether it has a special header
// that needs to be processed:
if (!processSpecialHeader(nextPacket, specialHeaderSize))
This is what the particular inherit class will do, for different packet format…
(3)Handle the packet data, for different RTP packet, it has different construct, so ***
(4) // The packet is usable. Deliver all or part of it to our caller:
nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
fCurPacketMarkerBit);
---------unsigned frameSize = nextEnclosedFrameSize(newFramePtr, fTail - fHead);
(5) If we have all the data that the client wants then :
// Call our own 'after getting' function. Because we're preceded
// by a network read, we can call this directly, without risking
// infinite recursion.
afterGetting(this);
------------ void FramedSource::afterGetting(FramedSource* source)
--------- void FileSink::afterGettingFrame(
void FileSink::afterGettingFrame1
a. addData(fBuffer, frameSize, presentationTime)
b. continuePlaying();// Then try getting the next frame:
《==
9.4 Boolean FileSink::continuePlaying()
fSource->getNextFrame---------FramedSource->getNextFrame-------MultiFramedRTPSource->
9.5 void MultiFramedRTPSource::doGetNextFrame()
(1) TaskScheduler::BackgroundHandlerProc* handler
= (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;
fRTPInterface.startNetworkReading(handler);
doGetNextFrame1(); [Back to the section of 9.3]
Note:
(1) For RealNetworks streams, use a special "Transport:" header, and also add a 'challenge response'.
(2) The detailed relationship of them doesn’t list because it is some complex and we should need more time.
(3) When we arrive the endTime that got from the SDP line or the server translate teardown info, then the client will stop
In the start function “startPlayingStream” it add the “ssessionTimerHandler” into the schedule.
从上面的流水帐我们可以看出利用live的代码创建一个传统的流媒体客户端的接收部分我们需要建立以下流程。
2.2增加一种新的媒体
一般基于多幀得数字媒体可以通过继承MultiFramedRTPSource实现自己得媒体类,同时需要继承PacketBuffer实现自己得包buffer管理,这里可以根据新媒体得RTP payload format 得格式进行操作,我们实现得新媒体类型,在下面会有详细描述。
2.2.1增加媒体的format
增加新媒体也是基于Frame格式的,这里每一幀称呼为MAU(Media Access Unit),而MAU在RTP packet中的组织不径相同。
As shown in Figure , the RTP Payload Format header is divided into three sections. Each section starts with a one-byte bit field, and is followed by one or more optional fields. In some cases, up to two entire sections may be omitted from the RTP Payload Format header. This can result in an RTP Payload Format header as small as one byte.
All RTP Payload Format fields should be transmitted in network byte order, which means that the most significant byte of each field is transmitted first.
The RTP Payload Format header is followed by a payload. The payload can consist of a complete MAU or a MAU fragment. The payload can contain a partial MAU, allowing large MAUs to be fragmented across multiple payloads in multiple RTP packets.
The first payload can be followed by additional pairs of RTP Payload Format headers and payloads, as permitted by the size of the RTP packet.
每一个包中MAU的组合形式有以下几种:
2.2.2 新媒体需要考虑的问题
A. 从上可以看出,新的媒体的每个RTP packet当中,可能含有一个或多个MAU亦或者MAU的fragment,而在parse每个RTP packet之后需要将每个完整MAU的信息(数据,大小,以及PT: Presentation Time, DTS等)传给Decoder,但是Live得代码支持得多媒体格式中基本集中为单幀一个包或者说一包多幀然而所有得附加信息都集中在packet的首部,即标准RTP头的后面 :-)。因此在收取RTP packet后首先handle标准的RTP header之后(MultiFramedRTPSource::networkReadHandler(×××)),将包丢入reorderdingBuffer,下一次取包处理特殊头的时候需要特殊处理,将单个包中所有的MAU或者MAU fragment的头信息以及大小等取出,在MultiFramedRTPSource::doGetNextFrame1()中综合处理
void MultiFramedRTPSource::doGetNextFrame1()
{
while (fNeedDelivery) //Sure, see the front
{
// If we already have packet data available, then deliver it now.
Boolean packetLossPrecededThis;
BufferedPacket* nextPacket //Get the header packet, maybe the one which we just handled
= fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
if(nextPacket == NULL)
break;
fNeedDelivery = False;
if (nextPacket->useCount() == 0)
{
// Before using the packet, check whether it has a special header
// that needs to be processed:
unsigned specialHeaderSize;
if (!processSpecialHeader(nextPacket, specialHeaderSize)) {
// Something's wrong with the header; reject the packet:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True;
break;
}
nextPacket->skip(specialHeaderSize);
}
// Check whether we're part of a multi-packet frame, and whether
// there was packet loss that would render this packet unusable:
if (fCurrentPacketBeginsFrame) //In the processSpecialHeader(), it will change...
{
unsigned PT_tem =0; //Alexis
FramePresentationTime(PT_tem);
nextPacket->setPresentTime(PT_tem); //Alexis 04-11-10
if (packetLossPrecededThis || fPacketLossInFragmentedFrame) //Packet loss and the former frame has unhandled fragment.
{
// We didn't get all of the previous frame.
// Forget any data that we used from it:
fTo = fSavedTo;
fMaxSize = fSavedMaxSize;
fFrameSize = 0;
}
fPacketLossInFragmentedFrame = False; //begin frame, so ...
}
else if (packetLossPrecededThis)
{
// We're in a multi-packet frame, with preceding packet loss
fPacketLossInFragmentedFrame = True;
}
if (fPacketLossInFragmentedFrame)
{
//---Alexis 10-28
unsigned MauFragLength;
doLossFrontPacket(MauFragLength);
// get the length from now MAU fragment to the next MAU start
if(MauFragLength != 0)
{
nextPacket->skip(MauFragLength);
fNeedDelivery = True;
break;
}
else //The original part...
{
//Normal case:This packet is unusable; reject it:
fReorderingBuffer->releaseUsedPacket(nextPacket);
fNeedDelivery = True;
break;
}
}
// The packet is usable. Deliver all or part of it to our caller:
unsigned frameSize;
nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
fCurPacketMarkerBit);
fFrameSize += frameSize;
if (!nextPacket->hasUsableData()) {
// We're completely done with this packet now
fReorderingBuffer->releaseUsedPacket(nextPacket);
}
if (fCurrentPacketCompletesFrame || fNumTruncatedBytes > 0)
{
// We have all the data that the client wants.
if (fNumTruncatedBytes > 0) {
envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
<< fSavedMaxSize << "). "
<< fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
}
// Call our own 'after getting' function. Because we're preceded
// by a network read, we can call this directly, without risking
// infinite recursion.
afterGetting(this);
//It will store the whole Frame to the outbuffer (in here is the file)
}
else
{
// This packet contained fragmented data, and does not complete
// the data that the client wants. Keep getting data:
fTo += frameSize;
fMaxSize -= frameSize;
fNeedDelivery = True;
}
}
}
B.另外,由于每个MAU的开始都会有各自的时间信息,因此,Live的代码中以标准RTP包头中的Timestamp作为时间基准已经不再适应,需要我们自己设置时间,在传输单个MAU的时候, 上面代码中nextPacket->setPresentTime(PT_tem); //Alexis 04-11-10就是这个意思。
C.
三、一些总结
A. Buffer 管理
How to control the burst input packet is a big topic. The leak bucker model may be useful, however, if a long burst of higher-rate packets arrives (in our system), the bucket will overflow and our control function will take actions against packets in that burst.
In our client system, in order to get the library (manager the session and the receiving thread) and the player (used to display picture and put the sound to the sound box, and this place include the decoder), we put a middle layer between the Server and the Player, which is easy for porting.
The following gives a more detailed description.
BTW: the UPC (usage parameter control) and the process of handling the exception, such as packet-loss, are complicated and we will not give a full description here.
1. Receiver Buffer
When the session has been set, we will be ready for receiving the streaming packet. Now, for example, there are two media subsessions which one is Audio subsession and the other one is Video subsession, and we have one buffer for each of them, the following is the details of the receiver buffer manager.
1) In the receive part, we have defined a ‘Packet’ class, which used to store and handle one RTP packet.
2) For each subsession, there is one buffer queue whose number is variable, and according to the Maxim delay time, we determine the number of the buffer queue.
3) Buffer queue is responsible for the packet re-order and something else.
4) In the receiver buffer, we will handle the packet as soon as possible (except one packet is delay by the network, and we will wait for it until arrived the delay threshold), and leave the buffer overflow and underflow manager to the Player.
Figure 1: packet receive flow
Figure 2: packet handle flow (with the decoder)
2. Player (Decoding) Buffer
The player stores media data from the RTSP client into buffers for every stream. The player allocates memory for every stream according to the maximum preroll length. In the initial phase, the player will wait for buffering till every stream has received contents at least Preroll time. So every buffer length will be Prerollmax + C (here C is a constant). When every buffer is ready, the player will start the playback thread and play the contents.
Figure 3: Playback with Stream Buffers
The playback thread counts time stamps for every stream. During playing process, one of the streams may be delayed and then the corresponding buffer will under run. If the video stream is delayed, the audio will play normally but the video stalls. The play back thread will continue to count time stamp for audio stream but the video time stamp will not increase. When the new video data is arrived the play back thread will decide it should skip some video frames till the next key frame or play faster to catch the audio time stamp. Usually the player may choose playing faster if it’s just delayed a short time. On the other hand, if it’s the audio stream that is stalled, the player will buffer again till every buffer stores data more than T time. Here T is a short time related with the audio stream’s preroll time, and it can be smaller or equal to the preroll. This dealing is for reducing discontinuity of audio when network is jitter. To avoid this case, choose a higher T value or choose a better network.
If one of the buffers is overflow, this is treated as an error. For the video stream, the error handler will drop some data till next key frame arrives. And for audio stream, the error handler will simply drop some data.
Figure 4: Process Buffer Overflow or Underflow
B. How to control the receive loop
在live的openRTSP代码的主循环
env->taskScheduler().doEventLoop()
中,函数doEventLoop有一默认的参数,可以通过设置这个参数达到推出循环的目的,不过可以直接调用下面C与D所写的释放资源的方法pause接收或者推出整个线程。
C. PAUSE&SEEK
OpenRTSP例子没有给具体的实现,最新的livemedia版本可以支持SEEK了(包括服务器部分)
//PAUSE:
playerIn.rtspClient->pauseMediaSession(*(playerIn.Session));
playerIn.rtspClient->playMediaSession(*(playerIn.Session), -1);
//will resume
// SEEK
float SessionLength = Session->playEndTime()
//先得到播放时间区域,在SDP解析中
先PAUSE***
再rtspClient->PlayMediaSession(Session, start);
//start less than the "SessionLength "
D. 释放资源问题
OpenRTSP给出的解决方案是shutdown()函数,而在我们将库与播放器连接过程中,发觉有线程始终不能推出,后来参考Mplayer(它的rtsp支持采用的就是live的代码)的释放方案,给出以下代码,目前运行一切正常。
void OutRTSPClient() //rtpState是我们定义的一个数据结构体,保存了一些会话信息
{
if (rtpState->Session == NULL)
return;
if (rtpState->rtspClient != NULL) {
MediaSubsessionIterator iter(*(rtpState->Session));
MediaSubsession* subsession;
while ((subsession = iter.next()) != NULL) {
Medium::close(subsession->sink);
subsession->sink = NULL;
rtpState->rtspClient->teardownMediaSubsession(*subsession);
}
}
UsageEnvironment* env = NULL;
TaskScheduler* scheduler = NULL;
if (rtpState->Session != NULL) {
env = &(rtpState->Session->envir());
scheduler = &(env->taskScheduler());
}
Medium::close(rtpState->Session);
Medium::close(rtpState->rtspClient);
env->reclaim();
delete scheduler;
}