首页 > 其他 > 详细

ActiveMQ学习

时间:2019-04-10 13:37:14      阅读:113      评论:0      收藏:0      [点我收藏+]

ActiveMQ概念:

是Apache开源基金会研发的消息中间件。主要应用在分布式系统架构中,帮助构建高可用,高性能,可伸缩的企业级面向消息服务的系统。

应用场景:

1.异步消息(消息队列),将串行改造成并行(减少请求时间)

2.应用解耦(下单与改库存绑定在一起,修改库存失败同时下单也会收到失败通知,可以将2步骤解耦,将下单成功通知放在消息队列中)

3.流量削峰(将请求放入消息队列排队,逻辑处理根据规则去读取队列中消息进行处理)

安装使用:

1.下载

2.上传到服务器后使用tar -zxvf  上传文件全名 开始解压文件

3.bin目录下启动 sh activemq start

技术分享图片

4.控制台访问:服务器ip+端口号(8161)

技术分享图片

默认账户名密码:admin   admin

登陆进入管理平台:

技术分享图片

 

JMS基本概念:

Java消息服务(Java Message Service)是java平台中关于面向消息中间件的API,用于在两个应用程序之间,

或者分布式系统中发送消息,进行异步通讯。(JMS是一种与厂商无关的api)

 

JMS:是一个与具体平台无关API,绝大多数MOM(Message Oriented Middleware)(面向消息中间件)提供商都对JMS提供支持

其它开源的JMS提供商:

JbossMQ(jboss4),jboss messaging(jboss5),joram,ubermq,mantamq,openjms

MOM:传送消息(协调作用)(异步)

面向消息的中间件,使用消息传送提供者来协调消息传输操作。 MOM需要提供API和管理工具。

客户端调用api,把消息发送到消息传送提供者指定的目的地,在消息发送之后,客户端会继续执行其他的工作。

并且在接收方收到这个消息确认之前。提供者一直保留该消息

技术分享图片

 

简单实现P2P(通过JMS简单实现接收与发送):

技术分享图片
1 <dependency>
2         <groupId>org.apache.activemq</groupId>
3         <artifactId>activemq-all</artifactId>
4         <version>5.15.0</version>
5 </dependency>
引入的maven依赖
技术分享图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 
 3 import javax.jms.*;
 4 
 5 /**
 6  * 接收端
 7  * @author 开发
 8  *
 9  */
10 public class JmsReceiver {
11 
12     public static void main(String[] args) {
13         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("" +
14                 "tcp://47.107.121.215:61616");
15         Connection connection = null;
16         try {
17             //创建连接
18             connection = connectionFactory.createConnection();
19             connection.start();
20 
21             Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
22 
23             //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
24             //destination表示目的地
25             Destination destination = session.createQueue("first-queue");
26             //创建消息接收者
27             MessageConsumer consumer = session.createConsumer(destination);
28             //接收内容
29             TextMessage textMessage = (TextMessage) consumer.receive();
30             System.out.println(textMessage.getText());
31             session.commit();
32             session.close();
33         } catch (JMSException e) {
34             e.printStackTrace();
35         } finally {
36             if (connection != null) {
37                 try {
38                     connection.close();
39                 } catch (JMSException e) {
40                     e.printStackTrace();
41                 }
42             }
43         }
44     }
45 }
接收端
技术分享图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 
 3 import javax.jms.*;
 4 
 5 /**
 6  * 发送端
 7  * @author 开发
 8  *
 9  */
10 public class JmsSender {
11 
12     public static void main(String[] args) {
13         ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("" +
14                 "tcp://47.107.121.215:61616");
15         Connection connection=null;
16         try {
17             //创建连接
18             connection=connectionFactory.createConnection();
19             connection.start();
20 
21             Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
22 
23             //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
24             //destination表示目的地
25             Destination destination=session.createQueue("first-queue");
26             //创建消息发送者
27             MessageProducer producer=session.createProducer(destination);
28             //发送的内容
29             TextMessage textMessage=session.createTextMessage("hello,boby");
30             //发送
31             producer.send(textMessage);
32             session.commit();
33             session.close();
34         } catch (JMSException e) {
35             e.printStackTrace();
36         }finally {
37             if(connection!=null){
38                 try {
39                     connection.close();
40                 } catch (JMSException e) {
41                     e.printStackTrace();
42                 }
43             }
44         }
45     }
46 }
发送端

 

消息的传递域:

点对点(p2p)

1.每个消息只能有一个消费者

2.消息的生成者和消费者之间没有时间上的相关性(无论消费者在生成者发送消息的时候是否处于运行状态,都可以提取消息)

发布与订阅(pub/sub)

1.每个消息可以有多个消费者

2.消息的生产者和消费者之间存在时间上的相关性(订阅一个主题的消费者只能消费自它订阅之后发布的消息

JMS规范允许提供客户端创建持久订阅(解决不能消费订阅之前发布消息的问题

 

使用JMS API

ConnectionFactory      连接工厂

Connection        封装客户端与JMS provider之间的一个虚拟的连接

Session          生产和消费消息的一个单线程上下文; 用于创建producer、consumer、message、queue...

Destination        消息发送或者消息接收的目的地

MessageProducer/MessageConsumer    消息生产者/消费者

 

消息组成:

1.消息头

  包含消息的识别信息和路由信息

2.消息体

  TextMessage   文本消息

  MapMessage   键值对消息

  BytesMessage 字节类型消息

  StreamMessage   输入输出流

  ObjectMessage  可序列化对象

3.属性

  单独指定自定义的属性(发送内容的属性)

 

 

JMS的可靠性机制:

  JMS消息之后被确认后,才会认为是被成功消费。消息的消费包含三个阶段: 客户端接收消息、客户端处理消息、消息被确认

事务性会话

技术分享图片

  设置为true的时候,消息会在session.commit(意味着签收,下次运行不至于重复签收)以后自动签收

非事务性会话

技术分享图片

  设置为false的时候,在该模式下,消息何时被确认取决于创建会话时的应答模式(不需要session.commit了)

应答模式:

AUTO_ACKNOWLEDGE

当客户端成功从receive()方法返回以后,或者[MessageListener.onMessage](异步非阻塞返回消息接收方法) 方法成功返回以后,会话会自动确认该消息

CLIENT_ACKNOWLEDGE

客户端通过调用消息的textMessage.acknowledge();确认消息。(被确认的意思就是从队列中清除该条消息

在这种模式中,如果一个消息消费者消费一共是10个消息,那么消费了5个消息,然后在第5个消息通过textMessage.acknowledge()

那么之前的所有消息都会被确认

DUPS_OK_ACKNOWLEDGE

延迟确认

 

 

 

本地事务

技术分享图片
 1 在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS Session 接口提供了commit和rollback方法。
 2 
 3 JMS Provider会缓存每个生产者当前生产的所有消息,直到commit或者rollback,
 4 
 5 commit操作将会导致事务中所有的消息被持久存储;
 6 
 7 rollback意味着JMS Provider将会清除此事务下所有的消息记录。
 8 
 9 在事务未提交之前,消息是不会被持久化存储的,也不会被消费者消费
10 
11 事务提交意味着生产的所有消息都被发送。消费的所有消息都被确认;
12 
13 事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复,也就是下次仍然能够接收到发送端的消息,除非消息已经过期了
View Code

JMS pub/sub)模型

1.订阅可以分为非持久订阅和持久订阅

2.当所有的消息必须接收的时候,则需要用到持久订阅。反之,则用非持久订阅

实例:

技术分享图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 
 3 import javax.jms.*;
 4 
 5 /**
 6  * 发送者
 7  * @author 开发
 8  *
 9  */
10 public class JmsTopicSender {
11 
12     public static void main(String[] args) {
13         ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("" +
14                 "tcp://47.107.121.215:61616");
15         Connection connection=null;
16         try {
17             //创建连接
18             connection=connectionFactory.createConnection();
19             connection.start();
20 
21             Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
22 
23             //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
24             //destination表示目的地
25             Destination destination=session.createTopic("first-topic");
26             //创建消息发送者
27             MessageProducer producer=session.createProducer(destination);
28             TextMessage textMessage=session.createTextMessage("HelloWorld");
29             producer.send(textMessage);
30             session.commit();
31             session.close();
32         } catch (JMSException e) {
33             e.printStackTrace();
34         }finally {
35             if(connection!=null){
36                 try {
37                     connection.close();
38                 } catch (JMSException e) {
39                     e.printStackTrace();
40                 }
41             }
42         }
43     }
44 }
发送者
技术分享图片
 1 import org.apache.activemq.ActiveMQConnectionFactory;
 2 import javax.jms.*;
 3 
 4 /**
 5  * 接收端
 6  * @author 开发
 7  *
 8  */
 9 public class JmsTopicPersistenteReceiver {
10 
11     public static void main(String[] args) {
12         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("" +
13                 "tcp://47.107.121.215:61616");
14         Connection connection = null;
15         try {
16             //创建连接
17             connection = connectionFactory.createConnection();
18             connection.setClientID("DUBBO-ORDER"); //设置持久订阅
19             connection.start();
20 
21             Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
22 
23             //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
24             //destination表示目的地
25             Topic topic = session.createTopic("first-topic");
26             //创建消息接收者
27             //MessageConsumer consumer = session.createConsumer(destination);
28             MessageConsumer consumer = session.createDurableSubscriber(topic,"DUBBO-ORDER");
29             TextMessage textMessage = (TextMessage) consumer.receive();
30             System.out.println(textMessage.getText());
31             session.commit();
32             session.close();
33         } catch (JMSException e) {
34             e.printStackTrace();
35         } finally {
36             if (connection != null) {
37                 try {
38                     connection.close();
39                 } catch (JMSException e) {
40                     e.printStackTrace();
41                 }
42             }
43         }
44     }
45 }
接收者

JMS  (P2P)模型

1.如果session关闭时,有一些消息已经收到,但还没有被签收,那么当消费者下次连接到相同的队列时,消息还会被签收

2.如果用户在receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中不会被接收

3.队列可以长久保存消息直到消息被消费者签收。消费者不需要担心因为消息丢失而时刻与jms provider保持连接状态

 

Broker:当前服务器启动的进程就是一个Broker实列。

技术分享图片
 1 public static void main(String[] args) {
 2         BrokerService brokerService=new BrokerService();
 3         try {
 4             brokerService.setUseJmx(true);
 5             brokerService.addConnector("tcp://localhost:61616");
 6             brokerService.start();
 7         } catch (Exception e) {
 8             e.printStackTrace();
 9         }
10 }
Broker简单演示

 

ActiveMQ学习

原文:https://www.cnblogs.com/LJing21/p/10671633.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!