feat(messaging/mqtt): 新增 MQTT 消息模块

This commit is contained in:
liquor
2025-11-26 01:41:44 +00:00
committed by Charles7c
parent 730b39d18e
commit ee75e849e2
24 changed files with 2513 additions and 0 deletions

View File

@@ -190,6 +190,13 @@
<version>${revision}</version> <version>${revision}</version>
</dependency> </dependency>
<!-- 消息模块 - MQTT -->
<dependency>
<groupId>top.continew.starter</groupId>
<artifactId>continew-starter-messaging-mqtt</artifactId>
<version>${revision}</version>
</dependency>
<!-- 消息模块 - WebSocket --> <!-- 消息模块 - WebSocket -->
<dependency> <dependency>
<groupId>top.continew.starter</groupId> <groupId>top.continew.starter</groupId>

View File

@@ -124,6 +124,11 @@ public class PropertiesConstants {
*/ */
public static final String MESSAGING_WEBSOCKET = MESSAGING + StringConstants.DOT + "websocket"; public static final String MESSAGING_WEBSOCKET = MESSAGING + StringConstants.DOT + "websocket";
/**
* MQTT 配置
*/
public static final String MESSAGING_MQTT = MESSAGING + StringConstants.DOT + "mqtt";
/** /**
* 日志配置 * 日志配置
*/ */

View File

@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>top.continew.starter</groupId>
<artifactId>continew-starter-messaging</artifactId>
<version>${revision}</version>
</parent>
<artifactId>continew-starter-messaging-mqtt</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>ContiNew Starter 消息模块 - MQTT</description>
<dependencies>
<!--spring MQTT消息模块-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,79 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.annotation;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* mqtt topic 监听器
*
* @author echo
* @since 2.15.0
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Component
public @interface MqttListener {
/**
* 要监听的 MQTT 主题
* <p>支持以下配置方式:
* <pre>{@code
* 方式1: 直接指定主题
* @MqttListener(topic = "sensor/temperature")
*
* 方式2: 使用配置文件占位符
*
* @MqttListener(topic = "${mqtt.topic}")
*
* 方式3: 使用通配符
* @MqttListener(topic = "sensor/+/temperature") // 单级通配符
* @MqttListener(topic = "sensor/#") // 多级通配符
* }</pre>
*
* <p><b>通配符说明:</b>
* <ul>
* <li>{@code +} - 单级通配符,匹配一个层级的任意内容</li>
* <li>{@code #} - 多级通配符,匹配零个或多个层级,只能用在主题末尾</li>
* </ul>
*
* @return MQTT 主题字符串或配置占位符表达式
*/
String topic();
/**
* QoS - 消息传输可靠性等级
* <p>支持以下配置方式:
* <ul>
* <li>直接指定: {@code qos = "0"}, {@code qos = "1"}, {@code qos = "2"}</li>
* <li>使用占位符: {@code qos = "${mqtt.qos}"}</li>
* </ul>
* <p><b>QoS 等级说明:</b>
* <ul>
* <li>{@code 0} - 最多一次,消息可能丢失</li>
* <li>{@code 1} - 至少一次,消息可能重复</li>
* <li>{@code 2} - 恰好一次,消息不丢失且不重复</li>
* </ul>
*
* @return QoS 等级字符串或配置占位符,默认为 "0"
*/
String qos() default "0";
}

View File

@@ -0,0 +1,336 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure;
import cn.hutool.core.util.ObjectUtil;
import jakarta.annotation.PostConstruct;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.StringUtils;
import top.continew.starter.core.constant.PropertiesConstants;
import top.continew.starter.core.constant.StringConstants;
import top.continew.starter.messaging.mqtt.autoconfigure.properties.*;
import top.continew.starter.messaging.mqtt.constant.MqttConstant;
import top.continew.starter.messaging.mqtt.handler.MqttMessageInboundHandler;
import top.continew.starter.messaging.mqtt.handler.MqttShutdownHandler;
import top.continew.starter.messaging.mqtt.msg.MqttMessageConsumer;
import top.continew.starter.messaging.mqtt.msg.MqttMessageProducer;
import top.continew.starter.messaging.mqtt.strategy.MqttOptions;
import top.continew.starter.messaging.mqtt.strategy.MqttTemplate;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
/**
* MQTT 自动配置类
* <p>
* 用于配置 MQTT 连接参数、入站/出站通道及消息处理器等组件。
* </p>
*
* @author echo
* @since 2.15.0
*/
@AutoConfiguration
@EnableConfigurationProperties(MqttProperties.class)
@ConditionalOnProperty(prefix = PropertiesConstants.MESSAGING_MQTT, value = PropertiesConstants.ENABLED, havingValue = "true")
public class MqttAutoConfiguration {
private static final Logger log = LoggerFactory.getLogger(MqttAutoConfiguration.class);
private final MqttProperties mqttProperties;
public MqttAutoConfiguration(MqttProperties mqttProperties) {
this.mqttProperties = mqttProperties;
}
/**
* 配置 MQTT 客户端连接选项
* <p>
* 该方法创建并配置 {@link MqttConnectOptions} 实例,用于建立 MQTT 客户端与服务器的连接。
* 包含认证信息、连接参数、重连策略、SSL/TLS 配置以及遗嘱消息等完整配置。
* </p>
*
* @return 配置完成的 MQTT 连接选项对象
* @see MqttConnectOptions
* @see MqttProperties
*/
@Bean
public MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置 MQTT 服务器地址,支持 tcp://host:port 或 ssl://host:port 格式
mqttConnectOptions.setServerURIs(new String[] {mqttProperties.getHost()});
// 设置用户名(用于认证)
mqttConnectOptions.setUserName(mqttProperties.getUsername());
// 设置密码(转为 char[] 以增强安全性)
mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());
// 设置连接超时时间(单位:秒),默认 30 秒
mqttConnectOptions.setConnectionTimeout(mqttProperties.getConnectionTimeout());
// 设置心跳间隔(单位:秒),用于维持长连接,默认 60 秒
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
// 设置是否清除会话
// true每次连接时清除上次会话状态
// false保留订阅信息与未确认消息
mqttConnectOptions.setCleanSession(mqttProperties.getCleanSession());
// 启用自动重连机制,默认 true
mqttConnectOptions.setAutomaticReconnect(mqttProperties.getAutomaticReconnect());
// 设置最大重连延迟(单位:毫秒),防止频繁重连,默认 128000 毫秒
mqttConnectOptions.setMaxReconnectDelay(mqttProperties.getMaxReconnectDelay());
// 设置最大允许未确认的 QoS>0 消息数量,控制并发发送能力,默认 10
mqttConnectOptions.setMaxInflight(mqttProperties.getMaxInflight());
// 设置自定义 WebSocket 请求头(仅在使用 ws:// 或 wss:// 协议时生效)
mqttConnectOptions.setCustomWebSocketHeaders(mqttProperties.getCustomWebSocketHeaders());
// 启用 HTTPS 主机名验证(适用于 SSL/TLS
mqttConnectOptions.setHttpsHostnameVerificationEnabled(mqttProperties.getHttpsHostnameVerificationEnabled());
// 设置 SSL 连接所需的客户端属性,如证书、密钥、信任库等
mqttConnectOptions.setSSLProperties(mqttProperties.getSslClientProps());
// 设置关闭 ExecutorService 的超时时间(单位:秒),默认 1 秒
mqttConnectOptions.setExecutorServiceTimeout(mqttProperties.getExecutorServiceTimeout());
// 设置遗嘱消息(当客户端异常断开连接时由服务器自动发送)
MqttWillProperties will = mqttProperties.getWill();
if (ObjectUtil.isNotEmpty(will)) {
// 设置遗嘱主题
// 设置遗嘱消息内容(字节数组)
// 设置 QoS 等级
// 设置是否保留该消息true 表示新订阅者会收到)
mqttConnectOptions.setWill(will.getTopic(), will.getPayload().getBytes(), will.getQos(), will
.getRetained());
}
return mqttConnectOptions;
}
/**
* 配置 MQTT 客户端工厂,关联连接参数。
*/
@Bean
public MqttPahoClientFactory mqttPahoClientFactory(MqttConnectOptions mqttConnectOptions) {
DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory();
clientFactory.setConnectionOptions(mqttConnectOptions);
return clientFactory;
}
/**
* 配置入站消息通道。
* - 若 consumer.async = true则使用线程池处理异步
* - 否则使用 DirectChannel同步
*/
@Bean(name = MqttConstant.MQTT_INPUT_CHANNEL_NAME)
public MessageChannel mqttInputChannel() {
MqttConsumerProperties consumer = mqttProperties.getConsumer();
Boolean async = consumer.getAsync();
if (Boolean.TRUE.equals(async)) {
return new ExecutorChannel(mqttConsumerExecutor());
} else {
return new DirectChannel();
}
}
/**
* 配置 MQTT 消息消费处理通道(队列模式),供消费者拉取使用。
*/
@Bean(name = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME)
public MessageChannel consumerChannel() {
MqttProducerProperties producer = mqttProperties.getProducer();
Boolean async = producer.getAsync();
if (Boolean.TRUE.equals(async)) {
return new ExecutorChannel(mqttProducerExecutor());
} else {
return new DirectChannel();
}
}
/**
* 配置 MQTT 出站通道,向 broker 发送消息。
*/
@Bean(name = MqttConstant.CONSUMER_CHANNEL_NAME)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* 配置 MQTT 入站适配器,接收来自 MQTT broker 的消息。
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter(MqttPahoClientFactory mqttPahoClientFactory,
@Qualifier(MqttConstant.MQTT_INPUT_CHANNEL_NAME) MessageChannel mqttInputChannel,
Environment environment) {
MqttConsumerProperties consumer = mqttProperties.getConsumer();
String clientId = consumer.getClientId();
if (!StringUtils.hasText(clientId)) {
clientId = getClientId(environment);
}
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttPahoClientFactory);
adapter.setAutoStartup(consumer.getAutoStartUp());
adapter.setOutputChannel(mqttInputChannel);
adapter.setQos(consumer.getQos());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setCompletionTimeout(consumer.getCompletionTimeout());
adapter.setDisconnectCompletionTimeout(consumer.getDisconnectCompletionTimeout());
return adapter;
}
/**
* 配置 MQTT 出站消息处理器,用于发布消息。
*/
@Bean
@ServiceActivator(inputChannel = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME)
public MqttPahoMessageHandler mqttOutbound(MqttPahoClientFactory mqttPahoClientFactory, Environment environment) {
MqttProducerProperties producer = mqttProperties.getProducer();
String clientId = producer.getClientId();
if (!StringUtils.hasText(clientId)) {
clientId = getClientId(environment);
}
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttPahoClientFactory);
messageHandler.setAsync(producer.getAsync());
messageHandler.setAsyncEvents(producer.getAsyncEvents());
messageHandler.setDefaultTopic(producer.getDefaultTopic());
messageHandler.setDefaultQos(producer.getDefaultQos());
messageHandler.setConverter(new DefaultPahoMessageConverter());
messageHandler.setDefaultRetained(producer.getDefaultRetained());
return messageHandler;
}
/**
* 构造封装的 MqttTemplate 工具类,提供更易用的发送/订阅能力。
*/
@Bean
public MqttOptions mqttOptions(MqttPahoMessageDrivenChannelAdapter adapter) {
return new MqttTemplate(adapter);
}
/**
* 构造入站消息处理器,分发到自定义监听器中。
*/
@Bean
public MqttMessageInboundHandler mqttMessageInboundHandler(List<MqttMessageConsumer> messageListeners,
MqttOptions mqttOptions,
Environment environment) {
return new MqttMessageInboundHandler(messageListeners, mqttOptions, environment);
}
/**
* 配置 MQTT 消息生产者网关
*/
@Bean
public GatewayProxyFactoryBean<?> mqttMessageProducer(@Qualifier(MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME) MessageChannel outboundChannel) {
GatewayProxyFactoryBean<?> factoryBean = new GatewayProxyFactoryBean<>(MqttMessageProducer.class);
factoryBean.setDefaultRequestChannel(outboundChannel);
return factoryBean;
}
/**
* 消费者异步线程池
*/
public ThreadPoolTaskExecutor mqttConsumerExecutor() {
MqttExecutorProperties executorProperties = mqttProperties.getConsumer().getExecutor();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getQueueCapacity());
executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("mqtt-consumer-");
executor.initialize();
return executor;
}
/**
* 生产者异步线程池
*
* @return {@link ThreadPoolTaskExecutor }
*/
public ThreadPoolTaskExecutor mqttProducerExecutor() {
MqttExecutorProperties executorProperties = mqttProperties.getProducer().getExecutor();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaxPoolSize(executorProperties.getMaxPoolSize());
executor.setQueueCapacity(executorProperties.getQueueCapacity());
executor.setKeepAliveSeconds(executorProperties.getKeepAliveSeconds());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("mqtt-producer-");
executor.initialize();
return executor;
}
/**
* 自动生成客户端 ID避免冲突。
*/
private String getClientId(Environment environment) {
String applicationName = environment.getProperty("spring.application.name", "mqtt");
return applicationName + StringConstants.DASHED + UUID.randomUUID()
.toString()
.replace(StringConstants.DASHED, "");
}
/**
* mqtt关闭处理程序
*
* @param inboundAdapter Mqtt Paho消息驱动通道适配器
* @param outboundHandler MQTT Paho 消息处理器
* @return {@link MqttShutdownHandler }
*/
@Bean
public MqttShutdownHandler mqttShutdownHandler(MqttPahoMessageDrivenChannelAdapter inboundAdapter,
MqttPahoMessageHandler outboundHandler) {
return new MqttShutdownHandler(inboundAdapter, outboundHandler);
}
/**
* 自动配置类完成初始化时打印日志。
*/
@PostConstruct
public void postConstruct() {
log.info("[ContiNew Starter] - Auto Configuration 'MQTT' completed initialization.");
}
}

View File

@@ -0,0 +1,139 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure.properties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import org.springframework.integration.mqtt.core.ClientManager;
import top.continew.starter.messaging.mqtt.enums.MqttQoS;
/**
* 消费者属性
*
* @author echo
* @since 2.15.0
*/
public class MqttConsumerProperties {
/**
* MQTT 服务质量等级QoS, Quality of Service
* 0最多一次AT_MOST_ONCE不保证消息到达
* 1至少一次AT_LEAST_ONCE可能重复
* 2只有一次EXACTLY_ONCE确保消息仅到达一次。
* 默认使用 QoS 0。
*/
private Integer qos = MqttQoS.AT_MOST_ONCE.value();
/**
* 消息发送完成等待超时时间(单位:毫秒)。
* 控制发送消息时,等待 broker 响应的最大时长,超过将报错。
* 默认值参考 ClientManager.DEFAULT_COMPLETION_TIMEOUT。
*/
private Long completionTimeout = ClientManager.DEFAULT_COMPLETION_TIMEOUT;
/**
* 是否自动启动客户端连接。
* 设置为 true 则在应用启动时自动连接 MQTT 服务器并订阅 Topic。
* 默认值为 true。
*/
private Boolean autoStartUp = true;
/**
* MQTT 客户端 ID。
* 用于唯一标识客户端连接,同一 broker 下不能重复。
* 如果为空,可能由系统自动生成。
*/
private String clientId;
/**
* 是否启用异步消息发送。
* 设置为 true 则消息发送不阻塞当前线程,适用于高吞吐场景;
* 设置为 false 则同步发送,便于确认是否成功。
* 默认值为 false。
*/
private Boolean async = false;
/**
* 客户端断开连接时的完成超时时间(单位:毫秒)。
* 用于控制断连操作的最长等待时间。
* 默认值参考 ClientManager.DISCONNECT_COMPLETION_TIMEOUT。
*/
private Long disconnectCompletionTimeout = ClientManager.DISCONNECT_COMPLETION_TIMEOUT;
/**
* MQTT 消息处理线程池配置。
* 包含核心线程数、最大线程数、队列容量等,控制消费者消息处理能力。
* 默认配置为 new MqttExecutorProperties()。
*/
@NestedConfigurationProperty
private MqttExecutorProperties executor = new MqttExecutorProperties();
public Integer getQos() {
return qos;
}
public void setQos(Integer qos) {
this.qos = qos;
}
public Long getCompletionTimeout() {
return completionTimeout;
}
public void setCompletionTimeout(Long completionTimeout) {
this.completionTimeout = completionTimeout;
}
public Boolean getAutoStartUp() {
return autoStartUp;
}
public void setAutoStartUp(Boolean autoStartUp) {
this.autoStartUp = autoStartUp;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Boolean getAsync() {
return async;
}
public void setAsync(Boolean async) {
this.async = async;
}
public Long getDisconnectCompletionTimeout() {
return disconnectCompletionTimeout;
}
public void setDisconnectCompletionTimeout(Long disconnectCompletionTimeout) {
this.disconnectCompletionTimeout = disconnectCompletionTimeout;
}
public MqttExecutorProperties getExecutor() {
return executor;
}
public void setExecutor(MqttExecutorProperties executor) {
this.executor = executor;
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure.properties;
/**
* 连接池属性
*
* @author echo
* @since 2.15.0
*/
public class MqttExecutorProperties {
/**
* 线程池核心线程数,表示即使线程处于空闲状态,也会保留的最小线程数。
* 通常设置为常规负载下的并发处理数量,默认值为 5。
*/
private Integer corePoolSize = 5;
/**
* 线程池最大线程数,表示线程池允许创建的最大线程数量。
* 超过 corePoolSize 后,如果任务队列已满,会继续创建线程直到该值。
* 默认值为 10。
*/
private Integer maxPoolSize = 10;
/**
* 线程池中线程最大空闲时间(单位:秒)。
* 当线程数量超过 corePoolSize 且处于空闲状态超过该时间时会被销毁。
* 默认值为 60 秒。
*/
private Integer keepAliveSeconds = 60;
/**
* 线程池的任务队列容量。
* 用于缓冲提交但尚未执行的任务,超过该容量时新任务会触发拒绝策略。
* 默认值为 512。
*/
private Integer queueCapacity = 512;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public Integer getKeepAliveSeconds() {
return keepAliveSeconds;
}
public void setKeepAliveSeconds(Integer keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
public Integer getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(Integer queueCapacity) {
this.queueCapacity = queueCapacity;
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure.properties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import top.continew.starter.messaging.mqtt.enums.MqttQoS;
/**
* 生产者属性
*
* @author echo
* @since 2.15.0
*/
public class MqttProducerProperties {
/**
* 默认的消息服务质量等级QoS, Quality of Service
* 0最多一次AT_MOST_ONCE不保证送达
* 1至少一次AT_LEAST_ONCE可能重复
* 2只有一次EXACTLY_ONCE确保仅送达一次。
* 可在发送时指定不同 QoS此为默认值。
* 默认值0AT_MOST_ONCE
*/
private Integer defaultQos = MqttQoS.AT_MOST_ONCE.value();
/**
* MQTT 客户端 ID。
* 用于标识连接的唯一客户端,在 broker 中必须唯一;
* 若为空,通常系统会自动生成。
*/
private String clientId;
/**
* 默认发布的 Topic。
* 当未指定 topic 时使用此 topic 发送消息。
* 默认值:"producer"。
*/
private String defaultTopic = "producer";
/**
* 是否启用异步发送模式。
* true 表示消息发送不会阻塞当前线程;
* false 表示同步等待发送完成。
* 默认值为 false。
*/
private Boolean async = false;
/**
* 是否异步触发发送事件(例如发送回调等)。
* 仅在 `async = true` 时生效;
* 设置为 true 可提升事件处理性能。
* 默认值为 false。
*/
private Boolean asyncEvents = false;
/**
* 是否设置消息为保留Retained消息。
* Retained 消息在发送后 broker 会保留并在新订阅者订阅时立刻推送;
* 可用于设备初始状态等场景。
* 默认值为 false不保留
*/
private Boolean defaultRetained = false;
/**
* MQTT 消息处理线程池配置。
* 包含核心线程数、最大线程数、队列容量等,控制消费者消息处理能力。
* 默认配置为 new MqttExecutorProperties()。
*/
@NestedConfigurationProperty
private MqttExecutorProperties executor = new MqttExecutorProperties();
public Integer getDefaultQos() {
return defaultQos;
}
public void setDefaultQos(Integer defaultQos) {
this.defaultQos = defaultQos;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public Boolean getAsync() {
return async;
}
public void setAsync(Boolean async) {
this.async = async;
}
public Boolean getAsyncEvents() {
return asyncEvents;
}
public void setAsyncEvents(Boolean asyncEvents) {
this.asyncEvents = asyncEvents;
}
public Boolean getDefaultRetained() {
return defaultRetained;
}
public void setDefaultRetained(Boolean defaultRetained) {
this.defaultRetained = defaultRetained;
}
public MqttExecutorProperties getExecutor() {
return executor;
}
public void setExecutor(MqttExecutorProperties executor) {
this.executor = executor;
}
}

View File

@@ -0,0 +1,288 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure.properties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import top.continew.starter.core.constant.PropertiesConstants;
import javax.net.ssl.HostnameVerifier;
import java.util.Properties;
/**
* 配置参数
*
* @author echo
* @since 2.15.0
*/
@ConfigurationProperties(prefix = PropertiesConstants.MESSAGING_MQTT)
public class MqttProperties {
/**
* 开关
*/
private boolean enabled;
/**
* 地址 格式 tcp://192.168.20.95:1883
*/
private String host;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 保持连接的间隔时间(秒)。客户端会按照此间隔向服务器发送心跳,以维持连接。
* 默认值60 秒。
*/
private Integer keepAliveInterval = 60;
/**
* 客户端允许同时存在的最大未确认消息数。
* 如果超出此数量,新的消息将被阻塞直到有确认消息返回。
* 默认值10。
*/
private Integer maxInflight = 10;
/**
* 遗嘱消息内容。客户端异常断开连接时,由服务器向指定主题发送的消息。
* 可设置 topic payload、QOS、retained 等属性。
*/
@NestedConfigurationProperty
private MqttWillProperties will;
/**
* 配置 SSL 连接所需的客户端属性。
* 例如:证书路径、密钥密码等。
*/
@NestedConfigurationProperty
private Properties sslClientProps;
/**
* 是否启用 HTTPS 主机名验证。
* 若为 true将校验服务端证书中的主机名是否与实际连接地址一致。
* 默认值false。
*/
private Boolean httpsHostnameVerificationEnabled = false;
/**
* 自定义的主机名校验器,用于验证 SSL/TLS 连接时服务端主机名是否合法。
* 可用于替换默认的验证策略。
*/
private HostnameVerifier sslHostnameVerifier;
/**
* 是否使用清洁会话。
* true 表示连接建立时清除之前的会话信息(订阅、未送达消息等),
* false 表示会话持久化。
* 默认值false持久会话
*/
private Boolean cleanSession = false;
/**
* 连接超时时间(秒)。客户端尝试连接服务器的最长等待时间。
* 默认值30 秒。
*/
private Integer connectionTimeout = 30;
/**
* 是否启用自动重连。当连接断开时,是否自动尝试重新连接。
* 默认值true。
*/
private Boolean automaticReconnect = true;
/**
* 最大重连延迟时间(毫秒)。用于自动重连时的退避策略上限。
* 默认值128000约 2 分钟)。
*/
private Integer maxReconnectDelay = 128000;
/**
* 自定义 WebSocket 请求头。
* 用于配置使用 WebSocket 协议连接时的额外 HTTP 请求头参数。
*/
@NestedConfigurationProperty
private Properties customWebSocketHeaders;
/**
* 终止执行服务时等待多长时间(以秒为单位)
*/
private Integer executorServiceTimeout = 1;
/**
* 生产者
*/
@NestedConfigurationProperty
private MqttProducerProperties producer = new MqttProducerProperties();
/**
* 消费者
*/
@NestedConfigurationProperty
private MqttConsumerProperties consumer = new MqttConsumerProperties();
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public Integer getKeepAliveInterval() {
return keepAliveInterval;
}
public void setKeepAliveInterval(Integer keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}
public Integer getMaxInflight() {
return maxInflight;
}
public void setMaxInflight(Integer maxInflight) {
this.maxInflight = maxInflight;
}
public MqttWillProperties getWill() {
return will;
}
public void setWill(MqttWillProperties will) {
this.will = will;
}
public Properties getSslClientProps() {
return sslClientProps;
}
public void setSslClientProps(Properties sslClientProps) {
this.sslClientProps = sslClientProps;
}
public Boolean getHttpsHostnameVerificationEnabled() {
return httpsHostnameVerificationEnabled;
}
public void setHttpsHostnameVerificationEnabled(Boolean httpsHostnameVerificationEnabled) {
this.httpsHostnameVerificationEnabled = httpsHostnameVerificationEnabled;
}
public HostnameVerifier getSslHostnameVerifier() {
return sslHostnameVerifier;
}
public void setSslHostnameVerifier(HostnameVerifier sslHostnameVerifier) {
this.sslHostnameVerifier = sslHostnameVerifier;
}
public Boolean getCleanSession() {
return cleanSession;
}
public void setCleanSession(Boolean cleanSession) {
this.cleanSession = cleanSession;
}
public Integer getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public Boolean getAutomaticReconnect() {
return automaticReconnect;
}
public void setAutomaticReconnect(Boolean automaticReconnect) {
this.automaticReconnect = automaticReconnect;
}
public Integer getMaxReconnectDelay() {
return maxReconnectDelay;
}
public void setMaxReconnectDelay(Integer maxReconnectDelay) {
this.maxReconnectDelay = maxReconnectDelay;
}
public Properties getCustomWebSocketHeaders() {
return customWebSocketHeaders;
}
public void setCustomWebSocketHeaders(Properties customWebSocketHeaders) {
this.customWebSocketHeaders = customWebSocketHeaders;
}
public Integer getExecutorServiceTimeout() {
return executorServiceTimeout;
}
public void setExecutorServiceTimeout(Integer executorServiceTimeout) {
this.executorServiceTimeout = executorServiceTimeout;
}
public MqttProducerProperties getProducer() {
return producer;
}
public void setProducer(MqttProducerProperties producer) {
this.producer = producer;
}
public MqttConsumerProperties getConsumer() {
return consumer;
}
public void setConsumer(MqttConsumerProperties consumer) {
this.consumer = consumer;
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.autoconfigure.properties;
import top.continew.starter.messaging.mqtt.enums.MqttQoS;
/**
* 遗嘱消息属性
*
* @author echo
* @since 2.15.0
*/
public class MqttWillProperties {
/**
* 遗嘱消息的目标主题。
* 当客户端异常断开时,将向该主题发布遗嘱消息。
*/
private String topic;
/**
* 遗嘱消息内容
*/
private String payload;
/**
* 遗嘱消息的 QoS 等级
* 0最多一次1至少一次2只有一次
* 默认值0
*/
private Integer qos = MqttQoS.AT_MOST_ONCE.value();
/**
* 是否设置为保留消息
* true新订阅者会立即收到该消息
* 默认值false
*/
private Boolean retained = false;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getPayload() {
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
public Integer getQos() {
return qos;
}
public void setQos(Integer qos) {
this.qos = qos;
}
public Boolean getRetained() {
return retained;
}
public void setRetained(Boolean retained) {
this.retained = retained;
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.constant;
/**
* mqtt常量
*
* @author echo
* @since 2.15.0
*/
public class MqttConstant {
/**
* MQTT 入站通道名称(消费者使用,接收消息的入口)
*/
public static final String MQTT_INPUT_CHANNEL_NAME = "mqttInputChannel";
/**
* MQTT 出站通道名称(生产者使用,发送消息的出口)
*/
public static final String MQTT_OUT_BOUND_CHANNEL_NAME = "mqttOutboundChannel";
/**
* 应用级消息消费处理通道名称(接收到 MQTT 消息后转发到此通道供业务处理)
*/
public static final String CONSUMER_CHANNEL_NAME = "consumerChannel";
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.enums;
import top.continew.starter.messaging.mqtt.exception.MqttException;
/**
* qos 消息质量等级枚举
*
* @author echo
* @since 2.15.0
*/
public enum MqttQoS {
/**
* QoS level 0 至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。
*/
AT_MOST_ONCE(0),
/**
* QoS level 1 至少一次都要在可变头部中附加一个16位的消息IDSUBSCRIBE 和 UNSUBSCRIBE 消息使用 QoS level 1。
*/
AT_LEAST_ONCE(1),
/**
* QoS level 2 确保只有一次,仅仅在 PUBLISH 类型消息中出现要求在可变头部中要附加消息ID。
*/
EXACTLY_ONCE(2),
/**
* 失败
*/
FAILURE(0x80);
private final int value;
MqttQoS(int value) {
this.value = value;
}
public int value() {
return value;
}
public static MqttQoS valueOf(int value) {
return switch (value) {
case 0 -> AT_MOST_ONCE;
case 1 -> AT_LEAST_ONCE;
case 2 -> EXACTLY_ONCE;
case 0x80 -> FAILURE;
default -> throw new MqttException("无效的 QoS: " + value);
};
}
@Override
public String toString() {
return "QoS" + value;
}
}

View File

@@ -0,0 +1,124 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.enums;
import top.continew.starter.messaging.mqtt.exception.MqttException;
import top.continew.starter.messaging.mqtt.util.TopicUtils;
/**
* topic 筛选器类型
*
* @author echo
* @since 2.15.0
*/
public enum TopicFilterType {
/**
* 默认 TopicFilter
*/
NONE {
@Override
public boolean match(String topicFilter, String topicName) {
return TopicUtils.match(topicFilter, topicName);
}
},
/**
* $queue/ 为前缀的共享订阅是不带群组的共享订阅
*/
QUEUE {
@Override
public boolean match(String topicFilter, String topicName) {
int prefixLen = TopicFilterType.SHARE_QUEUE_PREFIX.length();
return TopicUtils.match(topicFilter.substring(prefixLen), topicName);
}
},
/**
* $share/{group-name}/ 为前缀的共享订阅是带群组的共享订阅
*/
SHARE {
@Override
public boolean match(String topicFilter, String topicName) {
// 去除前缀 $share/<group-name>/ ,匹配 topicName / 前缀
int prefixLen = TopicFilterType.findShareTopicIndex(topicFilter);
return TopicUtils.match(topicFilter.substring(prefixLen), topicName);
}
};
/**
* 共享订阅的 topic
*/
public static final String SHARE_QUEUE_PREFIX = "$queue/";
public static final String SHARE_GROUP_PREFIX = "$share/";
/**
* 判断 topicFilter 和 topicName 匹配情况
*
* @param topicFilter topicFilter
* @param topicName topicName
* @return 是否匹配
*/
public abstract boolean match(String topicFilter, String topicName);
/**
* 获取 topicFilter 类型
*
* @param topicFilter topicFilter
* @return TopicFilterType
*/
public static TopicFilterType getType(String topicFilter) {
if (topicFilter.startsWith(TopicFilterType.SHARE_QUEUE_PREFIX)) {
return TopicFilterType.QUEUE;
} else if (topicFilter.startsWith(TopicFilterType.SHARE_GROUP_PREFIX)) {
return TopicFilterType.SHARE;
} else {
return TopicFilterType.NONE;
}
}
/**
* 读取共享订阅的分组名
*
* @param topicFilter topicFilter
* @return 共享订阅分组名
*/
public static String getShareGroupName(String topicFilter) {
int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length();
int topicFilterLength = topicFilter.length();
for (int i = prefixLength; i < topicFilterLength; i++) {
char ch = topicFilter.charAt(i);
if ('/' == ch) {
return topicFilter.substring(prefixLength, i);
}
}
throw new MqttException("分享订阅topic过滤器: " + topicFilter + " 不符合 $share/<group-name>/xxx");
}
private static int findShareTopicIndex(String topicFilter) {
int prefixLength = TopicFilterType.SHARE_GROUP_PREFIX.length();
int topicFilterLength = topicFilter.length();
for (int i = prefixLength; i < topicFilterLength; i++) {
char ch = topicFilter.charAt(i);
if ('/' == ch) {
return i + 1;
}
}
throw new MqttException("分享订阅主题过滤器: " + topicFilter + " 不符合 $share/<group-name>/xxx");
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.exception;
import top.continew.starter.core.exception.BaseException;
import java.io.Serial;
/**
* mqtt异常
*
* @author echo
* @since 2.15.0
*/
public class MqttException extends BaseException {
@Serial
private static final long serialVersionUID = 1L;
public MqttException() {
}
public MqttException(String message) {
super(message);
}
public MqttException(Throwable cause) {
super(cause);
}
public MqttException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,231 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.Environment;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import top.continew.starter.messaging.mqtt.annotation.MqttListener;
import top.continew.starter.messaging.mqtt.constant.MqttConstant;
import top.continew.starter.messaging.mqtt.enums.TopicFilterType;
import top.continew.starter.messaging.mqtt.exception.MqttException;
import top.continew.starter.messaging.mqtt.model.MqttMessage;
import top.continew.starter.messaging.mqtt.msg.MqttMessageConsumer;
import top.continew.starter.messaging.mqtt.strategy.MqttOptions;
import top.continew.starter.messaging.mqtt.util.TopicUtils;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
* 消息调度器 路由分发中心
*
* @author echo
* @since 2.15.0
*/
public class MqttMessageInboundHandler implements MessageHandler, InitializingBean, ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(MqttMessageInboundHandler.class);
// 精确匹配的topic -> 监听器映射(用于@MqttListener注解的监听器
private final Map<String, MqttMessageConsumer> annotatedListenerMap = new ConcurrentHashMap<>();
// 所有实现了MqttMessageListener的Bean列表用于动态订阅
private final List<MqttMessageConsumer> allListeners;
private final MqttOptions mqttOptions;
private final Environment environment;
@Nullable
private ApplicationContext applicationContext;
public MqttMessageInboundHandler(List<MqttMessageConsumer> mqttMessageConsumerList,
MqttOptions mqttOptions,
Environment environment) {
this.allListeners = mqttMessageConsumerList;
this.mqttOptions = mqttOptions;
this.environment = environment;
}
@Override
@ServiceActivator(inputChannel = MqttConstant.MQTT_INPUT_CHANNEL_NAME)
public void handleMessage(Message<?> message) throws MessagingException {
MqttMessage mqttMessage = MqttMessage.of(message);
String topic = mqttMessage.getTopic();
// 1. 优先处理通过@MqttListener注解订阅的精确匹配
boolean handled = handleAnnotatedListeners(topic, mqttMessage);
// 2. 如果没有精确匹配的注解监听器,则广播给所有监听器
if (!handled) {
broadcastToAllListeners(mqttMessage);
}
}
/**
* 处理通过@MqttListener注解订阅的监听器
*
* @return true if message was handled by annotated listener
*/
private boolean handleAnnotatedListeners(String topic, MqttMessage mqttMessage) {
boolean handled = false;
for (Map.Entry<String, MqttMessageConsumer> entry : annotatedListenerMap.entrySet()) {
String topicFilter = entry.getKey();
try {
if (isTopicMatch(topicFilter, topic)) {
entry.getValue().onMessage(mqttMessage);
handled = true;
}
} catch (Exception e) {
log.error("注解监听器处理消息时发生错误: {}", entry.getValue().getClass().getSimpleName(), e);
}
}
return handled;
}
/**
* 广播消息给所有监听器(用于动态订阅的场景)
*/
private void broadcastToAllListeners(MqttMessage mqttMessage) {
// 获取当前订阅的所有topics
List<String> subscribedTopics = mqttOptions.listTopics();
// 检查消息的topic是否在订阅列表中
boolean isSubscribed = subscribedTopics.stream()
.anyMatch(subscribedTopic -> isTopicMatch(subscribedTopic, mqttMessage.getTopic()));
if (!isSubscribed) {
return;
}
// 广播给所有没有@MqttListener注解的监听器
for (MqttMessageConsumer listener : allListeners) {
// 跳过已经通过注解处理的监听器
if (isAnnotatedListener(listener)) {
continue;
}
try {
listener.onMessage(mqttMessage);
} catch (Exception e) {
log.error("监听器处理消息时发生错误: {}", listener.getClass().getSimpleName(), e);
}
}
}
/**
* 检查是否是注解监听器
*/
private boolean isAnnotatedListener(MqttMessageConsumer listener) {
return annotatedListenerMap.containsValue(listener);
}
/**
* 判断topic是否匹配
*/
private boolean isTopicMatch(String topicFilter, String actualTopic) {
// 精确匹配
if (topicFilter.equals(actualTopic)) {
return true;
}
// 通配符匹配
if (TopicUtils.isTopicFilter(topicFilter) && TopicUtils.match(topicFilter, actualTopic)) {
return true;
}
// 共享订阅匹配
return TopicFilterType.SHARE.equals(TopicFilterType.getType(topicFilter)) && TopicFilterType.SHARE
.match(topicFilter, actualTopic);
}
@Override
public void afterPropertiesSet() {
Assert.notNull(applicationContext, "applicationContext不能为null");
if (CollectionUtils.isEmpty(allListeners)) {
return;
}
// 处理带有@MqttListener注解的监听器
for (MqttMessageConsumer mqttMessageConsumer : allListeners) {
Class<?> clazz = ClassUtils.getUserClass(mqttMessageConsumer);
MqttListener mqttListener = AnnotationUtils.findAnnotation(clazz, MqttListener.class);
if (Objects.nonNull(mqttListener)) {
String topic = resolvePlaceholder(mqttListener.topic());
int qos = resolveQos(mqttListener.qos());
mqttOptions.addTopic(topic, qos);
annotatedListenerMap.put(topic, mqttMessageConsumer);
} else {
log.info("发现无注解监听器: {},将接收所有动态订阅的消息", clazz.getSimpleName());
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 解析占位符,支持 ${} 和直接字符串
*/
private String resolvePlaceholder(String value) {
if (StringUtils.hasText(value)) {
return environment.resolvePlaceholders(value);
}
return value;
}
/**
* 解析 QoS 配置,支持占位符和直接数字
*
* @param qosValue QoS 字符串值或占位符
* @return QoS 等级 (0, 1, 或 2)
* @throws MqttException 如果 QoS 值无效
*/
private int resolveQos(String qosValue) {
try {
String resolved = resolvePlaceholder(qosValue);
int qos = Integer.parseInt(resolved);
// ✅ 验证 QoS 值是否合法
if (qos < 0 || qos > 2) {
throw new MqttException("QoS 值必须在 0-2 之间, 当前值: " + qos);
}
return qos;
} catch (NumberFormatException e) {
throw new MqttException("无法解析 QoS 值: " + qosValue + ", 必须是 0, 1 或 2", e);
}
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.handler;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
/**
* mqtt关闭处理程序
*
* @author echo
* @since 2.15.0
*/
@SuppressWarnings("ClassCanBeRecord")
public class MqttShutdownHandler implements DisposableBean, ApplicationListener<ContextClosedEvent> {
private final MqttPahoMessageDrivenChannelAdapter inboundAdapter;
private final MqttPahoMessageHandler outboundHandler;
public MqttShutdownHandler(MqttPahoMessageDrivenChannelAdapter inboundAdapter,
MqttPahoMessageHandler outboundHandler) {
this.inboundAdapter = inboundAdapter;
this.outboundHandler = outboundHandler;
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
stopMqttConnections();
}
@Override
public void destroy() throws Exception {
stopMqttConnections();
}
private void stopMqttConnections() {
try {
if (inboundAdapter != null && inboundAdapter.isRunning()) {
inboundAdapter.stop();
}
if (outboundHandler != null) {
outboundHandler.stop();
}
// 给一点时间让连接正常关闭
Thread.sleep(500);
} catch (Exception ignored) {
}
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.model;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import top.continew.starter.messaging.mqtt.enums.MqttQoS;
import java.io.Serializable;
import java.util.Objects;
/**
* mqtt消息实体
*
* @author echo
* @since 2.15.0
*/
public class MqttMessage implements Serializable {
/**
* 消息头信息,包含 MQTT 消息的元数据,如 topic、QOS、retained 等。
*/
private MessageHeaders messageHeaders;
/**
* 消息体(负载),可以是字符串、字节数组或其他对象类型。
*/
private Object payload;
/**
* MQTT 主题,用于标识消息的发布/订阅通道。
*/
private String topic;
/**
* 消息服务质量等级QOS
* 0 - 最多一次,消息可能会丢失;
* 1 - 至少一次,消息可能重复;
* 2 - 只有一次,确保消息不重复也不丢失。
*/
private Integer qos;
/**
* 是否保留该消息Retained
* true 表示该消息会保留在 MQTT 服务器上,供新订阅者立即获取;
* false 表示仅当前订阅者接收到该消息。
*/
private Boolean retained;
public MqttMessage(Message<?> message) {
this.messageHeaders = message.getHeaders();
this.topic = (String)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC));
this.qos = (Integer)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_QOS));
this.retained = (Boolean)Objects.requireNonNull(message.getHeaders().get(MqttHeaders.RECEIVED_RETAINED));
this.payload = message.getPayload();
}
public MqttMessage(Object payload, String topic) {
this.payload = payload;
this.topic = topic;
this.qos = MqttQoS.AT_MOST_ONCE.value();
this.retained = false;
}
public MqttMessage(Object payload, String topic, Integer qos) {
this.payload = payload;
this.topic = topic;
this.qos = qos;
this.retained = false;
}
public MqttMessage(Object payload, String topic, Integer qos, Boolean retained) {
this.payload = payload;
this.topic = topic;
this.qos = qos;
this.retained = retained;
}
public static MqttMessage of(Message<?> message) {
return new MqttMessage(message);
}
public static MqttMessage of(Object payload, String topic, Integer qos) {
return new MqttMessage(payload, topic, qos);
}
public MessageHeaders getMessageHeaders() {
return messageHeaders;
}
public void setMessageHeaders(MessageHeaders messageHeaders) {
this.messageHeaders = messageHeaders;
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getQos() {
return qos;
}
public void setQos(Integer qos) {
this.qos = qos;
}
public Boolean getRetained() {
return retained;
}
public void setRetained(Boolean retained) {
this.retained = retained;
}
}

View File

@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.msg;
import top.continew.starter.messaging.mqtt.handler.MqttMessageInboundHandler;
import top.continew.starter.messaging.mqtt.model.MqttMessage;
/**
* 消息监听 - 消费者
*
* @author echo
* @since 2.15.0
*/
public interface MqttMessageConsumer {
/**
* 消息订阅
*
* @param message {@link MqttMessage}
* @see MqttMessageInboundHandler
*/
void onMessage(MqttMessage message);
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.msg;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import top.continew.starter.messaging.mqtt.constant.MqttConstant;
/**
* 消息发送 - 生产者
*
* @author echo
* @since 2.15.0
**/
@MessagingGateway(defaultRequestChannel = MqttConstant.MQTT_OUT_BOUND_CHANNEL_NAME)
public interface MqttMessageProducer {
/**
* 消息发送 - 默认topic
*
* @param payload 消息体
*/
void sendToMqtt(String payload);
/**
* 指定topic进行消息发送
*
* @param topic topic
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Payload String payload);
/**
* 指定topic进行消息发送
*
* @param topic topic
* @param qos qos
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
@Header(MqttHeaders.RETAINED) boolean retained,
@Payload String payload);
/**
* 指定topic进行消息发送
*
* @param topic topic
* @param qos qos
* @param payload 消息体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
@Header(MqttHeaders.RETAINED) boolean retained,
@Payload byte[] payload);
}

View File

@@ -0,0 +1,133 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.strategy;
import java.util.Collection;
import java.util.List;
/**
* MQTT 客户端操作接口
* <p>
* 提供 topic 管理与消息发布能力。
* 实现类可基于不同 MQTT 客户端或通信方式进行扩展。
* </p>
*
* @author echo
* @since 2.15.0
*/
public interface MqttOptions {
/**
* 添加topic
*
* @param topic topic
*/
void addTopic(String topic);
/**
* 添加topic
*
* @param topic topic
* @param qos qos
*/
void addTopic(String topic, int qos);
/**
* 删除topic
*
* @param topic topic
*/
void removeTopic(String topic);
/**
* 查询订阅的所有topic
*
* @return topic list
*/
List<String> listTopics();
/**
* 批量添加订阅主题(默认 QoS
*
* @param topics 订阅的主题列表
*/
void addTopics(String... topics);
/**
* 批量添加订阅主题,指定统一 QoS 等级
*
* @param qos 消息服务质量等级0/1/2
* @param topics 订阅的主题列表
*/
void addTopics(int qos, String... topics);
/**
* 批量添加订阅主题(默认 QoS
*
* @param topics 订阅的主题集合List 形式)
*/
void addTopics(List<String> topics);
/**
* 批量添加订阅主题,指定统一 QoS 等级
*
* @param topics 订阅的主题集合List 形式)
* @param qos 消息服务质量等级0/1/2
*/
void addTopics(List<String> topics, int qos);
/**
* 批量取消订阅主题
*
* @param topics 要取消订阅的主题列表
*/
void removeTopics(String... topics);
/**
* 批量取消订阅主题
*
* @param topics 要取消订阅的主题集合List 形式)
*/
void removeTopics(List<String> topics);
/**
* 批量取消订阅主题
*
* @param topics 要取消订阅的主题集合Collection 形式)
*/
void removeTopics(Collection<String> topics);
/**
* 判断是否已订阅指定主题
*
* @param topic 主题名称
* @return 如果已订阅该主题,返回 true否则返回 false
*/
boolean containsTopic(String topic);
/**
* 清空所有已订阅的主题
*/
void clearAllTopics();
/**
* 获取当前已订阅的主题数量
*
* @return 已订阅的主题数量
*/
int getTopicCount();
}

View File

@@ -0,0 +1,142 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.strategy;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import top.continew.starter.messaging.mqtt.enums.MqttQoS;
import java.util.Collection;
import java.util.List;
import java.util.Set;
/**
* MQTT 客户端发布+订阅封装器
*
* @author echo
* @since 2.15.0
**/
@SuppressWarnings("ClassCanBeRecord")
public class MqttTemplate implements MqttOptions {
private final MqttPahoMessageDrivenChannelAdapter adapter;
public MqttTemplate(MqttPahoMessageDrivenChannelAdapter adapter) {
this.adapter = adapter;
}
@Override
public void addTopic(String topic) {
this.addTopic(topic, MqttQoS.AT_MOST_ONCE.value());
}
@Override
public void addTopic(String topic, int qos) {
Set<String> topics = Set.of(adapter.getTopic());
if (topics.contains(topic)) {
return;
}
adapter.addTopic(topic, qos);
}
@Override
public void removeTopic(String topic) {
Set<String> topics = Set.of(adapter.getTopic());
if (!topics.contains(topic)) {
return;
}
adapter.removeTopic(topic);
}
@Override
public List<String> listTopics() {
return List.of(adapter.getTopic());
}
@Override
public void addTopics(String... topics) {
addTopics(MqttQoS.AT_MOST_ONCE.value(), topics);
}
@Override
public void addTopics(int qos, String... topics) {
if (topics == null) {
return;
}
for (String topic : topics) {
addTopic(topic, qos);
}
}
@Override
public void addTopics(List<String> topics) {
addTopics(topics, MqttQoS.AT_MOST_ONCE.value());
}
@Override
public void addTopics(List<String> topics, int qos) {
if (topics == null || topics.isEmpty()) {
return;
}
topics.forEach(topic -> addTopic(topic, qos));
}
@Override
public void removeTopics(String... topics) {
if (topics == null) {
return;
}
for (String topic : topics) {
removeTopic(topic);
}
}
@Override
public void removeTopics(List<String> topics) {
if (topics == null || topics.isEmpty()) {
return;
}
topics.forEach(this::removeTopic);
}
@Override
public void removeTopics(Collection<String> topics) {
if (topics == null || topics.isEmpty()) {
return;
}
topics.forEach(this::removeTopic);
}
@Override
public boolean containsTopic(String topic) {
if (topic == null || topic.trim().isEmpty()) {
return false;
}
Set<String> topics = Set.of(adapter.getTopic());
return topics.contains(topic);
}
@Override
public void clearAllTopics() {
List<String> currentTopics = listTopics();
currentTopics.forEach(this::removeTopic);
}
@Override
public int getTopicCount() {
return listTopics().size();
}
}

View File

@@ -0,0 +1,212 @@
/*
* Copyright (c) 2022-present Charles7c Authors. All Rights Reserved.
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package top.continew.starter.messaging.mqtt.util;
import top.continew.starter.messaging.mqtt.exception.MqttException;
import java.util.List;
/**
* 消息主题工具类
*
* @author echo
* @since 2.15.0
*/
public class TopicUtils {
public static final char TOPIC_WILDCARDS_ONE = '+';
public static final char TOPIC_WILDCARDS_MORE = '#';
public TopicUtils() {
}
/**
* 校验 topicFilter
*
* @param topicFilterList topicFilter 集合
*/
public static void validateTopicFilter(List<String> topicFilterList) {
for (String topicFilter : topicFilterList) {
validateTopicFilter(topicFilter);
}
}
/**
* 校验 topicFilter
*
* @param topicFilter topicFilter
*/
public static void validateTopicFilter(String topicFilter) throws MqttException {
if (topicFilter == null || topicFilter.isEmpty()) {
throw new MqttException("TopicFilter is blank:" + topicFilter);
}
char[] topicFilterChars = topicFilter.toCharArray();
int topicFilterLength = topicFilterChars.length;
int topicFilterIdxEnd = topicFilterLength - 1;
char ch;
for (int i = 0; i < topicFilterLength; i++) {
ch = topicFilterChars[i];
if (Character.isWhitespace(ch)) {
throw new MqttException("Mqtt subscribe topicFilter has white space:" + topicFilter);
} else if (ch == TOPIC_WILDCARDS_MORE) {
// 校验: # 通配符只能在最后一位
if (i < topicFilterIdxEnd) {
throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
} else if (ch == TOPIC_WILDCARDS_ONE) {
// 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 /
if ((i > 0 && topicFilterChars[i - 1] != '/') || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != '/')) {
throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
}
}
}
/**
* 判断是否 topic filter
*
* @param topicFilter topicFilter
* @return 是否 topic filter
*/
public static boolean isTopicFilter(String topicFilter) {
char[] topicFilterChars = topicFilter.toCharArray();
for (char ch : topicFilterChars) {
if (TOPIC_WILDCARDS_ONE == ch || TOPIC_WILDCARDS_MORE == ch) {
return true;
}
}
return false;
}
/**
* 校验 topicName
*
* @param topicName topicName
*/
public static void validateTopicName(String topicName) throws MqttException {
if (topicName.isEmpty()) {
throw new MqttException("Topic is blank:" + topicName);
}
if (isTopicFilter(topicName)) {
throw new MqttException("Topic has wildcards char [+] or [#], topicName:" + topicName);
}
}
/**
* 判断 topicFilter topicName 是否匹配
*
* @param topicFilter topicFilter
* @param topicName topicName
* @return 是否匹配
*/
public static boolean match(String topicFilter, String topicName) {
char[] topicFilterChars = topicFilter.toCharArray();
char[] topicNameChars = topicName.toCharArray();
int topicFilterLength = topicFilterChars.length;
int topicNameLength = topicNameChars.length;
int topicFilterIdxEnd = topicFilterLength - 1;
int topicNameIdxEnd = topicNameLength - 1;
char ch;
// 是否进入 + 号层级通配符
boolean inLayerWildcard = false;
int wildcardCharLen = 0;
topicFilterLoop:
for (int i = 0; i < topicFilterLength; i++) {
ch = topicFilterChars[i];
if (ch == TOPIC_WILDCARDS_MORE) {
// 校验: # 通配符只能在最后一位
if (i < topicFilterIdxEnd) {
throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
return true;
} else if (ch == TOPIC_WILDCARDS_ONE) {
// 校验: 单独 + 是允许的,判断 + 号前一位是否为 /,如果有后一位也必须为 /
if ((i > 0 && topicFilterChars[i - 1] != '/') || (i < topicFilterIdxEnd && topicFilterChars[i + 1] != '/')) {
throw new MqttException("Mqtt subscribe topicFilter illegal:" + topicFilter);
}
// 如果 + 是最后一位,判断 topicName 中是否还存在层级 /
// topicName index
int topicNameIdx = i + wildcardCharLen;
if (i == topicFilterIdxEnd && topicNameLength > topicNameIdx) {
for (int j = topicNameIdx; j < topicNameLength; j++) {
if (topicNameChars[j] == '/') {
return false;
}
}
return true;
}
inLayerWildcard = true;
} else if (ch == '/') {
if (inLayerWildcard) {
inLayerWildcard = false;
}
// 预读下一位,如果是 #,并且 topicName 位数已经不足
int next = i + 1;
if ((topicFilterLength > next) && topicFilterChars[next] == '#' && topicNameLength < next) {
return true;
}
}
// topicName 长度不够了
if (topicNameIdxEnd < i) {
return false;
}
// 进入通配符
if (inLayerWildcard) {
for (int j = i + wildcardCharLen; j < topicNameLength; j++) {
if (topicNameChars[j] == '/') {
wildcardCharLen--;
continue topicFilterLoop;
} else {
wildcardCharLen++;
}
}
}
// topicName index
int topicNameIdx = i + wildcardCharLen;
// topic 已经完成topicName 还有数据
if (topicNameIdx > topicNameIdxEnd) {
return false;
}
if (ch != topicNameChars[topicNameIdx]) {
return false;
}
}
// 判断 topicName 是否还有数据
return topicFilterLength + wildcardCharLen + 1 > topicNameLength;
}
/**
* 获取处理完成之后的 topic
*
* @param topicTemplate topic 模板
* @return 获取处理完成之后的 topic
*/
public static String getTopicFilter(String topicTemplate) {
// 替换 ${name} 为 +
StringBuilder sb = new StringBuilder(topicTemplate.length());
int cursor = 0;
for (int start, end; (start = topicTemplate.indexOf("${", cursor)) != -1 && (end = topicTemplate
.indexOf('}', start)) != -1;) {
sb.append(topicTemplate, cursor, start);
sb.append('+');
cursor = end + 1;
}
sb.append(topicTemplate.substring(cursor));
return sb.toString();
}
}

View File

@@ -0,0 +1 @@
top.continew.starter.messaging.mqtt.autoconfigure.MqttAutoConfiguration

View File

@@ -18,6 +18,7 @@
<modules> <modules>
<module>continew-starter-messaging-mail</module> <module>continew-starter-messaging-mail</module>
<module>continew-starter-messaging-websocket</module> <module>continew-starter-messaging-websocket</module>
<module>continew-starter-messaging-mqtt</module>
</modules> </modules>
<dependencies> <dependencies>