1、在官网下载RocketMQ,下载完解压即可
2、添加环境变量:
ROCKETMQ_HOME="D:\rocketmq" NAMESRV_ADDR="localhost:9876"
3、启动Name Server
mqnamesrv.cmd
4、启动Broker,如果启动失败,删除C:\Users\"当前系统用户名"\store下的所有文件,注意要添加autoCreateTopicEnable=true 否则在创建消息组时会报错:MQClientException: No route info of this topic
mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
5、创建Maven工程,添加RocketMQ依赖,注意版本要和自己的服务器(下载的办法一致),否则可能会出现MQClientException: No route info of this topic这个错误
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>
6、provider:
package provider;
import com.sun.xml.internal.bind.api.impl.NameConverter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Pro {
public static void main(String [] args){
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("Demo");
defaultMQProducer.setNamesrvAddr("localhost:9876");
try {
defaultMQProducer.start();
for(int i = 0;i<100;i++){
try {
Message message = new Message("Topic1","Tag1",
("Hello World"+i).getBytes("UTF-8"));
SendResult sendResult = defaultMQProducer.send(message);
//defaultMQProducer.sendOneway(message);
} catch (Exception e) {
e.printStackTrace();
}
}
defaultMQProducer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
7、consumer
package consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String [] args){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerDemo");
consumer.setNamesrvAddr("localhost:9876");
try {
consumer.subscribe("Topic1","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt messageExt :list){
String str = new String(messageExt.getBody());
System.out.println(str);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// consumer.registerMessageListener(new MessageListenerOrderly() {
// public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
// for(MessageExt messageExt :list){
// String str = new String(messageExt.getBody());
// System.out.println(str);
// }
// return ConsumeOrderlyStatus.SUCCESS;
// }
// });
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
7、输出结果,可以看到输出的结果并不是很一致。

先看看RocketMQ架构图

1,启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
2,Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
3,收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
4,Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
5,Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
原文:https://www.cnblogs.com/minblog/p/13328874.html