前面用C++实现了windows平台上的网络完成端口组件,那么到C#中如何做了?起初我打算通过PInvoke来调用win底层API来仿照C++实现,但问题很快就出来了--C#中的Unsafe指针无法稳定的指向一块缓冲区的首地址,也就是说当垃圾回收进行的时候,我们的unsafe指针的值可能已经无效了。用pin?我也想过,可是锁住所有的TCP接收缓冲区,会极大的降低运行时的效率。难道没有办法了吗?想想完成端口模型的本质思想是将"启动异步操作的线程"和"提供服务的线程"(即工作者线程)拆伙。做到这一点不就够了吗,这是本质的东西。
好,看我是如何做的。
首先定义接口IRequestQueueManager,用于模拟完成端口的队列。
下面给出其定义和默认实现。
using System;
using System.Collections ;
namespace EnterpriseServerBase.Network
{
/// <summary>
/// IRequestQueueManager 用于模拟完成端口的队列。
/// </summary>
public interface IRequestQueueManager :IRequestPusher
{
//void Push(object package) ;
object Pop() ;
void Clear() ;
int Length {get ;}
}
//用于弹出队列中的下一个请求
public interface IRequestPusher
{
void Push(object package) ;
}
/// <summary>
/// IRequestQueueManager 的默认实现
/// </summary>
public class RequestQueueManager :IRequestQueueManager
{
private Queue queue = null ;
public RequestQueueManager()
{
this.queue = new Queue() ;
}
public void Push(object package)
{
lock(this.queue)
{
this.queue.Enqueue(package) ;
}
}
public object Pop()
{
object package = null ;
lock(this.queue)
{
if(this.queue.Count > 0)
{
package = this.queue.Dequeue() ;
}
}
return package ;
}
public void Clear()
{
lock(this.queue)
{
this.queue.Clear() ;
}
}
public int Length
{
get
{
return this.queue.Count ;
}
}
}
}
在IRequestQueueManager的基础上,可以将工作者线程和启动异步操作的线程拆开了。当tcp连接建立的时候,发起一个异步接收的操作,在异步回调方法中将接收到的数据放入IRequestQueueManager端口队列中,而众多的工作者线程就不断的从IRequestQueueManager中取出数据并处理之,处理结束后,在投递一个异步接收请求。如此就可以了。
来看看完成端口类如何实现:
using System;
using System.Threading ;
namespace EnterpriseServerBase.Network
{
/// <summary>
/// IIOCPManager 完成端口管理者,主要管理工作者线程和完成端口队列。
/// 2005.05.23
/// </summary>
public interface IIOCPManager : IRequestPusher
{
void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;
void Start() ; //启动工作者线程
void Stop() ; //退出工作者线程
int WorkThreadCount{get ;}
event CallBackPackageHandled PackageHandled ;
}
//IOCPPackageHandler 用于处理从完成端口队列中取出的package
public interface IOCPPackageHandler
{
void HandlerPackage(object package) ; //一般以同步实现
}
public delegate void CallBackPackageHandled(object package) ;
/// <summary>
/// IOCPManager 是IIOCPManager的默认实现
/// </summary>
public class IOCPManager :IIOCPManager
{
private IRequestQueueManager requestQueueMgr = null ;
private IOCPPackageHandler packageHandler ;
private int workThreadCount = 0 ; //实际的工作者线程数
private int MaxThreadCount = 0 ;
private bool stateIsStop = true ;
public IOCPManager()
{
}
#region ICPWorkThreadManager 成员
public event CallBackPackageHandled PackageHandled ;
public void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount )
{
this.requestQueueMgr = new RequestQueueManager() ;
this.MaxThreadCount = threadCount ;
this.packageHandler = i_packageHandler ;
}
public void Push(object package)
{
this.requestQueueMgr.Push(package) ;
}
public void Start()
{
if(! this.stateIsStop)
{
return ;
}
this.stateIsStop = false ;
this.CreateWorkThreads() ;
}
public void Stop()
{
if(this.stateIsStop)
{
return ;
}
this.stateIsStop = true ;
//等待所有工作者线程结束
int count = 0 ;
while(this.workThreadCount != 0)
{
if(count < 10)
{
Thread.Sleep(200) ;
}
else
{
throw new Exception("WorkThread Not Terminated !") ;
}
++ count ;
}
this.requestQueueMgr.Clear() ;
}
public int WorkThreadCount
{
get
{
return this.workThreadCount ;
}
}
#endregion
#region CreateWorkThreads
private void CreateWorkThreads()
{
for(int i= 0 ;i< this.MaxThreadCount ;i++)
{
Thread t = new Thread(new ThreadStart(this.ServeOverLap)) ;
Interlocked.Increment(ref this.workThreadCount) ;
t.Start() ;
}
}
#endregion
#region ServeOverLap 工作者线程
private void ServeOverLap()
{
while(! this.stateIsStop)
{
object package = this.requestQueueMgr.Pop() ;
if(package == null)
{
Thread.Sleep(200) ;
continue ;
}
this.packageHandler.HandlerPackage(package) ;
if(! this.stateIsStop)
{
if(this.PackageHandled != null)
{
this.PackageHandled(package) ;
}
}
}
//工作者线程安全退出
Interlocked.Decrement(ref this.workThreadCount) ;
}
#endregion
}
}
/***********************
注意,在使用时需要预定PackageHandled事件,而事件处理函数则是再次投递异步接收包。如此才可保证持续从tcp连接接收数据。