好久没写过编程的玩意了,搞一篇。
到新公司这半年在项目中遇到很多类似MessageQueue的实现。比如我们要implements一个轻量级的java.awt,其中有一个EventQueue。另外有一个系统上的Navigator的模块,其中Doc/View的实现中doc的Notify也是以这种方式来做的。还有很多,这个模型用处还是非常大的.异步通信的常用方式之一。
比如Windows message,java.awt的EventQueue.于是我试试设计一个相对通用简单的实现,这样其他类似的应用可以直接使用。
总结了一下一个message queue的应用一般由
1.消息:Message
这个Message的定义很广泛,一般定义一个接收者的某个操作,可以包含一些其他的信息。
2.队列结构:queue.
队列的实现有很多种,普通的fifo,优先队列(带优先级的队列)等等。是一种基本的数据结构。提供基本的操作:
getMessage:以队列的规则取出一条Message
putMessage:以队列的规则插入一条Message
peekMessage:和getMessage的结果一致,但是并不取出这条Message,而是“返回”
isEmpty():队列是否为空
isFull():队列是否为满
另外可以有其他的操作,但是最好不要有破坏队列的结构,除非有特殊的应用(比如合并),在我们的设计里,需要用户来实现这些接口以提高灵活性。
3.消息派发线程:Message dispatching thread
为了实现异步的通信,这个角色是不可缺少的,一般情况下它是Message地第一接受者,也是Message的“防火墙”,他的运作是很简单的,不断地在Queue里面取Message,再派发给相应的真正接受者或者根据Message做某些操作。如果Queue是空的,他将等待。
4.Message发送者,Client
客户线程将操作指定为Message发送给消息派发线程。完成操作。
上面是个最基本的消息队列的方式。根据这些需求,我们可以制定一个比较通用的消息派发器接口。
Message的封装:
为了适应各种Message的应用,Message仅将提供一个doOnMessage()的窄接口。这里是Message被执行的具体实现。比如类似一种Event地实现(简单的例子,event是一种操作的标示,不代表操作,也是较常用的用法):
public class EventMessage implements Message {
public final Event event;
public EventMessage(Event e) {
this.event = e;
}
public void doOnMessage() {
e.getSource().dispatchEvent(e);//简单的例子。不要误解:>
}
}
消息派发线程:
public void run() {
while(true) {
synchronized(messageQueue) {
while(messageQueue.isEmpty()) {
try {
messageQueue.wait();
} catch (InterruptedException e) {
return;//interrupted here
}
}
Message msg = messageQueue.get();
}
msg.doOnMessage();//no lock hold here;
}
}
public boolean postMessage(Message m) {
synchronized(messageQueue) {
return messageQueue.putMessage(m);
}
}
还有一类应用,属于发送消息后需要等待到消息处理结束,不过这个应用是有一定的危险性的,比如在消息派发线程的处理中又产生了新的消息,并且等待这个消息,那么将出现死锁。在java.awt.eventQueue中提供了方法invokeAndWait中将判断当前线程是否为dispatching thread,如果是将出现错误。我们也可以效仿这种方式,抛出异常。另外对于等待的实现,我的做法是仍然以接口的形式放到Message中,添加两个:
public void waitForMe(long timeout);
public void notifyMe();
一个简单的实现可以是(客户可以制定其他的等待方式):
增加一个isDone标志,默认false;
public void waitForMe(long timeout) throws ...//在等到超时的情况下抛出异常,或者是其他处理
{
synchronized(this){//message itself
if(isDone)return;
try {
this.wait(timeout);
} catch(InterruptedException e) {
//
}
if(!isDone)throw new ...;//waiting is still not set,timeout.
}
}
public void notitfyMe() {
synchronized(this) {
if(waiting) {
isDone = true;
this.notifyAll();
}
}
}
那么在postMessageAndWait里面:
public void postMessageAndWait(Message m) throws ...{
if(Thread.currentThread() == this)throw new ...//当前线程不允许发送等待消息
postMessage(m);
m.waitForMe();//发送消息线程进入等待
}
消息派发线程在处理的时候,消息处理结束后通过调用Message.notifyMe()解除锁定。
有些消息的等待机制并不一定以这种实现,这个仅仅是个简单的例子。
从接口上看,消息是否需要等待,也可以让用户指定Message为等待的,那么postMessageAndWait就不需要了,postMessage里面来根据Message是否为等待来决定要不要waitForMe.具体实现不再赘述。
这样一个简单的消息派发线程基本完工,不过似乎少了些什么,对阿,退出的处理。一个优雅的实现是定义一个退出的消息,为了方便的退出,Message的doOnMessage可以定义为返回boolean 的值来确定退出,也可以是用其他的方式,反正就是以消息作为推出的标示是一个不错的选择。另外,个人并不推荐Thread.interrupt,虽然这是一个非常不错的退出线程的方法(能够快速的退出等待状态),但是,Message.doOnMessage是完全让用户来实现,interrupt很可能对用户自身的一些操作产生影响。不过总的来说这个方式也不是不可用的,只不过没有推出消息那么“雅观”。当然,如果这个线程的操作并不是一个很“重要”的操作,设置成daemon线程也是可以的。这个可以作为构造函数的参数。
至此,一个简单的MessageQueue的模型就出来了,从后面的讨论需要对前面提到的一些接口做一定的修改,个人所好不同,只要完成基本的功能,都是可以的。另外要提到的是队列的实现,我们可以提供一个默认的普通队列,这样就省得用户再去实现自己的队列了。至于优先队列,有很多的结构可用,比如堆。其实比较常用的是按照优先值来索引:
SubQueue queues[MAXP];//maxp是最大优先值
对于优先队列,一个比较隐蔽的问题是等级低的消息被饿死,比如有很多等级很高的消息过来,然而处理的时间总是不够,结果消息派发线程一直在处理这些高等级的消息。这个问题可以用一些折衷的办法解决,最终目的是不至于那些低等级的消息一致得不到处理。
差不多,就这样了,当然还有很多的东西比如消息合并,并没有提供接口,可以在将来的考虑之中。