BOOL CTreadSafeMsgQueue::PostMsg(const MsgItem Msg, int WaitTime)
{
if (!m_bInitedOK) return FALSE;
// 防止在reset期间导致Semaphore计数不正确
while (m_bStop)
{
TRACE("Thread %d Sleep\n", GetCurrentThreadId());
Sleep(SLEEP_TIME);
}
// 进入同步操作状态
m_WritingThreadNum++;
if (WaitForSingleObject(m_S_Producer, WaitTime) == WAIT_OBJECT_0)
{
if (WaitForSingleObject(m_E_Queue, WaitTime) == WAIT_OBJECT_0)
{
// OK now, post message then
m_Queue[m_HeaderToWrite++] = Msg;
if (m_HeaderToWrite >= MAX_QUE_SIZE) m_HeaderToWrite = 0;
TRACE("Post message *** %d\n", Msg.MsgID);
ReleaseSemaphore(m_S_Consumer, 1, NULL);
SetEvent(m_E_Queue);
m_WritingThreadNum--;
return TRUE;
}
else // wait event time out
{
// not post message so release producer
ReleaseSemaphore(m_S_Producer, 1, NULL);
SetEvent(m_E_Queue);
m_WritingThreadNum--;
return FALSE;
}
}
else // wait semaphore time out
{
m_WritingThreadNum--;
return FALSE;
}
}
BOOL CTreadSafeMsgQueue::Reset()
{
if (!m_bInitedOK) return FALSE;
// 防止重入Reset
while (m_bStop) Sleep(SLEEP_TIME);
// 防止新的线程进入读写操作状态
m_bStop = TRUE;
TRACE("--------------Begin to Reset-------------\n");
// 等待至少一类线程(读线程或写线程)退出同步状态
while ( (m_WritingThreadNum != 0) && (m_ReadingThreadNum != 0)) Sleep(SLEEP_TIME);
// 此时,可能(最坏可能)还有一类线程处于同步状态,并被阻塞。
// 它们必定都被阻塞于信号量(Semaphore)状态, 注意此时Event事件处于信号态!!
// 如果被阻塞的是写线程,它们将被释放,并写入数据,但它们写入的数据将被抛弃。
// 释放写线程将改写m_S_Consumer,所以必须先释放写线程;
// ReleaseSemaphore(m_S_Producer)令m_S_Producer=MAX_QUE_SIZE;
while (m_WritingThreadNum > 0)
{
// 必须Sleep以出让CPU控制权
ReleaseSemaphore(m_S_Producer, 1, NULL);
Sleep(5);
}
// 继续阻塞读线程,令m_S_Consumer=0;
while (WaitForSingleObject(m_S_Consumer, 10) == WAIT_OBJECT_0);
// 由于写操作会改变m_HeaderToWrite,必须在写操作之后对其重置
m_HeaderToWrite = m_TailToRead = 0;
// 注意
TRACE("--------------Finished Reseting-------------\n");
m_bStop = FALSE;
return TRUE;
}