SpringBoot之MQTT

标签: 无 分类: 未分类 创建时间:2023-11-16 08:21:18 更新时间:2025-01-17 10:39:22

1.前言

这篇文章其实会很乱,因为我不是从基础开始弄的这个mqtt协议,只能是从别人的代码的基础上进行修改的来的。

对于发布者:

消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。
参考文章:
1.SpringBoot集成MQTT 这篇文章关于mqtt的内容,可以好好的参考示例做一下。这里有一个 mqtt事件监听类 的东西,@EventListener,但是其他的地方好像没有。MqttPahoClientFactory 配置连接;MessageChannel 接收和发送消息通道;MessageProducer 接收消息;@ServiceActivator 接收和发送消息处理;
【2】.Spring Integration 中文手册(完整版) 这里讲了整个的spring integration的东西,包括很多内容,大部分通过xml配置进行解决。
【3】.springboot整合mqtt的详细图文教程
【4】. Spring Integration简介

2.消息接收

参考文章:
1.一文搞懂MQTT,如何在SpringBoot中使用MQTT实现消息的订阅和发布 这里使用了EMQX作为服务器,讲解了各个部分不同的功能,没有用注解,就是建立连接、发送、配置回调
2.MQTT 消息通信工具使用 这里用了python和springboot进行了分别的示例说明。
3.java实现mqtt发送消息和接收消息
4.MQTT接收消息回调

3.消息通道

参考文章:
1.9.2.1 消息通道

4.引入步骤

这里的步骤是一个通道作为所有消息订阅的通道,一个通道作为所有发送消息的通道。

(1)创建Spring Boot Maven工程,引入如下依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>

(2)配置MQTT消费端,添加SpringConfig.java类,添加消息消费Bean。@ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息的channel。当接收到消息时,可以先拿到topic,然后根据不同的topic分别对消息进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Configuration
@IntegrationComponentScan
public class MqttConfiguration {

private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);

@Value("${cloud-sdk.mqtt.inbound-topic: }")
private String inboundTopic;

@Resource
private MqttPahoClientFactory mqttClientFactory;

@Resource(name = ChannelName.INBOUND)
private MessageChannel inboundChannel;


/**
* Clients of inbound message channels.
* @return
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
UUID.randomUUID().toString(), mqttClientFactory, inboundTopic.split(","));
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);
adapter.setConverter(converter);
adapter.setQos(1);
adapter.setOutputChannel(inboundChannel);
return adapter;
}

/**
* Clients of outbound message channels.
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.OUTBOUND)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
UUID.randomUUID().toString(), mqttClientFactory);
DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
// use byte types uniformly
converter.setPayloadAsBytes(true);

messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setConverter(converter);
return messageHandler;
}



/**
* Define a default channel to handle messages that have no effect.
* @return
*/
@Bean
@ServiceActivator(inputChannel = ChannelName.DEFAULT)
public MessageHandler defaultInboundHandler() {
return message -> {
log.info("The default channel does not handle messages." +
"\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) +
"\nPayload: " + message.getPayload() + "\n");
};
}
}

(3)配置MQTT消息发送端,@MessagingGateway是一个用于提供消息网关代理整合的注解,参数defaultRequestChannel指定发送消息绑定的channel。
这里我其实没有做具体的区分,只是借用了大疆里面的一些代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@MessagingGateway(defaultRequestChannel = ChannelName.OUTBOUND)
public interface IMqttMessageGateway {

/**
* Publish a message to a specific topic.
* @param topic target
* @param payload message
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);

/**
* Use a specific qos to push messages to a specific topic.
* @param topic target
* @param payload message
* @param qos qos
*/
void publish(@Header(MqttHeaders.TOPIC) String topic, byte[] payload, @Header(MqttHeaders.QOS) int qos);
}

(4)创建Rest Controller,通过http请求发送MQTT消息。

参考文章:
1.Spring 整合MQTT @ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息的channel。当接收到消息时,可以先拿到topic,然后根据不同的topic分别对消息进行处理。

问题

(1) 速度特别的慢

参考文章:
【1】.MQTT 为什么有时候很慢? 1.网络问题;2.QoS级别设置过高;3.大量连接;4.执行复杂的订阅操作;5.低性能设备;
【2】. 在Linux中,如何进行网络资源的优先级管理?
【3】.Linux 如何优化网络带宽?
小额赞助
本人提供免费与付费咨询服务,感谢您的支持!赞助请发邮件通知,方便公布您的善意!
**光 3.01 元
Sun 3.00 元
bibichuan 3.00 元
微信公众号
广告位
诚心邀请广大金主爸爸洽谈合作
每日一省
isNaN 和 Number.isNaN 函数的区别?

1.函数 isNaN 接收参数后,会尝试将这个参数转换为数值,任何不能被转换为数值的的值都会返回 true,因此非数字值传入也会返回 true ,会影响 NaN 的判断。

2.函数 Number.isNaN 会首先判断传入参数是否为数字,如果是数字再继续判断是否为 NaN ,不会进行数据类型的转换,这种方法对于 NaN 的判断更为准确。

每日二省
为什么0.1+0.2 ! == 0.3,如何让其相等?

一个直接的解决方法就是设置一个误差范围,通常称为“机器精度”。对JavaScript来说,这个值通常为2-52,在ES6中,提供了Number.EPSILON属性,而它的值就是2-52,只要判断0.1+0.2-0.3是否小于Number.EPSILON,如果小于,就可以判断为0.1+0.2 ===0.3。

每日三省
== 操作符的强制类型转换规则?

1.首先会判断两者类型是否**相同,**相同的话就比较两者的大小。

2.类型不相同的话,就会进行类型转换。

3.会先判断是否在对比 null 和 undefined,是的话就会返回 true。

4.判断两者类型是否为 string 和 number,是的话就会将字符串转换为 number。

5.判断其中一方是否为 boolean,是的话就会把 boolean 转为 number 再进行判断。

6.判断其中一方是否为 object 且另一方为 string、number 或者 symbol,是的话就会把 object 转为原始类型再进行判断。

每日英语
Happiness is time precipitation, smile is the lonely sad.
幸福是年华的沉淀,微笑是寂寞的悲伤。