JMS&ActiveMQ教程
基于JMS的消息传送
ActiveMQ与Spring集成
ActiveMQ与SpringBoot集成
ActiveMQ安全机制
ActiveMQ主从集群

ActiveMQ主从集群

什么是集群

集群就是将相同的程序、功能,部署在两台或多台服务器上,这些服务器对外提供的功能是完全一样的。

集群是通过不断横向扩展增加服务器的方式,以提高服务的能力。

为什么需要集群

● 集群可以解决单点故障问题

● 集群可以提高系统的可用性

● 集群可以提高系统的服务能力

ActiveMQ主从集群方式

1、shared filesystem Master-Slave方式主从集群

通过共享存储目录(kahaDB)来实现master和slave的主从信息同步;

所有ActiveMQ的broker都在不断地获取共享目录的控制权,哪个broker抢到了控制权,它就成为master,它将锁定该目录,其他broker就只能成为slave。

当master主出现故障后,剩下的slave从将再进行争夺共享目录的控制权,谁抢到共享目录的控制权,谁就成为主,其他没有抢到控制权的称为从。

由于他们是基于共享目录,所以当主出现故障后,其上没有被消费的消息在接下来产生的新的master主中可以继续进行消费。

这种方式客户端访问的都是主,从只是起到了一个备份访问的作用

(1) 架构图

(2) 实现步骤

A、 安装多个ActiveMQ

因为ActiveMQ的安装和Tomcat一样,解压就可以使用,所以我们直接在/usr/local目录下复制多份,就相当于安装了多个ActiveMQ,我们这里复制3个ActiveMQ出来。

复制前,先将运行的ActiveMQ停止。

B、 打开三个Xshell,分别连接不同的ActiveMQ方便操作

C、 配置每个activeMQ的conf /activemq.xml文件中的共享目录

如果集群搭建在一台机器上需要改端口,如果搭建在多台上就不需要了

如果搭建在多台服务器上,那么存放共享目录的机器需要通过磁盘挂载的方式挂载到主从机器上。

● 修改三个ActiveMQ的共享目录

persistenceAdapter>
    <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>

● 修改完持久化目录后,需要在/opt目录下创建该目录

D、 配置每个activeMQ的conf /activemq.xml文件中的端口

为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1,可以将文件下载下来替换。

一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大连接数;

● wireFormat.maxFrameSize 表示一个完整消息的最大数据量,单位byte;

● 0.0.0.0表示任意ip

E、 修改conf/jetty.xml文件的jetty服务器端口(管理控制台)

第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

F、 启动三台ActiveMQ,可以测试验证了

注意:启动后会有一段时间延时,稍等一会;

浏览器访问http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判断主从服务器;

web控制台能访问的是 master,不能访问的是 slave。

G、 修改11-activemq-java中的程序收发消息代码

连接时使用故障转义协议failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

 为了看到效果,发消息和接收消息我们都是用循环方式

//发消息 没有返回值,是非阻塞的
while(true){
    messageProducer.send(message); 
}

查看发送以及接收idea控制台输出,停止ActiveMQ主,看效果

注意:如果是事务消息,被中断那么程序发送程序出错,不能实现,所以我们将消息改为非事务消息进行测试,如果是非事务消息就注释掉session.commit。

2、shared database Master-Slave方式主从集群

该方式与shared filesystem方式类似,只是共享的存储介质由文件系统改成了数据库。

(1) 架构图

(2) 实现步骤

A、 安装多个ActiveMQ(已做)

因为ActiveMQ的安装和Tomcat一样,解压就可以使用,所以我们直接在/usr/local目录下复制多份,就相当于安装了多个ActiveMQ,我们这里复制3个ActiveMQ出来复制前,先将运行的ActiveMQ停止。

B、 打开三个Xshell,分别连接不同的ActiveMQ方便操作(已做)

C、 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器是jdbc数据库方式

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

D、 配置每个数据库连接池

注意:连接池的配置需要配置在的外面

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
</bean>

E、 在每个ActiveMQ的lib目录下加入mysql的驱动包和数据库连接池Druid包,该包在我提供的资料05-ActiveMQ\resources\lib下

可以通过Xftp或者rz命令上传。

F、 启动MySQL数据库,并创建activemq数据库

G、 配置每个activeMQ的conf /activemq.xml文件中的端口(已做)

如果集群搭建在一台机器上需要改端口,如果搭建在多台上就不需要了;

为了避免端口号的冲突,前三个地址端口+1,后两个端口地址-1。

第一个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三个ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大连接数;

● wireFormat.maxFrameSize 表示一个完整消息的最大数据量,单位byte;

● 0.0.0.0表示任意ip

H、 修改conf/jetty.xml文件的jetty服务器端口(管理控制台) (已做)

第一个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三个ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

I、启动三台ActiveMQ,可以测试验证了

浏览器访问http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判断主从服务器;

web控制台能访问的是 master,不能访问的是 slave。

J、 修改11-activemq-java中的程序收发消息类(已做)

连接时使用故障转义协议failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址 

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

为了看到效果,发消息和接收消息我们都是用循环方式

//发消息 没有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
    session.commit();
}

查看发送以及接收idea控制台输出,停止ActiveMQ主,看效果

注意:如果是事务消息,被中断那么程序发送程序出错,不能实现,所以我们将消息改为非事务消息进行测试,如果是非事务消息就注释掉session.commit。

3、Replicated LevelDB Store方式主从集群(常用)

基于可复制的LevelDB存储方式的集群;

这种集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper从一组broker中协调选择一个broker作为master主,其他broker作为slave从的模式。所有slave从节点通过复制master主节点的消息来实现消息同步,当主出现故障后,没有被消费的消息在从服务器上也同步了一份,所以不会有消息的丢失。

LevelDB 是 Google开发的一套用于持久化数据的高性能kv数据库,ActiveMQ利用该数据库进行数据的存储。

只有master 接受客户端连接,slave不接受客户端连接,Master的所有存储操作都将被复制到slaves。

在这个模式中,需要有半数以上的broker是正常的,集群才是可用的,超过半数broker故障,ZooKeeper的选举算法将不能选择master,从而导致集群不可用。

(1)架构图

(2) 实现步骤

A、 安装多个ActiveMQ(已做)

因为ActiveMQ的安装和Tomcat一样,解压就可以使用,所以我们直接在/usr/local目录下复制多份,就相当于安装了多个ActiveMQ,我们这里复制3个ActiveMQ出来。

复制前,先将运行的ActiveMQ停止

B、 打开三个Xshell,分别连接不同的ActiveMQ方便操作(已做)

C、 配置每个activeMQ的conf /activemq.xml文件中的持久化适配器replicatedLevelDB方式

<persistenceAdapter>
<replicatedLevelDB
    replicas="3"
    bind="tcp://0.0.0.0:0"
    zkAddress="localhost:2181"/>
</persistenceAdapter>

参数说明

● replicas :集群中存在的节点的数目

● bind :当该节点成为master后,将使用该bind配置的ip和端口进行数据复制

● zkAddress :ZooKeeper的地址

D、 启动ZooKeeper服务器

E、 启动三台ActiveMQ,可以测试验证了

浏览器访问http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判断主从服务器;

web控制台能访问的是 master,不能访问的是 slave。

F、 修改11-activemq-java中的程序收发消息类(已做)

连接时使用故障转义协议failover

ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

为了看到效果,发消息和接收消息我们都是用循环方式 

//发消息 没有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
}

查看发送以及接收idea控制台输出,停止ActiveMQ主,看效果。

G、 把其中的一台master关闭,留下两台运行,观察效果

H、 继续关闭下一台master,留下一台运行,观察效果

I、 启动其中一台,让两个运行,再观察效果

(3) 总结

这种方式,不适合集群太大,也就是activemq不能太多,因为多个activemq之间需要复制消息,这个比较耗资源,占用网络,建议3、5台。