
Publisher的代码:
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.DeliveryMode;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MapMessage;
- import javax.jms.MessageProducer;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class Publisher {
-
-
-
-
- private ConnectionFactory connectionFactory;
-
- private Connection connection;
-
- private Session session;
-
- private MessageProducer messageProducer;
-
- public Publisher() {
-
- try {
- this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
- "123", "tcp://localhost:61616");
- this.connection = connectionFactory.createConnection();
- this.connection.start();
-
-
- this.session = this.connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- this.messageProducer = this.session.createProducer(null);
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public Session getSession() {
- return this.session;
- }
-
- public void send1(
- try {
-
- Destination destination = this.session.createTopic("topic1");
- MapMessage msg1 = this.session.createMapMessage();
- msg1.setString("name", "张三");
- msg1.setInt("age", 22);
-
- MapMessage msg2 = this.session.createMapMessage();
- msg2.setString("name", "李四");
- msg2.setInt("age", 25);
-
- MapMessage msg3 = this.session.createMapMessage();
- msg3.setString("name", "张三");
- msg3.setInt("age", 30);
-
-
- this.messageProducer.send(destination, msg1,
- DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
- this.messageProducer.send(destination, msg2,
- DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
- this.messageProducer.send(destination, msg3,
- DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
-
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void send2() {
- try {
- Destination destination = this.session.createTopic("topic1");
- TextMessage message = this.session.createTextMessage("我是一个字符串");
-
- this.messageProducer.send(destination, message,
- DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public static void main(String[] args) {
- Publisher producer = new Publisher();
- producer.send1();
-
- }
-
- }
Subscribe的代码:
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MapMessage;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.jms.TextMessage;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- public class Subscriber {
-
-
-
-
- private ConnectionFactory connectionFactory;
-
- private Connection connection;
-
- private Session session;
-
- private MessageConsumer messageConsumer;
-
- private Destination destination;
-
- public Subscriber() {
-
- try {
- this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
- "123", "tcp://localhost:61616");
- this.connection = connectionFactory.createConnection();
- this.connection.start();
-
-
- this.session = this.connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- this.destination = this.session.createTopic("topic1");
- this.messageConsumer = this.session.createConsumer(destination);
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public Session getSession() {
- return this.session;
- }
-
-
- class MyLister implements MessageListener {
-
- @Override
- public void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
-
- }
- if (message instanceof MapMessage) {
- MapMessage ret = (MapMessage) message;
- System.out.println(ret.toString());
- System.out.println(ret.getString("name"));
- System.out.println(ret.getInt("age"));
-
- message.acknowledge();
- }
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
-
- public void receiver() {
- try {
- this.messageConsumer.setMessageListener(new MyLister());
- } catch (JMSException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static void main(String[] args) {
- Subscriber conmuser = new Subscriber();
- conmuser.receiver();
-
- }
-
- }



先启动消费者(先订阅后消费),再启动发布者

JMS-activeMq发布订阅模式(非持久订阅)
原文:http://www.cnblogs.com/austinspark-jessylu/p/7825257.html