在这个例子中,我们会设计和编写一个分布式JXTA应用,可以解决并行计算问题。我们将用一个反复使用的形式建立这个应用,在每步中扩大它的能力和 使用到的API集。这些原代码只能覆盖最重要的部分,需要全部代码可以到网站上查询
一些大型计算问题的子集可以用并行的方法解决。一个工作的并行执行意味着你可以将一个问题分解为几个小的子问题, 这些子问题又可以被同时的被执行。当一个子问题结束后,它将返回自己的结果到主程序,主程序将这些返回的小结果组合为更大的答案。
举个例子,考虑将任何2个整数间的质数列出来的工作。质数就是那些只能被它自己和1整除的自然数。 那些可以被1和其他比自己小的数整除的叫合数。因此,最简单的方法产生一个质数列表就是在自然数列表中消除所有的合数,剩余的就是质数了。
这个方法反复消除一系列自然数中的合数。它将列表中的每个数都用2到它的平方根去除。如果任何一个数被这样除了之后没有余数,那么这个数就是合数,并且对它作上标记。 当这些反复(工作)结束了之后,所有的被标记的数将被消除,剩下的就是质数了。
但是如果是一个很长的列表,有数百万的长度,那么我们将它分为几个小一点的列表,然后对每个列表执行上述方法。每个子计算被分发到网络上不同的机器去执行,充分利用分布的计算资源。质数的查询是这些可以被并行分解的大型问题集中的一个,现在流行的P2P风格的软件有不少,比如SETI@HOME工程,它的目标是对来自外太空的信号解码,寻找外星智能生物,类似的工程还有让用户共享出他们空闲CPU的资源去模拟蛋白质的折叠或者解码DNA串。
在这个应用中,主程序将向用户请求2个数,然后产生一个在2数之间包含所以质数的数列。这个主程序将首先在JXTA网络上尝试找到提供质数查询服务的的其他peer,然后将列表的片段分发给它们去执行。当一个peer完成了它的那一部分,它将返回这个片段中质数的数组。
为了这样的分配能够进行,我们要让所有peer能够在网络上通告(advertise)它具备质数查询的能力以让其他的peer可以找到并连接它。
JXTA Application Design
可能这个应用最不平常的一个方面就是,每个peer既扮演主程序(master process)的角色和一个从属程序的角色(slave),也有可能一个从属程序还要决定是否进一步的分解这个问题到的子任务。这种服务模式/客户模式操作是P2P程序设计的实质。我们将它定义为SM/CM操作
消息定义
当我们设计一个JXTA程序的时候,我们必须忍受JXTA是一个基于消息的系统:2个peer中的主要协议(contract)主要是通过消息。这样,设计程序的第一个工作就是定义消息传递。在这个质数查询应用中,一个peer传递一条包含2个边界数的消息给另外一个peer,接受的一方计算在这2个边界数中的质数,返回到以前的peer
net.jxta.endpoint.Message 类 概括出一个消息的感念,它允许赋予任何一个消息集一个键(KEY)。我们将在下表中用键-值 来表示这个类的实例
Table 16.1. Request Message
Key
Value
ServiceConstants.LOW_INT
Lower boundary of the (sub)list
ServiceConstants.HIGH_INT
Upper boundary of the (sub)list
Table 16.2. Response Message
Key
Value
ServiceConstants.LOW_INT
Lower boundary of the (sub)list
ServiceConstants.HIGH_INT
Upper boundary of the (sub)list
ServiceConstants.PRIMELIST
A string containing all primes between the bounds of the list. The primes are separated by ; characters.
服务的定义与发现
下一步,我们必须定义一个让主程序找到从属程序的方法,换句话说,我们比如让peer预先了解到提供这个服务的其他peer。
就象早先提及的那样,一个JXTA服务是用它的module类和specification定义的。因此, 我们将为这个质数查询服务module和specification定义一个通告. 然后让一个peer提供一个让这些通告在JXTA网络中传播的服务。这个质数查询module类将采用JXTACLASS:com.sams.p2p.primecruncher这个名字,module的spec将采用the name JXTASPEC:com.sams.p2p.primecruncher这个名字
主程序将用这个名字去发现通告module的说明。
因此, 除了消息的定义, 服务名字字符串也是peer在设计阶段应该获取的信息。所有 peer交互的信息将在运行的时候被发现。
服务实现
当一个质数查询服务开始的时候,它将初始化JXTA平台以得到去World and Net Peer Group的通道。当初始化只后,peer将创造并发布它的通告,包括它的module类和module说明通告。
模板说明通告将包含一个管道通告。 那些发现有此服务的一个模板说明通告的客户端必须得到这个管道通告,并且通过这个管道连接到那个服务。
发布了通告之后,我们的服务打开一个输入通道并且对进来的消息进行监听。当一个消息到达之后,这个服务尝试从这个消息中获得high与low这2个边界数字,然后将它们传递到一个仅产生质数链表的组件中。 当那个组件返回了结果(一个含质数的数组),这个质数查讯服务将产生一个包含结果的消息并将此消息送回客户端。在最先的反复中,服务将简单地打印出它接受到的消息。接下来的提炼中,它将打开一个管道将结果送回客户端。客户端将把这些从各个peer中得到的结果组合起来并将最后的数列存入文件中。.
Listing 16.2 Outline of PrimePeer and Initialization of a JXTA Peer
package primecruncher;
import net.jxta.peergroup.PeerGroup;
import net.jxta.peergroup.PeerGroupFactory;
import net.jxta.peergroup.PeerGroupID;
import net.jxta.discovery.DiscoveryService;
import net.jxta.pipe.PipeService;
import net.jxta.pipe.InputPipe;
import net.jxta.pipe.PipeID;
import net.jxta.exception.PeerGroupException;
import net.jxta.protocol.ModuleClassAdvertisement;
import net.jxta.protocol.ModuleSpecAdvertisement;
import net.jxta.protocol.PipeAdvertisement;
import net.jxta.document.*;
import net.jxta.platform.ModuleClassID;
import net.jxta.platform.ModuleSpecID;
import net.jxta.id.IDFactory;
import net.jxta.endpoint.Message;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileOutputStream;
import java.io.StringWriter;
public class PrimePeer {
private static PeerGroup group;
private static DiscoveryService discoSvc;
private static PipeService pipeSvc;
private InputPipe inputPipe;
private static final String PIPE_ADV_FILE = "primeserver_pipe.adv";
public static void main(String[] argv) {
PrimePeer pp = new PrimePeer();
pp.startJxta();
pp.doAdvertise();
pp.startService();
}
public PrimePeer() {
}
private void startJxta() {
try {
group = PeerGroupFactory.newNetPeerGroup();
discoSvc = group.getDiscoveryService();
pipeSvc = group.getPipeService();
} catch (PeerGroupException e) {
System.out.println("Cannot create Net Peer Group: " + e.getMessage(
));
System.exit(-1);
}
}
/**
* Create and propagate advertisements
*/
private void doAdvertise() {
...
}
/*
* Start up the service, listen for incoming messages on the service's input pipe.
*/
private void startService() {
...
}
/**
* Compute the requested list of prime numbers.
*/
private void processInput(String high, String low) {
...
}
}
在这个startJxta()服务初始化方法中,我们首先获得一个通往World Peer Group的引用(reference):这是通过一个静态PeerGroupFacrory实现的。调用这个方法将为JXTA的运行作好准备。下一步中,我们将获得一个到2个Net Peer Group提供的peer group服务的引用:DiscoveryService和the PipeService。我们将在创建服务通告的时候用到它们2个。