// ---------------------------------实现文件---------------------------------//
#include "threadsafequeue.h"
CTreadSafeMsgQueue::CTreadSafeMsgQueue(int QueSize, USHORT InvalidMsgType)
{
INVALID_MSG_TYPE = InvalidMsgType;
MAX_QUE_SIZE = QueSize;
m_Queue = NULL;
m_HeaderToWrite = 0;
m_TailToRead = 0;
m_S_Producer = NULL;
m_S_Consumer = NULL;
m_E_Queue = NULL;
m_WritingThreadNum = 0;
m_ReadingThreadNum = 0;
m_bStop = FALSE;
m_bInitedOK = TRUE; // 注意
if (QueSize > 0) m_Queue = new MsgItem [QueSize];
m_bInitedOK &= (m_Queue != NULL);
if (!m_bInitedOK) return;
m_S_Producer = CreateSemaphore(NULL, MAX_QUE_SIZE, MAX_QUE_SIZE, NULL);
m_bInitedOK &= (m_S_Producer != NULL);
if (!m_bInitedOK) return;
m_S_Consumer = CreateSemaphore(NULL, 0, MAX_QUE_SIZE, NULL);
m_bInitedOK &= (m_S_Consumer != NULL);
if (!m_bInitedOK) return;
m_E_Queue = CreateEvent(NULL, FALSE, TRUE, NULL);
m_bInitedOK &= (m_E_Queue != NULL);
}
CTreadSafeMsgQueue::~CTreadSafeMsgQueue()
{
// 防止新的线程进入
m_bInitedOK = FALSE;
// 如果Reset,等待Reset完成
while (m_bStop) Sleep(SLEEP_TIME);
// 等待至少一类线程(读线程或写线程)退出同步状态
while ((m_WritingThreadNum != 0) && (m_ReadingThreadNum != 0)) Sleep(SLEEP_TIME);
// 此时线程必定阻塞于信号量(Semaphore)状态
// 释放写线程
while (m_WritingThreadNum > 0)
{
// 必须Sleep以出让CPU控制权
ReleaseSemaphore(m_S_Producer, 1, NULL);
Sleep(5);
}
// 释放读线程
if (m_ReadingThreadNum > 0)
{
for (int i = 0; i < MAX_QUE_SIZE; i++)
{
// 将待读取消息类型置为非法类型
m_Queue[m_TailToRead].MsgID = INVALID_MSG_TYPE;
}
while (m_ReadingThreadNum > 0)
{
// 释放读线程
ReleaseSemaphore(m_S_Consumer, 1, NULL);
Sleep(5);
}
}
CloseHandle(m_S_Consumer);
CloseHandle(m_S_Producer);
CloseHandle(m_E_Queue);
delete [] m_Queue;
}