rocketmq双主双从配置(真实环境)
第一 主机配置
双主双从
两台虚拟机A:192.168.1.1 B:192.168.1.2
A上运行nameserverA brokerA brokerBSlave
B上运行nameserverB brokerB brokerASlave
第二 启动
启动顺序(**非常重要***)先启动nameserver,启动A的主B的主,再启动A的从B的从。
cd /home/rocketmq/rocketmq-all-4.6.0-bin-release/bin/
nohup sh mqnamesrv & 两台都先启动nameserver
再启动A的主
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-a.properties &
启动B的主
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-b.properties &
启动A的从
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
启动B的从
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
第三 关闭
先关闭A和B主机的broker 再关闭A主机和B主机的namesrv
sh mqshutdown broker
sh mqshutdown namesrv
相关配置文件
A主机brokerA
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876rokerClusterName=rocketmq-cluster
brokerIP1=192.168.1.1
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
storePathRootDir=/home/rocketmq/store
A主机brokerBSlave
#amesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
brokerClusterName=rocketmq-cluster
brokerIP1=192.168.1.1
#brokerIP2=192.168.1.1
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10001
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
storePathRootDir=/home/rocketmq/storeb
B主机brokerB
#namesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
brokerIP1=192.168.1.2
#brokerIP2=192.168.1.2
brokerClusterName=rocketmq-cluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
storePathRootDir=/home/rocketmq/store
B主机brokerASlave
# namesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
brokerClusterName=rocketmq-cluster
brokerIP1=192.168.1.2
# brokerIP2=192.168.1.2
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10001
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=false
storePathRootDir=/home/rocketmq/storeb
效果查看
调用程序java
RocketConsumer.java 消费端
/***@Title Test.java
*@description TODO
*@time 2019年12月7日 下午4:42:30
*@author liuyijiao
*@version 1.0
**/
package com.thedatasys.mqTest;
import java.util.Date;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-a");
//consumer.setNamesrvAddr("127.0.0.1:9876");
// consumer.setNamesrvAddr("11.10.143.126:9876");
consumer.setNamesrvAddr("11.10.143.124:9876;11.10.143.126:9876");
// consumer.setNamesrvAddr("192.168.3.33:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
//第二个参数表示消费匹配的tag * 表示topic所有的tag
consumer.subscribe("mytopic1-lyj1","mytag1-lyj1");
//2. 注册消费者监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* msgs 表示消息体
* @param msgs
* @param context
* @return
*/
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for(MessageExt messageExt:msgs){
try {
System.out.println(new Date()+new String(messageExt.getBody(),"UTF-8"));
}catch (Exception e){
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//3.consumer 启动
consumer.start();
System.out.println("消费端起来了哈.........");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
生产端java
RocketProducer.java
package com.thedatasys.mqTest;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* Hello world!
*
*/
public class RocketProducer {
public static void main(String[] args) {
try {
// 使用ProducerGroup初始化Producer
DefaultMQProducer producer = new DefaultMQProducer("s11");
// 指定namesrv
//producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setNamesrvAddr("10.185.162.116:9876");
// producer.setNamesrvAddr("11.10.143.124:9876;11.10.143.126:9876");
//producer.setNamesrvAddr("11.10.143.43:9876");
// producer.setNamesrvAddr("192.168.3.33:9876");
producer.setNamesrvAddr("11.10.143.126:9876");
//producer.setVipChannelEnabled(true);
producer.start();
System.out.println("teeeeeeeeeee");
for (int i = 0; i < 10; i++) {
// 创建消息
Message msg = new Message("mytopic1-lyj1", "mytag1-lyj1",
("lyj Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 发送消息
SendResult sendResult = producer.send(msg,10000);
System.out.printf("%s%n", sendResult);
}
// 关闭Producer
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
效果
以上是 rocketmq双主双从配置(真实环境) 的全部内容, 来源链接: utcz.com/z/515704.html