使用IBM MQTTv3实现相关的发布订阅功能
MQTTv3的发布消息的实现:
- package com.etrip.mqttv3;
-
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- public class MQTTPub {
- public static void doTest(){
- try {
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
- MqttTopic topic = client.getTopic("tokudu/china");
- MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
- message.setQos(1);
- client.connect();
- while(true){
- MqttDeliveryToken token = topic.publish(message);
- while (!token.isComplete()){
- token.waitForCompletion(1000);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MQTTV3的订阅消息类
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttConnectOptions;
- public class MQTTSubsribe {
- public static String doTest() {
- try {
-
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
-
- CallBack callback = new CallBack();
- client.setCallback(callback);
-
- MqttConnectOptions conOptions = new MqttConnectOptions();
-
- conOptions.setCleanSession(false);
-
- client.connect(conOptions);
-
- client.subscribe("tokudu/china", 1);
-
- } catch (Exception e) {
- e.printStackTrace();
- return "failed";
- }
- return "success";
- }
- }
回调处理类处理订阅的消息类
- package com.etrip.mqttv3;
-
- import com.ibm.micro.client.mqttv3.MqttCallback;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- public class CallBack implements MqttCallback {
-
- public CallBack() {
- }
-
- public void messageArrived(MqttTopic topic, MqttMessage message) {
- try {
- System.out.println(" MQTTSubsribe message.toString()"+message.toString());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public void connectionLost(Throwable cause) {
-
- }
- public void deliveryComplete(MqttDeliveryToken token) {
-
- }
- }
测试类:
- package com.etrip.mqttv3;
- public class MQTTMain {
- public static void main(String[] args) {
-
- MQTTSubsribe.doTest();
-
- MQTTPub.doTest();
-
- }
- }
MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例
原文:http://www.cnblogs.com/yudar/p/4615697.html