windows下的开发者都知道,异步i/o是创建高性能和可伸缩性的应用程序的秘诀。因为它允许单个线程处理来自不同客户机的请求。该线程不必顺序处理这些请求,也不必在等待i/o请求运行结束时中断运行。比如写网络通讯的程序。使用底层socket函数select可以实现多路复用。服务器端一个线程可以监听一个端口,响应多个客户进程的请求,如果是响应少数客户的请求还应付过来如果是成百上千个用户的请求呢!比如网络游戏的服务器程序。微软提供了i/o complete port解决这样类似的问题。那么他们是怎么做的呢?他们实现了一个线程池来处理那些客户的请求。每当用户对服务器进行请求,服务器从线程池中释放一个线程响应。服务结束线程回到线程池并挂起。等待下一次客户的请求。这样做的好处很多,不必每次提出一个请求就创建一个线程并在请求结束后释放线程。节省了大量的资源并且降低了CPU的context switch所带来的低效。让我们的程序更具有所谓的扩展性即程序的性能会随着服务器的硬件配置的提高而提高。(多么诱人的承诺啊!我都流口水了。)同时微软也提供了许多线程池函数供我们使用。但是微软的一贯作风是开发者只需要用好了。不需要关心内部的实现。这种把开发者当成傻瓜的行为即使是WIN32程序员也无法忍受。正如本文的题目一样,于是我想手工打造一个线程池。上个星期天我完成了这项工作发烧39度的我比起体温正常的我好像更有创造力。
其实在windows下写程序尤其是多线程的程序是一件很幸福的事情。微软提供了强大的调试工具让我们找bug。最为windows下的开发人员怎能辜负人家的好意。
有很多种线程池的实现方法:我的想法是。创建一个循环队列其中的每个节点包含使用AfxBeginThread创建线程返回的CWinThread指针typedef struct THREADNOD
{
CWinThread *pThread;//pointer to thread
bool bIdle; //if thread is idle
THREADNOD *pNext; //pointer to next node
}THREADNOD;
CWinThread是微软封装的一个线程类我们的CWinApp就是派生于此类。其实AfxBeginThread还可以创建GUI线程并且享受与生俱来的消息队列。这是另一个有趣的应用。因为线程池最高效时候是同时有cpu两倍的线程同时在运行,所以我创建了3个线程。我的cpu是1个。创建3个线程也许很让人难以理解。我只是为了照顾我的循环队列,如果队列中只有两个节点(两个线程)就太没意思了。如果我的机器是2个cup我会毫不犹豫的创建4个线程的!当每次有请求的时候我会从线程池中拿出一个线程响应请求。在处理结束后线程被挂起等待下一次请求。呵呵,感觉自己就像是包工头子!。
现在还有最后一个问题就是我如何得知线程什么时候结束并挂起。我又启动了一个线程使用WaitForMultipleObjects()监视线程内核对象的状态当线程结束变成一通知状态。我就将线程挂起.等待下一次运行。
UINT WaitThreadState(LPVOID pParam)
{
CThread *pThis=(CThread*)pParam;
DWORD dwWait=0;
while(g_bFlag)
{
dwWait=WaitForMultipleObjects(3,
pThis->m_hEvent,
false,
INFINITE);
switch(dwWait)
{
case WAIT_OBJECT_0+0://if thread1 finish suspend it
pThis->m_pThread1->SuspendThread();
pThis->Node1->bIdle=true;
pThis->m_pThreadNodeHead=pThis->Node1;
case WAIT_OBJECT_0+1: //if thread2 finish suspend it
pThis->m_pThread2->SuspendThread();
pThis->Node2->bIdle=true;
pThis->m_pThreadNodeHead=pThis->Node2;
case WAIT_OBJECT_0+2://if thread3 finish suspend it
pThis->m_pThread3->SuspendThread();
pThis->Node3->bIdle=true;
pThis->m_pThreadNodeHead=pThis->Node3;
}
}
return 0;
}
函数CreateThreadPool是创建线程池和链表。
bool CThread::CreateThreadPool()
{
m_pThread1=::AfxBeginThread(Thread1Function,
(LPVOID)this,
THREAD_PRIORITY_NORMAL,
0,
CREATE_SUSPENDED,
NULL);
m_pThread1->m_bAutoDelete=false;
m_pThread2=::AfxBeginThread(Thread2Function,
(LPVOID)this,
THREAD_PRIORITY_NORMAL,
0,
CREATE_SUSPENDED,
NULL);
m_pThread2->m_bAutoDelete=false;
m_pThread3=::AfxBeginThread(Thread3Function,
(LPVOID)this,
THREAD_PRIORITY_NORMAL,
0,
CREATE_SUSPENDED,
NULL);
m_pThread3->m_bAutoDelete=false;
m_pThread=::AfxBeginThread(WaitThreadState,
(LPVOID)this,
THREAD_PRIORITY_NORMAL,
0,
0,
NULL);
if(!m_pThread || !m_pThread3 || !m_pThread2 || !m_pThread1 )
return false;
m_hEvent[0]=m_pThread1->m_hThread;
m_hEvent[1]=m_pThread2->m_hThread;
m_hEvent[2]=m_pThread3->m_hThread;
CThread::Node1=new THREADNOD;
Node1->pThread=m_pThread1;
Node1->bIdle=true;
Node1->pNext=m_pThreadNodeHead;
m_pThreadNodeHead=Node1;
CThread::Node2=new THREADNOD;
Node2->pThread=m_pThread2;
Node2->bIdle=true;
Node2->pNext=m_pThreadNodeHead;
m_pThreadNodeHead=Node2;
CThread::Node3=new THREADNOD;
Node3->pThread=m_pThread3;
Node3->bIdle=true;
Node3->pNext=m_pThreadNodeHead;
m_pThreadNodeHead=Node3;
Node1->pNext=m_pThreadNodeHead;
return true;
}
GetThreadFromPool函数从pool中选出线程返回相应的线程指针
CWinThread * CThread::GetThreadFromPool()
{
int nThreadCounter=0;
if(m_pThreadNodeHead!=NULL)
{
EnterCriticalSection(&m_cs);
while(nThreadCounter!=3)
{
if(m_pThreadNodeHead->bIdle==true)
{
m_pThreadNodeHead->bIdle=false;
LeaveCriticalSection(&m_cs);
return m_pThreadNodeHead->pThread;
}
m_pThreadNodeHead=m_pThreadNodeHead->pNext;
nThreadCounter++;
}
LeaveCriticalSection(&m_cs);
}
return NULL;
}
拿到线程指针并调用之前被挂起的线程
void CThread::RunThread(CWinThread *pWinThread)
{
if(pWinThread!=NULL)
{
pWinThread->ResumeThread();
}
}