SpringBoot之Integration

标签: 无 分类: 未分类 创建时间:2024-03-25 08:18:43 更新时间:2025-01-17 10:39:22

1.前言

在使用大疆API进行开发的时候,我接触到了这个mqtt,我始终都搞不明白这个mqtt协议的东西,看来真是老了啊,资料也很少,看来看去的,都说不到点子上,我的理解能力还真是太弱了。

Spring Integration 涵盖了许多集成场景。试图将所有这些内容都包含在一个章节中,就像试图将大象装进一个信封一样。我将展示一张 Spring Integration 大象的照片,而不是对 Spring Integration 进行全面的讨论,以便让你了解它是如何工作的。然后,将创建一个向 Taco Cloud 应用程序添加功能的集成流。Spring Integration 大致交互逻辑:

  • 对于发布者:

(1)消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。
(2)DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

  • 对于订阅者:

(1)通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel。
(2)同样由 MessageChannel 的实例 DirectChannel 处理消费细节。Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

参考文章:
【1】.Spring Integration 快速入门教程
【2】.springboot集成MQTT Spring Integration 大致交互逻辑
【3】.springboot整合mqtt实现消息发送和消费,以及客户端断线重连之后的消息恢复 从 EMQX 安装到最后的接入,都有一个例子,通过设置回调函数的形式,进行了消息的处理。
【4】.通过源码简析Spring-Integration执行过程 这是源码级别处理,还是要好好的看看。
【5】.Springboot+MQTT集成,解决Callback中不能发布消息问题

2.基础知识

终结点在 DSL 中表示为谓词,以提高可读性。 以下列表包括常见的 DSL 方法名称和关联的 EIP 终结点:

  • 消息通道 → Channel
    Spring Integration 提供了多个管道的实现,包括:PublishSubscribeChannel、QueueChannel、PriorityChannel、RendezvousChannel、DirectChannel、ExecutorChannel、FluxMessageChannel

在 Java 配置和 Java DSL 样式中,输入通道都是自动创建的,默认是 DirectChannel。但是,如果希望使用不同的通道实现,则需要显式地将通道声明为 bean 并在集成流中引用它。

  • 转换 → Transformer
    转换器对消息执行一些操作,通常会产生不同的消息,并且可能会产生不同的负载类型。

  • 过滤器 → Filter
    过滤器可以放置在集成管道的中间,以允许或不允许消息进入流中的下一个步骤。

  • 服务激活器 → ServiceActivator
    服务激活器从输入信道接收消息并发送这些消息给的 MessageHandler。Spring 集成提供了多种的 MessageHandler 实现开箱即用(PayloadTypeRouter 就是 MessageHandler 的实现),但你会经常需要提供一些定制实现充当服务激活。

  • 分割器 → Splitter
    有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter 将为分割并处理这些消息。

  • 聚合 → Aggregator
    与分离器相反,它将来自不同渠道的多条信息组合成一条信息。

  • 路由 → Router
    基于某些路由标准的路由器允许在集成流中进行分支,将消息定向到不同的通道。

  • 网关 → MessagingGateway
    网关是通过一个应用程序可以将数据提交到一个集成信息流和接收这是该流的结果的响应的装置。通过 Spring Integration 实现的,网关是实现为应用程序可以调用将消息发送到集成信息流的接口。令人惊叹的是,没有必要实现这个接口。Spring Integration 自动提供运行时实现,这个实现会使用特定的通道进行数据的发送与接收。

  • 通道适配器 → Adapter
    通道适配器代表集成信息流的入口点和出口点。数据通过入站信道适配器的方式进入到集成流中,通过出站信道适配器的方式离开集成流。

总结:
(1)Spring Integration 允许定义数据在进入或离开应用程序时可以通过的流。

(2)Integration 流可以以 XML、Java 或 Java DSL 配置的风格进行定义。

(3)消息网关和通道适配器充当集成信息流的入口和出口。

(4)消息可以被转化,分割,聚集,路由和由服务活化器在流动的过程中进行处理。

(5)消息通道连接集成流的组件。

参考文章:
【1】.Spring Integration Java 配置和 DSL 这篇文章看起来就像是一篇翻译的文章,很多的地方都不是特别的通顺。

3.引入依赖

探索 Spring Integration 这是一本书《Spring实战》,里面有一章讲的是 Spring Integration 的 Java DSL 定义,根据这个系列教程里面,基本上还能理解整个的过程。

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>

3.创建网关接口

1
2
3
4
5
6
@MessagingGateway(defaultRequestChannel="textInChannel")
public interface FileWriterGateway {
void writeToFile(
@Header(FileHeaders.FILENAME) String filename,
String data);
}

首先会注意到它是由 @MessagingGateway 注解的。这个注解告诉 Spring Integration 在运行时生成这个接口的实现 —— 类似于 Spring Data 如何自动生成存储库接口的实现。当需要编写文件时,代码的其他部分将使用这个接口。

@MessagingGateway 的 defaultRequestChannel 属性表示,对接口方法的调用产生的任何消息都应该发送到给定的消息通道。在本例中,声明从 writeToFile() 调用产生的任何消息都应该发送到名为 textInChannel 的通道。

对于 writeToFile() 方法,它接受一个文件名作为字符串,另一个字符串包含应该写入文件的文本。关于此方法签名,值得注意的是 filename 参数使用 @Header 进行了注解。在本例中,@Header 注解指示传递给 filename 的值应该放在消息头中(指定为 FileHeaders),解析为 file_name 的文件名,而不是在消息有效负载中。另一方面,数据参数值则包含在消息有效负载中。

5.配置集成流

有三种方式配置集成流,一种是 xml,一种是java,最后一种是 DSL 配, 这里我不摘录使用 xml 的方式了,之摘录使用 java 和 DSl 方式,java 方式只是为了更好的理解整个的集成流。
(1) 使用java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class FileWriterIntegrationConfig {

@Bean
@Transformer(inputChannel="textInChannel", outputChannel="fileWriterChannel")
public GenericTransformer<String, String> upperCaseTransformer() {
return text -> text.toUpperCase();
}

@Bean
@ServiceActivator(inputChannel="fileWriterChannel")
public FileWritingMessageHandler fileWriter() {
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/tmp/sia5/files"));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
return handler;
}
}

使用 Java 配置,可以声明两个 bean:一个转换器和一个文件写入消息处理程序。这里转换器是 GenericTransformer。因为 GenericTransformer 是一个函数接口,所以能够以 lambda 的形式提供在消息文本上调用 toUpperCase() 的实现。转换器的 bean使用 @Transformer 进行注解,并将其指定为集成流中的转换器,该转换器接收名为 textInChannel 的通道上的消息,并将消息写入名为 fileWriterChannel 的通道。

至于文件写入 bean,它使用 @ServiceActivator 进行了注解,以指示它将接受来自 fileWriterChannel 的消息,并将这些消息传递给由 FileWritingMessageHandler 实例定义的服务。FileWritingMessageHandler 是一个消息处理程序,它使用消息的 file_name 头中指定的文件名将消息有效负载写入指定目录中的文件。与 XML 示例一样,将 FileWritingMessageHandler 配置为用换行符附加到文件中。

FileWritingMessageHandler bean 配置的一个独特之处是调用 setExpectReply(false) 来指示服务激活器不应该期望应答通道(通过该通道可以将值返回到流中的上游组件)。如果不调用 setExpectReply(),则文件写入 bean 默认为 true,尽管管道仍按预期工作,但将看到记录了一些错误,说明没有配置应答通道。

还会看到不需要显式地声明通道。如果不存在具有这些名称的 bean,就会自动创建 textInChannel 和 fileWriterChannel 通道。但是,如果希望对通道的配置方式有更多的控制,可以像这样显式地将它们构造为 bean

1
2
3
4
5
6
7
8
9
@Bean
public MessageChannel textInChannel() {
return new DirectChannel();
}
...
@Bean
public MessageChannel fileWriterChannel() {
return new DirectChannel();
}

(2) 使用 DSL 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
public class FileWriterIntegrationConfig {

@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}
}

用一个 bean 方法捕获整个流。IntegrationFlows 类初始化了这个构建者 API,可以从该 API 声明流。

在程序清单中,首先从名为 textInChannel 的通道接收消息,然后该通道转到一个转换器,使消息有效负载大写。在转换器之后,消息由出站通道适配器处理,该适配器是根据 Spring Integration 的文件模块中提供的文件类型创建的。最后,调用 get() 构建要返回的 IntegrationFlow。简而言之,这个 bean 方法定义了与 XML 和 Java 配置示例相同的集成流。

注意,与 Java 配置示例一样,不需要显式地声明通道 bean。虽然引用了 textInChannel,但它是由 Spring Integration 自动创建的,因为没有使用该名称的现有通道 bean。但是如果需要,可以显式地声明通道 bean。

至于连接转换器和外部通道适配器的通道,甚至不需要通过名称引用它。如果需要显式配置通道,可以在流定义中通过调用 channel() 的名称引用:

1
2
3
4
5
6
7
8
9
10
11
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlows
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(Files.outboundAdapter(new File("/tmp/sia5/files"))
.fileExistsMode(FileExistsMode.APPEND)
.appendNewLine(true))
.get();
}

问题

(1)Error creating bean with name ‘drcUpRouterFlow’ defined in class path resource [com/dji/sdk/mqtt/drc/DrcUpRouter.class]: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method ‘drcUpRouterFlow’ threw exception with message: org/springframework/integration/transformer/GenericTransformer

我将其他地方的代码复制到jeecg上的时候,结果总是出现问题,无法启动程序,但是光看错误,却一点头绪都没有,不知道问题到底出在了哪里,网上的资料又特别的少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class DrcUpRouter {

@Bean
public IntegrationFlow drcUpRouterFlow() {
return IntegrationFlows
.from(ChannelName.INBOUND_DRC_UP)
.transform(Message.class, source -> {
try {
TopicDrcRequest data = Common.getObjectMapper().readValue((byte[]) source.getPayload(), TopicDrcRequest.class);
return data.setData(Common.getObjectMapper().convertValue(data.getData(), DrcUpMethodEnum.find(data.getMethod()).getClassType()));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
.<TopicDrcRequest, DrcUpMethodEnum>route(
response -> DrcUpMethodEnum.find(response.getMethod()),
mapping -> Arrays.stream(DrcUpMethodEnum.values()).forEach(
methodEnum -> mapping.channelMapping(methodEnum, methodEnum.getChannelName())))
.get();
}
}

后来还有问题:Invocation of destroy method failed on bean with name ‘mqttInbound’: java.lang.NullPointerException: Cannot invoke “org.eclipse.paho.client.mqttv3.IMqttAsyncClient.close()” because “this.client” is null

【尝试方法】
(1)我尝试了进行版本升级,从 spring-integration-mqtt 的 5.5.5 升级到了 6.1.3 版本,一直没有效果。

1
2
3
4
5
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.1</version>
</dependency>

(2)我把相关的代码进行了注释,也还是无法启动,其他的 IntegrationFlows.from 也是同样的问题。也就是说,问题其实并不是出在这个地方。

1
2
3
4
5
6
7
8
@Bean
public IntegrationFlow drcUpRouterFlow() {
log.error(ChannelName.INBOUND_DRC_UP);
return IntegrationFlows
.from(ChannelName.INBOUND_DRC_UP)
.transform(Object::toString)
.get();
}

(3)我尝试把 jeecgboot 的代码降低到 springboot 2.7,废了 九牛二虎之力终于算是搞定了这个东西。

【解决方案】
最后几经周折之后,我发现了可能是 springboot 的版本问题。 我的 mqtt 所在的模块中,使用了springboot 2.7.12

1
2
3
4
5
6
7
8
9
10
11
12
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<type>pom</type>
<scope>import</scope>
</dependency>

</dependencies>
</dependencyManagement>

我的 jeecgboot 的版本用了 3.1.5, 这就可能导致了冲突。

1
2
3
4
5
6
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/>
</parent>
参考文章:
【1】.Spring Integration Java DSL
【2】.Revise configuration code for better Spring Native support
【3】.spring cloud gateway集成druid报异常:Error creating bean with name ‘statViewServletRegistrationBean‘ 这是由于druid需要servlet容器,而spring cloud gateway不能使用servlet,为此在配置文件中将stat-view-servlet 和 web-stat-filter 关闭:
【4】.Create an email integration flow

(2) ‘org.springframework.integration.dsl.IntegrationFlows’ is deprecated since version 6.0 and marked for removal

我用了 springboot 3.x 版本,在idea中就一直出现这个提示,后来 mqtt 换成 5.5.5 也是没有用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Bean
public IntegrationFlow osdRouterFlow() {
return IntegrationFlows
.from(ChannelName.INBOUND_OSD)
.transform(Message.class, source -> {
try {
TopicOsdRequest response = Common.getObjectMapper().readValue((byte[]) source.getPayload(),
new TypeReference<TopicOsdRequest>() {
});
String topic = String.valueOf(source.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
return response.setFrom(topic.substring((THING_MODEL_PRE + PRODUCT).length(),
topic.indexOf(OSD_SUF)));
} catch (IOException e) {
throw new CloudSDKException(e);
}
}, null)
.<TopicOsdRequest>handle((response, headers) -> {
// 注册网关
String gateway_sn=response.getGateway();
GatewayManager gateway=droneCommon.registerDevice(gateway_sn);
if(gateway==null){
gateway=gateway= SDKManager.getDeviceSDK(gateway_sn);
}
OsdDeviceTypeEnum typeEnum = OsdDeviceTypeEnum.find(gateway.getType(),
response.getFrom().equals(response.getGateway()));
Map<String, Object> data = (Map<String, Object>) response.getData();
if (!typeEnum.isGateway()) {
List payloadData = (List) data.getOrDefault(PayloadModelConst.PAYLOAD_KEY, new ArrayList<>());
PayloadModelConst.getAllIndexWithPosition().stream().filter(data::containsKey)
.map(data::get).forEach(payloadData::add);
data.put(PayloadModelConst.PAYLOAD_KEY, payloadData);
}
// log.error(JSONObject.toJSONString(data));
return response.setData(Common.getObjectMapper().convertValue(data, typeEnum.getClassType()));
})
.<TopicOsdRequest, OsdDeviceTypeEnum>route(response -> OsdDeviceTypeEnum.find(response.getData().getClass()),
mapping -> Arrays.stream(OsdDeviceTypeEnum.values()).forEach(key -> mapping.channelMapping(key, key.getChannelName())))
.get();
}

【解决方案】
IntegrationFlows.from 改为 IntegrationFlow.from

(3).failed to look up MessageChannel with name ‘inboundOsdDock’ in the BeanFactory.

failed to look up MessageChannel with name ‘inboundOsdDock’ in the BeanFactory. 我始终无法理解到底为什么会这个样子,尝试了很多的方法,我以为是 aop 的问题,还以为是其他的问题,结果都不是。

【解决方法】
解决方法,其实是我 没有实现

1
2
3
4
5
6
7
8
9
10
11
12
// 这里我注释掉了,其实应该打开的
// @Service

// 正确的方法
@Service
@Slf4j
public class SDKDeviceService extends AbstractDeviceService {
@ServiceActivator(inputChannel = ChannelName.INBOUND_OSD_DOCK)
public void osdDock(TopicOsdRequest<OsdDock> request, MessageHeaders headers) {

}
}
小额赞助
本人提供免费与付费咨询服务,感谢您的支持!赞助请发邮件通知,方便公布您的善意!
**光 3.01 元
Sun 3.00 元
bibichuan 3.00 元
微信公众号
广告位
诚心邀请广大金主爸爸洽谈合作
每日一省
isNaN 和 Number.isNaN 函数的区别?

1.函数 isNaN 接收参数后,会尝试将这个参数转换为数值,任何不能被转换为数值的的值都会返回 true,因此非数字值传入也会返回 true ,会影响 NaN 的判断。

2.函数 Number.isNaN 会首先判断传入参数是否为数字,如果是数字再继续判断是否为 NaN ,不会进行数据类型的转换,这种方法对于 NaN 的判断更为准确。

每日二省
为什么0.1+0.2 ! == 0.3,如何让其相等?

一个直接的解决方法就是设置一个误差范围,通常称为“机器精度”。对JavaScript来说,这个值通常为2-52,在ES6中,提供了Number.EPSILON属性,而它的值就是2-52,只要判断0.1+0.2-0.3是否小于Number.EPSILON,如果小于,就可以判断为0.1+0.2 ===0.3。

每日三省
== 操作符的强制类型转换规则?

1.首先会判断两者类型是否**相同,**相同的话就比较两者的大小。

2.类型不相同的话,就会进行类型转换。

3.会先判断是否在对比 null 和 undefined,是的话就会返回 true。

4.判断两者类型是否为 string 和 number,是的话就会将字符串转换为 number。

5.判断其中一方是否为 boolean,是的话就会把 boolean 转为 number 再进行判断。

6.判断其中一方是否为 object 且另一方为 string、number 或者 symbol,是的话就会把 object 转为原始类型再进行判断。

每日英语
Happiness is time precipitation, smile is the lonely sad.
幸福是年华的沉淀,微笑是寂寞的悲伤。