分享
 
 
 

[C#]I/O完成端口的类定义和测试实例

王朝c#·作者佚名  2006-01-09
窄屏简体版  字體: |||超大  

从William Kennedy那里整理过来的,不同之处在于他自己定义了一个Overlapped,而我们这里直接使用

System.Threading.NativeOverlapped.

附一段我以前的Win32下的IOCP文档,如果您了解IOCP也可以直接跳过看后面的C#测试示范:

整理者:郑昀@UltraPower

我们采用的是I/O Complete Port(以下简称IOCP)处理机制。

简单的讲,当服务应用程序初始化时,它应该先创建一个I/O CP。我们在请求到来后,将得到的数据打包用PostQueuedCompletionStatus发送到IOCP中。这时需要创建一些个线程(7个线程/每CPU,再多就没有意义了)来处理发送到IOCP端口的消息。实现步骤大致如下:

1 先在主线程中调用CreateIoCompletionPort创建IOCP。

CreateIoCompletionPort的前三个参数只在把设备同Complete Port相关联时才有用。

此时我们只需传递INVALID_HANDLE_VALUE,NULL和0即可。

第四个参数告诉端口同时能运行的最多线程数,这里设置为0,表示默认为当前计算机的CPU数目。

2 我们的ThreadFun线程函数执行一些初始化之后,将进入一个循环,该循环会在服务进程终止时才结束。

在循环中,调用GetQueuedCompletionStatus,这样就把当前线程的ID放入一个等待线程队列中,I/O CP内核对象就总能知道哪个线程在等待处理完成的I/O请求。

如果在IDLE_THREAD_TIMEOUT规定的时间内I/O CP上还没有出现一个Completion Packet,则转入下一次循环。在这里我们设置的IDLE_THREAD_TIMEOUT为1秒。

当端口的I/O完成队列中出现一项时,完成端口就唤醒等待线程队列中的这个线程,该线程将得到完成的I/O项中的信息: 传输的字节数、完成键和OVERLAPPED结构的地址。

在我们的程序中可以用智能指针或者BSTR或者int来接受这个OVERLAPPED结构的地址的值,从而得到消息;然后在这个线程中处理消息。

GetQueuedCompletionStatus的第一个参数hCompletionPort指出了要监视哪一个端口,这里我们传送先前从CreateIoCompletionPort返回的端口句柄。

需要注意的是:

第一, 线程池的数目是有限制的,和CPU数目有关系。

第二, IOCP是一种较为完美的睡眠/唤醒 线程机制;线程当前没有任务要处理时,就进入睡眠状态,从而不占用CPU资源,直到被内核唤醒;

第三, 最近一次刚执行完的线程,下次任务来的时候还会唤醒它;所以有可能比较少被调用的线程以后被调用的几率也少。

测试代码:

using System;

using System.Threading; // Included for the Thread.Sleep call

using Continuum.Threading;

using System.Runtime.InteropServices;

namespace IOCPDemo

{

//=============================================================================

/**//// <summary> Sample class for the threading class </summary>

public class UtilThreadingSample

{

//*****************************************************************************

/**//// <summary> Test Method </summary>

static void Main()

{

// Create the MSSQL IOCP Thread Pool

IOCPThreadPool pThreadPool = new IOCPThreadPool(0, 10, 20, new IOCPThreadPool.USER_FUNCTION(IOCPThreadFunction));

//for(int i =1;i<10000;i++)

{

pThreadPool.PostEvent(1234);

}

Thread.Sleep(100);

pThreadPool.Dispose();

}

//********************************************************************

/**//// <summary> Function to be called by the IOCP thread pool. Called when

/// a command is posted for processing by the SocketManager </summary>

/// <param name="iValue"> The value provided by the thread posting the event </param>

static public void IOCPThreadFunction(int iValue)

{

try

{

Console.WriteLine("Value: {0}", iValue.ToString());

Thread.Sleep(3000);

}

catch (Exception pException)

{

Console.WriteLine(pException.Message);

}

}

}

}

类代码:

using System;

using System.Threading;

using System.Runtime.InteropServices;

namespace IOCPThreading

{

[StructLayout(LayoutKind.Sequential, CharSet=CharSet.Auto)]

public sealed class IOCPThreadPool

{

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern UInt32 CreateIoCompletionPort(UInt32 hFile, UInt32 hExistingCompletionPort, UInt32* puiCompletionKey, UInt32 uiNumberOfConcurrentThreads);

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean CloseHandle(UInt32 hObject);

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean PostQueuedCompletionStatus(UInt32 hCompletionPort, UInt32 uiSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped* pOverlapped);

[DllImport("Kernel32", CharSet=CharSet.Auto)]

private unsafe static extern Boolean GetQueuedCompletionStatus(UInt32 hCompletionPort, UInt32* pSizeOfArgument, UInt32* puiUserArg, System.Threading.NativeOverlapped** ppOverlapped, UInt32 uiMilliseconds);

private const UInt32 INVALID_HANDLE_VALUE = 0xffffffff;

private const UInt32 INIFINITE = 0xffffffff;

private const Int32 SHUTDOWN_IOCPTHREAD = 0x7fffffff;

public delegate void USER_FUNCTION(int iValue);

private UInt32 m_hHandle;

private UInt32 GetHandle
{ get
{ return m_hHandle; } set
{ m_hHandle = value; } }

private Int32 m_uiMaxConcurrency;

private Int32 GetMaxConcurrency
{ get
{ return m_uiMaxConcurrency; } set
{ m_uiMaxConcurrency = value; } }

private Int32 m_iMinThreadsInPool;

private Int32 GetMinThreadsInPool
{ get
{ return m_iMinThreadsInPool; } set
{ m_iMinThreadsInPool = value; } }

private Int32 m_iMaxThreadsInPool;

private Int32 GetMaxThreadsInPool
{ get
{ return m_iMaxThreadsInPool; } set
{ m_iMaxThreadsInPool = value; } }

private Object m_pCriticalSection;

private Object GetCriticalSection
{ get
{ return m_pCriticalSection; } set
{ m_pCriticalSection = value; } }

private USER_FUNCTION m_pfnUserFunction;

private USER_FUNCTION GetUserFunction
{ get
{ return m_pfnUserFunction; } set
{ m_pfnUserFunction = value; } }

private Boolean m_bDisposeFlag;

/**//// <summary> SimType: Flag to indicate if the class is disposing </summary>

private Boolean IsDisposed
{ get
{ return m_bDisposeFlag; } set
{ m_bDisposeFlag = value; } }

private Int32 m_iCurThreadsInPool;

/**//// <summary> SimType: The current number of threads in the thread pool </summary>

public Int32 GetCurThreadsInPool
{ get
{ return m_iCurThreadsInPool; } set
{ m_iCurThreadsInPool = value; } }

/**//// <summary> SimType: Increment current number of threads in the thread pool </summary>

private Int32 IncCurThreadsInPool()
{ return Interlocked.Increment(ref m_iCurThreadsInPool); }

/**//// <summary> SimType: Decrement current number of threads in the thread pool </summary>

private Int32 DecCurThreadsInPool()
{ return Interlocked.Decrement(ref m_iCurThreadsInPool); }

private Int32 m_iActThreadsInPool;

/**//// <summary> SimType: The current number of active threads in the thread pool </summary>

public Int32 GetActThreadsInPool
{ get
{ return m_iActThreadsInPool; } set
{ m_iActThreadsInPool = value; } }

/**//// <summary> SimType: Increment current number of active threads in the thread pool </summary>

private Int32 IncActThreadsInPool()
{ return Interlocked.Increment(ref m_iActThreadsInPool); }

/**//// <summary> SimType: Decrement current number of active threads in the thread pool </summary>

private Int32 DecActThreadsInPool()
{ return Interlocked.Decrement(ref m_iActThreadsInPool); }

private Int32 m_iCurWorkInPool;

/**//// <summary> SimType: The current number of Work posted in the thread pool </summary>

public Int32 GetCurWorkInPool
{ get
{ return m_iCurWorkInPool; } set
{ m_iCurWorkInPool = value; } }

/**//// <summary> SimType: Increment current number of Work posted in the thread pool </summary>

private Int32 IncCurWorkInPool()
{ return Interlocked.Increment(ref m_iCurWorkInPool); }

/**//// <summary> SimType: Decrement current number of Work posted in the thread pool </summary>

private Int32 DecCurWorkInPool()
{ return Interlocked.Decrement(ref m_iCurWorkInPool); }

public IOCPThreadPool(Int32 iMaxConcurrency, Int32 iMinThreadsInPool, Int32 iMaxThreadsInPool, USER_FUNCTION pfnUserFunction)

{

try

{

// Set initial class state

GetMaxConcurrency = iMaxConcurrency;

GetMinThreadsInPool = iMinThreadsInPool;

GetMaxThreadsInPool = iMaxThreadsInPool;

GetUserFunction = pfnUserFunction;

// Init the thread counters

GetCurThreadsInPool = 0;

GetActThreadsInPool = 0;

GetCurWorkInPool = 0;

// Initialize the Monitor Object

GetCriticalSection = new Object();

// Set the disposing flag to false

IsDisposed = false;

unsafe

{

// Create an IO Completion Port for Thread Pool use

GetHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, null, (UInt32) GetMaxConcurrency);

}

// Test to make sure the IO Completion Port was created

if (GetHandle == 0)

throw new Exception("Unable To Create IO Completion Port");

// Allocate and start the Minimum number of threads specified

Int32 iStartingCount = GetCurThreadsInPool;

ThreadStart tsThread = new ThreadStart(IOCPFunction);

for (Int32 iThread = 0; iThread < GetMinThreadsInPool; ++iThread)

{

// Create a thread and start it

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();

// Increment the thread pool count

IncCurThreadsInPool();

}

}

catch

{

throw new Exception("Unhandled Exception");

}

}

~IOCPThreadPool()

{

if (!IsDisposed)

Dispose();

}

public void Dispose()

{

try

{

// Flag that we are disposing this object

IsDisposed = true;

// Get the current number of threads in the pool

Int32 iCurThreadsInPool = GetCurThreadsInPool;

// Shutdown all thread in the pool

for (Int32 iThread = 0; iThread < iCurThreadsInPool; ++iThread)

{

unsafe

{

bool bret = PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) SHUTDOWN_IOCPTHREAD, null);

}

}

// Wait here until all the threads are gone

while (GetCurThreadsInPool != 0) Thread.Sleep(100);

unsafe

{

// Close the IOCP Handle

CloseHandle(GetHandle);

}

}

catch

{

}

}

private void IOCPFunction()

{

UInt32 uiNumberOfBytes;

Int32 iValue;

try

{

while (true)

{

unsafe

{

System.Threading.NativeOverlapped* pOv;

// Wait for an event

GetQueuedCompletionStatus(GetHandle, &uiNumberOfBytes, (UInt32*) &iValue, &pOv, INIFINITE);

}

// Decrement the number of events in queue

DecCurWorkInPool();

// Was this thread told to shutdown

if (iValue == SHUTDOWN_IOCPTHREAD)

break;

// Increment the number of active threads

IncActThreadsInPool();

try

{

// Call the user function

GetUserFunction(iValue);

}

catch(Exception ex)

{

throw ex;

}

// Get a lock

Monitor.Enter(GetCriticalSection);

try

{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();

// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}

catch

{

}

// Relase the lock

Monitor.Exit(GetCriticalSection);

// Increment the number of active threads

DecActThreadsInPool();

}

}

catch(Exception ex)

{

string str=ex.Message;

}

// Decrement the thread pool count

DecCurThreadsInPool();

}

//public void PostEvent(Int32 iValue

public void PostEvent(int iValue)

{

try

{

// Only add work if we are not disposing

if (IsDisposed == false)

{

unsafe

{

// Post an event into the IOCP Thread Pool

PostQueuedCompletionStatus(GetHandle, 4, (UInt32*) iValue, null);

}

// Increment the number of item of work

IncCurWorkInPool();

// Get a lock

Monitor.Enter(GetCriticalSection);

try

{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();

// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}

catch

{

}

// Release the lock

Monitor.Exit(GetCriticalSection);

}

}

catch (Exception e)

{

throw e;

}

catch

{

throw new Exception("Unhandled Exception");

}

}

public void PostEvent()

{

try

{

// Only add work if we are not disposing

if (IsDisposed == false)

{

unsafe

{

// Post an event into the IOCP Thread Pool

PostQueuedCompletionStatus(GetHandle, 0, null, null);

}

// Increment the number of item of work

IncCurWorkInPool();

// Get a lock

Monitor.Enter(GetCriticalSection);

try

{

// If we have less than max threads currently in the pool

if (GetCurThreadsInPool < GetMaxThreadsInPool)

{

// Should we add a new thread to the pool

if (GetActThreadsInPool == GetCurThreadsInPool)

{

if (IsDisposed == false)

{

// Create a thread and start it

ThreadStart tsThread = new ThreadStart(IOCPFunction);

Thread thThread = new Thread(tsThread);

thThread.Name = "IOCP " + thThread.GetHashCode();

thThread.Start();

// Increment the thread pool count

IncCurThreadsInPool();

}

}

}

}

catch

{

}

// Release the lock

Monitor.Exit(GetCriticalSection);

}

}

catch

{

throw new Exception("Unhandled Exception");

}

}

}

}

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有