rocketmq双主双从配置(真实环境)

编程

第一 主机配置
双主双从
两台虚拟机A:192.168.1.1 B:192.168.1.2
A上运行nameserverA brokerA brokerBSlave 
B上运行nameserverB brokerB brokerASlave

第二 启动
启动顺序(**非常重要***)先启动nameserver,启动A的主B的主,再启动A的从B的从。
cd /home/rocketmq/rocketmq-all-4.6.0-bin-release/bin/
nohup sh mqnamesrv & 两台都先启动nameserver 
再启动A的主
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-a.properties &
启动B的主
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-b.properties &
启动A的从
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
启动B的从
nohup sh mqbroker -c /home/rocketmq/rocketmq-all-4.6.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &

第三 关闭
先关闭A和B主机的broker  再关闭A主机和B主机的namesrv
sh mqshutdown broker
sh mqshutdown namesrv

相关配置文件

A主机brokerA

namesrvAddr=192.168.1.1:9876;192.168.1.2:9876

rokerClusterName=rocketmq-cluster

brokerIP1=192.168.1.1

brokerName=broker-a

brokerId=0

deleteWhen=04

fileReservedTime=48

brokerRole=SYNC_MASTER

flushDiskType=ASYNC_FLUSH

listenPort=10911

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=false

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=false

storePathRootDir=/home/rocketmq/store

A主机brokerBSlave

#amesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876

namesrvAddr=192.168.1.1:9876;192.168.1.2:9876

brokerClusterName=rocketmq-cluster

brokerIP1=192.168.1.1

#brokerIP2=192.168.1.1

brokerName=broker-b

brokerId=1

deleteWhen=04

fileReservedTime=48

brokerRole=SLAVE

flushDiskType=ASYNC_FLUSH

listenPort=10001

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=false

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=false

storePathRootDir=/home/rocketmq/storeb

B主机brokerB

#namesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876

namesrvAddr=192.168.1.1:9876;192.168.1.2:9876

brokerIP1=192.168.1.2

#brokerIP2=192.168.1.2

brokerClusterName=rocketmq-cluster

brokerName=broker-b

brokerId=0

deleteWhen=04

fileReservedTime=48

brokerRole=SYNC_MASTER

flushDiskType=ASYNC_FLUSH

listenPort=10911

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=false

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=false

storePathRootDir=/home/rocketmq/store

B主机brokerASlave

# namesrvAddr=rocket_namesrv1:9876;rocket_namesrv2:9876

namesrvAddr=192.168.1.1:9876;192.168.1.2:9876

brokerClusterName=rocketmq-cluster

brokerIP1=192.168.1.2

# brokerIP2=192.168.1.2

brokerName=broker-a

brokerId=1

deleteWhen=04

fileReservedTime=48

brokerRole=SLAVE

flushDiskType=ASYNC_FLUSH

listenPort=10001

#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=false

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=false

storePathRootDir=/home/rocketmq/storeb

效果查看

调用程序java

RocketConsumer.java 消费端

/**

*@Title Test.java

*@description TODO

*@time 2019年12月7日 下午4:42:30

*@author liuyijiao

*@version 1.0

**/

package com.thedatasys.mqTest;

import java.util.Date;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

import org.apache.rocketmq.common.message.MessageExt;

public class RocketConsumer {

public static void main(String[] args) {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer-a");

//consumer.setNamesrvAddr("127.0.0.1:9876");

// consumer.setNamesrvAddr("11.10.143.126:9876");

consumer.setNamesrvAddr("11.10.143.124:9876;11.10.143.126:9876");

// consumer.setNamesrvAddr("192.168.3.33:9876");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

try {

//第二个参数表示消费匹配的tag * 表示topic所有的tag

consumer.subscribe("mytopic1-lyj1","mytag1-lyj1");

//2. 注册消费者监听

consumer.registerMessageListener(new MessageListenerConcurrently() {

/**

* msgs 表示消息体

* @param msgs

* @param context

* @return

*/

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

for(MessageExt messageExt:msgs){

try {

System.out.println(new Date()+new String(messageExt.getBody(),"UTF-8"));

}catch (Exception e){

e.printStackTrace();

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

//3.consumer 启动

consumer.start();

System.out.println("消费端起来了哈.........");

} catch (MQClientException e) {

e.printStackTrace();

}

}

}

生产端java

RocketProducer.java

package com.thedatasys.mqTest;

import org.apache.rocketmq.client.producer.DefaultMQProducer;

import org.apache.rocketmq.client.producer.SendResult;

import org.apache.rocketmq.common.message.Message;

import org.apache.rocketmq.remoting.common.RemotingHelper;

/**

* Hello world!

*

*/

public class RocketProducer {

public static void main(String[] args) {

try {

// 使用ProducerGroup初始化Producer

DefaultMQProducer producer = new DefaultMQProducer("s11");

// 指定namesrv

//producer.setNamesrvAddr("127.0.0.1:9876");

//producer.setNamesrvAddr("10.185.162.116:9876");

// producer.setNamesrvAddr("11.10.143.124:9876;11.10.143.126:9876");

//producer.setNamesrvAddr("11.10.143.43:9876");

// producer.setNamesrvAddr("192.168.3.33:9876");

producer.setNamesrvAddr("11.10.143.126:9876");

//producer.setVipChannelEnabled(true);

producer.start();

System.out.println("teeeeeeeeeee");

for (int i = 0; i < 10; i++) {

// 创建消息

Message msg = new Message("mytopic1-lyj1", "mytag1-lyj1",

("lyj Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );

// 发送消息

SendResult sendResult = producer.send(msg,10000);

System.out.printf("%s%n", sendResult);

}

// 关闭Producer

producer.shutdown();

} catch (Exception e) {

e.printStackTrace();

}

}

}

效果

以上是 rocketmq双主双从配置(真实环境) 的全部内容, 来源链接: utcz.com/z/515704.html

回到顶部