分享
 
 
 

如何利用jgroups实现分布式环境下消息的接受和发送

王朝java/jsp·作者佚名  2006-01-09
窄屏简体版  字體: |||超大  

为了提高应用的性能,我们准备实现分布式cache,所以我特别研究了oscache关于分布式实现的部分.

我们知道为了实现分布式环境下消息的通知,目前两种比较流行的做法是使用JavaGroups[http://www.jgroups.org]和JMS。这两种方式都在底层实现了广播发布消息。

由于JGroups可以提供可靠的广播通信.所以我们准备采用JGroups.

我自己写了一个JavaGroupBroadcastingManager.java类实现消息的管理(包括发送和接收),代码参考了oscache的相关代码,在其基础上进行了改进.

代码如下:

1、JavaGroupBroadcastingManager.java

package com.yz;

import com.opensymphony.oscache.base.FinalizationException;

import com.opensymphony.oscache.base.InitializationException;

import com.opensymphony.oscache.plugins.clustersupport.JavaGroupsBroadcastingListener;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.jgroups.Address;

import org.jgroups.Channel;

import org.jgroups.blocks.NotificationBus;

import java.io.Serializable;

import java.util.Properties;

/**

* @author yangzheng

* @version $Revision$

* @since 2005-7-14

*/

public class JavaGroupBroadcastingManager

implements NotificationBus.Consumer {

private static final Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class);

private static final String BUS_NAME = "OSCacheBus";

private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";

private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";

private NotificationBus bus;

/**

* Initializes the broadcasting listener by starting up a JavaGroups notification

* bus instance to handle incoming and outgoing messages.

*

*/

public synchronized void initialize(Properties config) throws InitializationException {

String properties = config.getProperty(CHANNEL_PROPERTIES);

String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);

if (log.isInfoEnabled()) {

log.info("Starting a new JavaGroups broadcasting listener with properties="

+ properties);

}

try {

bus = new NotificationBus(BUS_NAME, properties);

bus.start();

bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));

bus.setConsumer(this);

log.info("JavaGroups clustering support started successfully");

} catch (Exception e) {

throw new InitializationException("Initialization failed: " + e);

}

}

/**

* Shuts down the JavaGroups being managed

*/

public synchronized void finialize() throws FinalizationException {

if (log.isInfoEnabled()) {

log.info("JavaGroups shutting down...");

}

bus.stop();

bus = null;

if (log.isInfoEnabled()) {

log.info("JavaGroups shutdown complete.");

}

}

/**

* Uses JavaGroups to broadcast the supplied notification message across the cluster.

*

*/

protected void sendNotification(Serializable message) {

bus.sendNotification(message);

}

/**

* Handles incoming notification messages from JavaGroups. This method should

* never be called directly.

*

*/

public void handleNotification(Serializable serializable) {

log.info("An cluster notification message received message " + serializable.toString()

+ "). Notification ignored.");

}

/**

* We are not using the caching, so we just return something that identifies

* us. This method should never be called directly.

*/

public Serializable getCache() {

return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress();

}

/**

* A callback that is fired when a new member joins the cluster. This

* method should never be called directly.

*

* @param address The address of the member who just joined.

*/

public void memberJoined(Address address) {

if (log.isInfoEnabled()) {

log.info("A new member at address '" + address + "' has joined the cluster");

}

}

/**

* A callback that is fired when an existing member leaves the cluster.

* This method should never be called directly.

*

* @param address The address of the member who left.

*/

public void memberLeft(Address address) {

if (log.isInfoEnabled()) {

log.info("Member at address '" + address + "' left the cluster");

}

}

}

2、发送消息的程序:

package com.yz;

import java.io.FileInputStream;

import java.util.Properties;

/**

* @author yangzheng

* @version $Revision$

* @since 2005-7-14

*/

public class TestJavaGroupBroadcastSend {

public static void main(String[] args) throws Exception {

JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();

Properties properties = new Properties();

properties.load(new FileInputStream("javagroup.properties"));

javaGroupBroadcastingManager.initialize(properties);

String message = "hello world!";

while (true) {

Thread.sleep(1000);

javaGroupBroadcastingManager.sendNotification(message);

}

}

}

3、接受消息的程序:

package com.yz;

import java.io.FileInputStream;

import java.util.Properties;

/**

* @author yangzheng

* @version $Revision$

* @since 2005-7-14

*/

public class TestJavaGroupBroadcastReceive {

public static void main(String[] args) throws Exception {

JavaGroupBroadcastingManager javaGroupBroadcastingManager = new JavaGroupBroadcastingManager();

Properties properties = new Properties();

properties.load(new FileInputStream("javagroup.properties"));

javaGroupBroadcastingManager.initialize(properties);

Thread.sleep(100000000);

}

}

4、配置文件:(基本上不用改动)

javagroup.properties

cache.cluster.properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)

cache.cluster.multicast.ip=231.12.21.132

5、所需要的jar包

commons-logging-1.0.4.jar

jgroups-2.2.8.jar concurrent.jar 属于jgroups的包

6、说明:

1、发送消息和接受消息的程序都需要调用JavaGroupBroadcastingManager.initialize()方法初始化jgroup。

2、运行环境的多台服务器要在同一个局域网内,同时hosts中不要将127.0.0.1写入,以便jgroup获得本机的ip,而不是获得127.0.0.1

7、程序运行的结果:

接受端:

Jul 14, 2005 1:29:09 PM com.yz.JavaGroupBroadcastingManager initialize

INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)

Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets

INFO: sockets will use interface 10.0.99.99

Jul 14, 2005 1:29:12 PM org.jgroups.protocols.UDP createSockets

INFO: socket information:

local_addr=10.0.99.99:33637, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.99, ttl=32

sock: bound to 10.0.99.99:33637, receive buffer size=64000, send buffer size=32000

mcast_recv_sock: bound to 10.0.99.99:45566, send buffer size=131071, receive buffer size=80000

mcast_send_sock: bound to 10.0.99.99:33638, send buffer size=131071, receive buffer size=80000

-------------------------------------------------------

GMS: address is 10.0.99.99:33637

-------------------------------------------------------

Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager initialize

INFO: JavaGroups clustering support started successfully

Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.99:33617' has joined the cluster

Jul 14, 2005 1:29:14 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.99:33637' has joined the cluster

Jul 14, 2005 1:30:24 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.98:33648' has joined the cluster // 监控到发送端服务器加入cluster

Jul 14, 2005 1:30:25 PM com.yz.JavaGroupBroadcastingManager handleNotification //接受到消息

INFO: An cluster notification message received message hello world!). Notification ignored.

发送端

Jul 14, 2005 1:20:15 PM com.yz.JavaGroupBroadcastingManager initialize

INFO: Starting a new JavaGroups broadcasting listener with properties=UDP(mcast_addr=231.12.21.132;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)

Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets

INFO: sockets will use interface 10.0.99.98

Jul 14, 2005 1:20:16 PM org.jgroups.protocols.UDP createSockets

INFO: socket information:

local_addr=10.0.99.98:33648, mcast_addr=231.12.21.132:45566, bind_addr=/10.0.99.98, ttl=32

sock: bound to 10.0.99.98:33648, receive buffer size=64000, send buffer size=32000

mcast_recv_sock: bound to 10.0.99.98:45566, send buffer size=131071, receive buffer size=80000

mcast_send_sock: bound to 10.0.99.98:33649, send buffer size=131071, receive buffer size=80000

-------------------------------------------------------

GMS: address is 10.0.99.98:33648

-------------------------------------------------------

Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager initialize

INFO: JavaGroups clustering support started successfully

Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.99:33617' has joined the cluster

Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.99:33637' has joined the cluster

Jul 14, 2005 1:20:18 PM com.yz.JavaGroupBroadcastingManager memberJoined

INFO: A new member at address '10.0.99.98:33648' has joined the cluster // 监控到接受端服务器加入cluster

Jul 14, 2005 1:20:27 PM com.yz.JavaGroupBroadcastingManager memberLeft

INFO: Member at address '10.0.99.99:33637' left the cluster // 监控到接受端服务器的程序退出

现在程序已经可以正常运行,有了这个基础,分布式cache的实现指日可待.

 
 
 
免责声明:本文为网络用户发布,其观点仅代表作者个人观点,与本站无关,本站仅提供信息存储服务。文中陈述内容未经本站证实,其真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
2023年上半年GDP全球前十五强
 百态   2023-10-24
美众议院议长启动对拜登的弹劾调查
 百态   2023-09-13
上海、济南、武汉等多地出现不明坠落物
 探索   2023-09-06
印度或要将国名改为“巴拉特”
 百态   2023-09-06
男子为女友送行,买票不登机被捕
 百态   2023-08-20
手机地震预警功能怎么开?
 干货   2023-08-06
女子4年卖2套房花700多万做美容:不但没变美脸,面部还出现变形
 百态   2023-08-04
住户一楼被水淹 还冲来8头猪
 百态   2023-07-31
女子体内爬出大量瓜子状活虫
 百态   2023-07-25
地球连续35年收到神秘规律性信号,网友:不要回答!
 探索   2023-07-21
全球镓价格本周大涨27%
 探索   2023-07-09
钱都流向了那些不缺钱的人,苦都留给了能吃苦的人
 探索   2023-07-02
倩女手游刀客魅者强控制(强混乱强眩晕强睡眠)和对应控制抗性的关系
 百态   2020-08-20
美国5月9日最新疫情:美国确诊人数突破131万
 百态   2020-05-09
荷兰政府宣布将集体辞职
 干货   2020-04-30
倩女幽魂手游师徒任务情义春秋猜成语答案逍遥观:鹏程万里
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案神机营:射石饮羽
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案昆仑山:拔刀相助
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案天工阁:鬼斧神工
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案丝路古道:单枪匹马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:与虎谋皮
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:李代桃僵
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案镇郊荒野:指鹿为马
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:小鸟依人
 干货   2019-11-12
倩女幽魂手游师徒任务情义春秋猜成语答案金陵:千金买邻
 干货   2019-11-12
 
推荐阅读
 
 
 
>>返回首頁<<
 
靜靜地坐在廢墟上,四周的荒凉一望無際,忽然覺得,淒涼也很美
© 2005- 王朝網路 版權所有