Astrotrain是基于阿里巴巴开源项目RocketMQ进行封装的分布式消息中间件系统,提供集群环境下的消息生产和消费功能。
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。所有的主题和broker节点信息都由Name Server进行维护。
Broker 是主要的功能单元,处理主题的存储和消费逻辑,Broker会定时同步所有信息至Name Server。
Push模式下的消息是由事件触发,有消息到达时监听器会被调用(MessageListener)。
Pull模式下的消息可以由事件触发,也可以应用主动去拉取消息,没有消息可拉取时返回空。
Astrotrain-Client是对RocketMQ的Producer和Consumer的封装,集中解决了RocketMQ诸多配置信息和使用特性,针对特定需求可以进行二次开发来进行扩展。
Basic Component是Astrotrain对RocketMQ的基础封装,里面包含的生产者和消费者处理消息的逻辑,同时处理了RocketMQ的很多配置信息。
JDBC Message Component是消息事务的功能模块,依赖Basic Component的功能实现。
Astrotrain底层依赖RocketMQ提供消息服务。
业务层只面对Astrotrain暴露的服务。
ATClient定义最基础的功能,包括客户端的启动、初始化、关闭的定义。
Pipe是生产者和消费者公共部分的抽象,与具体的Topic进行绑定。
ATConsumer是对消费者的封装,提供基础的消息消费服务(没有具体的Pipe实现,因为消费是针对Listener的)。
对于Astrotrain的使用主要是基于astrotrain-client来实现,目前有两个版本可供使用,1.0和1.0.1,他们之间的区别在于后者提供了一个批量消费的接口,其余的相同,所以以1.0.1的版本为例说。
注意:下面列出的所有配置文件astrotrain-client会默认从ClassPath中进行读取,不需要显式指定。
< dependency > < groupId >com.zj</ groupId > < artifactId >astrotrain-client</ artifactId > < version >1.0.1</ version > </ dependency > |
准备资源配置文件astrotrain-producer.properties,生产者配置
#生产者群组名称 astrotrain.group.name=PleaseRename #应用实例名称 astrotrain.instance.name=ProducerAT #namesrv地址,多个之间以分号 ; 分隔 astrotrain.namesrv.address= 10.10 . 110.51 : 9876 |
准备资源文件astrotrain.properties,应用配置
#应用标志符 astrotrain.appId=app1 |
Java代码,准备POJO。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.zj.astrotrain.demo; import java.io.Serializable; import java.util.List; public class Order implements Serializable { /** * */ private static final long serialVersionUID = 1L; private long id; private String orderId; private String cardNo; private List<String> payments; public long getId() { return id; } public void setId( long id) { this .id = id; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this .orderId = orderId; } public String getCardNo() { return cardNo; } public void setCardNo(String cardNo) { this .cardNo = cardNo; } public List<String> getPayments() { return payments; } public void setPayments(List<String> payments) { this .payments = payments; } public String toString(){ return "Order [id=" + this .id + ",orderId=" + this .orderId + ",cardNo=" + this .cardNo + ",payments=" + this .payments == null ? null : this .payments.size() + "]" ; } } |
生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
package com.zj.astrotrain.demo; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import com.alibaba.fastjson.JSON; import com.zj.astrotrain.client.ATMessage; import com.zj.astrotrain.client.ATProducer; import com.zj.astrotrain.client.exceptions.ATException; import com.zj.astrotrain.client.message.ObjectMessage; import com.zj.astrotrain.client.message.StringMessage; import com.zj.astrotrain.client.producer.DefaultATProducer; /** * 生成者示例 * <pre>需要在 ClassPath 路径下准备 astrotrain-producer.properties 具体配置信息参考 src/main/resources下的配置</pre> * * */ public class ProducerDemon { private DefaultATProducer atProducer; public ProducerDemon() { } public void setUp() { this .atProducer = new DefaultATProducer(); try { this .atProducer.start(); } catch (Exception e) { e.printStackTrace(); } } /** * 发送字符类型的消息 */ public void doStringMessage() { //获取一个生成者通道,指定Topic ATProducer producer = this .atProducer.createProducer( "demo" ); DateFormat format = new SimpleDateFormat( "yyyyMMddHHmmss" ); for ( int i = 0 ; i < 100 ; i++){ Order order = new Order(); order.setId(i); order.setOrderId(format.format( new Date()) + "_" + i); order.setCardNo( "6221202000111112222" ); //新建一个StringMessage StringMessage msg = new StringMessage(JSON.toJSONString(order)); //为消息设置一个业务标识符,最好是唯一的,方便在调试程序时进行跟踪,可选属性. msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId()); try { //进行消息发送,不抛异常的都是正常发送.除非服务端程序Crash,不然不会丢失消息 producer.send(msg); } catch (ATException e) { e.printStackTrace(); } } } /** * 发送对象类型的消息 */ public void doObjectMessage() { //使用新的主题创建生产者通道 ATProducer producer = this .atProducer.createProducer( "demo" ); DateFormat format = new SimpleDateFormat( "yyyyMMddHHmmss" ); for ( int i = 0 ; i< 100 ; i++) { //新建一个ObjectMessage ObjectMessage msg = new ObjectMessage(); Order order = new Order(); order.setId(i); order.setOrderId(format.format( new Date()) + "_" + i); order.setCardNo( "6221202000111112222" + i); List<String> payments = new ArrayList<String>(); payments.add( "payment" + i); order.setPayments(payments); //设置对象 msg.putObject(order); //为消息设置一个业务标识符,最好是唯一的,方便在调试程序时进行跟踪,可选属性. msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId()); try { //进行消息发送,不抛异常的都是正常发送.除非服务端程序Crash,不然不会丢失消息 producer.send(msg); } catch (ATException e) { e.printStackTrace(); } } } public void shutdown() { if ( this .atProducer != null ){ this .atProducer.shutdown(); } } public static void main(String[] args) { ProducerDemon demon = new ProducerDemon(); demon.setUp(); demon.doStringMessage(); demon.doObjectMessage(); demon.shutdown(); } } |
准备资源文件astrotrain-consumer.properties,消费者配置。
#消费者群组名称,与生产者群组没有关联 astrotrain.group.name=PleaseRename #消费者示例名称 astrotrain.instance.name=ConsumerATbatch #namesrv的地址,多个以分号 ; 分隔 astrotrain.namesrv.address= 10.10 . 110.51 : 9876 #消费模式,CLUSTERING and BROADCASTING, default is CLUSTERING astrotrain.consumer.messageModel=CLUSTERING #消费者启动时从那个位置开始消费 astrotrain.consumer.consumeFromWhere=CONSUME_FROM_FIRST_OFFSET #消费者线程最小数 astrotrain.consumer.consumeThreadMin= 10 #消费者线程最大数 astrotrain.consumer.consumeThreadMax= 20 #单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置。 #astrotrain.consumer.batchMaxSize= 30 #消费者去broker拉取消息时,一次拉取多少条。可选配置。 #astrotrain.consumer.pullBatchSize= 100 #每次拉取消息的间隔,默认为 0 ,可选配置/ #astrotrain.consumer.pullInterval= 1000 |
准备资源文件astrotrain.properties,应用配置
#应用标志符 astrotrain.appId=app2 |
消费者,单个消息消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
package com.zj.astrotrain.demo; import com.alibaba.fastjson.JSON; import com.alibaba.rocketmq.client.exception.MQClientException; import com.zj.astrotrain.client.ATMessage; import com.zj.astrotrain.client.MessageListener; import com.zj.astrotrain.client.consumer.DefaultATPushConsumer; import com.zj.astrotrain.client.message.ObjectMessage; import com.zj.astrotrain.client.message.StringMessage; /** * 单个消息消费 * * */ public class ConsumerDemon { private DefaultATPushConsumer atPushConsumer; public ConsumerDemon() { } public void setUp() { this .atPushConsumer = new DefaultATPushConsumer(); try { //订阅必须在start之前 this .atPushConsumer.subscribe( "demo" , new DemonMessageListener()); } catch (MQClientException e) { e.printStackTrace(); } } public void start() { if ( this .atPushConsumer != null ) { try { this .atPushConsumer.start(); System.in.read(); //按任意键退出 } catch (Exception e) { e.printStackTrace(); } } } public void shutdown() { if ( this .atPushConsumer != null ){ this .atPushConsumer.shutdown(); } } public static void main(String[] args) { ConsumerDemon demon = new ConsumerDemon(); demon.setUp(); demon.start(); demon.shutdown(); } //单个消息监听接口 public class DemonMessageListener implements MessageListener { @Override public void onMessage(ATMessage message) { try { if (message instanceof StringMessage){ StringMessage msg = (StringMessage) message; Order order = JSON.parseObject(msg.getMsg(), Order. class ); System.out.println(order.getCardNo()); } else if (message instanceof ObjectMessage) { ObjectMessage msg = (ObjectMessage) message; Order order = (Order) msg.getObject(); System.out.println(order.getCardNo()); } } catch (Exception e) { e.printStackTrace(); } } } } |
消费者批量消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
package com.zj.astrotrain.demo; import java.util.List; import com.alibaba.fastjson.JSON; import com.alibaba.rocketmq.client.exception.MQClientException; import com.zj.astrotrain.client.ATMessage; import com.zj.astrotrain.client.ConcurrentlyMessageListener; import com.zj.astrotrain.client.consumer.DefaultATPushConsumer; import com.zj.astrotrain.client.message.ObjectMessage; import com.zj.astrotrain.client.message.StringMessage; /** * 消费者批量消费示例 * <pre>需要在 ClassPath 路径下准备 astrotrain-consumer.properties 具体配置信息参考 src/main/resources下的配置</pre> * * */ public class ConsumerBatchDemon { private DefaultATPushConsumer atPushConsumer; public ConsumerBatchDemon() { } public void setUp() { this .atPushConsumer = new DefaultATPushConsumer(); try { //订阅必须在start之前 this .atPushConsumer.subscribe( "demo" , new DemonConcurrentlyMessageListener()); } catch (MQClientException e) { e.printStackTrace(); } } public void start() { if ( this .atPushConsumer != null ) { try { this .atPushConsumer.start(); System.in.read(); //按任意键退出 } catch (Exception e) { e.printStackTrace(); } } } public void shutdown() { if ( this .atPushConsumer != null ){ this .atPushConsumer.shutdown(); } } public static void main(String[] args) { ConsumerBatchDemon demon = new ConsumerBatchDemon(); demon.setUp(); demon.start(); demon.shutdown(); } /** * 消息监听 * * */ public class DemonConcurrentlyMessageListener implements ConcurrentlyMessageListener{ @Override public void onMessage(List<ATMessage> msgs) { for (ATMessage message : msgs) { try { if (message instanceof StringMessage){ StringMessage msg = (StringMessage) message; Order order = JSON.parseObject(msg.getMsg(), Order. class ); System.out.println(order.getCardNo()); } else if (message instanceof ObjectMessage) { ObjectMessage msg = (ObjectMessage) message; Order order = (Order) msg.getObject(); System.out.println(order.getCardNo()); } } catch (Exception e) { e.printStackTrace(); } } } } } |
原文:http://www.cnblogs.com/it-zhoujian/p/4368983.html