写在前面的话
ActiveMQ在去年做项目时使用过,当时为了进度,急急忙忙的,木有太多的时间对ActiveMQ深入的了解,只了解其基本的用法,当时和Spring集成,木有做持久化的工作,现在有时间,重新整理、总结、归档一些知识,做点记录
Linux:CentOS 32位。
ActiveMQ:5.9.0,下载地址:http://activemq.apache.org/download.html。
说明:我是在Windows上面做开发,IDE为Eclipse,Maven为构建工具,使用CentOS的虚拟机,IP地址为:192.168.254.150,root帐户,若不使用root帐户,可以自行创建其它用记,并分配权限,然后再安装ActiveMQ,此外,要安装JDK,并修改/etc/hosts文件,将主机名与IP地址对应起来,以免在启动ActiveMQ时报“未知的名称和服务”异常。关掉Linux的防火墙,若不关闭,保证在Windows通过Ping命令Ping通Linux即可。
通过以下命令解压ActiveMQ包,tar –zxvf apache-activemq-5.9.0-linux-bin.tar.gz,安装目录即为当前目录。安装之前要先保证系统中已经安装了JDK,具体可参考我的博客中关于Linux安装JDK的文章。
注:我使用root帐户,所以木有创建用户、目录及授权的过程,若使用其它用户,需要授权等操作。启动ActiveMQ有几种方式,版本不同,略有区别,我使用的是5.9,启动方式有所变化,以前在使用旧版本时,直接运行bin/activemq命令即可,但在5.9下不行,可以看其提示,如下:
[root@wds activemq-5.9.0]# bin/activemq … … Usage: Main [--extdir <dir>] [task] [task-options] [task data] Tasks: browse - Display selected messages in a specified destination. bstat - Performs a predefined query that displays useful statistics regarding the specified broker create - Creates a runnable broker instance in the specified path. decrypt - Decrypts given text dstat Performs a predefined query that displays useful tabular statistics regarding the specified destination type encrypt - Encrypts given text export - Exports a stopped brokers data files to an archive file list - Lists all available brokers in the specified JMX context purge - Delete selected destination‘s messages that matches the message selector query - Display selected broker component‘s attributes and statistics. start - Creates and starts a broker using a configuration file, or a broker URI. stop - Stops a running broker specified by the broker name. … …
通过该命令的提示,就可知其用法,我使用两种方式启动,在介绍这两种方式之前,先看下我的目录结构,如下图:
第一种方式:bin/activemq start,执行结果如下:
第二种方式:bin/activemq console,如下图:
关于console的说明,可输入bin/activemq,给出的说明中就有关于console的介绍,其它的启动方式木有一一去尝试,官方给出的一种还nohup的方式。
通过以下命令查看ps –ef | grep activemq,如下图所示:
看其前面的进程号,3110,可通过kill -9 3110的方式将其关闭。如果是第二种方式启动的,直接按Ctrl+C键关闭。
注意:配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示:
JMS规范定了两种类型的消息,一是点对点的Point-To-Point方式,另一种是发布-订阅的Publish and Subscribe模式。创建一个简单的JMS应用,一般可遵循以下步骤:
1) 获得一个JMS连接工厂。
2) 使用工厂创建JMS连接。
3) 启动连接。
4) 从连接中创建Session。
5) 获得JMS的Destination。
6) 创建JMS消息生产者或创建JMS消息并指向到Destination。
7) 创建JMS消费者或注册监听器
8) 发送或接受JMS消息
9) 关闭JMS资源
public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); /** * 定义连接地址,TCP协议,IP为:192.168.254.150,端口:61616 */ private final static String brokerURL = "tcp://192.168.254.150:61616"; /** * 定义连接工厂 */ private static transient ConnectionFactory factory; /** * 定义连接 */ private transient Connection connection; /** * 定义Session */ private transient Session session; /** * 定义Producer */ private transient MessageProducer producer; private String[] queueTitles = new String[]{"QueueOne", "QueueTwo"}; private static int index = 10; /** * 初始化上述的几个属性 * @throws JMSException */ public Producer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public static void main(String[] args) throws JMSException { Producer producer = new Producer(); int i = 0; while(i < 10){ producer.sendMsg(i); i++; } producer.close(); } public void close(){ try { this.connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public void sendMsg(int serial) throws JMSException { int idx = 0; while (true) { idx = (int) Math.round(queueTitles.length * Math.random()); if (idx < queueTitles.length) { break; } } String title = queueTitles[idx]; Destination destination = session.createQueue("Queue." + title); for(int i = 0; i < index; i++){ Message message = session.createObjectMessage(title + ", Serial" + serial + ", index" + i); logger.info("Sending: id: " + ((ObjectMessage) message).getObject() + " on queue: " + destination); producer.send(destination, message); } } }Listener
public class Listener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Listener.class); public void onMessage(Message message) { try { logger.info("Consumer:" + ((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }Consumer
package com.wds.activemq.sec2.p2p; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { private final static String brokerURL = "tcp://127.0.0.1:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private String[] queueTitles = new String[]{"QueueOne", "QueueTwo"}; public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); for(String qName : consumer.queueTitles){ Destination destination = consumer.getSession().createQueue("Queue." + qName); MessageConsumer msgConsumer = consumer.getSession().createConsumer(destination); msgConsumer.setMessageListener(new Listener()); } } public void close(){ try { this.connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public Session getSession() { return session; } }
public class Publisher { private static final Logger logger = LoggerFactory.getLogger(Publisher.class); private final static String brokerURL = "tcp://192.168.254.150:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private String[] topicTitles = new String[]{"TopicOne", "TopicTwo"}; private static int index = 10; public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); int i = 0; while(i < 10){ publisher.sendMsg(i); i++; } publisher.close(); } public void close(){ try { this.connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public void sendMsg(int serial) throws JMSException { int idx = 0; while (true) { idx = (int) Math.round(topicTitles.length * Math.random()); if (idx < topicTitles.length) { break; } } String title = topicTitles[idx]; Destination destination = session.createTopic("Topic." + title); for(int i = 0; i < index; i++){ Message message = createMessage(title, serial, i); logger.info("Sending:" + title + " Serial:" + serial + " index:" + index + " on topic: " + destination); producer.send(destination, message); } } private Message createMessage(String title, int serial, int i) throws JMSException { MapMessage mapMsg = session.createMapMessage(); mapMsg.setString("title", title); mapMsg.setInt("Serial", serial); mapMsg.setInt("index", i); return mapMsg; } }Listener
public class Listener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(Listener.class); public void onMessage(Message message) { try { MapMessage map = (MapMessage)message; String title = map.getString("title"); int serial = map.getInt("serial"); int index = map.getInt("index"); logger.info("Consumer:" + title + " Serial:" + serial + " index:" + index); } catch (Exception e) { e.printStackTrace(); } } }Consumer
public class Publisher { private static final Logger logger = LoggerFactory.getLogger(Publisher.class); private final static String brokerURL = "tcp://192.168.254.150:61616"; private static transient ConnectionFactory factory; private transient Connection connection; private transient Session session; private transient MessageProducer producer; private String[] topicTitles = new String[]{"TopicOne", "TopicTwo"}; private static int index = 10; public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public static void main(String[] args) throws JMSException { Publisher publisher = new Publisher(); int i = 0; while(i < 10){ publisher.sendMsg(i); i++; } publisher.close(); } public void close(){ try { this.connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public void sendMsg(int serial) throws JMSException { int idx = 0; while (true) { idx = (int) Math.round(topicTitles.length * Math.random()); if (idx < topicTitles.length) { break; } } String title = topicTitles[idx]; Destination destination = session.createTopic("Topic." + title); for(int i = 0; i < index; i++){ Message message = createMessage(title, serial, i); logger.info("Sending:" + title + " Serial:" + serial + " index:" + index + " on topic: " + destination); producer.send(destination, message); } } private Message createMessage(String title, int serial, int i) throws JMSException { MapMessage mapMsg = session.createMapMessage(); mapMsg.setString("title", title); mapMsg.setInt("Serial", serial); mapMsg.setInt("index", i); return mapMsg; } }代码中使用logback作为日志记录,具体可参考博客中关于logback的使用
原文:http://blog.csdn.net/wangdongsong1229/article/details/19029769