用Delphi建立通讯与数据交换服务器—Transceiver技术剖析(下)
作者:火鸟 redbirdli@hotmail.com
二、 Transceiver Service详解
1.Transceiver Service分析概要
Transceiver Service是Transceiver系统的核心构成,Transceiver Kernel负责从系统配置库读取Transceiver Console设定的Port、Channel定义与参数,运行时动态创建和管控通讯Port及其关联关系,对数据的收、发、缓冲进行调度、对日志、队列进行管理等。Transceiver Shell则是所支持全部类型的用于数据收发的Port的实现。
2.Transceiver Service设计概要
Transceiver Service是由Delphi中Service Application开发而成,Service Application可运行于系统态而非用户态,由操作系统Service Control Manager (SCM)负责程序的运行管理,Service没有用户界面,属于系统的后台程序。Transceiver Kernel是Transceiver类的一系列对Transceiver Shell建立和控管的方法,而Transceiver Shell则是一系列负责通讯的对象集合。
注:由于性能和负载的考虑,Transceiver Kernel只是从逻辑上实现上架构图中的功能划分,构成模块并未以完全对象化的方式实现。
3.Transceiver Service实现概要
i. 建立一个Service Application
从Delphi主菜单File中选择NEW|Other…在弹出的New Items对话框中选择NEW|Service Application ,可以看到生成的程序框架如下:
program Project1;
uses
SvcMgr,
Unit1 in 'Unit1.pas' {Service1: TService};
{$R *.RES}
begin
Application.Initialize;
Application.CreateForm(TService1, Service1);
Application.Run;
end.
unit Unit1;
interface
uses
Windows, Messages, SysUtils, Classes, Graphics, Controls, SvcMgr, Dialogs;
type
TService1 = class(TService)
private
{ Private declarations }
public
function GetServiceController: TServiceController; override;
{ Public declarations }
end;
var
Service1: TService1;
implementation
{$R *.DFM}
procedure ServiceController(CtrlCode: DWord); stdcall;
begin
Service1.Controller(CtrlCode);
end;
function TService1.GetServiceController: TServiceController;
begin
Result := ServiceController;
end;
end.
可以看到除了在uses单元引用了用于服务管理的SvcMgr、TService1继承自TServiced而非TForm及一个重载的GetServiceController函数和以stdcall方式调用的ServiceController过程之外,用Delphi建立一个服务程序并没有太多特别之处,Delphi Fans也许又要欢呼了,这就是Delphi RAD的强大迷人之处。另外,Service Application由于无法直接在运行时调试,也没有用户界面,开发时应考虑调试信息的无界面输出以利于调试排错。
ii. 创始满足特定需求的Port类
要使用运行处理机制统一的Transceiver Kernel,就要求Transceiver Shell中的Port有统一的处理规则,Shell中有些Port是Delphi开发环境中已有的组件类(如TCP、FTP等),而有些则不是(如MSMQ、File等)这时就需要自己动手建立一个可以满足需要的类。如:
type//由于没有用户界面,所以继承自TComponent而非TControl
TFilePort=class(TComponent)
private
FilePath:string;//获取或保存文件的文件夹位置
Prefix:string;//文件前缀
suffix:string;//文件后缀
end;
建立TFilePort类以后,Transceiver Kernel就可以使用统一的类处理方式引用和管理对象,达到从FilePath指定的文件夹下存取特定文件的目的。如果用于信源(Source),将从特定文件夹下获取满足条件的文件,如果用于信宿(Target),将把从相应信源(Source)得到的数据写入到指定文件中(事实上每一个Port对象的实际参数都来源于系统配置库中Port表的定义)。
另一个例子:
type
TCOMPort=class(TComponent)
private
ComFace:string;//获取或提交数据的COM接口
end;
TCOMPort将用于从指定COM组件接口中获取数据或将数据提交到指定的COM组件接口上进行后续处理。在Delphi中OleVariant类是实现COM组件调用的途径之一,使用TCOMPort类的必要性在于,Transceiver在必要的数据存取时才会将TCOMPort定义的COM接口实例化为OleVariant对象,使用结束即释放对象,这样能减少Transceiver和COM服务器的负载压力。其它类似组件也有相同考虑。作者此处的类举例只是一种模型,必要时应加入适当的方法与事件。在开发中作者实现的类有:TCOMPort、TMSMQPort、TDBPort、TFilePort等
iii. 多Channel的支持—声明Port的对象数组
Transceiver把一个通讯过程看作是源(Source)到目标(Target)的数据流过程,这样一个过程是Transceiver中的一个Channel,而这个Channel又是由至少两个Port构成的(一个用于Source,一个用于Target),所以要定义不定数量并且Source、Target自由组合的多个Channel,必须分别声明用于Source 和Target 的多种Port类的对象数组(并为他们建立对应的关联关系,稍后您将看到)。如:
private
{ Private declarations }
TCPSource:array of TServerSocket;// 用于TCP Source的对象数组
TCPTarget:array of TClientSocket;//用于TCP Target的对象数组
MailSource:array of TIdPOP3; //用于Mail Source的对象数组
MailTarget:array of TIdSMTP; //用于Mail Target的对象数组
fileSource:array of TFilePort; //用于File Source的对象数组
fileTarget:array of TFilePort; //用于File Target的对象数组
comSource:array of TCOMPort;//用于COM Source的对象数组
comTarget:array of TCOMPort; // 用于COM Target的对象数组
注:由于同一类型的用于Source和Target的Port运行规则的也完全不同,在Transceiver概念中被视为是完全不同并且无直接关系的对象。所以同一类型的Port,对象数组也按Source和Target分别建立。
iv. 运行时实例化对象数组
每一个对象数组的元素个数由Port Builder在运行时管理,如果用户通过Transceiver Console定义了一些某种类型的Port,Port Builder将按照其个数和各自参数实例化该对象数组。否则,该对象数组将不会被实例化。在Source类型的Port对象中,Name属性被设置为'Receive'+Port ID 的形式,在之后的数据接收触发中,这将有助于Data Dispatcher定位对象和对不同类型的Port对象进行统一调度。Tag属性被用来向Channel Controller提供其所在Channel的target ID信息。
以下是Port Builder中对comSource对象数组的实例化部分
begin //Create COM/ Receive Port
itmp:=high(comSource)+1;
// 获取comSource的当前最大个数,itmp为integer变量
SetLength(comSource,itmp+1); // 添加一个comSource数组成员
comSource [itmp]:=TCOMPort.Create(self);// 实例化成员
comSource[itmp].Name:= 'Receive'+inttostr(isource);
//设置Name属性为'Receive'+Port ID,isource为整型的当前PortID
comSource [itmp].Tag:= itarget;//设置为其所在Channel的target ID
NullTest:=rece.Fields['Address'].value;
//得到系统配置COMFace的值,NullTest为Variant变量
if (NullTest <>null) and (trim(NullTest)<>'') then
begin
comSource [itmp].ComFace:=NullTest; //将有效值赋与ComFace
NullTest:=rece.Fields['interval'].value;
//得到系统配置中COM对象获取数据的触发时间间隔
SetTimer(application.handle,isource,NullTest*60000,nil);
//为当前Port建立用于定时收取数据的触发时钟, isource为Port ID
end
else
comSource [itmp].Tag:=-1;//初始化失败,标识为无效Port
end;
comSource是用于在一定的时间间隔后对ComFace中定义的接口进行调用并获取数据的Source类Port,相应comTarget的实现与其类似,只是由于向comTarget的ComFace提交数据是一个实时过程,所以不需要用到触发间隔,省略建立时钟的两条语句即可。其它类型的Port对象创建和初始化大同小异。如,另一个MailTarget实现片段:
begin //Create SMTP/Send Port
itmp:=high(MailTarget)+1;
SetLength(MailTarget,itmp+1);
MailTarget[itmp]:=TIdSMTP.Create(self);
MailTarget[itmp].Name:=’send’+ inttostr(itarget);
MailTarget[itmp].Tag:=3;// 设置为Target Port类型标识
NullTest:=rece.Fields['Address'].value; //邮件服务器地址
if (NullTest <>null) and (trim(NullTest)<>'') then
MailTarget[itmp].Host :=NullTest
else bValid:=false;
NullTest:=rece.Fields['Port'].value; //邮件服务器端口
if NullTest <>null then
(if NullTest<>0 then MailTarget[itmp].Port :=NullTest)
else bValid:=false;
NullTest:=rece.Fields['user'].value;//登录用户名
if NullTest <>null then
MailTarget[itmp].UserId :=NullTest
else bValid:=false;
NullTest:=rece.Fields['password'].value;//登录口令
……………
……………
end;
或许你会有这样的疑惑,大量的Transceiver Shell通讯组件在运行时被Port Builder创建,Transceiver Service的性能会高吗?事实上,Port Builder的使命是在ServiceCreate事件发生时一次性完成的,Shell Port的数目只会影响Transceiver Service的初始化速度,Shell Port的通讯速度和Transceiver Servicer的整体性能将不受影响,当然系统资源可能会占用更多一些。
v. 事件的动态分配和处理
在Transceiver Shell所支持的若干种通讯Port当中,使用TServerSocket(可能您更倾向于使用Indy的通讯组件,但这并不违背Transceiver Service的设计思想,只是Shell层面的修改或增加而已)实现的TCPSource是比较有特点的一种,因为TServerSocket作为一种Source Port,不同于COM或POP3之类需要定时触发的对象,它是在Transceiver Service启动后时刻处于监听状态,当有ClientSocket连接并发送数据时产生相应事件的组件。以下是TCPSource的实例化片段:
begin //Create TCP/Receive Port
itmp:=high(TCPSource)+1;
SetLength(TCPSource,itmp+1);
TCPSource [itmp]:=TServerSocket.Create(self);
TCPSource [itmp].OnClientRead:=TCPServersClientRead;
//分配OnClientRead事件的处理过程为TCPServersClientRead
TCPSource [itmp].OnClientError:=TCPServerClientError;
//分配OnClientError事件的处理过程为TCPServerClientError
TCPSource [itmp].Name:= 'Receive'+inttostr(isource);
//设置Name属性为'Receive'+Port ID
TCPSource [itmp].Tag:=itarget; //设置为其所在Channel的target ID
TCPSource [itmp].Socket.Data:=@ TCPSource [itmp].Tag;
//将此Port对象的target ID作为指针数据附于Socket对象上
……………
……………
end;
回来接着看我们的comSource的处理,在实例化时我们为其建立了触发时钟,但如何来处理时钟触发时的事件呢?同理,也是事件处理的动态分配。
comSource的时钟的处理定义可在ServiceCreate事件处理中加入: application.OnMessage:=Timer;
实现对消息处理的重载,当有Application的消息产生时,Timer就将被触发,在Timer事件中我们过滤处理时钟触发的WM_TIMER消息,就可以按Port ID和类型实现对特定Source Port的数据获取方法的调用:
Procedure TCarrier.Timer(var Msg: TMsg; var Handled: Boolean);
var stmp:string;
Obj:TComponent;
begin
if Msg.message =WM_TIMER then//处理时钟消息
begin//根据触发消息的Port ID找到定义此消息的对象
Obj:=FindComponent('Receive'+inttostr(Msg.WParam));
if obj=nil then exit;//没有找到就退出处理
stmp:=obj.ClassName;//反射获得此Port对象的类型信息
if stmp='TIdPOP3' then GetPOP3(TIdPOP3(Obj));
if stmp='TIdFTP' then GetFTP(TIdFTP(obj));
if stmp='TFilePort' then GetFile(TFilePort(Obj));
if stmp='TCOMPort' then GetCOM(TCOMPort(Obj));
//调用COMSource的数据获取过程
……………
……………
end;
end;
vi. 获取数据
以下是COMSource的数据获取处理
procedure TCarrier.GetCOM(COMObj: TCOMPort);
var stmp:string;
COMInterface:OleVariant;
begin
try//根据ComFace的值建立COM组件对象
COMInterface:=CreateOleObject(COMObj.ComFace);
stmp:=COMInterface.GetData; //调用约定的接口方法,获取数据
while stmp<>#0 do // #0为约定的数据提取结束标志
begin
DataArrive(stmp,COMObj.Tag);
//交由data Dispatcher统一处理, COMObj.Tag为对象所在Channel的Target Port ID
stmp:=COMInterface.GetData;
end;
COMInterface:= Unassigned;
except
COMInterface:= Unassigned;
end;
end;// 完成数据提取操作,释放组件对象,直至下一次触发调用
以下是TCPSource的数据获取处理:
procedure TCarrier.TCPServersClientRead(Sender: TObject; Socket:TCustomWinSocket);
begin
DataArrive(socket.ReceiveText,integer(TServerWinSocket(sender).data^));
//交由data Dispatcher统一处理, 第二个参数为附于Socket对象sender上的Target Port ID指针值,
end;
不同类型的Source Port对象其接收数据的方式也不尽相同,但最终都将所接收到的数据交由data Dispatcher做统一处理。从实现层面讲,每加入一种数据接收对象并实现其数据接收,就为Transceiver Shell实现了一种新的Source Port。注:此处作者只是实现了接收文本数据,可能用户需要接收的是内存对象、数据流或二进制数据,对接收代码稍做更改即可。
vii. 数据调度
Transceiver Service的数据调度是由data Dispatcher逻辑单元完成的,Data Dispatcher的主要任务是对从不同的Source Port接收到的数据进行统一的管理与控制、与Channel Controller协同工作,按Channel的定义向不同的Target Port进行数据分发、监视其发送结果成功与否,并根据发送结果和系统配置库的设置决定数据是否需要提交到Queue Manager和Log Recorder进行缓冲和日志处理等等。接下来看看Source Port提交数据的DataArrive方法:
procedure TCarrier.DataArrive(sData:String;PortID:Integer);
var dTime:Datetime;
iLogID:integer;
bSendSeccess:Boolean;
begin
if sData='' then exit;//如数据为空则跳出
iLogID:=-1;
dTime:= now; //接收时间
if sData[length(sdata)]=#0 then sdata:=copy(sdata,1,length(sdata)-1);
//用于兼容C语言的的字符串格式
bSendSeccess:=DataSend(sdata,PortID) ;
//调用 Data Dispatcher发送调度方法,PortID为Target Port ID
if (TSCfg.LogOnlyError=false) or (bSendSeccess=false) then
iLogID:=writeLog(dTime, now,sData, PortID, bSendSeccess);
//根据系统配置信息中的日志处理规则和发送结果记录日志
if (TSCfg.Queueing=True) and (bSendSeccess=false) then
PutQueue(dTime, now,sData, PortID, bSendSeccess, iLogID);
//根据封装系统配置信息中Queue配置定义决定Queue处理
end;
以上是Data Dispatcher的DataArrive方法,其中Queue的处理是按照系统配置信息和发送状态决定的,也可以调整为强制性的队列化处理。下面是Data Dispatcher的DataSend方法,用于将数据按Target Port类型分发处理:
Function TCarrier.DataSend(sData:String;PortID:Integer):boolean;
var Obj:TComponent;
begin
DataSend:=false;
Obj:=FindComponent('Send'+inttostr(PortID)); //根据Port ID找到对象
if (obj=nil) or (obj.Tag =-1) then exit;
//对象不存在或因初始化失败已被标识为无效Port
case obj.Tag of
1:DataSend:=PutTCP(TClientSocket(obj),sdata);
3:DataSend:=PutSMTP(TIdSMTP(obj),sdata);
5:DataSend:=PutFTP(TIdFTP(obj),sdata);
7:DataSend:=PutHTTP(TIdHTTP(obj),sdata);
9:DataSend:=PutFile(TFilePort(obj),sdata);
11:DataSend:=PutMSMQ(TMSMQPort (obj),sdata);
13:DataSend:=PutDB(TDBPort(obj),sdata);
15:DataSend:=PutCOM(TCOMPort (obj),sdata);
……………
……………
end;
end;
值得注意的是,如果没有使用对象数组,而是每种类型的Port只有一个实例的话,处理数据分发处理的更佳办法应该是使用回调(Callback)函数,但在现在的情况下,那将导致不知应该由对象数组中哪一个成员处理数据。另外,现在的处理方法使Transceiver Kernel与Transceiver Shell没有彻底剥离,应该寻求更加抽象、独立性好的处理方法。
viii. 数据发送
以下是TCP的发送
Function TCarrier.PutTCP(TCPOBJ:TClientSocket;sdata:string):Boolean;
var itime:integer;
begin
PutTCP:=false;
try
TCPOBJ.Close;
TCPOBJ.Open;
itime:=gettickcount;//起始时间
repeat
application.ProcessMessages;
until (TCPOBJ.Active=true) or (gettickcount-itime>5000);
//连接成功或5秒超时就跳出循环
if TCPOBJ.Active then
begin
TCPOBJ.Socket.SendText(sdata);
PutTCP:=true;//发送数据成功时,返回值才为True
end;
TCPOBJ.Close;
Except
TCPOBJ.Close;
end;
end;
以下是COM的发送
Function TCarrier.PutCOM(COMOBJ:TCOMPort;sdata:string):Boolean;
var Com:OleVariant;
begin
PutCOM:=false;
try
Com:=CreateOleObject(COMOBJ.ComFace);//建立预定义的接口
PutCOM:=Com.PutData(sdata);//调用预定义的方法
Com:= Unassigned;
except
Com:= Unassigned;
end;
end;
其它类型的Port发送大同小异,在此不再赘述。到此为止,Source和Target的基本处理已经完成。一个基本的通讯功能已经建立,经过不同类型的Source和Target的自由匹配,就可以实现完全不同的通讯功能。建立多个Channel,就可以集中实现多个不同功用的通讯处理。
ix. 队列处理
在上文的DataArrive方法中当数据被发送之后,Data Dispatcher会调用数据日志记录的writeLog和队列化处理的PutQueue方法,二者的功能类似,都是根据系统参数对数据信息进行数据库的存储,不是本文的重点。而队列的Retry处理与Timer事件中按Port类型分发处理的原理类似,是依赖于Queue Timer的触发,将缓冲的数据从数据库中读出,并依照Target Port ID再次调用DataSend进行数据的发送重试,如发送成功,则本次数据传输的事务完成,否则重新进入队列等待下一次触发时间进行重试,直到发送成功或达到设置的最大重试数为止。
三、 开发经验总结
由于本文的侧重点在于说明Transceiver的核心思想与设计理念,简化和削弱了Transceiver作为后台服务应当考虑的多线程处理、对象池化以及事务支持、更为复杂强大的Source和Target的Group管理和Channel集成、收发内存对象、数据流、二进制数据的能力、系统配置信息的读取和其封装类的实现、系统及数据的安全性等等,希望读者朋友们能够抛砖引玉,理解Transceiver的设计思想,启发实际开发工作中的灵感火花,做出更加出色强大的软件。
作者:火鸟 redbirdli@hotmail.com
用Delphi建立通讯与数据交换服务器—Transceiver技术剖析(上)
用Delphi建立通讯与数据交换服务器—Transceiver技术剖析(下)
通过C#实现集合类纵览.NET Collections及相关技术
老东西:儿时的编程算法心得笔记