1、消息可靠性
消息默认是持久化的
2、队列的持久化
3、TOPIC的持久化
一定要运行消费者,等于向MQ注册,然后再运行生产者,此时无论消费者在不在线,都会接收到,不在线的话,下次连接也会接收到消息
消费者:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * Created by Administrator on 2020/2/11. */ public class JmsConsumerTopicPersist { public static final String ACTIVE_MQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic-persist"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消息订阅者z3"); //1、创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL); //2、通过连接工厂,获取连接的connection并启动访问 QueueConnection connection = connectionFactory.createQueueConnection(); connection.setClientID("z3"); //3创建会话session (事物、签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地具体是主題 Topic topic = session.createTopic(TOPIC_NAME); TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "备注信息--"); connection.start(); Message message = topicSubscriber.receive(); while( null != message){ TextMessage textMessage = (TextMessage) message; System.out.println("收到持久化的Topic--" + textMessage.getText()); message = topicSubscriber.receive(4000l); } session.close(); connection.close(); } }
生产者:
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by Administrator on 2020/2/11. */ public class JmsProduceTopicPersist { public static final String ACTIVE_MQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "topic-persist"; public static void main(String[] args) throws JMSException { //1、创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL); QueueConnection connection = connectionFactory.createQueueConnection(); //3创建会话session (事物、签收) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、创建目的地具体是主題 Topic topic = session.createTopic(TOPIC_NAME); // 5\创建消息的生产者 MessageProducer messageProducer = session.createProducer(topic); // 设置主体持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i =0; i<3;i++){ // 创建消息 TextMessage textMessage = session.createTextMessage("Topic-message=" + i); // 通过messageProducer发送消息 messageProducer.send(textMessage); } // 6、关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("Topic消息发送到MQ完成"); } }
原文:https://www.cnblogs.com/ts-sd/p/12293981.html