需求背景
 
ASP站长网将阿里云同一个VPC下的RabbitMQ集群的消息从一个网段集群迁移到另一个网段集群。消息中间件的消息是即时消费,为何还有历史消息,因为是历史遗留问题。故要迁移
 
整个网络拓扑图如下
 
注意:
 
若对于跨VPC网络
 
1. 确保各主机网络互通
 
2. 配置好各主机名
 
两边安全组出方向开发:15672、25672、5672、4369端口
 
否在在加入集群会出现问题
 
 
 
资源清单
 
主机名
 
IP地址
 
角色
 
备注
 
node171
 
172.20.0.171
 
老的MQ集群_1
 
 
node172
 
172.20.0.172
 
老的MQ集群_2
 
 
node173
 
192.168.0.173
 
新的MQ集群_1
 
 
node174
 
192.168.0.174
 
新的MQ集群_2
 
 
基础软件及环境信息
 
操作系统:CentOS Linux release 7.3.1611
 
Erlang:Erlang/OTP 20 [erts-9.3.3.3]
 
RabbitMQ:rabbitmq_server-3.7.8
 
集群的部署
 
node171、node172组成集群A
 
node173、node174组成集群B
 
这里的环境部署略
 
创建测试账户
 
在【node171上进行操作】
 
rabbitmqctl add_user root root123
 
rabbitmqctl add_vhost kcvhost
 
rabbitmqctl set_permissions -p kcvhost root  ".*" ".*" ".*"
 
rabbitmqctl add_user admin admin123
 
rabbitmqctl set_permissions -p kcvhost admin  ".*" ".*" ".*"
 
rabbitmqctl set_user_tags admin administrator
 
rabbitmq-plugins enable rabbitmq_management
 
rabbitmqctl stop_app
 
rabbitmqctl start_app
 
生成测试数据
 
消息生产者代码:
 
package com.zjkj.rabbitmq.demo;
 
import Java.io.IOException;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
/**
 * 消息的生产者
 * @author zjkj
 *
 */
public class Rabbitmq_Producer {
 
private static final String EXCHANGE_NAME = "exchange_test_3";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "172.20.0.171";
private static final int PORT = 5672; //RabbitMQ服务默认端口号为5672
 
 
public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection(); //创建连接
Channel channel = connection.createChannel(); // 创建信道
 //创建一个type="direct"、持久化、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true, false, null);
// 创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false,null);
// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条持久化的消息:hello world!
for(int i=1;i<=100000;i++){
String msg = "交换器_1与队列1绑定:Message_"+i;
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}
// 关闭资源
channel.close();
connection.close();
 
}
 
}
 
 
消费者代码
 
package com.zjkj.rabbitmq.demo;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
/**
 * 消息的消费者
 * @author zjkj
 *
 */
public class Rabbitmq_Consumer {
 
private static final String QUEUE_NAME = "queue_test_3";
private static final String IP_ADDRESS = "192.168.6.171";
private static final  int PORT = 5672;
 
 
public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
Address[] addresses = new Address[]{
new Address(IP_ADDRESS,PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root123");
// 这里的连接方式与生产者的demo略有不同,注意区别
Connection connection = factory.newConnection(addresses); //创建连接
final Channel channel = connection.createChannel(); // 创建信道
channel.basicQos(64);// 设置客户端最多接收未被ack的消息的个数
 
 
/**
 * 这里采用了继承DefaultConsumer的方式来实现消费,有过RabbitMQ使用经验的开发者
 * 可能喜欢使用QueueingConsumer的方式来实现消费
 * 因为使用QueueingConsumer会有一些隐患。
 * 同时在RabbitMQ Java客户端4.0.0版本开始将QueueingConsumer标记为@Deprecated了
 */
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)
  throws IOException{
System.out.println("recv message : " + new String(body));
try{
TimeUnit.SECONDS.sleep(1);
 
}catch(InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME,true, consumer);
//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
 
}
 
}
 
 
查看集群中诸如用户数、交换器数量、队列数量等
 
[root@node171 rabbitmq]# rabbitmqctl list_users
 
Listing users ...
 
admin  [monitoring]
 
guest  [administrator]
 
root    []
 
[root@node171 rabbitmq]# rabbitmqctl list_exchanges
 
Listing exchanges for vhost / ...
 
amq.topic      topic
 
amq.headers    headers
 
exchange_test_3 direct
 
amq.direct      direct
 
exchange_test_2 direct
 
amq.rabbitmq.trace      topic
 
amq.match      headers
 
        direct
 
exchange_test_1 direct
 
amq.fanout      fanout
 
[root@node171 rabbitmq]# rabbitmqctl list_queues
 
Timeout: 60.0 seconds ...
 
Listing queues for vhost / ...
 
queue_test_3    100000
 
queue_test_2    200
 
queue_test_1    10000
 
迁移方案
 
迁移步骤
 
1. 停止所有生产者和消费者的应用程序
 
2. 将集群B中的机器依次一台一台加入集群A中,并确认所有队列镜像完成
 
3. 剔除集群A中的一台一台机器
 
4. 将应用指向集群B
 
方案1【不可行】
 
将集群B中的一台机器加入集群A中,然后再集群B中的另一他机器已加入集群,然后剔除集群A中的一台机器,然后再剔除集群A中的另一台机器
 
此方案对于RabbitMQ的普通集群也即是Cluster模式是无效的
 
1. 停止A集群中的所有连接
 
2. 将集群B中的一台节点加入到A集群中
 
将集群A中的.erlang.cookie的值拷贝到集群B中的node173上
 
[root@node171 rabbitmq]# cat .erlang.cookie
 
ORMTFBMHOXOGFKRLQSPU[root@node171 rabbitmq]#
 
[root@node173 plugins]# cp /var/lib/rabbitmq/.erlang.cookie  /var/lib/rabbitmq/erlang.cookie.bak
 
[root@node173 plugins]# chmod 700 /var/lib/rabbitmq/.erlang.cookie
 
 [root@node173 plugins]# vi /var/lib/rabbitmq/.erlang.cookie
 
 
ORMTFBMHOXOGFKRLQSPU
 
 
[root@node173 plugins]# chmod 400 /var/lib/rabbitmq/.erlang.cookie
 
[root@node173 plugins]# ls -lrth /var/lib/rabbitmq/.erlang.cookie
 
-r-------- 1 rabbitmq rabbitmq 21 Oct 24 18:51 /var/lib/rabbitmq/.erlang.cookie
 
3.将集群B中的node173加入到集群A中
 
[root@node173 rabbitmq]# service rabbitmq-server start
 
Redirecting to /bin/systemctl start  rabbitmq-server.service
 
[root@node173 rabbitmq]# rabbitmqctl stop_app
 
Stopping rabbit application on node mq_173@node173 ...
 
[root@node173 rabbitmq]# rabbitmqctl reset
 
Resetting node mq_173@node173 ...
 
[root@node173 rabbitmq]# rabbitmqctl join_cluster mq171@node171
 
Clustering node mq_173@node173 with mq171@node171
 
[root@node173 rabbitmq]# rabbitmqctl start_app
 
Starting node mq_173@node173 ...
 
 completed with 3 plugins.
 
4. 同样的方法将集群B中的node174加入到集群A中
 
[root@node174 rabbitmq]# rabbitmqctl cluster_status
 
Cluster status of node mq_174@node174 ...
 
[{nodes,[{disc,[mq_174@node174]}]},
 
 {running_nodes,[mq_174@node174]},
 
 {cluster_name,<<"mq_174@node174">>},
 
 {partitions,[]},
 
 {alarms,[{mq_174@node174,[]}]}]
 
 [root@node174 rabbitmq]# rabbitmqctl stop_app
 
Stopping rabbit application on node mq_174@node174 ...
 
[root@node174 rabbitmq]# rabbitmqctl reset
 
Resetting node mq_174@node174 ...
 
[root@node174 rabbitmq]# rabbitmqctl join_cluster mq171@node171
 
Clustering node mq_174@node174 with mq171@node171
 
[root@node174 rabbitmq]# rabbitmqctl start_app
 
Starting node mq_174@node174 ...
 
 completed with 0 plugins.
 
  5.将集群A中的node171剔除集群
 
[root@node171 rabbitmq]# rabbitmqctl stop
 
Stopping and halting node mq171@node171 ...
 
这时访问node172 的Web集群管理

dawei

【声明】:九江站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。