这篇文章的目的就是写一个简单的基于文本的聊天程序(SimpleChat),包含如下功能:
JGroups 使用 JChannel 作为连接到组、发送和接收消息的主 API,并可通过 JChannel 注册用来处理这些事件(成员加入、退出和发送消息)的侦听器。
而 Messages 是发送的消息,它包含一个字节缓冲区、发送和接受者地址。Addresses 是 org.jgroups.Address 的子类,通常包含一个 IP 地址和端口。
组中的实例列表被成为 View,每个实例包含相同的 View,可通过 View.getMembers() 来获取所有实例的地址列表。
实例 Instances 只能在加入组后才能发送和接收消息。
当一个实例要离开组时,JChannel.disconnect() 或者 JChannel.close() 方法会被调用,后者实际上会判断当连接还保持时调用了 disconnect() 方法来关闭通道。
要加入一个组,我需要使用 JChannel。一个 JChannel 的实例可以通过一个配置来创建,配置中定义了通道的属性。然后通过 connect(String clustername) 来连接到组中。所有通道实例都是调用 connect() 并使用相同的参数来加入相同的组中。下面让我们实际创建一个 JChannel 并连接到名为 ChatCluster 的组中:
import org.jgroups.JChannel; public class SimpleChat { JChannel channel; String user_name=System.getProperty("user.name", "n/a"); private void start() throws Exception { channel=new JChannel(); // use the default config, udp.xml channel.connect("ChatCluster"); } public static void main(String[] args) throws Exception { new SimpleChat().start(); } }
首先我们使用空的构造器来创建一个 JChannel 实例,该方法使用默认的配置。你也可以传递一个 XML 文件来配置这个 JChannel,例如 new JChannel("/home/bela/udp.xml").
connect() 方法加入 ChatCluster 组中。注意我们并不需要事先明确的创建一个组,connect() 方法会判断如果是组中的第一个实例时自动创建该组。之后其他的实例连接过来就加入了相同的组,例如:
这样我们就有三个组:"cluster-one" 包含 ch1 和 ch4, "cluster-two" 包含 ch2 和 ch3, 而 "cluster-three" 只有一个 ch5 实例.
现在我们运行一个事件循环来从标准控制台输入中读取文本消息,然后发送到组中所有成员。当输入 exit 或者 quit 命令时,将会退出循环并关闭通道
private void start() throws Exception { channel=new JChannel(); channel.connect("ChatCluster"); eventLoop(); channel.close(); } private void eventLoop() { BufferedReader in=new BufferedReader(new InputStreamReader(System.in)); while(true) { try { System.out.print("> "); System.out.flush(); String line=in.readLine().toLowerCase(); if(line.startsWith("quit") || line.startsWith("exit")) break; line="[" + user_name + "] " + line; Message msg=new Message(null, null, line); channel.send(msg); } catch(Exception e) { } } }
在这里添加了 eventLoop() 的调用,然后关闭通道的方法。
事件处理循环当输入回车时就会发送消息到组中,这是通过构造一个新的 Message 实例然后调用 Channel.send() 方法来发送。
Message 类构造器的首个参数是目标地址,该参数设置为 null 表示将发送给组中所有成员(非空的参数表示要发送到指定的某个成员)。
第二个参数是发送者的地址,这也是 null,JGroups 会自动使用当前的恰当的地址。
第三个参数是我们从标准控制台输入中读到的键盘输入的内容,这个内容会通过 Java 的序列化机制变成 byte[] 数据并设置为 Message 的内容。注意我们也可以自己来序列化一个对象(而且也推荐这样做),然后给 Message 构造器第三个参数传递 byte[] 值。
到这里应用程序功能就已差不多完整,不过还缺少一样,没有对接收到的消息进行提醒和显示,接下来我们介绍消息的接收。
现在我们要注册一个 Receiver 来接收消息并查看组中成员的变动。我们可以实现 org.jgroups.Receiver,不过这里我选用直接继承 ReceiverAdapter 类,因为该类已经有一些默认的实现方法了。然后只需要重载回调函数 receive() 和 viewChange() 即可:
public class SimpleChat extends ReceiverAdapter {
设置接收器的 start() 方法:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); eventLoop(); channel.close(); }实现 receive() 和 viewAccepted() 方法:
public void viewAccepted(View new_view) { System.out.println("** view: " + new_view); } public void receive(Message msg) { System.out.println(msg.getSrc() + ": " + msg.getObject()); }
viewAccepted() 回调函数会在新成员加入组中,或者已有成员崩溃了或离开组时被调用。其 toString() 方法会打印 View ID(也就是成员ID)以及当前成员列表。
在 receive() 方法中我们可以得到一个 Message 的参数,只需要读取其缓冲区内容并输出到控制台,同时我们也把发送者的地址打印出来 (Message.getSrc()).
注意我们也可以通过调用 Message.getBuffer() 来获取消息实体中的 byte[] 数据然后通过自己的反序列化来处理,例如 String line=new String(msg.getBuffer()).
现在我们这个演示程序的所有功能均已完成,可使用如下命令来运行试试(译者注:请自行将 jgroup-xxx.jar 添加到类路径):
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-48776, cluster=ChatCluster, physical address=192.168.1.5:42442 ------------------------------------------------------------------- ** view: [linux-48776|0] [linux-48776] >
我们启动的实例名是 linux-48776 ,其物理地址是 192.168.1.5:42442 (IP address:port). 这个名字是由 JGroups 来生成的(如果用户没有设置的话,通常是主机名加一个随机的数字),该名字一直存在并在整个生命周期中保持不变,同时映射到一个基础的 UUID,而 UUID 映射到物理地址。
接下来启动第二个实例:
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-37238, cluster=ChatCluster, physical address=192.168.1.5:40710 ------------------------------------------------------------------- ** view: [linux-48776|1] [linux-48776, linux-37238] >
现在组中有两个成员,包括 [linux-48776, linux-37238], 这里显示的是加入组中两个成员的名称。注意第一个成员 (linux-48776) 同样也接收到相同的 view,因此两个成员包含了同样的成员列表,顺序也一致。实例是根据加入组中的先后顺序列表的,因此最早加入的成员排在最前面。
只需要输入消息并按回车键就可以发送消息。消息被发送到组中,因此组中所有成员都将接收到该消息,包括发送者自己。
当输入 exit 或者 quit 并按回车,实例将会退出组。
为了模拟成员崩溃的情况(例如在控制台中按 Ctrl + C),其他的幸存者将接收到一个新的 view ,而当前的组中只有一个成员。
JGroups 另外一个使用场景是可以帮你维护一份可在组中共享的数据。例如,Web 服务器中的所有 HTTP 会话。如果这些会话通过组来复制,那么客户端就可以访问组中的任意成员来获得这些数据,就算某个成员崩溃或者退出了,这些数据依然可用。
对会话的更新也会通过组进行复制,例如某个序列化属性被修改了,那么其他成员也会得知这个修改,这样就使得组中所有成员包含了相同的状态。
可当组中加入一个新成员会怎么样呢?新加入的成员必须通过某些方法来获取这些状态数据,这被称为是“状态传输”。
JGroups 的状态传输是通过两个回调函数来实现的 (getState() and setState()) ,我们需要调用 getState() 方法来获取状态数据。注意,为了在应用中使用状态传输,协议堆栈必须有一个状态传输协议(我们这个演示程序使用了默认的堆栈)。
我们修改一下 start() 方法,加入 getState() 的调用:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); channel.getState(null, 10000); eventLoop(); channel.close(); }getState() 方法第一个参数是目标成员,null 表示首个成员(协调者)。第二个参数是超时时间,这里我们设置了 10 秒钟的超时时间,意味着状态传输的时间必须在 10 秒内完成,否则将会抛出异常,0 代表没有超时时间。 本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。 2KB翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务