转自醒客视角 autoasm.blog-city.com
一种高性能的消息转发架构
Alex Dou
摘要 本文介绍了一种基于多路选择和逆向路由方式的消息转发架构,该架构具有更大的吞吐量,更好的可伸缩性和更小的响应延迟。
关键词 分布式 消息 转发 高性能 软件架构
1. 消息转发的应用与挑战
无论是在Internet业务中还是电信系统的核心交换系统,都使用了各种类型的消息转发代理。如下图
图1.1
上图表示了一个典型的即时消息服务的架构。由于PC1和PC2都位于带地址转换功能的防火墙后,所以,这两个客户端不能直接建立连接。一种简单的解决方法是利用消息服务器转发。首先,PC1与PC2都与消息服务器建立并保持连接。PC1先将消息发送给消息服务器,然后消息服务器再根据消息中的目的地址信息将消息转发给PC2。
除了即时消息服务外,电信交换系统中的各种网关以及网络游戏服务器等都有转发大量消息的需求。
在不考虑规模可伸缩性的情况下,实现消息转发是非常简单的。下面伪代码描述了一个最简单的实现。
//Get Request
Request req = conn1.getRequest();
//Search route table for destination connection
Connection conn2 = routeTable.getConnByAddr(req.getDestination());
//Forward request
conn2.send(req);
//… Wait for response
Response resp = conn2.getResponse();
//Forward response
conn1.send(resp);
//OK
上面这段代码描述了最基本的转发逻辑。
首先,从请求方获得请求消息,然后根据请求的目的地址,从路由表中获得到目的地址的连接,然后再将消息转发给目的连接。在请求转发完成了以后,还需要等待应答,收到应答之后,再将应答转发给最初的请求一方。
尽管在逻辑上这段代码能够正确无误的工作,但是,这种简单的设计难以处理海量的消息转发业务。
在上述代码中,等待应答(Response)操作的耗时具有不确定性。应答可能在0.5秒内收到,但是也可能过了15秒才收到,甚至干脆就超时。而让一个宝贵的处理任务(负责实现转发逻辑守护进程或者线程)在这里白白等待实在是一种浪费。在等待超时的过程中,任务队列上可能已经挂满了新的转发请求,这些请求对应到现实世界则是若干焦急等待的用户。
没有理由让用户干着急而让处理任务优哉游哉的因为等待应答而睡眠(在多数平台的实现中,在没有收到数据的情况下,同步的IO都将使得调用者所在的进程或者线程挂起,直到接收到数据或者超时事件发生)。因此,我们需要一种能使处理任务更忙碌的设计,新的设计方案要保证不必为一个慢如蜗牛的转发任务耽误其它转发。
2. 可伸缩的架构
鱼与熊掌不可得兼。
这个简单的辩证规律在信息时代同样适用。本文所介绍的转发架构在获得可伸缩性的同时,系统变得更加复杂了。
下图表示了转发架构中的主要参与者之间的关系。
图 2.1
Request和Response类分别表示了抽象的请求和应答消息。每个Request对象具有id属性,每个Response对象具有responseFor属性,如果Response对象的responseFor属性与Request对象的id属性相同,则说明是针对该Request对象的应答。
Connection类表示了抽象的连接。Selector类是一个多路选择器,它具有从一组Connection对象中选择一个激活的连接(即需要处理的)的能力。
ProxyDaemon类表示了消息转发处理任务,在真实的系统中,ProxyDaemon应该是一系列活动对象。
PendingResponse类表示未决应答。该类记录了谁在什么时候给谁发送了请求这些重要信息。这些信息是进行路由应答消息(即逆向路由)和判断应答是否超时的重要依据。
PendingList维护PendingResponse对象的列表。
TimerDaemon主要用于超时检测,该守护任务定时启动,检测PendingList中的未决应答是否超时,如果超时,则向请求方发出超时消息。
3. 请求的处理
ProxyDaemon用于转发请求与应答。ProxyDaemon的逻辑与本文第一节描述的逻辑类似,只不过只是需要添加一个未决的应答,而不需要同步等待应答。下面代码描述了如何处理请求。
//.. Shows how to handle request
Connection conn = selector.getActiveConnection();
Message msg = conn.recv();
if (msg.isRequest()) {
Request req = new Request (msg);
Connection dest = routeTbl.getConnectionByAddr(req.getDestination);
//Forward request
dest.send(req);
//Add pending response
PendingResponse pending = new PendingResponse(req);
//Get reference of sender’s connection
pending.setSender(conn);
pendingList.add(pending);
// Add destination connection into selector, so, once response received, selector can detect it
selector.addConnection(dest);
}
else {
//Response
}
在上面代码中,这个首先根据接收的消息类型来判断是请求还是应答,如果是请求,则转发,然后添加一个未决的应答,该未决应答记录了用于逆向寻址和超时检测(PendingResonse的构造函数中获得时戳)的信息。最后,将请求的目的方,即应答发出一方的连接添加到selector的等待池中。这样如果该连接上有应答消息到达,则selector可以返回该连接对象。
4. 转发响应
转发响应也是ProxyDaemon的主要工作。
在新的架构中,我们使用了多路选择机制代替同步的等待,因此,我们还需要采用一种逆向的寻址和路由机制,使得应答消息能够被正确地转发给请求发起者。
PendingResponse对象记录了用于逆向寻址的信息。通过请求和应答消息的id编号,可以正确地执行请求应答配对;另外,PendingResponse也保留了请求发起者的连接对象,因此可以将应答转发给最初的请求发起者。下面伪代码描述了转发响应得流程。
//…
Connection conn = selector.getActiveConnection();
Message msg = conn.recv();
if(msg.isRequest()) {
//Request
}
else {
//Get Response
Response resp = new Response (msg);
//Get correct pending response by ID
PendngResponse pending = pendingList.getPengResponse(resp);
if(pending != null) {
//Forward response
Pending.getConnection().send(resp);
}
else {
//timeout
}
}
当收到应答消息后,首先根据应答的responseFor属性再未决应答队列中寻找记录了逆向路由信息的未决应答对象。如果该对象没有找到,则说明应答已经超时,未决应答对象已经被删除;如果未决应答对象找到,则根据该对象保留的请求发起者的连接对象应用来转发应答。
5. 超时处理
分布式系统是不可靠的,任何人在设计协议的时候都不能假设消息能够被正确地投递和处理,因此,等待超时是必须要处理的。
在本文第一节所介绍的最简单实现逻辑中,由于是同步等待应答,所以,超时很容易判断,只要recv()方法能支持超时即可。而在本文介绍的架构中,应答也是通过多路选择方式消息驱动的,这给超时的判断与处理带来了复杂性。
我们需要采用TimerDaemon来动态检测未决应答队列。每次TimerDaemon被定时器唤醒后,都要遍历整个未决应答队列,如果有未决应答对象超时,则根据逆向路由信息向请求发起者发送超时消息,并从未决队列中删除该未决应答对象。
//…
while(!stop) {
sleep(500);
while(true) {
PendingResponse pending = pendingList.getNextPending();
if(pending == null) {
break;
}
else {
if(pending.isTimeout()) {
// Send Time out message
Connection sender = pending.getConnection();
sender.send(Message.TIMEOUT);
// Remove pending response from list
pendingList.removePendingResponse(pending);
}
}
}
}
由于处理每个未决应答对象都需要时间,因此,TimerDaemon的睡眠周期应小于协议规定的最小定时器间隔。
6. 结束语
本文所介绍的方法的本质特点是使用异步方式来处理应答,从而避免了漫长的等待。当应答到来之前,处理任务可以继续处理其他转发请求而不会被阻塞,从而大幅度提高了系统的吞吐量,另外,由于处理任务不会被阻塞,也使得系统具有了更好的可伸缩性和更小的响应延迟。