Project JXTA 中对等管道的构建
潘大为 (ppalas@sina.com)
1 引言
管道的概念源于Unix,是不同线程之间直接传输数据的基本手段。JDK中Java.io包中就有管道类,同时,管道在JXTA中是最基本的概念,是对等点之间的数据传输的主要方式。对等管道协议(PBP)明确规范了对等管道的绑定,解析,响应。
本文依次剖析集中式(JDK)和对等环境下(JXTA)管道的实现方式,对比分析其异同,然后尝试在JXTA中建立一个虚拟的全双工的管道。
本文的目标是通过对不同环境下管道的实现方式对比分析,来理解为什么JXTA采用管道作为基本的数据传输手段。
2 管道的形象化描述
一个生活中的情景:现在有两个地区A,B。A是石油生产区,B是石油消费区,现在B地区需要消费A地区的石油,当然可以通过海运,空运获得,然而最通常的方式是架设输油管道。如图所示:
java中流的概念和管道的概念都可以通过此案例阐述,A与B之间连接的就是管道,负责将A的石油向B输出。A向管道输出数据(output),B从管道输入数据(input),可以这样理解,管道是A的输出对象,是B的数据源。这里就产生了三个类:输出流A,输入流B,管道。输入流B负责如何获取数据(read 操作),输出流A负责如何消费数据(write操作),管道负责连接它们(connect 操作)。其实,在实现时,管道类分解为管道口,管道出口,由入口出口负责连接。在复杂的网络环境中,这种连接方式可以有专门的网络协议负责(例如,JXTA中的PBP,全称Pipe Bind Protocol)。
由以上描述,我们可以清楚知道最原始的管道就是单向的,文章后面介绍的双向管道,是用两个单向管道虚拟的,而非真实的连接方式。不难发现管道最要害的问题是如何协调输出(A)与输入(B)。这在不同的网络环境会碰到不同的问题,最简单的是同一JVM下的不同过程(线程或任务)之间用同步方式传递数据。而对等环境下,如何去发现对方就是一个很现实的问题,这仅仅只是问题的其中之一,下面的章节会依次分析。
3 集中式环境下管道的实现
问题的描述:A与B是在同一JVM中,A,B有一方能够发现另一方的存在,A将数据发往B方,A发送数据与B接收数据是相互独立的。
现在回到问题的最初:为什么要使用管道?A只管发送,B只管接受,那么数据在哪儿呢?经过下面的分析,就会明白管道把治理数据缓冲区的重任交给了他自己,A,B均是围绕这个缓冲区来启停线程的,显然这才是问题的本质。
JDK中,类PipeInputStream(即前面所述的B)与PipeOutputStream(即前面所述的的A)可以很好的解决这一问题。首先给出类图如下。
下面是将类PipeOutputStream的connect方法代码简化后给予注释。
public synchronized void connect(PipedInputStream snk) throws IOException {
sink = snk; //将PipeInputStream的实例作为PipeOutputStream的一个属性,以便调用
snk.in = -1;//缓冲区的输入位置,<0表示缓冲区为空
snk.out = 0;//缓冲区的输出位置
snk.connected = true;
}
连接以后,PipeOutputStream的write操作直接调用sink.receive(b);这样,对缓冲区buffer的维护,就变成了read()和receive()操作之间的线程同步。JDK对缓冲区的处理非常巧妙,采用了循环列表,它用缓冲区的标志位的变化来代替数据的移动,类似于生活中的时钟把线性的时间规范为24小时来表示。这不属于本文的论述范围,就不继续分析了。
read操作,正常情况下,从out位置读取数据。缓冲区空时进入等待状态。以轮询的方式(1秒间隔)来自我释放。
receive操作,正常情况下,向in位置写入数据。缓冲区满时进入等待状态。同样,以轮询的方式(1秒间隔)来自我释放。
4 JXTA对等管道的实现
通过对JDK的分析,我们可以了解到在集中式环境下,管道的架设方案是比较简单的。在对等环境下(分布式环境下也类似),出于同样的目标,碰到的问题却在急剧的扩大。例如,管道入口和出口之间如何相互发现?数据如何保证在不同的环境下传送?甚至,对管道本身的概念发生质疑:一定是单入口,单出口吗?
JXTA规范中,管道是在端点之上的服务或应用之间发送和接收信息的虚拟连接通道,管道提供在对等端点传输之上的网络抽象。管道有点到点和广播两种通信模式。
JXTA是通过管道广告来唯一标示管道的,输出管道要找到与其广告相同的输入管道才能发送数据,广告内容如下
<!DOCTYPE jxta:PipeAdvertisement>
<jxta:PipeAdvertisement XMLns:jxta="http://jxta.org">
<Id>
urn:jxta:uuid-59616261646162614A787461503250335003093E73074218AE3ABBE08EF3CBE303
</Id>
<Type>
JxtaUnicast
</Type>
<Name>
PipeExample
</Name>
</jxta:PipeAdvertisement>
假如您需要对JXTA管道有实例化的概念,请参考Sing Li的使p2p能进行交互操作:Jxta命令shell ,这篇文章有部分内容专门介绍了如何在通过shell使用管道。本文主要是从编程的视角去看管道是如何实现的。
4.1 客户视角
Project JXTA : Java Programmer′s Guide Chapter7有个例子阐述如何去在对等点之间发送信息,读者可以到www.jxta.org下载源码。现在从客户视角简要的分析它的传送原理,要深入的了解可以看下一节的系统视角分析。
该例中,有两个对等点,并且构建了两个不同的类:一个负责接收(Pipelistener),一个负责发送(PipeExample)。具体的接收次序可以参考时序图:
类Pipelistener实现了接口PipeMsgListener,类PipeExample实现了接口OutputPipeListener。
由时序图(这是两个JVM中的类,所以时序符号是独立标示的)可以清楚的获知,各个对等点的前1,2步是相互独立的。各自的第3步,采用回调的方式建立输入和输出管道。一旦对等系统探测到对方的存在,就分别触发各自的事件发送或接收消息。显然JXTA中管道是异步的。
调试该例程时,注重先建立输入管道,然后建立输出管道。因为,输出管道在一定的时间和次数内探测不到输入管道的存在,就会主动放弃。否则,轻易让网络系统在这些无休止的探测中瘫痪。
4.2 系统视角
从上面的例程中,可以了解对等管道的创建方法,以及数据流程,但是不能明确对等系统是如何去实现的。JXTA中管道的实现比在JDK中实现要复杂得多,具体的技术标准可以参考对等管道绑定协议(PBP),此协议规范了JXTA中管道的概念,但并没有涉及到如何去实现,这同样是所有JXTA协议的特征。它们的目标是阐述what it is,而把how to do it留给开发者,这样有利于增强系统的开放性。其中Java参考实现,就是该协议实现的一个案例,以下将具体分析。
首先看管道实现的类图(以单播为例):
要害的类:
InputPipeImpl :输入管道的实现类
NonBlockingOutputPipe :输出管道的实现类
PipeServiceImpl :管道服务的实现类,负责创建输入输出管道
PipeResolver :提供管道绑定的解析服务
通过客户视角的分析,可以得知系统外部是通过PipeServiceImpl来获取输入输出管道。那么消息是如何在对等系统中通过管道过滤和传递的? 从程序实现的角度,涉及到太多的技术细节,JXTA的参考实现中有着庞杂的监听系统。本文尝试用一个案例从两个层次去解析这个问题,两个层次分别是消息的具体形式,服务和端点协议的具体分发策略。很显然,这里我们把注重力放在了管道的架构路径上,而把如何去架构放在了一边,我想它们是有先后关系的,并且距离并不遥远。
5 案例描述
现在假设有两个对等点alas 和sisal ,在一个局域网内,按照客户视角那一节的例程sisal先建立输入管道,alas建立输出管道。由于同一网内可以用广播的方式发送查询信息,可以不设rendevous,并且路由是两点间的,消息传递过程得到了一定的简化。
6 案例分析
以上案例中,从输入输出管道的建立到完成对接并传输数据总共有5个步骤:
sisal建立输入管道
alasl建立输出管道,需要查找输入管道,通过广播向网络发出管道查询消息
sisal获得alas的管道查询消息,通过单播向sisal发出响应表示
alas获得sisal的响应,通过单播向alas发出数据
sisal获得数据
6.1 输入管道的建立
sisal通过管道服务(pipeserviceImpl)创建输入管道InputPipeImpl,并将自己注册端点服务和管道解析服务中。
6.2 输出管道的建立
alas通过管道服务,分析管道广告,把自己注册于管道服务和管道解析服务。然后转交管道解析服务,查询是否存在本地或缓存中,不存在则通过集中服务向网络发送广播。rendevous首先将广播消息给端点服务,由端点服务决定用哪一个网络协议,本案例中,端点服务采用TCP协议发送广播。
广播消息的内容如下(消息是名称,类型,内容构成的三元组,类型可以是xml也可以是二进制,以下列出的均用xml格式,所以省略了type)
name=RendezVousPropagate
content=
<?xml version="1.0"?>
<!DOCTYPE jxta:RendezVousPropagateMessage>
<jxta:RendezVousPropagateMessage xmlns:jxta="http://jxta.org">
<TTL>
7
</TTL>
<DestSName>
urn:jxta:uuid-DEADBEEFDEAFBABAFEEDBABE0000000205
</DestSName>
<DestSParam>
jxta-NetGroupORes
</DestSParam>
<Path>
urn:jxta:uuid-59616261646162614A78746150325033BFED264F86E14966B71A855134C813A503
</Path>
<MessageId>
1023521770614
</MessageId>
</jxta:RendezVousPropagateMessage>
name=RendezVousPropagateurn:jxta:jxta-NetGroup
content=
<?xml version="1.0"?>
<!DOCTYPE jxta:RendezVousPropagateMessage>
<jxta:RendezVousPropagateMessage xmlns:jxta="http://jxta.org">
<TTL>
7
</TTL>
<DestSName>
urn:jxta:uuid-DEADBEEFDEAFBABAFEEDBABE0000000205
</DestSName>
<DestSParam>
jxta-NetGroupORes
</DestSParam>
<Path>
urn:jxta:uuid-59616261646162614A78746150325033BFED264F86E14966B71A855134C813A503
</Path>
<MessageId>
1023521770614
</MessageId>
</jxta:RendezVousPropagateMessage>
name=jxta-NetGroupORes
content=
<?xml version="1.0"?>
<!DOCTYPE jxta:ResolverQuery>
<jxta:ResolverQuery xmlns:jxta="http://jxta.org">
<HandlerName>
JxtaPipeResolver
</HandlerName>
<Credential>
</Credential>
<QueryID>
0
</QueryID>
<SrcPeerID>
urn:jxta:uuid-59616261646162614A78746150325033BFED264F86E14966B71A855134C813A503
</SrcPeerID>
<Query>
<?xml version="1.0"?>
<!DOCTYPE jxta:PipeResolver>
<jxta:PipeResolver xmlns:jxta="http://jxta.org">
<MsgType>
Query
</MsgType>
<PipeId>
urn:jxta:uuid-59616261646162614A757874614D504725184FBC4E5D498AA0919F662E40028B04
</PipeId>
<Type>
JxtaUnicast
</Type>
</jxta:PipeResolver>
</Query>
</jxta:ResolverQuery>
6.3 响应查询信息
sisal的网络端口监听到alas的广播消息,将消息转发给端点服务处理。端点服务查询了消息的内容,又转发给集中服务,集中服务又将查询的内容转交解析服务处理,然后解析服务转交给管道解析服务,此时发现本地注册的广告就是所要查询的广告,因此返回响应消息给了解析服务,然后有端点服务负责发送回应消息。
响应的内容:
<?xml version="1.0"?>
<!DOCTYPE jxta:PipeResolver>
<jxta:PipeResolver xmlns:jxta="http://jxta.org">
<MsgType>
Answer
</MsgType>
<PipeId> urn:jxta:uuid-59616261646162614A757874614D504725184FBC4E5D498AA0919F662E40028B04
</PipeId>
<Type>
JxtaUnicast
</Type>
<Peer> urn:jxta:uuid-59616261646162614A7874615032503386E8880590814538A64682785564B9A603
</Peer>
<PeerAdv>
对等点广告,内容较多,所以省略了
</PeerAdv>
</jxta:PipeResolver>
6.4 接收响应消息,发送数据
alas的网络端口收到sisal发送的响应消息,转交端点服务,对路由信息处理以后转交解析服务,然后到管道解析服务,发现注册表中含有客户端的事件(listener)。则触发该事件发送数据。
6.5 获得数据
sisal的网络端口收到alas发送的数据,转交端点服务,对路由信息处理以后,发现注册表中含有客户端的监听事件,则触发该事件分析数据。从下面的消息内容可以发现所得到的数据就是:Hello from peer alas
获得的消息如下
name= jxta:EndpointSourceAddress
content = tcp://192.168.0.58:9703/
name= jxta:EndpointDestinationAddress
content = tcp://192.168.0.58:9701/EndpointRouter/jxta-WorldGroup
name = Jxta:EndpointRouter
content = <?xml version="1.0"?>
<!DOCTYPE jxta:EndpointRouterMessage>
<jxta:EndpointRouterMessage xmlns:jxta="http://jxta.org">
<jxta:Src>
jxta://uuid-59616261646162614A78746150325033A85AC96048BF4C5A98438038268F5E7003
</jxta:Src>
<jxta:Dest>
jxta://uuid-59616261646162614A7874615032503386E8880590814538A64682785564B9A603/
PipeService/urn:jxta:uuid-59616261646162614A757874614D504725184FBC4E5D498AA0919F662E40028B04
</jxta:Dest>
<jxta:Last>
jxta://uuid-59616261646162614A78746150325033A85AC96048BF4C5A98438038268F5E7003
</jxta:Last>
<jxta:NBOH>
1
</jxta:NBOH>
</jxta:EndpointRouterMessage>
name = PipeListenerMsg
content = Hello from peer alas
6.6 小结
从消息的传递与处理可以看出JXTA系统中,各个服务之间有着清楚的职责划分。这样的体系结构非常有利于提供给客户透明的管道。相对于用户而言,在JXTA使用管道和在集中式的环境下(Jdk)下的区别仅仅在于增加了管道的广告来标示管道,而不需将输入管道和输出管道放在一起成对使用。
从上面的案例中,还会发现假如用的是同一个管道广告,可以有多个发送者一个接收者。这相比成对使用输入输出用途更广。当然,JXTA还支持多对多的广播管道,不过实现方式与单播不太一致,并且正在完善中。
7 如何去构建一个全双工的对等管道
上述的例程中,A是信息的发送者,B是信息的接收者,信息单向传输。能否A,B既是发送者又是接受者呢?解决办法很简单,再建立一条从B到A的管道,然后将这两条管道绑定在一起。JXTA的工具包net.jxta.impl.util含有BidirectionalPipeService类实现了这一目标。类图如下:
类图下面的三个类和一个接口均是内部类。Pipe是一个封装完好的双向管道,建立管道的过程如下时序图:(这一时序图的视角是客户对象A,B是如何外部连接的)
以上时序图中,A的accept()和B的connect()是一个异步操作,他们互相连通以后返回Pipe类的对象pipe。Pipe的inputPipe和outputPipe负责从对方获取信息和向对方发送信息。它们互不干扰,当然同步性也没有控制,很像实际生活中的IP电话。
A的accept()和B的connect(),完成了A与B的两次通信过程,建立了两个独立的管道其过程如下:(首先外部已建立了从B到A的临时管道)
A监听B的信息
B创建输入管道的广告,以此产生输入管道inputPipe,然后按消息name=inputPipeAdv将广告发送出去
A收到B的消息,以inputPipeAdv的内容作为管道广告,创建输出管道outputPipe。然后创建输入管道的广告,以此产生输入管道inputPipe,然后按消息name=inputPipeAdvAck将广告发送出去
B收到A的消息,以inputPipeAdvAck的内容作为管道广告,创建输出管道outputPipe。
A,B均返回 new Pipe(inputPipe,outputPipe)
8 总结
漫长的论述过程,完成了一个目标,弄明对等点A与B是如何通过管道通讯的。对于JXTA程序开发人员一个很重要的原则,就是明白自己的程序所在的协议层次,维护好JXTA协议的清楚的层次结构对于整个系统的可扩展和和可维护性均是至关重要的。管道在JXTA内部的构建过程,可以让我们感觉到JXTA协议的职责在Java参考实现中有着明确的划分。
在复杂的对等环境中,对等点之间的有价值的通信是非常重要的,而如何通信又是一个很复杂的问题。管道服务提供了这种便捷的通信的抽象,留给服务的享受者只有三个概念:管道广告,入口,出口。所以,我想这就是JXTA采用管道作为基本的数据传输手段的一个很重要的思想方式。
参考文献
官方的 Jxta 社区位于 Jxta.org。您可在这里找到最新的规范、文档、源代码和二进制文件
在 developerWorks Java 技术专区查找更多 Java 参考资料。
Early Adopter JXTA 作者 Sing Li
关于作者:
潘大为,华中科技大学计算机学院2000级硕士研究生,研究方向网络对等计算,网络信息系统。 Email: ppalas@sina.com
--摘自IBM网站
http://www-900.ibm.com/developerWorks/cn/java/l-jxta2/index.sHtml