输入输出流的概念在java 1.0中首次被引入。与多媒体流不同,Java数据流以一种标准的方式进行工作,将数据写入目的地和从源读取数据。如:文件,套接字,甚至键盘和屏幕(System.in和System.out)都是作为可以用输入输出流进行通讯的源和目的的普通的例子。实际上,一些对象,如套接字,可以同时既是目的也是源。
Java消息服务是企业级应用在一个分布式的环境中相互通讯的标准方式。虽然这是一个众所周知并大量验证的方法,但是它是复杂的和有时显得很粗笨的消息驱动框架,它缺少了一些简单的流框架就可以提供的能力。MantaRay,一个开源的数据消息项目(JMS提供者),基于点对点,无服务端的架构,它通过融合了JMS和数据流两者的优点来解决上述的问题。
这篇文章将会讨论隐藏在流概念中的能力和由MantaRay开源项目开发的企业级数据流,一种新型的流,它的目的地和源是JMS的topic和queue。
输入输出流的能力
概括
流的最重要的能力特性在于,无论目的和源是什么,通讯API始终保持一致。写入一个套接字和写入一个文件是一样的。一个FileOutputStream对象可以专门为文件的操作提供额外的功能,但基础的读写仍然是一样的。结果是假如你不使用额外的功能,那么你就可以简单的替换掉源/目的而不需要改变你的代码。
图1显示了流是如何使用的
图1.使用流
针对每种目的的流API是相同的和很原始的。举例,InputStream类的read()方法的返回值是一个在0-255之间的int。出于这个原因,Java提供了一组写入这些流更多复杂数据的工具。这些工具包括像writer和reader这样的帮助类,也有包装原始流的包装流。这些工具将在下一节讨论。
流交换和改变
另一个强大的特性是在一个流之上包装另一个流。假如你想写数据到一个文件但希望数据是被压缩的,你只需简单地创建一个FileOutputStream对象并将它包装在一个ZipOutputStream对象内。然后你写数据到ZipOutputSream,它会压缩数据并将它传递给FileOutputStream,它会将其写入到一个物理文件中。类似的,你可以使用FileInputStream和ZipInputStream对象从一个.zip文件中读出内容。
实际上,你可以将多个流链接起来。比如,你可将一个SocketOutputStream包装在一个CipherOutputStream对象内用来加密数据,再包装在一个ZipInputStream对象内用来压缩数据。当数据从一个流传递到另一个流时, 每一个对象都在数据上进行自己的操作。
图2显示了包装流的一个例子
图2.包装流
MantaRay的企业级流
为什么需要企业级流?
与大多数流工作于物理的IO组件不同,MantaRay企业级流工作于JMS的队列(queue)和主题(topic)之上。JMS是通过队列和主题传递消息的一个面向消息的标准,通常用在企业级应用环境中。
举例来说,让我们看看两个想通过队列来进行通讯的应用。一个应用向队列中发送一条消息,另一个应用则接收这条消息。下面的代码演示了这一过程,使用JMS 1.02实现。
// 发送代码:javax.jms.QueueConnectionFactory conFactory = new
...
// look up in JNDI or create an instance
// 在JNDI中查找或创建一个实例javax.jms.QueueConnection con
= conFactory.createQueueConnection();
// 创建一个非事务的自动确认的sessionjavax.jms.QueueSession sendSession
= con.createQueueSession(false
,Session.AUTO_ACKNOWLEDGE);javax.jms.Queue sendQueue
= sendSession.createQueue (sQueue);javax.jms.QueueSender sender
= sendSession.createSender(sendQueue);javax.jms.TextMessage msg
= sendSession.createTextMessage();msg.setText( "some text" );sender.send( msg,
javax.jms.DeliveryMode.NON_PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,MESSAGE_TTL);
// 接收代码:javax.jms.QueueConnectionFactory conFactory = new ...
// 在JNDI中查找或创建一个实例javax.jms.QueueConnection con
= conFactory.createQueueConnection();
// 创建一个非事务的自动确认的sessionjavax.jms.QueueSession receiveSession
= con.createQueueSession(false
,Session.AUTO_ACKNOWLEDGE);javax.jms.Queue receiveQueue
= receiveSession.createQueue (rQueue);javax.jms.QueueReceiver qReceiver
= receiveSession.createReceiver(receiveQueue);javax.jms.TextMessageMessage
=(TextMessageMessage)qReceiver.receive();
正如你所看到的,不仅代码有点复杂,而且它也是面向消息而非面向流的。当一个用户向企业级流中写入数据时,流会将数据剪切成包并将他们包装到一个JMS消息中。然后将消息发送到预设的队列或主题中,那里消息将被作为输入流处理,解包数据,并为目的端用户作好读取的预备。
图3 显示了数据是如何在MantaRay企业级流中被处理的。
图3.MantaRay企业级流中数据处理的过程
因为企业级流扩展了InputStream和OutputStream对象,就象一个套接字或文件的Input/OutputStream做的那样,你可以将它们像流一样使用,从而释放出流所特有的能力。
·因为提供了所有流可以共享的接口,所以它们使用起来非常简单。
·你可将它们与其它流包装在一起从而获得如压缩和加密那样的扩展功能。
·你不用担心将数据拆分包,缓存的分配或其它的低层次的针对数据传递的问题
使用企业级流
JMS队列都是点对点的通讯。当在一个队列上使用企业级流时,在同一个队列上应该只有一个输出流和一个输入流。虽然大多数的JMS提供者,包括MantaRay,在同一个队列上启用了多个生产者和消费者,但这并没有在JMS标准中定义,因此当在一个队列上使用企业级流时,这就是一种误用。原因在于,特定队列上的一条消息只能传递给一个消费者;因此,假如有多个消费者,将会‘偷’走其它人的消息。
JMS主题定义了多对多的通讯。当企业级流使用一个主题用作源或目的时,同一个主题只能有一个发布者,同时可以有多个订阅者。原因在于,同一个主题上的多个发布者只会搅乱数据,并向订阅者发送无意义的输出。
但是,实事上基于主题的可以有多个订阅者,可以使用类似广播的一对多的通讯。输出流的用户不需要为每一个对端治理一个输出流。而是,用户只需简单地将数据发送到输出流中,那么所有的主题订阅者将会收到消息。
企业级流使用connect方法来绑定到一个主题或队列上。只有当流联接以后,你才能从中写入或读取。基于队列的流,输出流产生的数据存放在队列中,直到有输入流来‘消费’它。因为有存储数据的能力,联接的顺序是不重要的。输出流可以联接后送入数据,输入流则可以在任何时候联接,并取走从头开始的所有数据。使用主题作为输入流时有些不同,主题只能收到当它联接以后产生的数据。虽然这在某些情况下并不是个问题(比如,一个持续的CPU使用报告),但在某些情况下却很重要(比如,文件传输)。
MantaRay企业级流示例
提供—阅读式图表
考虑一个图型数据提供者组件,它产生持续的数据—可以是内存使用率,股票价格,工厂输出或是地球上某一时刻火星人的数目。这种数据需要显示在一个叫做图表阅读器的几个不同的位置。
因为数据是持续的并且通讯是一对多的类型,这个任务可以被MantaRay企业级流轻松的解决。
图4显示了图表阅读器组件
图4.图表阅读器组件
下面是图表数据提供者组件一个简短的示例:
import org.mr.api.blocks.MantaOutputStream;
.../*** 创建随机的数据并将它发送给图表阅读器
*/public class GraphFeeder {
public static void main(String[] args)
throws Exception {
// 创建一个具有4字节容量的小包的输出流,这个容量可以告诉流要将
// 信息切成小包
MantaOutputStream out =
new MantaOutputStream(4);
// 将企业级输出流联接到一个叫graph的主题上
out.connect("graph",
MantaOutputStream.TOPIC);
// 将数据包装在DataOutputStream中,这样我们可以轻松地写入整数
DataOutputStream dos =
new DataOutputStream(out);
int currentStatus = 0;
for(int rounds =0 ;rounds < 30000
;rounds++){
// 在图表中产生随机的波动
int rand =(int)
(System.currentTimeMillis()%777);
if(rand%2 ==0){
currentStatus++;
}else{
currentStatus--;
}
// 将数据写入流中
dos.writeInt(currentStatus);
// 在产生更多的随机数时睡眠一会儿
Thread.sleep(rand/3);
}
}}
下面是一个图表阅读器组件的简短代码示例:
import org.mr.api.blocks.MantaInputStream;
.../*** 以图表的方式显示从输入流中获取的数据
*/public class GraphViewer {// 运行程序
public static void main(String[] args)
throws IOException {
// 初始化程序
GraphViewer view = new GraphViewer();
view.init();
}
// 程序逻辑
public void init() throws IOException{
// 初始化图表
InitGraph();
// 创建 一个企业级输入流
MantaInputStream in =
new MantaInputStream();
// 将企业级输出流联接到一个叫graph的主题上
in.connect("graph",
MantaOutputStream.TOPIC);
// 将输入流包装到DataOutputStream中,这样我们可以轻松地读出整数
DataInputStream dis =
new DataInputStream(in);
int input =0;
boolean go = true;
while(go){
try{
//读数据
input=dis.readInt();
}catch(IOException e){
e.printStackTrace();
go =false;
}
// 更新图表
updateGraph(input);
}
}}
示例显示了将一个流包装到另一个流中去的这种能力。正如你在示例中所看到的,一个DataInputStream和一个DataOutputStream被用来以向流中写入整数。这个图表阅读器示例的完整代码可以在MantaRay的最新版本的sample目录下找到。这篇文章简化了代码的例子是为了让它适合这篇文章的长度。
JMS提供了增强的特性,比如选择器,它们对于有些任务来说是有用的,但并没用被企业级流支持。另外,面向消息提任务可以使用JMS API来使其变得更简单。上述例子显示了一个像这样的简单的面向流的任务如何从使用MantaRay的企业级InputStream和OutputStream的简单的面向流的API中获益的。
结论
将JMS的能力与流组合在一起是非常强大的。当应用使用J2EE框架作为通讯架构时,可以使用所有流所具有的强大的能力。对于不想与JMS对象打交道的用户来说,流也是很有用的,虽然他们有时稍显复杂和粗笨。
企业级流是MantaRay的“积木块”集合的一部分—简单并且直接的工具简化了编写分布式应用的过程,并扩展了MantaRay为分布式应用通讯所提供的能力。