分享
 
 
 

.NET下可复用的TCP通信层实现之TCP组件

王朝c#·作者佚名  2008-06-01
窄屏简体版  字體: |||超大  

2006年已经来临,回首刚走过的2005,心中感慨万千。在人生和生活的目标上,有了清楚明确的定位,终于知道了自己喜欢什么样的生活,喜欢什么样的生活方式;在技术上,成熟了不少,眼界也开阔的不少,从面向对象到组件、从.Net到J2EE、从微软到开源,颇有收获。

非凡值得一提的是,熟悉了Rod Johnson这个大牛人,也终于在自己的项目中正式使用SPRing.net框架来开发了,这确实是一个优秀的框架。而在已经到来的2006年,我有一个主要目标就是B/S应用开发,来填补自己在企业级开发上的另一半空白。

以前就很想将自己在Tcp通信层的开发心得、经验共享出来,但一直没有实现,究其原因,还是自己太懒了。今天终于找到一个时机,写下这篇文章,也算是对2005年的另一种形式的回忆吧。

绝大多数C/S(包括多层)结构的系统中,终端与服务器的通信都是通过Tcp进行的(使用Udp的也有一些,但是其相对于Tcp简单许多,所以不在这里的讨论之列)。通常,这样的C/S系统都需要处理极大的并发,也就是说随时都可能有成千上万个用户在线,并且每分钟都可能有数以百计的用户上线/下线。由于每个用户都与服务器存在着一个Tcp连接,如何治理所有这些连接,并使我们的Tcp通信层稳定高效地工作,是我开发的这个“TcpTcp通信层”设计实现的主要目标。

自从2004年9月开始至今,我就一直负责某C/S系统的服务器端的架构设计,并负责整个通信层的实现,在探索的过程中,逐渐形成了一套可复用的“Tcp通信层框架”(“框架”这个词真的蛮吓人,呵呵),其位于EnterpriseServerBase类库的EnterpriseServerBase.Network命名空间中。现将我在通信层这一块的设计/开发经验记录于此,以便日后回顾。也期大家多多赐教。

我期望的“Tcp通信层”并不只是能接受连接、治理连接、转发用户请求这么简单,为了构建一个高度可复用的、灵活的、可接插的Tcp通信层,需要定义很多的规则、接口、契约,这需要做很多的工作。“Tcp通信层”决不仅仅只是Tcp协议通信,由于通信与消息联系紧密,不可避免的需要将“通信的消息”纳入到我们的分析中来,比如,基于Tcp传输的特性,我们可能需要对接收到的消息进行分裂、重组等(后文中会解释为什么、以及如何做)。请答应我在这里澄清一下,假如只是解决“仅仅”的Tcp通信问题,我只需要介绍Tcp组件就可以了,但是假如要解决“整个Tcp通信层”的问题,并使之可高度复用,那就需要介绍很多额外的东西,比如,上面提到的“消息”,以及“消息”所涉及的通信协议。

在我们应用的通信层中,存在以Tcp组件为核心的多个组件,这些组件相互协作,以构建/实现高度可复用的Tcp通信层。这些组件之间的关系简单图示如下:

我先解释一下上图。当网络(Tcp)组件从某个Tcp连接上接收到一个请求时,会将请求转发给消息分派器,消息分派器通过IDataStreamHelper组件获取请求消息的类型,然后根据此类型要求处理器工厂创建对应类型的请求处理器,请求处理器处理请求并返回结果。接下来再由网络组件把结果返回给终端用户。在消息分派器进行请求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重组、消息验证等。而且,根据不同的应用,可能有其它的消息转换要求,而且这些操作可能是多样化的,为了满足这种多样性和可接插性,这就需要消息分派器提供一个插入点,让我们可以随心所欲地插入自定义的对请求/回复消息的预处理和后处理。

上图中消息分派器中可接插的操作除了消息分裂器(使用实线框)是必须的,消息加密器和消息验证器(使用虚线框)是可选的,应根据你应用的实际情况加以决定是否使用。关于这几个典型的可接插的组件的功能作用会在后文中介绍。在继续介绍Tcp组件的实现之前,有必要先提一下IDataStreamHelper接口的作用,IDataStreamHelper接口用于抽象我们实际的通信协议,并能从任何一请求/回复消息中提取关于本条消息的元数据,比如,消息的长度、类型等信息。具体的应用必须根据自己的消息协议来实现IDataStreamHelper接口。关于该接口的定义也在后文中给出。

关于上图,需要提醒的是,整个消息的流动是由Tcp组件驱动的!这篇文章以Tcp组件和消息分派器组件为索引来组织整个可复用的Tcp通信层的实现。首先,我们来深入到Tcp组件的具体实现中去。

一.Tcp组件

1.Tcp组件的主要职责

Tcp组件的主要职责并不是在一个很短的时间内总结出来的,它是逐步完善的(至今可能还不够全面)。为了使Tcp组件具有高度的可复用性,需要考虑很多的需求,而所有这些需求中具有共性的、占主导位置的需求就被纳入到Tcp组件的职责中来了。这个职责的集合如下:

(1) 治理所有的Tcp连接以及连接对应的上下文(Context)。

(2) 当某用户上线或下线时,能发出事件通知。

(3) 当在线用户(连接)的数量发生变化时,能发出事件通知。

(4) 当用户的请求得到回复时,发出事件通知。这一点对于记录用户请求和跟踪用户请求非常有用)

(5) 能及时主动关闭指定连接。比如,当某一非法用户登录后,用户验证组件通知Tcp组件强行关闭该用户对应的连接。

(6) 除了能转发用户请求及对请求的应答(通过消息分派器)外,还能直接对指定的用户发送数据。这也要求我们的Tcp连接是多线程安全的。

(7) 提供绕开Tcp组件直接从Tcp连接同步接收数据的功能。比如,客户端需要上传一个Blob,我们可能希望直接从Tcp连接进行接收数据,这是有好处的,后面可以看到。

这里列出的是Tcp组件的主要职责,还有很多细节性的没有罗列出来,假如一个Tcp组件解决了上述所有问题,对我来说,应该就是一个很好用、很适用的Tcp组件了。

2.Tcp组件接口定义

相信很多朋友和我一样,刚接触Tcp服务端开发的时候,通常是当一个Tcp连接建立的时候,就分配一个线程在该连接上监听请求消息,这种方式的缺点有很多,最主要的缺点是效率低、治理复杂。

我的最初的Tcp组件是C++版本的,那时很有幸接触到了windows平台上最高效的Tcp通信模型――完成端口模型,完全理解这个模型需要点时间,但是《Win32 多线程程序设计》(侯捷翻译)和《windows网络编程》这两本书可以给你不少帮助。异步机制是完成端口的基础,完成端口模型的本质思想是将"启动异步操作的线程"和"提供服务的线程"(即工作者线程)拆伙。理解这一点很重要。在.Net中没有对应的组件或类对应于完成端口模型,解决方案有两个:一是通过P/Invoke来实现自己的完成端口组件,另一种方式是通过.Net的现有通信设施来模拟完成端口实现。

本文给出第二种方案的实现说明,另外,我也给出通过“异步+线程池”的方式的Tcp组件实现,这种方式对于大并发量也可以很好的治理。也就是我,我的EnterpriseServerBase类库中,有两种不同方式的Tcp组件实现,一个是模拟完成端口模型,一个是“异步+线程池”方式。无论是哪种方式,它们都实现了相同的接口ITcp。ITcp这个接口涵盖了上述的Tcp组件的所有职责,这个接口并不复杂,假如理解了,使用起来也非常简单。我们来看看这个接口的定义:

public interface ITcp :INet ,ITcpEventList ,ITcpClientsController

{

int ConnectionCount{get ;} //当前连接的数量

}

这个接口继续了另外三个接口,INet ,ITcpEventList ,ITcpClientsController。INet接口是为了统一基于Tcp和Udp的通信组件而抽象出来的,它包含了以下内容:

public interface INet

{

void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated) ;

void InitializeAll() ;

void UnitializeAll() ;

NetAddinType GetProtocalType() ; //Udp, Tcp

event CallBackDynamicMessage DynamicMsgArrived ;

//通常是通信插件中一些与服务和用户无关的动态信息,如监听线程重启等

void Start() ;

void Stop() ;

IReqestStreamDispatcher Dispatcher{set;} //支持依靠注入

int Port{get ;set ;}

bool UserValidated{set ;}

}

public enum NetAddinType

{

Tcp ,Udp

}

public delegate void CallBackDynamicMessage(string msg) ;

IReqestStreamDispatcher就是我们上述图中的消息分派器,这是Tcp通信层中的中心,它的重要性已从前面的关系图中可见一斑了。IReqestStreamDispatcher需要在初始化的时候提供,或者通过Dispatcher属性通过IOC容器进行设值法注入。UserValidated属性用于决定当用户的第一个请求不是登录请求时,是否立即关闭Tcp连接。其它的属性已经加上了注释,非常轻易理解。

ITcpEventList接口说明了Tcp组件应当发布的事件,主要对应于前述Tcp组件职责的(2)(3)(4)点。其定义如下:

public interface ITcpEventList

{

event CallBackForTcpUser2 SomeOneConnected ; //上线

event CallBackForTcpUser1 SomeOneDisConnected ; //掉线

event CallBackForTcpCount ConnectionCountChanged ;//在线人数变化

event CallBackForTcpMonitor ServiceCommitted ;//用户请求的服务的回复信息

event CallBackForTcpUser UserAction ;

}

每一个在线用户都对应着一个Tcp连接,我们使用tcp连接的Hashcode作为ConnectID来标志每一个连接。UserAction将用户与服务器的交互分为三类:登录、退出和标准功能访问,如以下枚举所示。

public enum TcpUserAction

{

Logon , Exit , Functionaccess , //标准的功能访问

}

最后一个接口ITcpClientsController,主要用来完成上述Tcp组件职责的(5)(6)(7)三点。定义如下:

/// <summary>

/// ITcpController 用于服务器主动控制TCP客户的连接

/// </summary>

public interface ITcpClientsController

{

//同步接收消息

bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount) ;

//主动给某个客户同步发信息

void SendData(int ConnectID ,byte[] data) ;

void SendData(int ConnectID, byte[] data ,int offset ,int size) ;

//主动关闭连接

void DispoSEOneConnection(int connectID ,DisconnectedCause cause) ;

}

这个接口中的方法的含义是一目了然的。

上述的几个接口已经完整的覆盖了前述的Tcp组件的所有职责,在了解了这些接口定义的基础上,大家已经能够使用EnterpriseServerBase类库中的Tcp组件了。假如想复用的不仅仅是Tcp组件,而是整个Tcp通信层,你就需要关注后面的内容。不管怎样,为了文章的完整性,我在这里先给出前面提到的Tcp组件的两种实现。

3.Tcp组件基本元素实现

在实现Tcp组件之前,有一些基本元素需要先建立起来,比如安全的网络流、Tcp监听器、用户连接上下文、上下文治理者等。(1)安全的网络流SafeNetworkStream

前面已经提到过,为了能在Tcp组件外部 对指定的连接发送数据,必须保证我们的Tcp连接是线程安全的,而System.Net.Sockets.NetworkStream是非线程安全的,我们必须自己对其进行封装,以保证这一点。System.Net.Sockets.NetworkStream的线程安全的封装就是EnterpriseServerBase.Network.SafeNetworkStream类,它继续了ISafeNetworkStream接口:

/// <summary>

/// ISafeNetworkStream 线程安全的网络流 。

/// 注重:假如调用的异步的begin方法,就一定要调用对应的End方法,否则锁将得不到释放。

/// 作者:朱伟 sky.zhuwei@163.com

/// </summary>

public interface ISafeNetworkStream :ITcpSender ,ITcpReciever

{

void Flush();

void Close() ;

}

//用于在TCP连接上发送数据,支持同步和异步

public interface ITcpSender

{

void Write(byte[] buffer ,int offset ,int size) ;

IAsyncResult BeginWrite(byte[] buffer, int offset, int size, AsyncCallback callback, object state );

void EndWrite(IAsyncResult asyncResult );

}

//用于在TCP连接上接收数据,支持同步和异步

public interface ITcpReciever

{

int Read (byte[] buffer ,int offset ,int size) ;

IAsyncResult BeginRead( byte[] buffer, int offset, int size, AsyncCallback callback, object state );

int EndRead(IAsyncResult asyncResult );

}

该接口几乎与System.Net.Sockets.NetworkStream提供的方法一样,只不过它们是线程安全的。这样,针对同一个SafeNetworkStream,我们就可以在不同的线程中同时在其上进行数据接收/发送(主要是发送)了。

(2)Tcp监听器EnterpriseServerBase.Network.XTcpListener

不可否认,System.Net.Sockets.TcpListener只是提供了一些最低阶的工作,为了将监听线程、端口、监听事件整合起来,我引入了EnterpriseServerBase.Network.XTcpListener类,它可以启动和停止,并且当有Tcp连接建立的时候,会触发事件。XTcpListener实现了IXTcpListener接口,其定义如下:

public interface IXTcpListener

{

void Start() ; //开始或启动监听线程

void Stop() ; //暂停,但不退出监听线程

void ExitListenThread() ;//退出监听线程

event CBackUserLogon TcpConnectionEstablished ; //新的Tcp连接成功建立

event CallBackDynamicMsg DynamicMsgArrived ;

}

XTcpListener可以在不同的Tcp组件中复用,这是一种更细粒度的复用。

(3)用户连接上下文ContextKey

ContextKey用于将所有的与一个用户Tcp连接相关的信息(比如接收缓冲区、连接的状态――空闲还是忙碌、等)封装起来,并且还能保存该用户的请求中上次未处理完的数据,将其放于接收缓冲区的头部,并与后面接收到的数据进行重组。说到这里,你可能不太明白,我需要解释一下。Tcp协议可以保证我们发出的消息完整的、有序的、正确的到达目的地,但是它不能保证,我们一次发送的数据对方也能一次接收完全。比如,我们发送了一个100Bytes的数据,对方可能要接收两次才能完全,先收到60Bytes,再收到40Bytes,这表明我们可能会收到“半条”消息。还有一种情况,你连续发了两条100Bytes的消息,而对方可能一次就接收了160Bytes,所以需要对消息进行分裂,从中分裂出完整的消息然后进行处理。这,就是前面所说的需要对消息进行分裂、重组的原因。知道这点后,IContextKey接口应该比较轻易理解了,因为该接口的很多元素的存在都是为了辅助解决这个问题。IContextKey的定义如下:

public interface IContextKey

{

NetStreamState StreamState{get ;set ;} //网络流的当前状态--空闲、忙碌

ISafeNetworkStream NetStream{get ;set ;}

byte[] Buffer{get ;set ;} //接收缓冲区

int BytesRead{get ;set ;} //本次接收的字节数

int PreLeftDataLen{get ;set ;}

bool IsFirstMsg{get ;set ;} //是否为建立连接后的第一条消息

int StartOffsetForRecieve{get ;}

int MaxRecieveCapacity{get ;} //本次可以接收的最大字节数

RequestData RequestData{get ;}

void ResetBuffer(byte[] leftData) ;

//leftData 表示上次没有处理完的数据,需要与后面来的数据进行重组,然后再次处理

}

对于消息的分裂和重组是由消息分裂器完成的,由于Tcp组件的实现不需要使用消息分裂器,所以消息分裂器的说明将在后面的消息分派器实现中讲解。

(4)上下文治理者ContextKeyManager

ContextKeyManager用于治理所有的ContextKey,其实现的接口IContextKeyManager很轻易理解:

public interface IContextKeyManager

{

void InsertContextKey(ContextKey context_key) ;

void DisposeAllContextKey() ;

bool IsAllStreamSafeToStop() ; //是否可以安全退出

void RemoveContextKey(int streamHashCode) ;

int ConnectionCount {get ;}

ISafeNetworkStream GetNetStream(int streamHashCode) ;

event CallBackCountChanged StreamCountChanged ;

}

在上述四个基本元素的支持下,再来实现Tcp组件就方便了许多,无论是以何种方式(如完成端口模型、异步方式)实现Tcp组件,这些基本元素都是可以通用的,所以假如你要实现自己的Tcp组件,也可以考虑复用上述的一些基本元素。复用可以在不同的粒度进行,复用真是无处不在,呵呵。

4.完成端口Tcp组件实现

前面已经提到,完成端口模型本质思想是将"启动异步操作的线程"和"提供服务的线程"(即工作者线程)拆伙。只要做到这一点,就模拟了完成端口。

分析一下我们需要几种类型的线程,首先我们需要一个线程来接收TCP连接请求,这就是所谓监听线程,当成功的接收到一个连接后,就向连接发送一个异步接收数据的请求,由于是异步操作,所以会立即返回,然后再去接收新的连接请求,如此监听线程就循环运作起来了(已经封装成前述的XTcpListener组件了)。值得提出的是,在异步接收的回调函数中,应该对接收到的数据进行处理,完成端口模型所做的就是将接收到的数据放在了完成端口队列中,注重,是一个队列。第二种线程类型,就是工作者线程。工作者线程的个数有个经验值是( Cpu个数×2 + 2),当然具体取多少,还要取决于你的应用的要求。工作者线程的任务就是不断地从完成端口队列中取出数据,并处理它,然后假如有回复,再将回复写入对应的连接。

让我们来定义接口IRequestQueueManager,用于模拟完成端口的队列,该队列是线程安全的,用于将所有的请求进行排队,然后由工作者线程来轮流处理这些请求。

public interface IRequestQueueManager :IRequestPusher

{

object Pop() ;//弹出队列中的下一个请求

void Clear() ;

int Length {get ;} //队列长度

}

public interface IRequestPusher

{

void Push(object package) ; //向队列中压入一个请求

}

在IRequestQueueManager的基础上,可以将工作者线程和启动异步操作的线程拆开了。由于工作者线程只与端口队列相关,所以我决定将它们一起封装起来--成为IIOCPManager,用于治理请求队列和工作者线程。

/// <summary>

/// IIOCPManager 完成端口治理者,主要治理工作者线程和完成端口队列。

/// </summary>

public interface IIOCPManager : IRequestPusher

{

void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;

void Start() ; //启动工作者线程

void Stop() ; //退出工作者线程

int WorkThreadCount{get ;}

event CallBackPackageHandled PackageHandled ;

}

//IOCPPackageHandler 用于处理从完成端口队列中取出的package

public interface IOCPPackageHandler

{

void HandlerPackage(object package) ; //一般以同步实现

}

有了IRequestQueueManager和IIOCPManager的支持,实现基于完成端口模型的Tcp组件就非常简单了。当然,你也可以单独使用IIOCPManager。你只要提供一个监听者线程接收连接,并将从连接接收到的数据通过IRequestPusher接口放入端口队列就可以了。 当然,为了处理接收到的数据,我们需要提供一个实现了IOCPPackageHandler接口的对象给IOCPManager。值得提出的是,你可以在数据处理并发送了回复数据后,再次投递一个异步接收请求,以保证能源源不断的从对应的TCP连接接收数据。下面,我们来看基于完成端口模型的Tcp组件的完整实现。

完成端口Tcp组件

1/**//// <summary>

2 /// IocpTcp 完成端口Tcp组件。

3 /// </summary>

4 public class IocpTcp :ITcp ,IOCPPackageHandler

5 {

6 members#region members

7 private const int BufferSize = 1024 ;

8 private const int MaxWorkThreadNum = 50 ;

9

10 private IXTcpListener xtcpListener ;

11 private IIOCPManager iocpMgr = null ;

12 private ITcpReqStreamDispatcher messageDispatcher = null ;

13 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;

14 private bool stateIsStop = true ;

15 private bool validateRequest = false ;

16 private int curPort = 8888 ;

17 #endregion

18

19 public IocpTcp()

20 {

21

22 }

23 ITcp 成员#region ITcp 成员

24 public int ConnectionCount

25 {

26 get

27 {

28 return this.contextKeyMgr.ConnectionCount ;

29 }

30 }

31

32 #endregion

33

34 INet 成员#region INet 成员

35

36 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll

37 public void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated)

38 {

39 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;

40 if(this.messageDispatcher == null)

41 {

42 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;

43 }

44

45 this.validateRequest = userValidated ;

46 this.curPort = port ;

47

48 this.InitializeAll() ;

49 }

50

51 public void InitializeAll()

52 {

53 this.xtcpListener = new XTcpListener(this.curPort) ;

54 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);

55 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;

56 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged (this.OnStreamCountChanged) ;

57

58 this.iocpMgr = new IOCPManager() ;

59 this.iocpMgr.Initialize(this , IocpTcp.MaxWorkThreadNum) ;

60 }

61

62 public void UnitializeAll()

63 {

64 this.Stop() ;

65 this.xtcpListener.ExitListenThread() ;

66

67 //将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次

68 this.ConnectionCountChanged = null ;

69 this.DynamicMsgArrived = null ;

70 this.ServiceCommitted = null ;

71 this.SomeOneConnected = null ;

72 this.SomeOneDisConnected = null ;

73 this.UserAction = null ;

74 }

75 #endregion

76

77 Start ,Stop#region Start ,Stop

78 public void Start()

79 {

80 try

81 {

82 if(this.stateIsStop)

83 {

84 this.stateIsStop = false ;

85 this.xtcpListener.Start() ;

86 this.iocpMgr.Start() ;

87 }

88 }

89 catch(Exception ee)

90 {

91 throw ee ;

92 }

93 }

94

95 public void Stop()

96 {

97 if(this.stateIsStop)

98 {

99 return ;

100 }

101

102 this.stateIsStop = true ;

103 this.xtcpListener.Stop() ;

104 this.iocpMgr.Stop() ;

105

106 //关闭所有连接

107 int count = 0 ;

108 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到达停止安全点

109 {

110 Thread.Sleep(200) ;

111 if(10 == count++)

112 {

113 break ;

114 }

115 }

116 this.contextKeyMgr.DisposeAllContextKey() ;

117 }

118 #endregion

119

120 public event EnterpriseServerBase.Network.CallBackDynamicMessage DynamicMsgArrived;

121

122 public NetAddinType GetProtocalType()

123 {

124 return NetAddinType.Tcp ;

125 }

126

127 #endregion

128

129 ITcpEventList 成员#region ITcpEventList 成员

130 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;

131

132 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;

133

134 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;

135

136 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;

137

138 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;

139

140 #endregion

141

142 ITcpClientsController 成员#region ITcpClientsController 成员

143

144 public void SendData(int ConnectID, byte[] data)

145 {

146 this.SendData(ConnectID ,data ,0 ,data.Length) ;

147 }

148

149 public void SendData(int ConnectID, byte[] data ,int offset ,int size)

150 {

151 if((data == null) (data.Length == 0) (offset <0) (size <0) (offset+size > data.Length))

152 {

153 return ;

154 }

155

156 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;

157 if(netStream == null)

158 {

159 return ;

160 }

161

162 netStream.Write(data ,offset ,size) ;

163 }

164

165 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)

166 {

167 readCount = 0 ;

168 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;

169 if(netStream == null)

170 {

171 return false ;

172 }

173

174 readCount = netStream.Read(buffer ,offset ,size) ;

175

176 return true ;

177 }

178

179 public void DisposeOneConnection(int connectID, EnterpriseServerBase.Network.DisconnectedCause cause)

180 {

181 this.DisposeOneConnection(connectID) ;

182

183 if(this.SomeOneDisConnected != null)

184 {

185 this.SomeOneDisConnected(connectID ,cause) ;

186 }

187

188 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;

189 }

190

191 /**//// <summary>

192 /// DisposeOneConnection 主要由用户治理模块调用--当无法检测到掉线情况时,该方法保证资源被释放

193 /// </summary>

194 private void DisposeOneConnection(int connectID)

195 {

196 this.contextKeyMgr.RemoveContextKey(connectID) ;

197 }

198

199 #endregion

200

201 private#region private

202 BindRequestToQueue#region BindRequestToQueue

203 private void BindRequestToQueue(IAsyncResult ar)

204 {

205 try

206 {

207 ContextKey key = (ContextKey)ar.AsyncState ;

208 key.BytesRead = key.NetStream.EndRead(ar) ;

209 if(! this.CheckData(key))

210 {

211 return ;

212 }

213

214 this.iocpMgr.Push(key) ;

215 }

216 catch(Exception ee)

217 {

218 ee = ee ;

219 }

220 }

221

222 CheckData#region CheckData

223 private bool CheckData(ContextKey key)

224 {

225 int streamHashcode = key.NetStream.GetHashCode() ;

226 if(this.stateIsStop)

227 {

228 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;

229 return false;

230 }

231

232 if(key.BytesRead == 0) //表示客户端掉线或非正常关闭连接

233 {

234 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;

235 return false ;

236 }

237

238 if(key.BytesRead == 8)//表示客户端正常关闭连接

239 {

240 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;

241 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;

242 return false;

243 }

244

245 return true ;

246 }

247 #endregion

248 #endregion

249

250 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished

251 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)

252 {

253 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;

254 ContextKey key = new ContextKey(safeStream ,IocpTcp.BufferSize) ;

255 key.ResetBuffer(null) ;

256 this.contextKeyMgr.InsertContextKey(key) ;

257 int connectID = key.NetStream.GetHashCode() ;

258 if(this.SomeOneConnected != null)

259 {

260 this.SomeOneConnected(connectID) ;

261 }

262

263 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;

264

265 key.IsFirstMsg = true ;

266 this.RecieveDataFrom(key) ;

267 }

268 #endregion

269

270 ActivateUserActionEvent#region ActivateUserActionEvent

271 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)

272 {

273 if(this.UserAction != null)

274 {

275 this.UserAction(ConnectID ,action) ;

276 }

277 }

278 #endregion

279

280 PutoutDynamicMsg#region PutoutDynamicMsg

281 private void PutoutDynamicMsg(string msg)

282 {

283 if(this.DynamicMsgArrived != null)

284 {

285 this.DynamicMsgArrived(msg) ;

286 }

287 }

288 #endregion

289

290 OnStreamCountChanged#region OnStreamCountChanged

291 private void OnStreamCountChanged(int count)

292 {

293 if(this.ConnectionCountChanged != null)

294 {

295 this.ConnectionCountChanged(count) ;

296 }

297 }

298 #endregion

299

300 RecieveDataFrom#region RecieveDataFrom

301 private void RecieveDataFrom(ContextKey key)

302 {

303 try

304 {

305 key.StreamState = NetStreamState.Reading ;

306 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.BindRequestToQueue) ,key) ;

307 }

308 catch(Exception ee)

309 {

310 ee = ee ;

311 }

312

313 }

314 #endregion

315 #endregion

316

317 IOCPPackageHandler 成员#region IOCPPackageHandler 成员

318

319 public void HandlerPackage(object package)

320 {

321 ContextKey key = package as ContextKey ;

322 if(key == null)

323 {

324 return ;

325 }

326

327 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode

328

329 //处理请求

330 try

331 {

332 byte[] leftData = null ;

333 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;

334

335 if(this.validateRequest)

336 {

337 if(key.Validation.gotoCloseConnection)

338 {

339 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;

340 return ;

341 }

342 }

343

344 key.StreamState = NetStreamState.Writing ;

345 if(repondList!= null && (repondList.Count != 0))

346 {

347 foreach(object obj in repondList)

348 {

349 byte[] respond_stream = (byte[])obj ;

350 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;

351 if(this.ServiceCommitted != null)

352 {

353 RespondInformation info = new RespondInformation() ;

354 info.ConnectID = streamHashCode ;

355 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;

356 info.repondData = respond_stream ;

357 this.ServiceCommitted(info) ;

358 }

359 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;

360 }

361 }

362

363 if(key.IsFirstMsg)

364 {

365 if(repondList == null (repondList.Count == 0)) //表示第一条消息还未接收完全

366 {

367 key.IsFirstMsg = true ;

368 }

369 else

370 {

371 key.IsFirstMsg = false ;

372 }

373 }

374

375 key.StreamState = NetStreamState.Idle ;

376

377 key.ResetBuffer(leftData) ;

378

379 if(! this.stateIsStop)

380 {

381 //继续接收请求

382 this.RecieveDataFrom(key) ;

383 }

384 else //停止服务

385 {

386 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;

387 }

388 }

389 catch(Exception ee)

390 {

391 if(ee is System.IO.IOException) //正在读写流的时候,连接断开

392 {

393 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;

394 }

395

396 ee = ee ;

397 }

398 }

399

400 #endregion

401

402 INet 成员#region INet 成员

403

404 public IReqestStreamDispatcher Dispatcher

405 {

406 set

407 {

408 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;

409 }

410 }

411

412 public int Port

413 {

414 set

415 {

416 this.curPort = value ;

417 }

418 get

419 {

420 return this.curPort ;

421 }

422 }

423

424 public bool UserValidated

425 {

426 set

427 {

428 this.validateRequest = value ;

429 }

430 }

431

432 #endregion

433 }

5.异步Tcp组件实现

这种方式的主要思想是:当一个新的Tcp连接建立时,就在该连接上发送一个异步接收的请求(BeginRead),并在异步回调中处理该请求,当请求处理完毕,再次发送异步接收请求,如此循环下去。异步接收启用的是系统默认线程池中的线程,所以,在异步Tcp组件中不用显式治理工作线程。异步Tcp组件的实现相对于完成端口模型而言简单许多,也单纯一些,不用治理请求队列,不需使用工作者线程等等。但是,相比于完成端口模型,其也有明显的缺陷:一个Tcp连接绑定到了一个线程,即使这个线程是后台线程池中的。假如用户数量巨大,这对性能是极其不利的;而完成端口模型,则可以限定工作者线程的个数,并且可以根据应用的类型进行灵活调节。

异步Tcp组件实现源码。

异步Tcp组件

1/**//// <summary>

2 /// AsynTcp 异步Tcp组件。

3 /// </summary>

4 public class AsynTcp :ITcp

5 {

6 members#region members

7 private const int BufferSize = 1024 ;

8

9 private IXTcpListener xtcpListener = null ;

10 private ITcpReqStreamDispatcher messageDispatcher = null ;

11 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;

12 private bool stateIsStop = true ;

13 private bool validateRequest = false ;

14 private int curPort = 8888 ;

15 #endregion

16

17

18 public AsynTcp()

19 {

20

21 }

22

23 INet 成员#region INet 成员

24

25 public event CallBackDynamicMessage DynamicMsgArrived;

26

27 public NetAddinType GetProtocalType()

28 {

29

30 return NetAddinType.Tcp;

31 }

32

33 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll

34 public void InitializeAll(IReqestStreamDispatcher i_dispatcher, int port, bool userValidated)

35 {

36 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;

37 if(this.messageDispatcher == null)

38 {

39 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;

40 }

41

42 this.curPort = port ;

43 this.validateRequest = userValidated ;

44

45 this.InitializeAll() ;

46 }

47

48 public void InitializeAll()

49 {

50 this.xtcpListener = new XTcpListener(this.curPort) ;

51 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);

52 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;

53 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged(this.OnStreamCountChanged) ;

54 }

55

56 public void UnitializeAll()

57 {

58 this.Stop() ;

59 this.xtcpListener.ExitListenThread() ;

60

61 //将事件容器清空==》防止外部框架再多次初始化的过程中将一个事件预定多次

62 this.ConnectionCountChanged = null ;

63 this.DynamicMsgArrived = null ;

64 this.ServiceCommitted = null ;

65 this.SomeOneConnected = null ;

66 this.SomeOneDisConnected = null ;

67 this.UserAction = null ;

68 }

69

70 #endregion

71

72 Start ,Stop#region Start ,Stop

73 public void Start()

74 {

75 if(this.stateIsStop)

76 {

77 this.xtcpListener.Start() ;

78 this.stateIsStop = false ;

79 }

80 }

81

82 public void Stop()

83 {

84 if(this.stateIsStop)

85 {

86 return ;

87 }

88

89 this.stateIsStop = true ;

90 this.xtcpListener.Stop() ;

91

92 //关闭所有连接

93 int count = 0 ;

94 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到达停止安全点

95 {

96 Thread.Sleep(200) ;

97 if(10 == count++)

98 {

99 break ;

100 }

101 }

102 this.contextKeyMgr.DisposeAllContextKey() ;

103 }

104 #endregion

105

106 #endregion

107

108 ITcpEventList 成员#region ITcpEventList 成员

109

110 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;

111

112 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;

113

114 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;

115

116 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;

117

118 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;

119

120 #endregion

121

122 ITcpClientsController 成员#region ITcpClientsController 成员

123

124 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)

125 {

126 readCount = 0 ;

127 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;

128 if(netStream == null)

129 {

130 return false ;

131 }

132

133 readCount = netStream.Read(buffer ,offset ,size) ;

134

135 return true ;

136 }

137

138 public void SendData(int ConnectID, byte[] data)

139 {

140 this.SendData(ConnectID ,data ,0 ,data.Length) ;

141 }

142

143 public void SendData(int ConnectID, byte[] data ,int offset ,int size)

144 {

145 if((data == null) (data.Length == 0) (offset <0) (size <0) (offset+size > data.Length))

146 {

147 return ;

148 }

149

150 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;

151 if(netStream == null)

152 {

153 return ;

154 }

155

156 netStream.Write(data ,offset ,size) ;

157 }

158

159 public void DisposeOneConnection(int connectID, DisconnectedCause cause)

160 {

161 this.DisposeOneConnection(connectID) ;

162

163 if(this.SomeOneDisConnected != null)

164 {

165 this.SomeOneDisConnected(connectID , cause) ;

166 }

167

168 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;

169 }

170

171 #endregion

172

173 ITcp 成员#region ITcp 成员

174 public int ConnectionCount

175 {

176 get

177 {

178 return this.contextKeyMgr.ConnectionCount ;

179 }

180 }

181

182 #endregion

183

184 private#region private

185

186 ActivateUserActionEvent#region ActivateUserActionEvent

187 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)

188 {

189 if(this.UserAction != null)

190 {

191 this.UserAction(ConnectID ,action) ;

192 }

193 }

194 #endregion

195

196 DisposeOneConnection#region DisposeOneConnection

197 /**//// <summary>

198 /// DisposeOneConnection 主要由用户治理模块调用--当无法检测到掉线情况时,该方法保证资源被释放

199 /// </summary>

200 private void DisposeOneConnection(int connectID)

201 {

202 this.contextKeyMgr.RemoveContextKey(connectID) ;

203 }

204 #endregion

205

206 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished

207 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)

208 {

209 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;

210

211 ContextKey key = new ContextKey(safeStream ,AsynTcp.BufferSize) ;

212 key.ResetBuffer(null) ;

213 this.contextKeyMgr.InsertContextKey(key) ;

214 int connectID = key.NetStream.GetHashCode() ;

215

216 if(this.SomeOneConnected != null)

217 {

218 this.SomeOneConnected(connectID) ;

219 }

220 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;

221

222 key.IsFirstMsg = true ;

223 this.RecieveDataFrom(key) ;

224 }

225 #endregion

226

227 PutoutDynamicMsg#region PutoutDynamicMsg

228 private void PutoutDynamicMsg(string msg)

229 {

230 if(this.DynamicMsgArrived != null)

231 {

232 this.DynamicMsgArrived(msg) ;

233 }

234 }

235 #endregion

236

237 OnStreamCountChanged#region OnStreamCountChanged

238 private void OnStreamCountChanged(int count)

239 {

240 if(this.ConnectionCountChanged != null)

241 {

242 this.ConnectionCountChanged(count) ;

243 }

244 }

245 #endregion

246

247 RecieveDataFrom#region RecieveDataFrom

248 private void RecieveDataFrom(ContextKey key)

249 {

250 key.StreamState = NetStreamState.Reading ;

251 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.ServeOverLap) ,key) ;

252

253 }

254 #endregion

255

256 ServeOverLap#region ServeOverLap

257 private void ServeOverLap(IAsyncResult ar)

258 {

259 ContextKey key = (ContextKey)ar.AsyncState ;

260 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode

261

262 try

263 {

264 key.BytesRead = key.NetStream.EndRead(ar) ;

265

266 if(! this.CheckData(key))

267 {

268 return ;

269 }

270

271 //处理请求

272 byte[] leftData = null ;

273 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;

274

275 if(this.validateRequest)

276 {

277 if(key.Validation.gotoCloseConnection)

278 {

279 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;

280 }

281 }

282

283 key.StreamState = NetStreamState.Writing ;

284 if(repondList!= null && (repondList.Count != 0))

285 {

286 foreach(object obj in repondList)

287 {

288 byte[] respond_stream = (byte[])obj ;

289 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;

290 if(this.ServiceCommitted != null)

291 {

292 RespondInformation info = new RespondInformation() ;

293 info.ConnectID = streamHashCode ;

294 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;

295 info.repondData = respond_stream ;

296 this.ServiceCommitted(info) ;

297 }

298

299 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;

300 }

301 }

302

303 if(key.IsFirstMsg)

304 {

305 if(repondList == null (repondList.Count == 0)) //表示第一条消息还未接收完全

306 {

307 key.IsFirstMsg = true ;

308 }

309 else

310 {

311 key.IsFirstMsg = false ;

312 }

313 }

314

315 key.StreamState = NetStreamState.Idle ;

316

317 key.ResetBuffer(leftData) ;

318

319 if(! this.stateIsStop)

320 {

321 //继续接收请求

322 this.RecieveDataFrom(key) ;

323 }

324 else //停止服务

325 {

326 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;

327 }

328 }

329 catch(Exception ee)

330 {

331 if(ee is System.IO.IOException) //正在读写流的时候,连接断开

332 {

333 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;

334 }

335

336 ee = ee ;

337 }

338 }

339 #endregion

340

341 CheckData#region CheckData

342 private bool CheckData(ContextKey key)

343 {

344 int streamHashcode = key.NetStream.GetHashCode() ;

345 if(this.stateIsStop)

346 {

347 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;

348 return false;

349 }

350

351 if(key.BytesRead == 0) //表示客户端掉线或非正常关闭连接

352 {

353 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;

354 return false ;

355 }

356

357 if(key.BytesRead == 8)//表示客户端正常关闭连接

358 {

359 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;

360 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;

361 return false;

362 }

363

364 return true ;

365 }

366 #endregion

367 #endregion

368

369 INet 成员#region INet 成员

370

371 public IReqestStreamDispatcher Dispatcher

372 {

373 set

374 {

375 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;

376 }

377 }

378

379 public int Port

380 {

381 set

382 {

383 this.curPort = value ;

384 }

385 get

386 {

387 return this.curPort ;

388 }

389 }

390

391 public bool UserValidated

392 {

393 set

394 {

395 this.validateRequest = value ;

396 }

397 }

398

399 #endregion

400 }

今天介绍了Tcp通信层中的核心――Tcp组件,仅仅复用Tcp组件已经能为我们省去很多麻烦了,假如想进行更高层次的复用――整个Tcp通信层的复用,请关注本篇的续文。

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有