从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用
System.Threading.NativeOverlapped.
附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:
整理者:郑昀@UltraPower
我们采用的是I/O Complete Port(以下简称IOCP)处理机制。
简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:
1 先在主线程中调用CreateIoCompletionPort创建IOCP。
CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。
此时我们只需传递INVALID_HANDLE_VALUE,NULL和0即可。
第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。
2 我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。
在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。
如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT为1秒。
当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息: 传输的字节数、完成键和OVERLAPPED结构的地址。
在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。
GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。
需要注意的是:
第一, 线程池的数目是有限制的,和CPU数目有关系。
第二, IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;
第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。
测试代码:
using System; using System.Threading; // Included for the Thread.Sleep call using Continuum.Threading; using System.Runtime.InteropServices; namespace IOCPDemo { //============================================================================= /**//// <summary> Sample class for the threading class </summary> public class UtilThreadingSample { //***************************************************************************** /**//// <summary> Test Method </summary> static void Main() { // Create the MSSQL IOCP Thread Pool IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction)); //for(int i =1;i<10000;i++) { pThreadPool.PostEvent(1234); } Thread.Sleep(100); pThreadPool.Dispose(); } //******************************************************************** /**//// <summary> Function to be called by the IOCP thread pool. Called when /// a command is posted for processing by the SocketManager </summary> /// <param name="iValue"> The value provided by the thread posting the event </param> static public void IOCPThreadFunction(int iValue) { try { Console.WriteLine("Value: {0}", iValue.ToString()); Thread.Sleep(3000); } catch (Exception pException) { Console.WriteLine(pException.Message); } } } }类代码:
using System; using System.Threading; using System.Runtime.InteropServices; namespace IOCPThreading { [StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)] public sealed class IOCPThreadPool { [DllImport("Kernel32", CharSet=CharSet.Auto)] private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads); [DllImport("Kernel32", CharSet=CharSet.Auto)] private unsafe static extern Boolean CloseHandle(UInt32 hObject); [DllImport("Kernel32", CharSet=CharSet.Auto)] private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped); [DllImport("Kernel32", CharSet=CharSet.Auto)] private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds); private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff; private const UInt32 INIFINITE = 0xffffffff; private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff; public delegate void USER_FUNCTION(int iValue); private UInt32 m_hHandle; private UInt32 GetHandle { get { return m_hHandle; } set { m_hHandle = value; } } private Int32 m_uiMaxConcurrency; private Int32 GetMaxConcurrency { get { return m_uiMaxConcurrency; } set { m_uiMaxConcurrency = value; } } private Int32 m_iMinThreadsInPool; private Int32 GetMinThreadsInPool { get { return m_iMinThreadsInPool; } set { m_iMinThreadsInPool = value; } } private Int32 m_iMaxThreadsInPool; private Int32 GetMaxThreadsInPool { get { return m_iMaxThreadsInPool; } set { m_iMaxThreadsInPool = value; } } private Object m_pCriticalSection; private Object GetCriticalSection { get { return m_pCriticalSection; } set { m_pCriticalSection = value; } } private USER_FUNCTION m_pfnUserFunction; private USER_FUNCTION GetUserFunction { get { return m_pfnUserFunction; } set { m_pfnUserFunction = value; } } private Boolean m_bDisposeFlag; /**//// <summary> SimType: Flag to indicate if the class is disposing </summary> private Boolean IsDisposed { get { return m_bDisposeFlag; } set { m_bDisposeFlag = value; } } private Int32 m_iCurThreadsInPool; /**//// <summary> SimType: The current number of threads in the thread pool </summary> public Int32 GetCurThreadsInPool { get { return m_iCurThreadsInPool; } set { m_iCurThreadsInPool = value; } } /**//// <summary> SimType: Increment current number of threads in the thread pool </summary> private Int32 IncCurThreadsInPool() { return Interlocked.Increment(ref m_iCurThreadsInPool); } /**//// <summary> SimType: Decrement current number of threads in the thread pool </summary> private Int32 DecCurThreadsInPool() { return Interlocked.Decrement(ref m_iCurThreadsInPool); } private Int32 m_iActThreadsInPool; /**//// <summary> SimType: The current number of active threads in the thread pool </summary> public Int32 GetActThreadsInPool { get { return m_iActThreadsInPool; } set { m_iActThreadsInPool = value; } } /**//// <summary> SimType: Increment current number of active threads in the thread pool </summary> private Int32 IncActThreadsInPool() { return Interlocked.Increment(ref m_iActThreadsInPool); } /**//// <summary> SimType: Decrement current number of active threads in the thread pool </summary> private Int32 DecActThreadsInPool() { return Interlocked.Decrement(ref m_iActThreadsInPool); } private Int32 m_iCurWorkInPool; /**//// <summary> SimType: The current number of Work posted in the thread pool </summary> public Int32 GetCurWorkInPool { get { return m_iCurWorkInPool; } set { m_iCurWorkInPool = value; } } /**//// <summary> SimType: Increment current number of Work posted in the thread pool </summary> private Int32 IncCurWorkInPool() { return Interlocked.Increment(ref m_iCurWorkInPool); } /**//// <summary> SimType: Decrement current number of Work posted in the thread pool </summary> private Int32 DecCurWorkInPool() { return Interlocked.Decrement(ref m_iCurWorkInPool); } public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction) { try { // Set initial class state GetMaxConcurrency = iMaxConcurrency; GetMinThreadsInPool = iMinThreadsInPool; GetMaxThreadsInPool = iMaxThreadsInPool; GetUserFunction = pfnUserFunction; // Init the thread counters GetCurThreadsInPool = 0; GetActThreadsInPool = 0; GetCurWorkInPool = 0; // Initialize the Monitor Object GetCriticalSection = new Object(); // Set the disposing flag to false IsDisposed = false; unsafe { // Create an IO Completion Port for Thread Pool use GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency); } // Test to make sure the IO Completion Port was created if (GetHandle == 0) throw new Exception("Unable To Create IO Completion Port"); // Allocate and start the Minimum number of threads specified Int32 iStartingCount = GetCurThreadsInPool; ThreadStart tsThread = new ThreadStart(IOCPFunction); for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread) { // Create a thread and start it Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } catch { throw new Exception("Unhandled Exception"); } } ~IOCPThreadPool() { if (!IsDisposed) Dispose(); } public void Dispose() { try { // Flag that we are disposing this object IsDisposed = true; // Get the current number of threads in the pool Int32 iCurThreadsInPool = GetCurThreadsInPool; // Shutdown all thread in the pool for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread) { unsafe { bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null); } } // Wait here until all the threads are gone while (GetCurThreadsInPool != 0) Thread.Sleep(100); unsafe { // Close the IOCP Handle CloseHandle(GetHandle); } } catch { } } private void IOCPFunction() { UInt32 uiNumberOfBytes; Int32 iValue; try { while (true) { unsafe { System.Threading.NativeOverlapped* pOv; // Wait for an event GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE); } // Decrement the number of events in queue DecCurWorkInPool(); // Was this thread told to shutdown if (iValue == SHUTDOWN_IOCPTHREAD)break;
// Increment the number of active threads IncActThreadsInPool(); try { // Call the user function GetUserFunction(iValue); } catch(Exception ex) { throw ex; } // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Relase the lock Monitor.Exit(GetCriticalSection); // Increment the number of active threads DecActThreadsInPool(); } } catch(Exception ex) { string str=ex.Message; } // Decrement the thread pool count DecCurThreadsInPool(); } //public void PostEvent(Int32 iValue public void PostEvent(int iValue) { try { // Only add work if we are not disposing if (IsDisposed == false) { unsafe { // Post an event into the IOCP Thread Pool PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null); } // Increment the number of item of work IncCurWorkInPool(); // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Release the lock Monitor.Exit(GetCriticalSection); } } catch (Exception e) { throw e; } catch { throw new Exception("Unhandled Exception"); } } public void PostEvent() { try { // Only add work if we are not disposing if (IsDisposed == false) { unsafe { // Post an event into the IOCP Thread Pool PostQueuedCompletionStatus(GetHandle, 0, null, null); } // Increment the number of item of work IncCurWorkInPool(); // Get a lock Monitor.Enter(GetCriticalSection); try { // If we have less than max threads currently in the pool if (GetCurThreadsInPool < GetMaxThreadsInPool) { // Should we add a new thread to the pool if (GetActThreadsInPool == GetCurThreadsInPool) { if (IsDisposed == false) { // Create a thread and start it ThreadStart tsThread = new ThreadStart(IOCPFunction); Thread thThread = new Thread(tsThread); thThread.Name = "IOCP " + thThread.GetHashCode(); thThread.Start(); // Increment the thread pool count IncCurThreadsInPool(); } } } } catch { } // Release the lock Monitor.Exit(GetCriticalSection); } } catch { throw new Exception("Unhandled Exception"); } } } }