2.5 HeartLessPRoxy Run方法的实现
HeartLessProxy::run()
{
myWorkerThread-run();
mySipThread-run();
}
通过上面可以看到有两个Run方法的调用,第一个是WorkThread的Run方法,它的主要作用是处理UaBuilder的Process方法,主要用来处理Sptr myFifo中的各种事件,前面已经具体的介绍了SipProxyEvent类的作用,这个类已经在前面介绍了,其实简单的说,它就是一个本地的各种事件的集合。
现在我们来看一下两个Run方法的实现:
2.5.1 WorkerThread的Run方法:
UaBuilder::process( const Sptr nextEvent )
{
//处理以下的四种事件
/// SipEvent
Sptr sipEvent;
sipEvent.dynamicCast( nextEvent );
if ( sipEvent != 0 )
{
//处理本地的SIP的事件,包括对状态机的设置和命令/状态队列返回的操作在下面将//对它做具体的介绍
if( processSipEvent( sipEvent ) )
{
return;
}
//向消息队列myCallContainer中插入相应的事件信息。
sendEvent( nextEvent );
return;
}
/// UaDeviceEvent
Sptr uaDeviceEvent;
uaDeviceEvent.dynamicCast( nextEvent );
if ( uaDeviceEvent != 0 )
{
//处理本地的设备事件,最主要的就是处理摘机信号;
if( processUaDeviceEvent( uaDeviceEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}
/// UaDigitEvent
Sptr uaDigitEvent;
uaDigitEvent.dynamicCast( nextEvent );
if ( uaDigitEvent != 0 )
{
//处理在规定的时间间隔(Kickstart)主动呼叫事件的触发。
if( processUaDigitEvent( uaDigitEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}
/// UaTimerEvent
Sptr uaTimerEvent;
uaTimerEvent.dynamicCast( nextEvent );
if ( uaTimerEvent != 0 )
{
//在各种SIP命令的回应产生了超时事件后,系统的事件触发。例如:
//在StateTrying()中addEntryOperator( new OpStartTimer )在myEntryOperators队列中加入
//该Operator(指一个操作,例如呼叫或者是进入等待),这里我们这个Operator在时间到达
//以后户会被OpTimeout::process的方法检测到(isTimeout(event)进行检测,对StateTrying
//整个状态进行检测,也就是Trying事件),最后假如UaTimerEvent事件被触发,那么,//就会调用:stateMachine-findState( "StateError" )这个状态,进入错误状态,实施错误的
//处理机制,同时向myEntryOperators队列中加入一个新的Operator--OpStartErrorTone,
//从而被processUaTimerEvent过程扑捉到,最后通过SendEvent发送到执行队列里去。
if( processUaTimerEvent( uaTimerEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}
assert( 0 );
}
2.5.1.1 processSipEvent
顾名思义,processSipEvent方法是对队列中的SIP消息进行处理,我们来看下面的程序:
bool UaBuilder::processSipEvent( const Sptr sipEvent )
{
Sptr statusMsg;
statusMsg.dynamicCast( sipEvent-getSipMsg() );
// 检验是否为返回的状态码(主要是对Notify,Subscribe,Register三种状态进行单独处理)
//下面做具体介绍
if ( statusMsg != 0 )
{
if( handleStatusMsg( sipEvent ) )
{
return true;
}
}
//在这里表示接收到一个SIP的消息,
//检验是否为一个SIP的消息而不是一个状态(例如是否为Invite命令)
/// Let's check the call info, now
callId = sipEvent-getSipCallLeg()-getCallId();
callInfo = calls-findCall( callId );
if ( callInfo == 0 )
{
//下面分成两种状况进行讨论,一种是接受到Invite的消息,一种是接收到一个普通的
//命令,例如
Sptr inviteMsg;
inviteMsg.dynamicCast( sipEvent-getSipMsg() );
if ( inviteMsg == 0 )
{
//假如大家在这里有什么希奇的话没有必要,为什么除了inviteMsg以外的所有的消
//息都不处理呢?其实这些消息都在SipThread这个程序中处理了,在Ua这个大状态
//机中所有的状态都是以Invite这个消息作为启动的。每一个INVITE启动一个系列的//消息和状态。
return true;
}
else
{
//收到一个Invite消息,这个时候我们就要进入相应的处理机制中了;
callInfo = calls-newCall
( sipEvent-getSipCallLeg()-getCallId() );
assert( callInfo != 0 );
callInfo-setFeature( stateMachine );
//假如进入的状态是自动呼叫(Auto Call)或者是自动应答(Auto Answer)状态(这
//两种状态的确定要在CFG文件中体现)
if ( UaConfiguration::instance()-getLoadGenOn() )
{
/// Assume this is a new call...
/// Also assume that we are not in use.
callInfo-setState( stateMachine-findState( "StateAutoIdle" ) );
//StateAutoIdle这个状态是一个自动应答和自动呼叫(按照呼叫列表)时候的状态,这里我
//们不做介绍,它本身和手动呼叫是非常相似的。
}
else // LoadGen is off
{
//下面这个程序会进入等待远端SIP事件和本地呼叫事件的状态StateIdle
if( handleCallWaiting( callInfo ) )
{
cpLog( LOG_ERR, "Returned from handleCallWaiting\n" );
return true;
}
}
} // lots of brackets!
}
return false;
} /// UaBuilder::processSipEvent
handleStatusMsg在做什么?
前面我们已经作了简单的介绍,这个函数的主要目的是在处理Rgister,Notify,和Subscribe等几个状态,并且分别调用他们的处理机;
Rgister调用它的处理机:
handleRegistrationResponse他的主要作用是处理返回的各种Rgister状态,例如200,4XX或者是100等状态,另外它还负责在作为Mashal Server的时候转发各种状态时候,重新设定EXPire的值;另外要注重的是在Register中增加了一个新的返回--Trying这个是非常合理的,非凡是大型网络中,对服务器端的性能判定很有效,所以使用协议栈的同志能好好利用这个机制;另外假如发挥的值是401/407状态(未授权),还需要调用authenticateMessage做相应的处理,以返回的(401/407)状态中所带的密钥加密新的Rgister消息,发送给Register服务器重新进行授权判定;有爱好的可以看看BaseAuthentication中的addAuthorization函数。在介绍UaMarshal和Redirect Server的时候会着重讨论这个问题。
注明:Subscribe的处理机在Feature Server章节里面在再具体介绍)。
2.5.1.2 processUaDeviceEvent
前面说了,processUaDeviceEvent主要是用来处理本地的设备事件,最主要就是处理摘机信号,在这里程序的流程我就不具体的列出,不过我们从主要的程序主体部分可以看出:
在uaDeviceEvent-type == DeviceEventHookUp也就是检测了摘机以后,程序会采取某些必要的方式取得CallID(主要是通过CFG文件),最后让程序进入状态机的StateIdle状态,这个状态是接收和发送消息的初始状态,我们可以在后面将会重点介绍这个状态;
2.5.1.3 processUaDigitEvent
也是主要通过判定CFG文件中的LoadGen_On的参数是On或者是Off来决定是否进入StateAutoIdle状态,或者是StateAutoRS状态(自动通过Marshal Server进行中转所有的SIP的消息和状态,在Marshal Server的时候会做具体的介绍)。
2.5.1.4 processUaTimerEvent
这个的流程也实在没有什么好说的,前面也有了一定的介绍,假如大家对这些还有不明白的话,可以看一下SIP协议中Trying过程的走势,主要是对超时处理部分的介绍,就会明白(按照前面所说的UaBuilder::Process中关于SIP命令消息超时的介绍部分)。
2.5.2 SipThread的Run方法:
Void SipThread::thread()
{
… …
while ( true )
{
try
{
//接收所发送的消息,并且预备置入相关的队列中;
Sptr sipRcv( mySipStack-receive(1000) );
if ( sipRcv != 0 )
{
Sptr sipMsg = sipRcv-back();
if ( sipMsg != 0 )
{
//根据本地的地址来检查是否发生了路由环路
if ( discardMessage(sipMsg) )
{
continue;
}
// 在这里的myOutputFifo就是 myCallProcessingQueue(异地输入消息的队
//列),在Workthread构建的时候会把这个队列带入作为处理参量
Sptr nextEvent = new SipEvent(myOutputFifo);
if ( nextEvent != 0 )
{
//以下就是把新收到的消息载入队列当中。
nextEvent-setSipReceive(sipRcv);
nextEvent-setSipStack(mySipStack);
if(myCallLegHistory) nextEvent-setCallLeg();
myOutputFifo-add(nextEvent);
}
}
}
else
{
… …
}
}
catch ( VException& v)
{
… …
}
catch ( ... )
{
… …
}
if ( isShutdown() == true )
{
return;
}
}
}
2.5.2.1 SIP消息的接收/发送缓冲技术
a. 负责接收的主要程序体:
Sptr sipRcv( mySipStack-receive(1000) );这个方法就是利用SipTransceiver的receive方法接收SIP的消息;
Sptr SipTransceiver::receive(int timeOut)
{
Sptr msgQPtr = 0;
//以下是设立超时参数,假如发生超时,那么就让该命令无效;
timeval start, now;
if ( timeOut = 0 )
{
gettimeofday(&start, 0);
}
while (msgQPtr == 0)
{
int timePassed = 0;
if ( timeOut = 0 )
{
gettimeofday(&now, 0);
timePassed = ( now.tv_sec - start.tv_sec ) * 1000
+ ( now.tv_usec - start.tv_usec ) / 1000;
if (timePassed = timeOut)
{
return 0;
}
}
recvdMsgsFifo.block(timeOut);
if ( !recvdMsgsFifo.messageAvailable() )
{
continue;
}
SipMsgContainer *msgPtr = recvdMsgsFifo.getNext();
if ( msgPtr == 0)
{
assert(0);
cpLog(LOG_CRIT, "received NULL");
continue;
}
#if 1
if ( natOn == true)
{
//这里是一个非常有意思的地方,虽然再程序主体中将它设定为False,也就是我们就
//不能采用NAT转换了,不过我还是想介绍一下,它主要是用在假如UA是一个标准
//的网关,或者是路由器设备的情况之下,在这个时候,它主要做各个消息包的转
//译工作,把路由(Via List)改成下一跳的IP地址和端口地址;
SipVia natVia = msgPtr-msg.in-getVia(0);
LocalScopeAllocator lo;
string addr1 = natVia.getHost().getData(lo);
string addr2 = msgPtr-msg.in-getReceivedIPName().getData(lo);
NetworkAddress netaddr1(addr1);
NetworkAddress netaddr2(addr2);
if ( netaddr1.getHostName() != netaddr2.getHostName())
{
natVia.setReceivedhost(msgPtr-msg.in-getReceivedIPName());
natVia.setReceivedport(msgPtr-msg.in-getReceivedIPPort());
//remove the first item from the via list
msgPtr-msg.in-removeVia(0);
//insert natvia in the vector via list
msgPtr-msg.in-setVia(natVia, 0);
}
}
#endif
//---NAT
/* *********************************************************************/
SipMsgQueue *msgQ = 0;
Sptr sipPtr = msgPtr-msg.in;
if(msgPtr-msg.in-getType() == SIP_STATUS)
//这两个是处理返回消息队列的函数,下面将重点介绍
msgQ = sentRequestDB.processRecv(msgPtr);
else
msgQ = sentResponseDB.processRecv(msgPtr);
//更新SNMP命令队列,并向SNMP网管中心发送接收的消息队列;
if(msgQ)
{
msgQPtr = msgQ;
//need to have snmpDetails for this.
if (sipAgent != 0)
{
updateSnmpData(sipPtr, INS);
}
}
else if(msgPtr-msg.in != 0)
{
send(msgPtr);
}
else if(msgPtr-msg.out.length())
{
send(msgPtr);
}
else
… …
}
}
b.描述接收/发送SIP消息队列的主要类:
SipSentRequestDB:: processRecv和SipSentRequestDB::processSend是一对相互的方法,
另外还有SipSentResponseDB:: processRecv和SipSentResponseDB::processSend是用来记忆状态/消息的发送和接受的,在这里和Request的结构基本相同,就不做累述了;前者处理发送的SIP消息队列,后者处理接收的SIP消息队列,为了实现高效率的处理SIP的队列,在程序中大量采用了HASH表的方法,由于这个部分的程序非常的多,我不想一一把他们罗列出来,在这里就做一下简单的一个浏览:
HASH队列的抽象:在这里有三个用于表示HASH表的类:
SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node;
第一个是表的入口,它的组成由:目的地址 NameAddress源地址 From以及CallID三个部分叠加而成;
第二个是表的索引,包括CSeq和Via 路由表
第三个就是具体的消息对了,也就是一个呼叫命令组的列表;详见下图:
我们下面一一个简单的例子来描述一下一个INVITE消息的处理过程:
A. 接收到一个Invite Message/发送一个180状态的情况的情况:
1. 在UDP通道收到一个INVITE消息
2. 创建了一个InvMsg,同时发送到SipSentResponseDB中做备份,我们要检查在这里有没有重复的副本;
3.假如没有重复,那么InvMsg就放入RecvFifo中,预备让应用层进行处理;
4.应用层通过SipTransciever接收到了InvMsg并且做出相应的处理;
5.应用层产生了180回应到SipSentResponseDB中备份,
6.180在SndFifo中排队,并且调用SipTransceiver中的SendReply方法回送消息
B.从对方接收到一个100(Trying)状态的作为向对方发送Invite消息回应的情况:
1. 在UDP通道收到一个INVITE的状态;
2. 创建了一个StatusMsg,同时发送到SipSentResquestDB中做备份,我们要检查在这里有没有重复的副本;
3.假如没有重复,那么StatusMsg就放入RecvFifo中,预备让应用层进行处理;
4.应用层通过SipTransciever接收到了StatusMsg并且做出相应的处理;
5.应用层产生了ACK回应到SipSentResquestDB中备份,
6.180在SndFifo中排队,并且调用SipTransceiver中的SendAsync方法回送ACK消息,
c.在存在一个呼叫重新定向的情况:
*我们下面来看一个更加复杂一点的情况:
1SipSendtRequestDB::processSend方法:
我们可以做一个很简单的举例,大家就对这两个方法有比较深入的了解了,可以以上面的Diagram1来做一个很好的例子比如,Marshal Server开始发送一个Invite的消息,由SipSendtRequestDB::processSend来进行处理,同时并且把这个消息装入SipMsgContainer中,然后消息被插入到SipTransactionList队列中:
topNode-findOrInsert(id)-val-findOrInsert(id)
最后放在SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node形成一个新的节点。
2SipSentRequestDB:: processRecv方法:
例如我们接收了一个回应100 Trying这个回应的处理自然落在下面的这个部分:
int statusCode = response-getStatusLine().getStatusCode();
if((statusCode
((SipTransceiver::myAppContext == APP_CONTEXT_PROXY) &&
(statusCode == 200) &&
(response-getCSeq().getMethod() == INVITE_METHOD) ) )
… …
retVal = new SipMsgQueue;
retVal-push_back(msgContainer-msg.in)
单纯的把消息队列返回上面的应用层;
后续的180(Ringing)也是如此直接返回应用层;
但是到了接受到200(OK),那么处理的方式就大不一样了因为OK以后命令交互阶段已经告一段落,那么我们通过SipTransactionGC::instance()- collect的后台方法处理(Thread线程),根据Delay的时间的变化:如invCleanupDelay等等,删除当前的一些队列中消息所占用的内存(垃圾处理),(具体处理机制可以参看SipTransactionGC::thread()这个后台处理掉一些孤独的消息,例如有Request没有Response的等等,并且根据各个消息所占用的Delay时间来释放他们);
但是假如没有收到200呢?假设我们收到了302(呼叫转移)呢?(例如在上面Diagram 1中所表现的那样)
答案在这里:
else if(response-getStatusLine().getStatusCode() = 200)
{
if(level3Node-val-msgs.response)//这里是检验在消息队列中是否有应答
//产生,也就是Diagram 1中的Second Phase的情况,(第二个Invite消息)
{
SipTransactionList::SipTransListNode *
curr = 0;
if(level3Node-val-myKey == INVITE_METHOD)
{
curr = level2Node-val-level3.getLast();
while(curr)
{
// look for the ACK message
if(curr-val-myKey == ACK_METHOD &&
curr-val-msgs.request)
{
cpLog(DEBUG_NEW_STACK,"duplicate message: %s",
msgContainer-msg.out.logData());
//通过第一个ACK来复制第二个ACK,使用上二者完全相同,
msgContainer-msg.in = 0;
msgContainer-msg.out =
curr-val-msgs.request-msg.out;
msgContainer-msg.type
= curr-val-msgs.request-msg.type;
msgContainer-msg.transport =
curr-val-msgs.request-msg.transport;
msgContainer-msg.netAddr =
curr-val-msgs.request-msg.netAddr;
msgContainer-retransCount = FILTER_RETRANS_COUNT;
break;
}
curr = level2Node-val-level3.getPrev(curr);
}
很明显复制一个ACK消息预备进行下一个新的Invite的发送,当然这个是要在有ACK发送以后才可以进行,假如没有那么我们可以假定ACK正处在Processing状态;
if(!curr)
{
msgContainer-msg.in = 0;
msgContainer-msg.out = "";
msgContainer-retransCount = 0;
}
}
else
… …
在这个else下面所表示的处理机制是在第一个Message发送出去以后回应大于200的情况,也就是在Diagram 1中First Phase的情况,也就是发出第一个302的情况,在下面有一行语句:
msgContainer-msg.out=msgContainer-msg.in-encode()
它的主要目的是用于形成ACK应答,
另外后面介绍Marshal Server的时候向异地发送Invite的时候返回4XX的回应,一般都是4XX等恶名招著的Response不会有其他的,本地一般采取的处理就是向应用层汇报,并且消除Hash队列里的所有驻留的消息。
大家可以根据上面介绍的方法实验一下其他的情况,基本上都是合适的.
目前来说这个处理机制并不使最优的,非凡是在服务器的状态,某些情况事实上并没有
一个具体的处理方法:例如4XX的回应,可能会造成超时等待过长。
2.6 在User Agent中的四个重要实例的Run方法:
HeartLessProxy的两个Run方法都介绍完毕了,现在我们来看下面将要启动的四个Run过程:
2.6.1 媒体设备启动
DeviceThread-run(); //调用SoundcardDevice::hardwareMain(0)
第一个是调用Sound Card的处理进程,它最主要的用处是返回各种按键的处理信息,他的具体的作用可以参看程序,和具体的操作手册,非常的简单易懂,不用具体介绍,不过要注重的一点是,在程序中,启动按键事件的检测是通过RTP/RTCP的事件触发的,(很明显,例如在通话的时候按下z表示挂机,必须是在有RTP/RTCP事件),说简单了,没有设备,键盘事件无法触发。
2.6.2 启动RTP线程,用于对RTP/RTCP包的接收和发送治理;
rtpThread-run //调用SoundCardDevice::processRTP()
参看RtpThread实例化的过程可以看出,实际上就是调用SoundCardDevice的processRTP过程。
SoundCardDevice::processRTP ()
{
… …
if (audioStack == 0)
{
… …
return;
}
bool bNothingDo = true;
RtpSessionState sessionState = audioStack-getsessionState();
if ( sessionState == rtp_session_undefined )
{
deviceMutex.unlock();
… …
return;
}
if( sessionState == rtp_session_recvonly
sessionState == rtp_session_sendrecv )
{
// audioStack就是RtpSession,在这里它是在构建这个声音设备的时候,就创建它了。
//这里表示从一个创建好的RTP会话中接收一帧数据,
inRtpPkt = audioStack-receive();
if( inRtpPkt )
{
//这里的声卡目前只能接受一种压缩方式PCM,所以只能解析这一种最常用的,
if( inRtpPkt-getPayloadType() != rtpPayloadPCMU
//RTP的采样频率是否为要求的频率,例如为20ms
inRtpPkt-getPayloadUsage() != NETWORK_RTP_RATE )
{
cpLog(LOG_ERR,"Received from RTP stack incorrect payload type");
}
//将数据输出到声卡,
writeToSoundCard( (unsigned char*) inRtpPkt-getPayloadLoc(),
inRtpPkt-getPayloadUsage() );
bNothingDo = false;
… …
}
}
// 这里是发送一帧数据;
if( sessionState == rtp_session_sendonly
sessionState == rtp_session_sendrecv )
{
int cc;
if( audioStack-getRtcpTran() )
{ //假如有发送零声的情况,例如零声回送被叫端,这里在OpRing里通过//sendRemoteRingback过程来实现向远端回送零声(sendRingback=True)
if( sendRingback )
{
cc = getRingbackTone( dataBuffer, RESID_RTP_RATE );
#ifdef WIN32
Sleep(15);
#endif
}
else
{//从声卡中读入一帧数据,按照cfg文件中规定的采样标准
cc = readFromSoundCard( dataBuffer, RESID_RTP_RATE );
}
if ((cc 0) && audioStack)
{//将这帧数据(毛数据,未压缩的作成RTP包发送出去);
audioStack-transmitRaw( (char*)dataBuffer, cc );
bNothingDo = false;
}
… …
}
}
… …
deviceMutex.unlock();
return;
}
2.6.3 合法用户列表的获取(Redirection Server专用)
第三个过程是featureThread-run,,这个过程主要是用在向重定向服务器(Redirection Server)和Provisioning Server中的Feature线程,它实质上是调用 subscribe -Manager-subscribeMain,主体程序部分是向Provisioning Server发送Subscribe消息,在这个循环中会反复的发送SubScribe消息到Provision Server中去,稍后我们要介绍的UaBuilder::handleStatusMsg(UaBuilder::processSipEvent中)过程会将会处理从Provision Server 返回的Notify消息,关于Subscribe/Notify消息对的介绍我们可以参看在Vocal中的相关介绍,它的作用范围是在一个普通的UA向Marshal Server进行注册或者是证实的时候,Marshal Server同时向Redirection Server发出Register消息,并且由Redirection Server向Provisioning Server发送Subscribe消息,对用户列表进行检测;我们可以举一个例子来说明这个过程:
我们来看Diagram.7
1 在A阶段当启动Redirection Server(RS)的时候,RS向Provisioning Server(PS)发送SubScribe消息,取得合法的用户列表;
2 在B阶段,UA端向Marshal Server发送Register消息,以确认自己是否在合法用户列表内;
3 在C阶段,RS将通过Subscribe/Notify命令对把该用户的呼叫特性列表(呼叫等待,呼叫转接,语音邮件,呼叫前转,禁止呼叫等信息)得到该用户的呼叫特性;
我们在Redirection Server这一章内将具体介绍Subscribe/Notify命令对。
2.6.4 监测线程:
一个调用的RUN方法loadGenThread-run是一个监测线程,检查各种回应和请求消息,并记录在LOG文件中。
2.6.5 自动呼叫
在loadGenThread-run后面的程序实现了一个自动在预定时间内发送INVITE消息的过程,大家有爱好可以参看OpAutoCall类,当在UserAgent::Run()中通过检测Cfg文件,通过setLoadGenSignalType(LoadGenStartCall)设定了一个公共变量以后,我们可以发现系统将自动进入OpAutoCall操作,并且启动INVITE开始呼叫。
好了,通过上面的介绍后我们需要知道如何让系统进入Idle状态,在这个状态中系统处于一种"等待"的状态,接收本地的命令输入,和远端的消息;这个状态是所有后续状态的一个初始阶段,在上述程序中我们可以在processSipEvent过程中找到handleCallWaiting子程序,就在该过程中让系统进入Idle状态;见下面的程序:
… …
if ( UaConfiguration::instance()-getLoadGenOn() )
{
callInfo-setState
( stateMachine-findState( "StateAutoIdle" ) );
}
else // LoadGen is off
{
if( handleCallWaiting( callInfo ) )
{
return true;
}
… …
(未完待续)