下载地址http://rocketmq.apache.org/docs/quick-start/ 解压 tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz 进入HOME目录 cd rocketmq-all-4.4.0-bin-release 开启端口 9876/9876 10909/10912
cd bin grep "Xmx" * vim runbroker.sh、vim runserver.sh、vim tools.sh 将 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g" 改为 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"
2. Broker配置
vim conf/broker.conf brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 namesrvAddr=你的公网IP:9876 brokerIP1=你的公网IP # 是否自动创建topic 线上改为false 测试true autoCreateTopicEnable=true
1. 启动注册中心 nohup bin/mqnamesrv -n 你的公网IP:9876 > mqnamesrv.log 2>&1 & 2. 检查端口监听是否为0.0.0.0:9876/外网IP:9876 命令 netstat -anpt | grep 9876 3. 启动数据节点 nohup sh bin/mqbroker -n 你的公网IP:9876 -c conf/broker.conf > broker.log 2>&1 & 4. 查看是否注册成功(集群信息) bin/mqadmin clusterList -n 你的公网IP:9876
cd rocketmq-all-4.4.0-bin-release/
---集群相关
查询集群信息 bin/mqadmin clusterList -n localhost:9876
打印Broker配置 bin/mqbroker -m -n localhost:9876
更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876
查看Broker统计信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10911
---订阅组相关
创建订阅组 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName
列出消费组 bin/mqadmin consumerProgress -n localhost:9876
查看消费组IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876
查看消费组数据堆积 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName
删除订阅组 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName
---Topic相关
创建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName
Topic列表 bin/mqadmin topicList -n localhost:9876
发送Topic消息测试 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024
打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName
Topic详情统计 bin/mqadmin topicstatus -n localhost:9876 -t TopicName
获取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName
删除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName
查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName
查看Topic状态 bin/mqadmin topicStatus -n localhost:9876 -t TopicName
根据ID查询消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876
根据偏移量查询消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.0</version> <exclusions> <exclusion> <groupId>io.netty</groupId> <artifactId>netty-tcnative</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> </dependencies>
public class Producer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("GroupName"); // Specify name server addresses. producer.setNamesrvAddr("IP:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicName" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
public class Consumer { public static void main(String[] args) { String topicName = "TopicName"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupName"); consumer.setNamesrvAddr("IP:9876"); try { consumer.subscribe(topicName, "*"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 如果是顺序消息,这边的监听就要使用MessageListenerOrderly监听 * 并且,返回结果也要使用ConsumeOrderlyStatus */ consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //设置自动提交,如果不设置自动提交就算返回SUCCESS,消费者关闭重启 还是会重复消费的 context.setAutoCommit(true); try { for (MessageExt msg : msgs) { String recString = null; try { recString = new String(msg.getBody(), "UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(recString); } catch (Exception e) { e.printStackTrace(); //如果出现异常,消费失败,挂起消费队列一会会,稍后继续消费 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } //消费成功 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <!--jdk 版本--> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!--全限定名--> <mainClass>com.package.Consumer</mainClass> </transformer> </transformers> <artifactSet> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build>
个人微信,有什么建议、意见或补充,欢迎及时沟通!!!(添加时注明“博客园”,谢谢)
原文:https://www.cnblogs.com/pidgey/p/11733719.html