在开发网络应用程序的时候,处理业务和通讯流程之间经常会出现矛盾。这种矛盾主要是由于两者之间的不同步造成的。比如,网络的延迟较大,而实际业务处理的速度则相对比较快,那么如果处理完某一事务然后等待发送成功再处理下一个事务则会大大降低效率。当然,如果必须等待则另当别论。但是,如果处理不需要依赖发送的成功,比如流媒体或者某些网络游戏等等,则要考虑一种更好的方法来解决这个问题了。
解决方法之一是建立一个发送消息的队列,然后处理程序将要发送的数据放入队列,然后通讯程序从队列的另一端取出数据发送。这个也是一个标准的生产者消费者模型:业务处理程序是生产者,通讯程序是消费者,两者在不同的线程中运行,而产品则是待发送的数据。这样业务处理程序可以不间断地处理自己的逻辑,只需要把每次处理的结果放入这个发送队列即可。
既然是一个多线程的应用,难免涉及到线程同步的问题。而这个消费者生产者模型在任何操作系统教材中都可以找到,下面是我在VC中实现的这个消息队列。
typedef deque<MyMsg> MsgQueue;
MsgQueue msgQueue; //消息队列
SOCKET connectionSocket;
HANDLE hSThread;
DWORD dwSThreadId;
HANDLE sSemaphore;
HANDLE sMutex;
//socket建立等代码略去
//创建线程
hSThread = CreateThread(
NULL,
0,
sendProc,
0,
0,
& dwSThreadId);
//互斥对象
sMutex=CreateMutex(NULL, FALSE, NULL);
//信号量
sSemaphore=CreateSemaphore(NULL, 0, 128, NULL);
//发送消息的线程
DWORD WINAPI sendProc(PVOID lpParam)
{
while(true){
//等待队列非空
WaitForSingleObject(sSemaphore, INFINITE);
//等待队列无操作
WaitForSingleObject(sMutex, INFINITE);
MyMsg msg;
if(msgQueue.empty()==false){
MyMsg *pmsg=&(msgQueue.front());
memcpy(&msg, pmsg, sizeof(msg));
}
ReleaseMutex(sMutex);
if(send(connectionSocket, (char *)(&msg), sizeof(msg), 0) ==SOCKET_ERROR){
int err = WSAGetLastError();
//处理错误
}
else{
//成功发送,将消息出列
WaitForSingleObject(sMutex, INFINITE);
msgQueue.pop_front();
ReleaseMutex(sMutex);
}
}
}
//发送数据函数,即将一个消息放入队列尾部
void sendMsg(MyMsg msg)
{
//等待队列无操作
WaitForSingleObject(sMutex, INFINITE);
//入队
msgQueue.push_back(msg);
//释放互斥对象
ReleaseMutex(sMutex);
//通知有新的消息如对,信号量增加1
ReleaseSemaphore(sSemaphore, 1, NULL);
}
这样,我们就完成了一个发送队列的基本模型,显然,这个模型提高了数据处理效率。上面的错误处理处根据情况不通可以有多种选择,比如丢弃该消息或者再次发送等等。在这个模型的基础上,我们可以为每个消息数据添加一些附加信息,比如优先级等等,以适应更复杂的应用需求。总之,发送队列为我们处理网络数据提供了一种思路。